Skip to content

Commit

Permalink
fix(mergeMap): allow concurrent to be set as the second argument for …
Browse files Browse the repository at this point in the history
…mergeMap and mergeMapTo

This behavior is similar to how merge works
  • Loading branch information
david-driscoll authored and kwonoj committed Mar 22, 2016
1 parent fa43a22 commit c003468
Show file tree
Hide file tree
Showing 4 changed files with 155 additions and 4 deletions.
97 changes: 96 additions & 1 deletion spec/operators/mergeMap-spec.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import * as Rx from '../../dist/cjs/Rx';
declare const {hot, cold, expectObservable, expectSubscriptions};
import {DoneSignature} from '../helpers/test-helper';
import {DoneSignature, type} from '../helpers/test-helper';

const Observable = Rx.Observable;

Expand Down Expand Up @@ -352,6 +352,88 @@ describe('Observable.prototype.mergeMap', () => {
expectSubscriptions(hotC.subscriptions).toBe(csubs);
});

it('should mergeMap to many cold Observable, with parameter concurrency=1, without resultSelector', () => {
const values = {i: 'foo', j: 'bar', k: 'baz', l: 'qux'};
const e1 = hot('-a-------b-------c---| ');
const e1subs = '^ !';
const inner = cold('----i---j---k---l---| ', values);
const innersubs = [' ^ ! ',
' ^ ! ',
' ^ !'];
const expected = '-----i---j---k---l-------i---j---k---l-------i---j---k---l---|';

function project() { return inner; }
const result = e1.mergeMap(project, 1);

expectObservable(result).toBe(expected, values);
expectSubscriptions(inner.subscriptions).toBe(innersubs);
expectSubscriptions(e1.subscriptions).toBe(e1subs);
});

it('should mergeMap to many cold Observable, with parameter concurrency=2, without resultSelector', () => {
const values = {i: 'foo', j: 'bar', k: 'baz', l: 'qux'};
const e1 = hot('-a-------b-------c---| ');
const e1subs = '^ !';
const inner = cold('----i---j---k---l---| ', values);
const innersubs = [' ^ ! ',
' ^ ! ',
' ^ !'];
const expected = '-----i---j---(ki)(lj)k---(li)j---k---l---|';

function project() { return inner; }
const result = e1.mergeMap(project, 2);

expectObservable(result).toBe(expected, values);
expectSubscriptions(inner.subscriptions).toBe(innersubs);
expectSubscriptions(e1.subscriptions).toBe(e1subs);
});

it('should mergeMap to many hot Observable, with parameter concurrency=1, without resultSelector', () => {
const values = {i: 'foo', j: 'bar', k: 'baz', l: 'qux'};
const e1 = hot('-a-------b-------c---| ');
const e1subs = '^ !';
const hotA = hot('x----i---j---k---l---| ', values);
const hotB = hot('-x-x-xxxx-x-x-xxxxx-x----i---j---k---l---| ', values);
const hotC = hot('x-xxxx---x-x-x-x-x-xx--x--x-x--x--xxxx-x-----i---j---k---l---|', values);
const asubs = ' ^ ! ';
const bsubs = ' ^ ! ';
const csubs = ' ^ !';
const expected = '-----i---j---k---l-------i---j---k---l-------i---j---k---l---|';
const inners = { a: hotA, b: hotB, c: hotC };

function project(x) { return inners[x]; }
const result = e1.mergeMap(project, 1);

expectObservable(result).toBe(expected, values);
expectSubscriptions(e1.subscriptions).toBe(e1subs);
expectSubscriptions(hotA.subscriptions).toBe(asubs);
expectSubscriptions(hotB.subscriptions).toBe(bsubs);
expectSubscriptions(hotC.subscriptions).toBe(csubs);
});

it('should mergeMap to many hot Observable, with parameter concurrency=2, without resultSelector', () => {
const values = {i: 'foo', j: 'bar', k: 'baz', l: 'qux'};
const e1 = hot('-a-------b-------c---| ');
const e1subs = '^ !';
const hotA = hot('x----i---j---k---l---| ', values);
const hotB = hot('-x-x-xxxx----i---j---k---l---| ', values);
const hotC = hot('x-xxxx---x-x-x-x-x-xx----i---j---k---l---|', values);
const asubs = ' ^ ! ';
const bsubs = ' ^ ! ';
const csubs = ' ^ !';
const expected = '-----i---j---(ki)(lj)k---(li)j---k---l---|';
const inners = { a: hotA, b: hotB, c: hotC };

function project(x) { return inners[x]; }
const result = e1.mergeMap(project, 2);

expectObservable(result).toBe(expected, values);
expectSubscriptions(e1.subscriptions).toBe(e1subs);
expectSubscriptions(hotA.subscriptions).toBe(asubs);
expectSubscriptions(hotB.subscriptions).toBe(bsubs);
expectSubscriptions(hotC.subscriptions).toBe(csubs);
});

