From 79e908492edb57f58bce232e84ba078ec057f7bc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lu=C3=ADs=20Gabriel=20Lima?= Date: Tue, 1 Dec 2015 18:51:23 -0300 Subject: [PATCH] fix(expand): accept scheduler parameter Also moves the handling of the default value for optional parameters to the expand function instead of the operator's ctor. Closes #841. --- spec/operators/expand-spec.js | 26 +++++++++++++++++++++++++- src/CoreOperators.ts | 2 +- src/Observable.ts | 2 +- src/operator/expand-support.ts | 34 ++++++++++++++++++++++++---------- src/operator/expand.ts | 8 ++++++-- 5 files changed, 57 insertions(+), 15 deletions(-) diff --git a/spec/operators/expand-spec.js b/spec/operators/expand-spec.js index ec7eed18dc..7496fbc2af 100644 --- a/spec/operators/expand-spec.js +++ b/spec/operators/expand-spec.js @@ -323,4 +323,28 @@ describe('Observable.prototype.expand()', function () { done(); }); }); -}); \ No newline at end of file + + it('should work when passing undefined for the optional arguments', function () { + var values = { + a: 1, + b: 1 + 1, // a + a, + c: 2 + 2, // b + b, + d: 4 + 4, // c + c, + e: 8 + 8, // d + d + }; + var e1 = hot('(a|)', values); + var e1subs = '^ ! '; + var e2shape = '---(z|) '; + var expected = 'a--b--c--d--(e|)'; + + var result = e1.expand(function (x) { + if (x === 16) { + return Observable.empty(); + } + return cold(e2shape, { z: x + x }); + }, undefined, undefined); + + expectObservable(result).toBe(expected, values); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + }); +}); diff --git a/src/CoreOperators.ts b/src/CoreOperators.ts index f5cdcf8dac..7de90bcf65 100644 --- a/src/CoreOperators.ts +++ b/src/CoreOperators.ts @@ -26,7 +26,7 @@ export interface CoreOperators { delay?: (delay: number, scheduler?: Scheduler) => Observable; distinctUntilChanged?: (compare?: (x: T, y: T) => boolean, thisArg?: any) => Observable; do?: (next?: (x: T) => void, error?: (e: any) => void, complete?: () => void) => Observable; - expand?: (project: (x: T, ix: number) => Observable) => Observable; + expand?: (project: (x: T, ix: number) => Observable, concurrent: number, scheduler: Scheduler) => Observable; filter?: (predicate: (x: T) => boolean, ix?: number, thisArg?: any) => Observable; finally?: (ensure: () => void, thisArg?: any) => Observable; first?: (predicate?: (value: T, index: number, source: Observable) => boolean, diff --git a/src/Observable.ts b/src/Observable.ts index 2e0ac970d0..8058e7ca3d 100644 --- a/src/Observable.ts +++ b/src/Observable.ts @@ -185,7 +185,7 @@ export class Observable implements CoreOperators { delay: (delay: number, scheduler?: Scheduler) => Observable; distinctUntilChanged: (compare?: (x: T, y: T) => boolean, thisArg?: any) => Observable; do: (next?: (x: T) => void, error?: (e: any) => void, complete?: () => void) => Observable; - expand: (project: (x: T, ix: number) => Observable) => Observable; + expand: (project: (x: T, ix: number) => Observable, concurrent: number, scheduler: Scheduler) => Observable; filter: (predicate: (x: T) => boolean, ix?: number, thisArg?: any) => Observable; finally: (ensure: () => void, thisArg?: any) => Observable; first: (predicate?: (value: T, index: number, source: Observable) => boolean, diff --git a/src/operator/expand-support.ts b/src/operator/expand-support.ts index a5e3714228..b2716d9d83 100644 --- a/src/operator/expand-support.ts +++ b/src/operator/expand-support.ts @@ -1,5 +1,6 @@ import {Operator} from '../Operator'; import {Observable} from '../Observable'; +import {Scheduler} from '../Scheduler'; import {Subscriber} from '../Subscriber'; import {tryCatch} from '../util/tryCatch'; import {errorObject} from '../util/errorObject'; @@ -8,12 +9,13 @@ import {InnerSubscriber} from '../InnerSubscriber'; import {subscribeToResult} from '../util/subscribeToResult'; export class ExpandOperator implements Operator { - constructor(private project: (value: T, index: number) => Observable, - private concurrent: number = Number.POSITIVE_INFINITY) { + constructor(private project: (value: T, index: number) => Observable, + private concurrent: number, + private scheduler: Scheduler) { } call(subscriber: Subscriber): Subscriber { - return new ExpandSubscriber(subscriber, this.project, this.concurrent); + return new ExpandSubscriber(subscriber, this.project, this.concurrent, this.scheduler); } } @@ -25,13 +27,18 @@ export class ExpandSubscriber extends OuterSubscriber { constructor(destination: Subscriber, private project: (value: T, index: number) => Observable, - private concurrent: number = Number.POSITIVE_INFINITY) { + private concurrent: number, + private scheduler: Scheduler) { super(destination); if (concurrent < Number.POSITIVE_INFINITY) { this.buffer = []; } } + private static dispatch({subscriber, result, value, index}): void { + subscriber.subscribeToProjection(result, value, index); + } + _next(value: any): void { const destination = this.destination; @@ -46,19 +53,26 @@ export class ExpandSubscriber extends OuterSubscriber { let result = tryCatch(this.project)(value, index); if (result === errorObject) { destination.error(result.e); + } else if (!this.scheduler) { + this.subscribeToProjection(result, value, index); } else { - if (result._isScalar) { - this._next(result.value); - } else { - this.active++; - this.add(subscribeToResult(this, result, value, index)); - } + const state = { subscriber: this, result, value, index }; + this.add(this.scheduler.schedule(ExpandSubscriber.dispatch, 0, state)); } } else { this.buffer.push(value); } } + private subscribeToProjection(result, value: T, index: number): void { + if (result._isScalar) { + this._next(result.value); + } else { + this.active++; + this.add(subscribeToResult(this, result, value, index)); + } + } + _complete(): void { this.hasCompleted = true; if (this.hasCompleted && this.active === 0) { diff --git a/src/operator/expand.ts b/src/operator/expand.ts index 296f1e349c..e54ce1e052 100644 --- a/src/operator/expand.ts +++ b/src/operator/expand.ts @@ -1,7 +1,11 @@ import {Observable} from '../Observable'; +import {Scheduler} from '../Scheduler'; import {ExpandOperator} from './expand-support'; export function expand(project: (value: T, index: number) => Observable, - concurrent: number = Number.POSITIVE_INFINITY): Observable { - return this.lift(new ExpandOperator(project, concurrent)); + concurrent: number = Number.POSITIVE_INFINITY, + scheduler: Scheduler = undefined): Observable { + concurrent = (concurrent || 0) < 1 ? Number.POSITIVE_INFINITY : concurrent; + + return this.lift(new ExpandOperator(project, concurrent, scheduler)); }