diff --git a/src/operator/window.ts b/src/operator/window.ts index 527fc3dc3f..5f1270818d 100644 --- a/src/operator/window.ts +++ b/src/operator/window.ts @@ -1,11 +1,6 @@ -import { Operator } from '../Operator'; -import { Subscriber } from '../Subscriber'; -import { Observable } from '../Observable'; -import { Subject } from '../Subject'; -import { OuterSubscriber } from '../OuterSubscriber'; -import { InnerSubscriber } from '../InnerSubscriber'; -import { subscribeToResult } from '../util/subscribeToResult'; +import { Observable } from '../Observable'; +import { window as higherOrder } from '../operators'; /** * Branch out the source Observable values as a nested Observable whenever @@ -44,77 +39,5 @@ import { subscribeToResult } from '../util/subscribeToResult'; * @owner Observable */ export function window(this: Observable, windowBoundaries: Observable): Observable> { - return this.lift(new WindowOperator(windowBoundaries)); -} - -class WindowOperator implements Operator> { - - constructor(private windowBoundaries: Observable) { - } - - call(subscriber: Subscriber>, source: any): any { - const windowSubscriber = new WindowSubscriber(subscriber); - const sourceSubscription = source.subscribe(windowSubscriber); - if (!sourceSubscription.closed) { - windowSubscriber.add(subscribeToResult(windowSubscriber, this.windowBoundaries)); - } - return sourceSubscription; - } -} - -/** - * We need this JSDoc comment for affecting ESDoc. - * @ignore - * @extends {Ignored} - */ -class WindowSubscriber extends OuterSubscriber { - - private window: Subject = new Subject(); - - constructor(destination: Subscriber>) { - super(destination); - destination.next(this.window); - } - - notifyNext(outerValue: T, innerValue: any, - outerIndex: number, innerIndex: number, - innerSub: InnerSubscriber): void { - this.openWindow(); - } - - notifyError(error: any, innerSub: InnerSubscriber): void { - this._error(error); - } - - notifyComplete(innerSub: InnerSubscriber): void { - this._complete(); - } - - protected _next(value: T): void { - this.window.next(value); - } - - protected _error(err: any): void { - this.window.error(err); - this.destination.error(err); - } - - protected _complete(): void { - this.window.complete(); - this.destination.complete(); - } - - protected _unsubscribe() { - this.window = null; - } - - private openWindow(): void { - const prevWindow = this.window; - if (prevWindow) { - prevWindow.complete(); - } - const destination = this.destination; - const newWindow = this.window = new Subject(); - destination.next(newWindow); - } + return higherOrder(windowBoundaries)(this); } diff --git a/src/operators/index.ts b/src/operators/index.ts index d5dd9b1881..8ddcbae9a0 100644 --- a/src/operators/index.ts +++ b/src/operators/index.ts @@ -22,4 +22,5 @@ export { switchAll } from './switchAll'; export { switchMap } from './switchMap'; export { takeLast } from './takeLast'; export { tap } from './tap'; +export { window } from './window'; export { windowToggle } from './windowToggle'; diff --git a/src/operators/window.ts b/src/operators/window.ts new file mode 100644 index 0000000000..54556694a9 --- /dev/null +++ b/src/operators/window.ts @@ -0,0 +1,122 @@ +import { Observable } from '../Observable'; +import { OperatorFunction } from '../interfaces'; +import { Subject } from '../Subject'; +import { Subscriber } from '../Subscriber'; +import { OuterSubscriber } from '../OuterSubscriber'; +import { InnerSubscriber } from '../InnerSubscriber'; +import { subscribeToResult } from '../util/subscribeToResult'; +import { Operator } from '../Operator'; + +/** + * Branch out the source Observable values as a nested Observable whenever + * `windowBoundaries` emits. + * + * It's like {@link buffer}, but emits a nested Observable + * instead of an array. + * + * + * + * Returns an Observable that emits windows of items it collects from the source + * Observable. The output Observable emits connected, non-overlapping + * windows. It emits the current window and opens a new one whenever the + * Observable `windowBoundaries` emits an item. Because each window is an + * Observable, the output is a higher-order Observable. + * + * @example In every window of 1 second each, emit at most 2 click events + * var clicks = Rx.Observable.fromEvent(document, 'click'); + * var interval = Rx.Observable.interval(1000); + * var result = clicks.window(interval) + * .map(win => win.take(2)) // each window has at most 2 emissions + * .mergeAll(); // flatten the Observable-of-Observables + * result.subscribe(x => console.log(x)); + * + * @see {@link windowCount} + * @see {@link windowTime} + * @see {@link windowToggle} + * @see {@link windowWhen} + * @see {@link buffer} + * + * @param {Observable} windowBoundaries An Observable that completes the + * previous window and starts a new window. + * @return {Observable>} An Observable of windows, which are + * Observables emitting values of the source Observable. + * @method window + * @owner Observable + */ +export function window(windowBoundaries: Observable): OperatorFunction> { + return function windowOperatorFunction(source: Observable) { + return source.lift(new WindowOperator(windowBoundaries)); + }; +} + +class WindowOperator implements Operator> { + + constructor(private windowBoundaries: Observable) { + } + + call(subscriber: Subscriber>, source: any): any { + const windowSubscriber = new WindowSubscriber(subscriber); + const sourceSubscription = source.subscribe(windowSubscriber); + if (!sourceSubscription.closed) { + windowSubscriber.add(subscribeToResult(windowSubscriber, this.windowBoundaries)); + } + return sourceSubscription; + } +} + +/** + * We need this JSDoc comment for affecting ESDoc. + * @ignore + * @extends {Ignored} + */ +class WindowSubscriber extends OuterSubscriber { + + private window: Subject = new Subject(); + + constructor(destination: Subscriber>) { + super(destination); + destination.next(this.window); + } + + notifyNext(outerValue: T, innerValue: any, + outerIndex: number, innerIndex: number, + innerSub: InnerSubscriber): void { + this.openWindow(); + } + + notifyError(error: any, innerSub: InnerSubscriber): void { + this._error(error); + } + + notifyComplete(innerSub: InnerSubscriber): void { + this._complete(); + } + + protected _next(value: T): void { + this.window.next(value); + } + + protected _error(err: any): void { + this.window.error(err); + this.destination.error(err); + } + + protected _complete(): void { + this.window.complete(); + this.destination.complete(); + } + + protected _unsubscribe() { + this.window = null; + } + + private openWindow(): void { + const prevWindow = this.window; + if (prevWindow) { + prevWindow.complete(); + } + const destination = this.destination; + const newWindow = this.window = new Subject(); + destination.next(newWindow); + } +}