it('should mergeMap many complex, where all inners are finite', () => {
const a = cold( '-#' );
const b = cold( '-#' );
Expand Down Expand Up @@ -663,4 +745,17 @@ describe('Observable.prototype.mergeMap', () => {

expect(completed).toBe(true);
});

it('should support type signatures', () => {
type(() => {
let o: Rx.Observable<number>;

/* tslint:disable:no-unused-variable */
let a1: Rx.Observable<string> = o.mergeMap(x => x.toString());
let a2: Rx.Observable<string> = o.mergeMap(x => x.toString(), 3);
let a3: Rx.Observable<{ o: number; i: string; }> = o.mergeMap(x => x.toString(), (o, i) => ({ o, i }));
let a4: Rx.Observable<{ o: number; i: string; }> = o.mergeMap(x => x.toString(), (o, i) => ({ o, i }), 3);
/* tslint:enable:no-unused-variable */
});
});
});
50 changes: 49 additions & 1 deletion spec/operators/mergeMapTo-spec.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import * as Rx from '../../dist/cjs/Rx';
declare const {hot, cold, expectObservable, expectSubscriptions};
import {DoneSignature} from '../helpers/test-helper';
import {DoneSignature, type} from '../helpers/test-helper';

const Observable = Rx.Observable;

Expand Down Expand Up @@ -284,6 +284,40 @@ describe('Observable.prototype.mergeMapTo', () => {
expectSubscriptions(e1.subscriptions).toBe(e1subs);
});

it('should mergeMapTo many cold Observable, with parameter concurrency=1, without resultSelector', () => {
const values = {i: 'foo', j: 'bar', k: 'baz', l: 'qux'};
const e1 = hot('-a-------b-------c---| ');
const e1subs = '^ !';
const inner = cold('----i---j---k---l---| ', values);
const innersubs = [' ^ ! ',
' ^ ! ',
' ^ !'];
const expected = '-----i---j---k---l-------i---j---k---l-------i---j---k---l---|';

const result = e1.mergeMapTo(inner, 1);

expectObservable(result).toBe(expected, values);
expectSubscriptions(inner.subscriptions).toBe(innersubs);
expectSubscriptions(e1.subscriptions).toBe(e1subs);
});

it('should mergeMap to many cold Observable, with parameter concurrency=2, without resultSelector', () => {
const values = {i: 'foo', j: 'bar', k: 'baz', l: 'qux'};
const e1 = hot('-a-------b-------c---| ');
const e1subs = '^ !';
const inner = cold('----i---j---k---l---| ', values);
const innersubs = [' ^ ! ',
' ^ ! ',
' ^ !'];
const expected = '-----i---j---(ki)(lj)k---(li)j---k---l---|';

const result = e1.mergeMapTo(inner, 2);

expectObservable(result).toBe(expected, values);
expectSubscriptions(inner.subscriptions).toBe(innersubs);
expectSubscriptions(e1.subscriptions).toBe(e1subs);
});

it('should mergeMapTo many outer to arrays', () => {
const e1 = hot('2-----4--------3--------2-------|');
const e1subs = '^ !';
Expand Down Expand Up @@ -402,4 +436,18 @@ describe('Observable.prototype.mergeMapTo', () => {

expect(completed).toBe(true);
});

it('should support type signatures', () => {
type(() => {
let o: Rx.Observable<number>;
let m: Rx.Observable<string>;

/* tslint:disable:no-unused-variable */
let a1: Rx.Observable<string> = o.mergeMapTo(m);
let a2: Rx.Observable<string> = o.mergeMapTo(m, 3);
let a3: Rx.Observable<{ o: number; i: string; }> = o.mergeMapTo(m, (o, i) => ({ o, i }));
let a4: Rx.Observable<{ o: number; i: string; }> = o.mergeMapTo(m, (o, i) => ({ o, i }), 3);
/* tslint:enable:no-unused-variable */
});
});
});
6 changes: 5 additions & 1 deletion src/operator/mergeMap.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,12 @@ import {InnerSubscriber} from '../InnerSubscriber';
* @owner Observable
*/
export function mergeMap<T, I, R>(project: (value: T, index: number) => ObservableInput<I>,
resultSelector?: (outerValue: T, innerValue: I, outerIndex: number, innerIndex: number) => R | number,
resultSelector?: ((outerValue: T, innerValue: I, outerIndex: number, innerIndex: number) => R) | number,
concurrent: number = Number.POSITIVE_INFINITY): Observable<R> {
if (typeof resultSelector === 'number') {
concurrent = <number>resultSelector;
resultSelector = null;
}
return this.lift(new MergeMapOperator(project, <any>resultSelector, concurrent));
}

Expand Down
6 changes: 5 additions & 1 deletion src/operator/mergeMapTo.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,12 @@ import {subscribeToResult} from '../util/subscribeToResult';
* @owner Observable
*/
export function mergeMapTo<T, I, R>(observable: Observable<I>,
resultSelector?: (outerValue: T, innerValue: I, outerIndex: number, innerIndex: number) => R | number,
resultSelector?: ((outerValue: T, innerValue: I, outerIndex: number, innerIndex: number) => R) | number,
concurrent: number = Number.POSITIVE_INFINITY): Observable<R> {
if (typeof resultSelector === 'number') {
concurrent = <number>resultSelector;
resultSelector = null;
}
return this.lift(new MergeMapToOperator(observable, <any>resultSelector, concurrent));
}

Expand Down

0 comments on commit c003468

Please sign in to comment.