Skip to content

Commit

Permalink
feat(mergeScan): support concurrency parameter for mergeScan
Browse files Browse the repository at this point in the history
- expose concurrency parameter to interface of mergeScan
- expand test coverage to test concurrency works

closes ReactiveX#868
  • Loading branch information
kwonoj committed Dec 4, 2015
1 parent d2e6318 commit fe0eb37
Show file tree
Hide file tree
Showing 3 changed files with 154 additions and 6 deletions.
148 changes: 147 additions & 1 deletion spec/operators/mergeScan-spec.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
/* globals describe, it, expect, expectObservable, expectSubscriptions, hot, cold */
/* globals describe, it, expect, rxTestScheduler, expectObservable, expectSubscriptions, hot, cold */
var Rx = require('../../dist/cjs/Rx');
var Observable = Rx.Observable;

Expand Down Expand Up @@ -225,4 +225,150 @@ describe('Observable.prototype.mergeScan()', function () {
expectObservable(source, sub).toBe(expected, values);
expectSubscriptions(e1.subscriptions).toBe(sub);
});

it('should mergescan projects cold Observable with single concurrency', function () {
var e1 = hot('--a--b--c--|');
var e1subs = '^ !';

var inner = [
cold( '--d--e--f--| '),
cold( '--g--h--i--| '),
cold( '--j--k--l--|')
];

var xsubs = ' ^ !';
var ysubs = ' ^ !';
var zsubs = ' ^ !';

var expected = '--x-d--e--f--f-g--h--i--i-j--k--l--|';

var index = 0;
var source = e1.mergeScan(function (acc, x) {
var value = inner[index++];
return value.startWith(acc);
}, 'x', 1);

expectObservable(source).toBe(expected);

expectSubscriptions(e1.subscriptions).toBe(e1subs);
expectSubscriptions(inner[0].subscriptions).toBe(xsubs);
expectSubscriptions(inner[1].subscriptions).toBe(ysubs);
expectSubscriptions(inner[2].subscriptions).toBe(zsubs);
});

it('should emit accumulator if inner completes without value', function () {
var e1 = hot('--a--^--b--c--d--e--f--g--|');
var e1subs = '^ !';
var expected = '---------------------(x|)';

var source = e1.mergeScan(function (acc, x) {
return Observable.empty();
}, ['1']);

expectObservable(source).toBe(expected, {x: ['1']});
expectSubscriptions(e1.subscriptions).toBe(e1subs);
});

it('should emit accumulator if inner completes without value after source completes', function () {
var e1 = hot('--a--^--b--c--d--e--f--g--|');
var e1subs = '^ !';
var expected = '-----------------------(x|)';

var source = e1.mergeScan(function (acc, x) {
return Observable.empty().delay(50, rxTestScheduler);
}, ['1']);

expectObservable(source).toBe(expected, {x: ['1']});
expectSubscriptions(e1.subscriptions).toBe(e1subs);
});

it('should mergescan projects hot Observable with single concurrency', function () {
var e1 = hot('---a---b---c---|');
var e1subs = '^ !';

var inner = [
hot( '--d--e--f--|'),
hot( '----g----h----i----|'),
hot( '------j------k-------l------|')
];

var xsubs = ' ^ !';
var ysubs = ' ^ !';
var zsubs = ' ^ !';

var expected = '---x-e--f--f--i----i-l------|';

var index = 0;
var source = e1.mergeScan(function (acc, x) {
var value = inner[index++];
return value.startWith(acc);
}, 'x', 1);

expectObservable(source).toBe(expected);

expectSubscriptions(e1.subscriptions).toBe(e1subs);
expectSubscriptions(inner[0].subscriptions).toBe(xsubs);
expectSubscriptions(inner[1].subscriptions).toBe(ysubs);
expectSubscriptions(inner[2].subscriptions).toBe(zsubs);
});

