operators/take.js

  1. import { Observable } from '../Observable';
  2. import { passThroughNext } from './passThroughNext';
  3. /**
  4. * Takes a number of values that satisfy the `filterCallback` then completes
  5. *
  6. * @memberof operators
  7. *
  8. * @param {Observable} source$
  9. * @param {Number} amount
  10. * @param {Function} [filterCallback]
  11. * @returns {Observable}
  12. */
  13. export const take = function (source$, amount, filterCallback = () => true) {
  14. return new Observable (function (observer) {
  15. const taken = [];
  16. const subscription = passThroughNext(source$, function ({ next, complete }, value) {
  17. const isComplete = taken.length === amount;
  18. if (!isComplete && filterCallback(value)) {
  19. taken.push(value);
  20. next(value);
  21. }
  22. if (isComplete) {
  23. complete();
  24. }
  25. }).subscribe(observer);
  26. return () => subscription.unsubscribe();
  27. });
  28. };
  29. Observable.take = take;
  30. Observable.prototype.take = function (amount, filterCallback) {
  31. return take(this, amount, filterCallback);
  32. };