Skip to content

Commit

Permalink
feat(combineLatest): accept array of observable as parameter
Browse files Browse the repository at this point in the history
closes #594
  • Loading branch information
kwonoj committed Nov 20, 2015
1 parent 3f85538 commit 2edd92c
Show file tree
Hide file tree
Showing 5 changed files with 68 additions and 12 deletions.
14 changes: 13 additions & 1 deletion spec/observables/combineLatest-spec.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
/* globals describe, it, expect, hot, cold, expectObservable */
/* globals describe, it, expect, hot, cold, expectObservable, expectSubscriptions */
var Rx = require('../../dist/cjs/Rx');
var Observable = Rx.Observable;
var immediateScheduler = Rx.Scheduler.immediate;
Expand Down Expand Up @@ -29,6 +29,18 @@ describe('Observable.combineLatest', function () {
});
});

it('should accept array of observables', function () {
var firstSource = hot('----a----b----c----|');
var secondSource = hot('--d--e--f--g--|');
var expected = '----uv--wx-y--z----|';

var combined = Observable.combineLatest([firstSource, secondSource], function (a, b) {
return '' + a + b;
});

expectObservable(combined).toBe(expected, {u: 'ad', v: 'ae', w: 'af', x: 'bf', y: 'bg', z: 'cg'});
});