it('should mergescan projects cold Observable with dual concurrency', function () {
var e1 = hot('----a----b----c----|');
var e1subs = '^ !';

var inner = [
cold( '---d---e---f---| '),
cold( '---g---h---i---| '),
cold( '---j---k---l---|')
];

var xsubs = ' ^ !';
var ysubs = ' ^ !';
var zsubs = ' ^ !';

var expected = '----x--d-d-eg--fh--hi-j---k---l---|';

var index = 0;
var source = e1.mergeScan(function (acc, x) {
var value = inner[index++];
return value.startWith(acc);
}, 'x', 2);

expectObservable(source).toBe(expected);

expectSubscriptions(e1.subscriptions).toBe(e1subs);
expectSubscriptions(inner[0].subscriptions).toBe(xsubs);
expectSubscriptions(inner[1].subscriptions).toBe(ysubs);
expectSubscriptions(inner[2].subscriptions).toBe(zsubs);
});

it('should mergescan projects hot Observable with dual concurrency', function () {
var e1 = hot('---a---b---c---|');
var e1subs = '^ !';

var inner = [
hot( '--d--e--f--|'),
hot( '----g----h----i----|'),
hot( '------j------k-------l------|')
];

var xsubs = ' ^ !';
var ysubs = ' ^ !';
var zsubs = ' ^ !';

var expected = '---x-e-efh-h-ki------l------|';

var index = 0;
var source = e1.mergeScan(function (acc, x) {
var value = inner[index++];
return value.startWith(acc);
}, 'x', 2);

expectObservable(source).toBe(expected);

expectSubscriptions(e1.subscriptions).toBe(e1subs);
expectSubscriptions(inner[0].subscriptions).toBe(xsubs);
expectSubscriptions(inner[1].subscriptions).toBe(ysubs);
expectSubscriptions(inner[2].subscriptions).toBe(zsubs);
});
});
2 changes: 1 addition & 1 deletion src/Rx.KitchenSink.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ export interface KitchenSinkOperators<T> extends CoreOperators<T> {
max?: <T, R>(comparer?: (x: R, y: T) => R) => Observable<R>;
min?: <T, R>(comparer?: (x: R, y: T) => R) => Observable<R>;
timeInterval?: <T>(scheduler?: IScheduler) => Observable<T>;
mergeScan?: <T, R>(project: (acc: R, x: T) => Observable<R>, seed: R) => Observable<R>;
mergeScan?: <T, R>(project: (acc: R, x: T) => Observable<R>, seed: R, concurrent?: number) => Observable<R>;
switchFirst?: () => Observable<T>;
switchMapFirst?: <R>(project: ((x: T, ix: number) => Observable<any>),
projectResult?: (x: T, y: any, ix: number, iy: number) => R) => Observable<R>;
Expand Down
10 changes: 6 additions & 4 deletions src/operator/extended/mergeScan.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,16 @@ import {errorObject} from '../../util/errorObject';
import {subscribeToResult} from '../../util/subscribeToResult';
import {OuterSubscriber} from '../../OuterSubscriber';

export function mergeScan<T, R>(project: (acc: R, x: T) => Observable<R>, seed: R) {
return this.lift(new MergeScanOperator(project, seed));
export function mergeScan<T, R>(project: (acc: R, x: T) => Observable<R>,
seed: R,
concurrent: number = Number.POSITIVE_INFINITY) {
return this.lift(new MergeScanOperator(project, seed, concurrent));
}

export class MergeScanOperator<T, R> implements Operator<T, R> {
constructor(private project: (acc: R, x: T) => Observable<R>,
private seed: R,
private concurrent: number = Number.POSITIVE_INFINITY) {
private concurrent: number) {
}

call(subscriber: Subscriber<R>): Subscriber<T> {
Expand All @@ -34,7 +36,7 @@ export class MergeScanSubscriber<T, R> extends OuterSubscriber<T, R> {
constructor(destination: Subscriber<R>,
private project: (acc: R, x: T) => Observable<R>,
private acc: R,
private concurrent: number = Number.POSITIVE_INFINITY) {
private concurrent: number) {
super(destination);
}

Expand Down

0 comments on commit fe0eb37

Please sign in to comment.