Skip to content

Commit

Permalink
fix(expand): accept scheduler parameter
Browse files Browse the repository at this point in the history
  • Loading branch information
luisgabriel committed Dec 1, 2015
1 parent f123887 commit 81117b7
Showing 1 changed file with 40 additions and 12 deletions.
52 changes: 40 additions & 12 deletions src/operators/expand-support.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
import {Operator} from '../Operator';
import {Observable} from '../Observable';
import {Scheduler} from '../Scheduler';
import {Subscriber} from '../Subscriber';
import {isNumeric} from '../util/isNumeric';
import {isScheduler} from '../util/isScheduler';
import {tryCatch} from '../util/tryCatch';
import {errorObject} from '../util/errorObject';
import {OuterSubscriber} from '../OuterSubscriber';
Expand All @@ -9,11 +12,12 @@ import {subscribeToResult} from '../util/subscribeToResult';

export class ExpandOperator<T, R> implements Operator<T, R> {
constructor(private project: (value: T, index: number) => Observable<any>,
private concurrent: number = Number.POSITIVE_INFINITY) {
private concurrent?: number | Scheduler,
private scheduler?: Scheduler) {
}

call(subscriber: Subscriber<R>): Subscriber<T> {
return new ExpandSubscriber(subscriber, this.project, this.concurrent);
return new ExpandSubscriber(subscriber, this.project, this.concurrent, this.scheduler);
}
}

Expand All @@ -22,12 +26,25 @@ export class ExpandSubscriber<T, R> extends OuterSubscriber<T, R> {
private active: number = 0;
private hasCompleted: boolean = false;
private buffer: any[];
private _concurrent: number = Number.POSITIVE_INFINITY;

constructor(destination: Subscriber<R>,
private project: (value: T, index: number) => Observable<R>,
private concurrent: number = Number.POSITIVE_INFINITY) {
private concurrent?: number | Scheduler,
private scheduler?: Scheduler) {
super(destination);
if (concurrent < Number.POSITIVE_INFINITY) {

if (isNumeric(concurrent)) {
this._concurrent = Number(concurrent) < 1 && 1 || Number(concurrent);
} else if (isScheduler(concurrent)) {
scheduler = <Scheduler> concurrent;
}
if (!isScheduler(scheduler)) {
scheduler = undefined;
}
this.scheduler = scheduler;

if (this._concurrent < Number.POSITIVE_INFINITY) {
this.buffer = [];
}
}
Expand All @@ -41,24 +58,31 @@ export class ExpandSubscriber<T, R> extends OuterSubscriber<T, R> {
}

const index = this.index++;
if (this.active < this.concurrent) {
if (this.active < this._concurrent) {
destination.next(value);
let result = tryCatch(this.project)(value, index);
if (result === errorObject) {
destination.error(result.e);
} else if (!this.scheduler) {
this._innerSub(result, value, index);
} else {
if (result._isScalar) {
this._next(result.value);
} else {
this.active++;
this.add(subscribeToResult<T, R>(this, result, value, index));
}
const state = { subscriber: this, result, value, index };
this.add(this.scheduler.schedule(disptch, 0, state));
}
} else {
this.buffer.push(value);
}
}

_innerSub(result: any, value: any, index: number): void {
if (result._isScalar) {
this._next(result.value);
} else {
this.active++;
this.add(subscribeToResult<T, R>(this, result, value, index));
}
}

_complete(): void {
this.hasCompleted = true;
if (this.hasCompleted && this.active === 0) {
Expand All @@ -81,4 +105,8 @@ export class ExpandSubscriber<T, R> extends OuterSubscriber<T, R> {
notifyNext(outerValue: T, innerValue: R, outerIndex: number, innerIndex: number): void {
this._next(innerValue);
}
}
}

function disptch({ subscriber, result, value, index }) {
subscriber._innerSub(result, value, index);
}

0 comments on commit 81117b7

Please sign in to comment.