diff --git a/src/operator/takeUntil.ts b/src/operator/takeUntil.ts index 070d558433..0019265178 100644 --- a/src/operator/takeUntil.ts +++ b/src/operator/takeUntil.ts @@ -1,11 +1,5 @@ -import { Operator } from '../Operator'; import { Observable } from '../Observable'; -import { Subscriber } from '../Subscriber'; -import { TeardownLogic } from '../Subscription'; - -import { OuterSubscriber } from '../OuterSubscriber'; -import { InnerSubscriber } from '../InnerSubscriber'; -import { subscribeToResult } from '../util/subscribeToResult'; +import { takeUntil as higherOrder } from '../operators/takeUntil'; /** * Emits the values emitted by the source Observable until a `notifier` @@ -41,38 +35,5 @@ import { subscribeToResult } from '../util/subscribeToResult'; * @owner Observable */ export function takeUntil(this: Observable, notifier: Observable): Observable { - return this.lift(new TakeUntilOperator(notifier)); -} - -class TakeUntilOperator implements Operator { - constructor(private notifier: Observable) { - } - - call(subscriber: Subscriber, source: any): TeardownLogic { - return source.subscribe(new TakeUntilSubscriber(subscriber, this.notifier)); - } -} - -/** - * We need this JSDoc comment for affecting ESDoc. - * @ignore - * @extends {Ignored} - */ -class TakeUntilSubscriber extends OuterSubscriber { - - constructor(destination: Subscriber, - private notifier: Observable) { - super(destination); - this.add(subscribeToResult(this, notifier)); - } - - notifyNext(outerValue: T, innerValue: R, - outerIndex: number, innerIndex: number, - innerSub: InnerSubscriber): void { - this.complete(); - } - - notifyComplete(): void { - // noop - } + return higherOrder(notifier)(this); } diff --git a/src/operators/index.ts b/src/operators/index.ts index 174e493419..76f5dcc37a 100644 --- a/src/operators/index.ts +++ b/src/operators/index.ts @@ -71,6 +71,7 @@ export { switchMap } from './switchMap'; export { switchMapTo } from './switchMapTo'; export { take } from './take'; export { takeLast } from './takeLast'; +export { takeUntil } from './takeUntil'; export { tap } from './tap'; export { timestamp } from './timestamp'; export { toArray } from './toArray'; diff --git a/src/operators/takeUntil.ts b/src/operators/takeUntil.ts new file mode 100644 index 0000000000..ca89686ee4 --- /dev/null +++ b/src/operators/takeUntil.ts @@ -0,0 +1,80 @@ +import { Operator } from '../Operator'; +import { Observable } from '../Observable'; +import { Subscriber } from '../Subscriber'; +import { TeardownLogic } from '../Subscription'; + +import { OuterSubscriber } from '../OuterSubscriber'; +import { InnerSubscriber } from '../InnerSubscriber'; +import { subscribeToResult } from '../util/subscribeToResult'; + +import { MonoTypeOperatorFunction } from '../interfaces'; + +/** + * Emits the values emitted by the source Observable until a `notifier` + * Observable emits a value. + * + * Lets values pass until a second Observable, + * `notifier`, emits something. Then, it completes. + * + * + * + * `takeUntil` subscribes and begins mirroring the source Observable. It also + * monitors a second Observable, `notifier` that you provide. If the `notifier` + * emits a value or a complete notification, the output Observable stops + * mirroring the source Observable and completes. + * + * @example Tick every second until the first click happens + * var interval = Rx.Observable.interval(1000); + * var clicks = Rx.Observable.fromEvent(document, 'click'); + * var result = interval.takeUntil(clicks); + * result.subscribe(x => console.log(x)); + * + * @see {@link take} + * @see {@link takeLast} + * @see {@link takeWhile} + * @see {@link skip} + * + * @param {Observable} notifier The Observable whose first emitted value will + * cause the output Observable of `takeUntil` to stop emitting values from the + * source Observable. + * @return {Observable} An Observable that emits the values from the source + * Observable until such time as `notifier` emits its first value. + * @method takeUntil + * @owner Observable + */ +export function takeUntil(notifier: Observable): MonoTypeOperatorFunction { + return (source: Observable) => source.lift(new TakeUntilOperator(notifier)); +} + +class TakeUntilOperator implements Operator { + constructor(private notifier: Observable) { + } + + call(subscriber: Subscriber, source: any): TeardownLogic { + return source.subscribe(new TakeUntilSubscriber(subscriber, this.notifier)); + } +} + +/** + * We need this JSDoc comment for affecting ESDoc. + * @ignore + * @extends {Ignored} + */ +class TakeUntilSubscriber extends OuterSubscriber { + + constructor(destination: Subscriber, + private notifier: Observable) { + super(destination); + this.add(subscribeToResult(this, notifier)); + } + + notifyNext(outerValue: T, innerValue: R, + outerIndex: number, innerIndex: number, + innerSub: InnerSubscriber): void { + this.complete(); + } + + notifyComplete(): void { + // noop + } +}