From c003468a702334c6310be284b5ea15a83f937463 Mon Sep 17 00:00:00 2001 From: David Driscoll Date: Thu, 10 Mar 2016 12:00:45 -0500 Subject: [PATCH] fix(mergeMap): allow concurrent to be set as the second argument for mergeMap and mergeMapTo This behavior is similar to how merge works --- spec/operators/mergeMap-spec.ts | 97 ++++++++++++++++++++++++++++++- spec/operators/mergeMapTo-spec.ts | 50 +++++++++++++++- src/operator/mergeMap.ts | 6 +- src/operator/mergeMapTo.ts | 6 +- 4 files changed, 155 insertions(+), 4 deletions(-) diff --git a/spec/operators/mergeMap-spec.ts b/spec/operators/mergeMap-spec.ts index 80a28b803d..7f2ec77cde 100644 --- a/spec/operators/mergeMap-spec.ts +++ b/spec/operators/mergeMap-spec.ts @@ -1,6 +1,6 @@ import * as Rx from '../../dist/cjs/Rx'; declare const {hot, cold, expectObservable, expectSubscriptions}; -import {DoneSignature} from '../helpers/test-helper'; +import {DoneSignature, type} from '../helpers/test-helper'; const Observable = Rx.Observable; @@ -352,6 +352,88 @@ describe('Observable.prototype.mergeMap', () => { expectSubscriptions(hotC.subscriptions).toBe(csubs); }); + it('should mergeMap to many cold Observable, with parameter concurrency=1, without resultSelector', () => { + const values = {i: 'foo', j: 'bar', k: 'baz', l: 'qux'}; + const e1 = hot('-a-------b-------c---| '); + const e1subs = '^ !'; + const inner = cold('----i---j---k---l---| ', values); + const innersubs = [' ^ ! ', + ' ^ ! ', + ' ^ !']; + const expected = '-----i---j---k---l-------i---j---k---l-------i---j---k---l---|'; + + function project() { return inner; } + const result = e1.mergeMap(project, 1); + + expectObservable(result).toBe(expected, values); + expectSubscriptions(inner.subscriptions).toBe(innersubs); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + }); + + it('should mergeMap to many cold Observable, with parameter concurrency=2, without resultSelector', () => { + const values = {i: 'foo', j: 'bar', k: 'baz', l: 'qux'}; + const e1 = hot('-a-------b-------c---| '); + const e1subs = '^ !'; + const inner = cold('----i---j---k---l---| ', values); + const innersubs = [' ^ ! ', + ' ^ ! ', + ' ^ !']; + const expected = '-----i---j---(ki)(lj)k---(li)j---k---l---|'; + + function project() { return inner; } + const result = e1.mergeMap(project, 2); + + expectObservable(result).toBe(expected, values); + expectSubscriptions(inner.subscriptions).toBe(innersubs); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + }); + + it('should mergeMap to many hot Observable, with parameter concurrency=1, without resultSelector', () => { + const values = {i: 'foo', j: 'bar', k: 'baz', l: 'qux'}; + const e1 = hot('-a-------b-------c---| '); + const e1subs = '^ !'; + const hotA = hot('x----i---j---k---l---| ', values); + const hotB = hot('-x-x-xxxx-x-x-xxxxx-x----i---j---k---l---| ', values); + const hotC = hot('x-xxxx---x-x-x-x-x-xx--x--x-x--x--xxxx-x-----i---j---k---l---|', values); + const asubs = ' ^ ! '; + const bsubs = ' ^ ! '; + const csubs = ' ^ !'; + const expected = '-----i---j---k---l-------i---j---k---l-------i---j---k---l---|'; + const inners = { a: hotA, b: hotB, c: hotC }; + + function project(x) { return inners[x]; } + const result = e1.mergeMap(project, 1); + + expectObservable(result).toBe(expected, values); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + expectSubscriptions(hotA.subscriptions).toBe(asubs); + expectSubscriptions(hotB.subscriptions).toBe(bsubs); + expectSubscriptions(hotC.subscriptions).toBe(csubs); + }); + + it('should mergeMap to many hot Observable, with parameter concurrency=2, without resultSelector', () => { + const values = {i: 'foo', j: 'bar', k: 'baz', l: 'qux'}; + const e1 = hot('-a-------b-------c---| '); + const e1subs = '^ !'; + const hotA = hot('x----i---j---k---l---| ', values); + const hotB = hot('-x-x-xxxx----i---j---k---l---| ', values); + const hotC = hot('x-xxxx---x-x-x-x-x-xx----i---j---k---l---|', values); + const asubs = ' ^ ! '; + const bsubs = ' ^ ! '; + const csubs = ' ^ !'; + const expected = '-----i---j---(ki)(lj)k---(li)j---k---l---|'; + const inners = { a: hotA, b: hotB, c: hotC }; + + function project(x) { return inners[x]; } + const result = e1.mergeMap(project, 2); + + expectObservable(result).toBe(expected, values); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + expectSubscriptions(hotA.subscriptions).toBe(asubs); + expectSubscriptions(hotB.subscriptions).toBe(bsubs); + expectSubscriptions(hotC.subscriptions).toBe(csubs); + }); + it('should mergeMap many complex, where all inners are finite', () => { const a = cold( '-#' ); const b = cold( '-#' ); @@ -663,4 +745,17 @@ describe('Observable.prototype.mergeMap', () => { expect(completed).toBe(true); }); + + it('should support type signatures', () => { + type(() => { + let o: Rx.Observable; + + /* tslint:disable:no-unused-variable */ + let a1: Rx.Observable = o.mergeMap(x => x.toString()); + let a2: Rx.Observable = o.mergeMap(x => x.toString(), 3); + let a3: Rx.Observable<{ o: number; i: string; }> = o.mergeMap(x => x.toString(), (o, i) => ({ o, i })); + let a4: Rx.Observable<{ o: number; i: string; }> = o.mergeMap(x => x.toString(), (o, i) => ({ o, i }), 3); + /* tslint:enable:no-unused-variable */ + }); + }); }); diff --git a/spec/operators/mergeMapTo-spec.ts b/spec/operators/mergeMapTo-spec.ts index fc2ba066c4..b931833249 100644 --- a/spec/operators/mergeMapTo-spec.ts +++ b/spec/operators/mergeMapTo-spec.ts @@ -1,6 +1,6 @@ import * as Rx from '../../dist/cjs/Rx'; declare const {hot, cold, expectObservable, expectSubscriptions}; -import {DoneSignature} from '../helpers/test-helper'; +import {DoneSignature, type} from '../helpers/test-helper'; const Observable = Rx.Observable; @@ -284,6 +284,40 @@ describe('Observable.prototype.mergeMapTo', () => { expectSubscriptions(e1.subscriptions).toBe(e1subs); }); + it('should mergeMapTo many cold Observable, with parameter concurrency=1, without resultSelector', () => { + const values = {i: 'foo', j: 'bar', k: 'baz', l: 'qux'}; + const e1 = hot('-a-------b-------c---| '); + const e1subs = '^ !'; + const inner = cold('----i---j---k---l---| ', values); + const innersubs = [' ^ ! ', + ' ^ ! ', + ' ^ !']; + const expected = '-----i---j---k---l-------i---j---k---l-------i---j---k---l---|'; + + const result = e1.mergeMapTo(inner, 1); + + expectObservable(result).toBe(expected, values); + expectSubscriptions(inner.subscriptions).toBe(innersubs); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + }); + + it('should mergeMap to many cold Observable, with parameter concurrency=2, without resultSelector', () => { + const values = {i: 'foo', j: 'bar', k: 'baz', l: 'qux'}; + const e1 = hot('-a-------b-------c---| '); + const e1subs = '^ !'; + const inner = cold('----i---j---k---l---| ', values); + const innersubs = [' ^ ! ', + ' ^ ! ', + ' ^ !']; + const expected = '-----i---j---(ki)(lj)k---(li)j---k---l---|'; + + const result = e1.mergeMapTo(inner, 2); + + expectObservable(result).toBe(expected, values); + expectSubscriptions(inner.subscriptions).toBe(innersubs); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + }); + it('should mergeMapTo many outer to arrays', () => { const e1 = hot('2-----4--------3--------2-------|'); const e1subs = '^ !'; @@ -402,4 +436,18 @@ describe('Observable.prototype.mergeMapTo', () => { expect(completed).toBe(true); }); + + it('should support type signatures', () => { + type(() => { + let o: Rx.Observable; + let m: Rx.Observable; + + /* tslint:disable:no-unused-variable */ + let a1: Rx.Observable = o.mergeMapTo(m); + let a2: Rx.Observable = o.mergeMapTo(m, 3); + let a3: Rx.Observable<{ o: number; i: string; }> = o.mergeMapTo(m, (o, i) => ({ o, i })); + let a4: Rx.Observable<{ o: number; i: string; }> = o.mergeMapTo(m, (o, i) => ({ o, i }), 3); + /* tslint:enable:no-unused-variable */ + }); + }); }); diff --git a/src/operator/mergeMap.ts b/src/operator/mergeMap.ts index 68fec28e5a..a6d62f082f 100644 --- a/src/operator/mergeMap.ts +++ b/src/operator/mergeMap.ts @@ -20,8 +20,12 @@ import {InnerSubscriber} from '../InnerSubscriber'; * @owner Observable */ export function mergeMap(project: (value: T, index: number) => ObservableInput, - resultSelector?: (outerValue: T, innerValue: I, outerIndex: number, innerIndex: number) => R | number, + resultSelector?: ((outerValue: T, innerValue: I, outerIndex: number, innerIndex: number) => R) | number, concurrent: number = Number.POSITIVE_INFINITY): Observable { + if (typeof resultSelector === 'number') { + concurrent = resultSelector; + resultSelector = null; + } return this.lift(new MergeMapOperator(project, resultSelector, concurrent)); } diff --git a/src/operator/mergeMapTo.ts b/src/operator/mergeMapTo.ts index 4f5edd79cb..d7d302bac2 100644 --- a/src/operator/mergeMapTo.ts +++ b/src/operator/mergeMapTo.ts @@ -16,8 +16,12 @@ import {subscribeToResult} from '../util/subscribeToResult'; * @owner Observable */ export function mergeMapTo(observable: Observable, - resultSelector?: (outerValue: T, innerValue: I, outerIndex: number, innerIndex: number) => R | number, + resultSelector?: ((outerValue: T, innerValue: I, outerIndex: number, innerIndex: number) => R) | number, concurrent: number = Number.POSITIVE_INFINITY): Observable { + if (typeof resultSelector === 'number') { + concurrent = resultSelector; + resultSelector = null; + } return this.lift(new MergeMapToOperator(observable, resultSelector, concurrent)); }