From 81117b7b00f069a2840e00b21fbb52e8eb1d1c06 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lu=C3=ADs=20Gabriel=20Lima?= Date: Tue, 1 Dec 2015 18:51:23 -0300 Subject: [PATCH] fix(expand): accept scheduler parameter --- src/operators/expand-support.ts | 52 +++++++++++++++++++++++++-------- 1 file changed, 40 insertions(+), 12 deletions(-) diff --git a/src/operators/expand-support.ts b/src/operators/expand-support.ts index a5e3714228a..9d9f9e15e94 100644 --- a/src/operators/expand-support.ts +++ b/src/operators/expand-support.ts @@ -1,6 +1,9 @@ import {Operator} from '../Operator'; import {Observable} from '../Observable'; +import {Scheduler} from '../Scheduler'; import {Subscriber} from '../Subscriber'; +import {isNumeric} from '../util/isNumeric'; +import {isScheduler} from '../util/isScheduler'; import {tryCatch} from '../util/tryCatch'; import {errorObject} from '../util/errorObject'; import {OuterSubscriber} from '../OuterSubscriber'; @@ -9,11 +12,12 @@ import {subscribeToResult} from '../util/subscribeToResult'; export class ExpandOperator implements Operator { constructor(private project: (value: T, index: number) => Observable, - private concurrent: number = Number.POSITIVE_INFINITY) { + private concurrent?: number | Scheduler, + private scheduler?: Scheduler) { } call(subscriber: Subscriber): Subscriber { - return new ExpandSubscriber(subscriber, this.project, this.concurrent); + return new ExpandSubscriber(subscriber, this.project, this.concurrent, this.scheduler); } } @@ -22,12 +26,25 @@ export class ExpandSubscriber extends OuterSubscriber { private active: number = 0; private hasCompleted: boolean = false; private buffer: any[]; + private _concurrent: number = Number.POSITIVE_INFINITY; constructor(destination: Subscriber, private project: (value: T, index: number) => Observable, - private concurrent: number = Number.POSITIVE_INFINITY) { + private concurrent?: number | Scheduler, + private scheduler?: Scheduler) { super(destination); - if (concurrent < Number.POSITIVE_INFINITY) { + + if (isNumeric(concurrent)) { + this._concurrent = Number(concurrent) < 1 && 1 || Number(concurrent); + } else if (isScheduler(concurrent)) { + scheduler = concurrent; + } + if (!isScheduler(scheduler)) { + scheduler = undefined; + } + this.scheduler = scheduler; + + if (this._concurrent < Number.POSITIVE_INFINITY) { this.buffer = []; } } @@ -41,24 +58,31 @@ export class ExpandSubscriber extends OuterSubscriber { } const index = this.index++; - if (this.active < this.concurrent) { + if (this.active < this._concurrent) { destination.next(value); let result = tryCatch(this.project)(value, index); if (result === errorObject) { destination.error(result.e); + } else if (!this.scheduler) { + this._innerSub(result, value, index); } else { - if (result._isScalar) { - this._next(result.value); - } else { - this.active++; - this.add(subscribeToResult(this, result, value, index)); - } + const state = { subscriber: this, result, value, index }; + this.add(this.scheduler.schedule(disptch, 0, state)); } } else { this.buffer.push(value); } } + _innerSub(result: any, value: any, index: number): void { + if (result._isScalar) { + this._next(result.value); + } else { + this.active++; + this.add(subscribeToResult(this, result, value, index)); + } + } + _complete(): void { this.hasCompleted = true; if (this.hasCompleted && this.active === 0) { @@ -81,4 +105,8 @@ export class ExpandSubscriber extends OuterSubscriber { notifyNext(outerValue: T, innerValue: R, outerIndex: number, innerIndex: number): void { this._next(innerValue); } -} \ No newline at end of file +} + +function disptch({ subscriber, result, value, index }) { + subscriber._innerSub(result, value, index); +}