diff --git a/src/operator/takeLast.ts b/src/operator/takeLast.ts index 89490ef808..0af02fb2ec 100644 --- a/src/operator/takeLast.ts +++ b/src/operator/takeLast.ts @@ -1,9 +1,6 @@ -import { Operator } from '../Operator'; -import { Subscriber } from '../Subscriber'; -import { ArgumentOutOfRangeError } from '../util/ArgumentOutOfRangeError'; -import { EmptyObservable } from '../observable/EmptyObservable'; + import { Observable } from '../Observable'; -import { TeardownLogic } from '../Subscription'; +import { takeLast as higherOrderTakeLast } from '../operators'; /** * Emits only the last `count` values emitted by the source Observable. @@ -42,65 +39,5 @@ import { TeardownLogic } from '../Subscription'; * @owner Observable */ export function takeLast(this: Observable, count: number): Observable { - if (count === 0) { - return new EmptyObservable(); - } else { - return this.lift(new TakeLastOperator(count)); - } -} - -class TakeLastOperator implements Operator { - constructor(private total: number) { - if (this.total < 0) { - throw new ArgumentOutOfRangeError; - } - } - - call(subscriber: Subscriber, source: any): TeardownLogic { - return source.subscribe(new TakeLastSubscriber(subscriber, this.total)); - } -} - -/** - * We need this JSDoc comment for affecting ESDoc. - * @ignore - * @extends {Ignored} - */ -class TakeLastSubscriber extends Subscriber { - private ring: Array = new Array(); - private count: number = 0; - - constructor(destination: Subscriber, private total: number) { - super(destination); - } - - protected _next(value: T): void { - const ring = this.ring; - const total = this.total; - const count = this.count++; - - if (ring.length < total) { - ring.push(value); - } else { - const index = count % total; - ring[index] = value; - } - } - - protected _complete(): void { - const destination = this.destination; - let count = this.count; - - if (count > 0) { - const total = this.count >= this.total ? this.total : this.count; - const ring = this.ring; - - for (let i = 0; i < total; i++) { - const idx = (count++) % total; - destination.next(ring[idx]); - } - } - - destination.complete(); - } + return higherOrderTakeLast(count)(this); } diff --git a/src/operators/index.ts b/src/operators/index.ts index bfb54ca5a3..245d6ff706 100644 --- a/src/operators/index.ts +++ b/src/operators/index.ts @@ -4,3 +4,4 @@ export { map } from './map'; export { mergeMap } from './mergeMap'; export { scan } from './scan'; export { switchMap } from './switchMap'; +export { takeLast } from './takeLast'; diff --git a/src/operators/takeLast.ts b/src/operators/takeLast.ts new file mode 100644 index 0000000000..e6e44d9cef --- /dev/null +++ b/src/operators/takeLast.ts @@ -0,0 +1,109 @@ +import { Operator } from '../Operator'; +import { Subscriber } from '../Subscriber'; +import { ArgumentOutOfRangeError } from '../util/ArgumentOutOfRangeError'; +import { EmptyObservable } from '../observable/EmptyObservable'; +import { Observable } from '../Observable'; +import { TeardownLogic } from '../Subscription'; +import { OperatorFunction } from '../interfaces'; + +/** + * Emits only the last `count` values emitted by the source Observable. + * + * Remembers the latest `count` values, then emits those + * only when the source completes. + * + * + * + * `takeLast` returns an Observable that emits at most the last `count` values + * emitted by the source Observable. If the source emits fewer than `count` + * values then all of its values are emitted. This operator must wait until the + * `complete` notification emission from the source in order to emit the `next` + * values on the output Observable, because otherwise it is impossible to know + * whether or not more values will be emitted on the source. For this reason, + * all values are emitted synchronously, followed by the complete notification. + * + * @example Take the last 3 values of an Observable with many values + * var many = Rx.Observable.range(1, 100); + * var lastThree = many.takeLast(3); + * lastThree.subscribe(x => console.log(x)); + * + * @see {@link take} + * @see {@link takeUntil} + * @see {@link takeWhile} + * @see {@link skip} + * + * @throws {ArgumentOutOfRangeError} When using `takeLast(i)`, it delivers an + * ArgumentOutOrRangeError to the Observer's `error` callback if `i < 0`. + * + * @param {number} count The maximum number of values to emit from the end of + * the sequence of values emitted by the source Observable. + * @return {Observable} An Observable that emits at most the last count + * values emitted by the source Observable. + * @method takeLast + * @owner Observable + */ +export function takeLast(count: number): OperatorFunction { + return function takeLastOperatorFunction(source: Observable): Observable { + if (count === 0) { + return new EmptyObservable(); + } else { + return source.lift(new TakeLastOperator(count)); + } + }; +} + +class TakeLastOperator implements Operator { + constructor(private total: number) { + if (this.total < 0) { + throw new ArgumentOutOfRangeError; + } + } + + call(subscriber: Subscriber, source: any): TeardownLogic { + return source.subscribe(new TakeLastSubscriber(subscriber, this.total)); + } +} + +/** + * We need this JSDoc comment for affecting ESDoc. + * @ignore + * @extends {Ignored} + */ +class TakeLastSubscriber extends Subscriber { + private ring: Array = new Array(); + private count: number = 0; + + constructor(destination: Subscriber, private total: number) { + super(destination); + } + + protected _next(value: T): void { + const ring = this.ring; + const total = this.total; + const count = this.count++; + + if (ring.length < total) { + ring.push(value); + } else { + const index = count % total; + ring[index] = value; + } + } + + protected _complete(): void { + const destination = this.destination; + let count = this.count; + + if (count > 0) { + const total = this.count >= this.total ? this.total : this.count; + const ring = this.ring; + + for (let i = 0; i < total; i++) { + const idx = (count++) % total; + destination.next(ring[idx]); + } + } + + destination.complete(); + } +}