it('should work with two nevers', function () {
var e1 = cold( '-');
var e1subs = '^';
Expand Down
19 changes: 18 additions & 1 deletion spec/operators/combineLatest-spec.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
/* globals describe, it, expect, hot, cold, expectObservable */
/* globals describe, it, expect, hot, cold, expectObservable, expectSubscriptions */
var Rx = require('../../dist/cjs/Rx');
var Observable = Rx.Observable;
var immediateScheduler = Rx.Scheduler.immediate;
Expand Down Expand Up @@ -145,6 +145,23 @@ describe('Observable.prototype.combineLatest', function () {
expectSubscriptions(e2.subscriptions).toBe(e2subs);
});

it('should accept array of observables', function () {
var e1 = hot('--a--^--b--c--|');
var e1subs = '^ !';
var e2 = hot('---e-^---f--g--|');
var e2subs = '^ !';
var e3 = hot('---h-^----i--j-|');
var e3subs = '^ !';
var expected = '-----wxyz-|';

var result = e1.combineLatest([e2, e3], function (x, y, z) { return x + y + z; });

expectObservable(result).toBe(expected, { w: 'bfi', x: 'cfi', y: 'cgi', z: 'cgj' });
expectSubscriptions(e1.subscriptions).toBe(e1subs);
expectSubscriptions(e2.subscriptions).toBe(e2subs);
expectSubscriptions(e3.subscriptions).toBe(e3subs);
});

it('should work with empty and error', function () {
var e1 = hot('----------|'); //empty
var e1subs = '^ !';
Expand Down
9 changes: 7 additions & 2 deletions src/Observable.ts
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,10 @@ export class Observable<T> implements CoreOperators<T> {
}

// static method stubs
static combineLatest: <T>(...observables: Array<Observable<any> | ((...values: Array<any>) => T) | Scheduler>) => Observable<T>;
static combineLatest: <T>(...observables: Array<Observable<any> |
Array<Observable<any>> |
((...values: Array<any>) => T) |
Scheduler>) => Observable<T>;
static concat: <T>(...observables: Array<Observable<any> | Scheduler>) => Observable<T>;
static defer: <T>(observableFactory: () => Observable<T>) => Observable<T>;
static empty: <T>(scheduler?: Scheduler) => Observable<T>;
Expand Down Expand Up @@ -166,7 +169,9 @@ export class Observable<T> implements CoreOperators<T> {
bufferWhen: (closingSelector: () => Observable<any>) => Observable<T[]>;
catch: (selector: (err: any, source: Observable<T>, caught: Observable<any>) => Observable<any>) => Observable<T>;
combineAll: <R>(project?: (...values: Array<any>) => R) => Observable<R>;
combineLatest: <R>(...observables: Array<Observable<any> | ((...values: Array<any>) => R)>) => Observable<R>;
combineLatest: <R>(...observables: Array<Observable<any> |
Array<Observable<any>> |
((...values: Array<any>) => R)>) => Observable<R>;
concat: <R>(...observables: (Observable<any> | Scheduler)[]) => Observable<R>;
concatAll: () => Observable<any>;
concatMap: <R>(project: ((x: T, ix: number) => Observable<any>), projectResult?: (x: T, y: any, ix: number, iy: number) => R) => Observable<R>;
Expand Down
19 changes: 15 additions & 4 deletions src/operators/combineLatest-static.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import {ArrayObservable} from '../observables/ArrayObservable';
import {CombineLatestOperator} from './combineLatest-support';
import {Scheduler} from '../Scheduler';
import {isScheduler} from '../util/isScheduler';
import {isArray} from '../util/isArray';

/**
* Combines the values from observables passed as arguments. This is done by subscribing
Expand All @@ -14,15 +15,25 @@ import {isScheduler} from '../util/isScheduler';
* @returns {Observable} an observable of other projected values from the most recent values from each observable, or an array of each of
* the most recent values from each observable.
*/
export function combineLatest<R>(...observables: Array<Observable<any> | ((...values: Array<any>) => R) | Scheduler>): Observable<R> {
let project, scheduler;
export function combineLatest<R>(...observables: Array<Observable<any> |
Array<Observable<any>> |
((...values: Array<any>) => R) |
Scheduler>): Observable<R> {
let project: (...values: Array<any>) => R = null;
let scheduler: Scheduler = null;

if (isScheduler(observables[observables.length - 1])) {
scheduler = observables.pop();
scheduler = <Scheduler>observables.pop();
}

if (typeof observables[observables.length - 1] === 'function') {
project = observables.pop();
project = <(...values: Array<any>) => R>observables.pop();
}

// if the first and only other argument besides the resultSelector is an array
// assume it's been called with `combineLatest([obs1, obs2, obs3], project)`
if (observables.length === 1 && isArray(observables[0])) {
observables = <Array<Observable<any>>>observables[0];
}

return new ArrayObservable(observables, scheduler).lift(new CombineLatestOperator(project));
Expand Down
19 changes: 15 additions & 4 deletions src/operators/combineLatest.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import {Observable} from '../Observable';
import {ArrayObservable} from '../observables/ArrayObservable';
import {CombineLatestOperator} from './combineLatest-support';
import {isArray} from '../util/isArray';

/**
* Combines the values from this observable with values from observables passed as arguments. This is done by subscribing
Expand All @@ -12,11 +13,21 @@ import {CombineLatestOperator} from './combineLatest-support';
* @returns {Observable} an observable of other projected values from the most recent values from each observable, or an array of each of
* the most recent values from each observable.
*/
export function combineLatest<R>(...observables: Array<Observable<any> | ((...values: Array<any>) => R)>): Observable<R> {
observables.unshift(this);
let project;
export function combineLatest<R>(...observables: Array<Observable<any> |
Array<Observable<any>> |
((...values: Array<any>) => R)>): Observable<R> {
let project: (...values: Array<any>) => R = null;
if (typeof observables[observables.length - 1] === 'function') {
project = observables.pop();
project = <(...values: Array<any>) => R>observables.pop();
}

// if the first and only other argument besides the resultSelector is an array
// assume it's been called with `combineLatest([obs1, obs2, obs3], project)`
if (observables.length === 1 && isArray(observables[0])) {
observables = <Array<Observable<any>>>observables[0];
}

observables.unshift(this);

return new ArrayObservable(observables).lift(new CombineLatestOperator(project));
}

0 comments on commit 2edd92c

Please sign in to comment.