Skip to content

Commit

Permalink
feat(operator): add switchFirst and switchMapFirst
Browse files Browse the repository at this point in the history
  • Loading branch information
jinroh committed Nov 10, 2015
1 parent 9956b39 commit 71e3dd1
Show file tree
Hide file tree
Showing 9 changed files with 683 additions and 0 deletions.
28 changes: 28 additions & 0 deletions perf/micro/current-thread-scheduler/operators/switchFirst.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
var RxOld = require('rx');
var RxNew = require('../../../../index');

var source = Array.apply(null, { length: 25 });

module.exports = function (suite) {
var oldMergeAllWithCurrentThreadScheduler = RxOld.Observable.fromArray(
source.map(function () { return RxOld.Observable.range(0, 25, RxOld.Scheduler.currentThread); }),
RxOld.Scheduler.currentThread
)
.switchFirst();
var newMergeAllWithCurrentThreadScheduler = RxNew.Observable.fromArray(
source.map(function () { return RxNew.Observable.range(0, 25, RxNew.Scheduler.immediate); }),
RxNew.Scheduler.immediate
)
.switchFirst();

function _next(x) { }
function _error(e) { }
function _complete() { }
return suite
.add('old switchFirst with current thread scheduler', function () {
oldMergeAllWithCurrentThreadScheduler.subscribe(_next, _error, _complete);
})
.add('new switchFirst with current thread scheduler', function () {
newMergeAllWithCurrentThreadScheduler.subscribe(_next, _error, _complete);
});
};
24 changes: 24 additions & 0 deletions perf/micro/current-thread-scheduler/operators/switchMapFirst.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
var RxOld = require('rx');
var RxNew = require('../../../../index');

module.exports = function (suite) {
var oldMergeMapWithCurrentThreadScheduler = RxOld.Observable.range(0, 25, RxOld.Scheduler.currentThread)
.flatMapFirst(function (x) {
return RxOld.Observable.range(x, 25, RxOld.Scheduler.currentThread);
});
var newMergeMapWithCurrentThreadScheduler = RxNew.Observable.range(0, 25, RxNew.Scheduler.immediate)
.switchMapFirst(function (x) {
return RxNew.Observable.range(x, 25, RxNew.Scheduler.immediate);
});

function _next(x) { }
function _error(e) { }
function _complete() { }
return suite
.add('old switchMapFirst with current thread scheduler', function () {
oldMergeMapWithCurrentThreadScheduler.subscribe(_next, _error, _complete);
})
.add('new switchMapFirst with current thread scheduler', function () {
newMergeMapWithCurrentThreadScheduler.subscribe(_next, _error, _complete);
});
};
27 changes: 27 additions & 0 deletions perf/micro/immediate-scheduler/operators/switchFirst.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
var RxOld = require('rx');
var RxNew = require('../../../../index');

var source = Array.apply(null, { length: 25 });

module.exports = function (suite) {
var oldMergeAllWithImmediateScheduler = RxOld.Observable.fromArray(
source.map(function () { return RxOld.Observable.range(0, 25, RxOld.Scheduler.immediate); }),
RxOld.Scheduler.immediate
)
.switchFirst();
var newMergeAllWithImmediateScheduler = RxNew.Observable.fromArray(
source.map(function () { return RxNew.Observable.range(0, 25); })
)
.switchFirst();

function _next(x) { }
function _error(e) { }
function _complete() { }
return suite
.add('old switchFirst with immediate scheduler', function () {
oldMergeAllWithImmediateScheduler.subscribe(_next, _error, _complete);
})
.add('new switchFirst with immediate scheduler', function () {
newMergeAllWithImmediateScheduler.subscribe(_next, _error, _complete);
});
};
24 changes: 24 additions & 0 deletions perf/micro/immediate-scheduler/operators/switchMapFirst.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
var RxOld = require('rx');
var RxNew = require('../../../../index');

module.exports = function (suite) {
var oldMergeMapWithImmediateScheduler = RxOld.Observable.range(0, 25, RxOld.Scheduler.immediate)
.flatMapFirst(function (x) {
return RxOld.Observable.range(x, 25, RxOld.Scheduler.immediate);
});
var newMergeMapWithImmediateScheduler = RxNew.Observable.range(0, 25)
.switchMapFirst(function (x) {
return RxNew.Observable.range(x, 25);
});

function _next(x) { }
function _error(e) { }
function _complete() { }
return suite
.add('old switchMapFirst with immediate scheduler', function () {
oldMergeMapWithImmediateScheduler.subscribe(_next, _error, _complete);
})
.add('new switchMapFirst with immediate scheduler', function () {
newMergeMapWithImmediateScheduler.subscribe(_next, _error, _complete);
});
};
163 changes: 163 additions & 0 deletions spec/operators/switchFirst-spec.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
/* globals describe, it, expect, hot, cold, expectObservable, expectSubscriptions */
var Rx = require('../../dist/cjs/Rx');
var Promise = require('promise');

var Observable = Rx.Observable;
var immediateScheduler = Rx.Scheduler.immediate;

describe('Observable.prototype.switchFirst()', function () {
it('should switch to first immediately-scheduled inner Observable', function () {
var e1 = cold( '(ab|)');
var e1subs = '(^!)';
var e2 = cold( '(cd|)');
var e2subs = [];
var expected = '(ab|)';
expectObservable(Observable.of(e1, e2).switchFirst()).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe(e1subs);
expectSubscriptions(e2.subscriptions).toBe(e2subs);
});

it('should handle throw', function () {
var e1 = Observable.throw('damn');
var expected = '#';
expectObservable(e1.switchFirst()).toBe(expected, null, 'damn');
});

it('should handle empty', function () {
var e1 = Observable.empty();
var expected = '|';
expectObservable(e1.switchFirst()).toBe(expected);
});

it('should handle never', function () {
var e1 = Observable.never();
var expected = '-';
expectObservable(e1.switchFirst()).toBe(expected);
});

it('should handle a hot observable of observables', function () {
var x = cold( '--a---b---c--| ');
var xsubs = ' ^ ! ';
var y = cold( '---d--e---f---| ');
var ysubs = [];
var z = cold( '---g--h---i---|');
var zsubs = ' ^ !';
var e1 = hot( '------x-------y-----z-------------|', { x: x, y: y, z: z });
var expected = '--------a---b---c------g--h---i---|';
expectObservable(e1.switchFirst()).toBe(expected);
expectSubscriptions(x.subscriptions).toBe(xsubs);
expectSubscriptions(y.subscriptions).toBe(ysubs);
expectSubscriptions(z.subscriptions).toBe(zsubs);
});

it('should handle a hot observable of observables, outer is unsubscribed early', function () {
var x = cold( '--a---b---c--| ');
var xsubs = ' ^ ! ';
var y = cold( '---d--e---f---|');
var ysubs = [];
var e1 = hot( '------x-------y------| ', { x: x, y: y });
var unsub = ' ! ';
var expected = '--------a---b--- ';
expectObservable(e1.switchFirst(), unsub).toBe(expected);
expectSubscriptions(x.subscriptions).toBe(xsubs);
expectSubscriptions(y.subscriptions).toBe(ysubs);
});

it('should handle a hot observable of observables, inner never completes', function () {
var x = cold( '--a---b--| ');
var xsubs = ' ^ ! ';
var y = cold( '-d---e- ');
var ysubs = [];
var z = cold( '---f--g---h--');
var zsubs = ' ^ ';
var e1 = hot( '---x---y------z----------| ', { x: x, y: y, z: z });
var expected = '-----a---b-------f--g---h--';
expectObservable(e1.switchFirst()).toBe(expected);
expectSubscriptions(x.subscriptions).toBe(xsubs);
expectSubscriptions(y.subscriptions).toBe(ysubs);
expectSubscriptions(z.subscriptions).toBe(zsubs);
});

it('should handle a synchronous switch and stay on the first inner observable', function () {
var x = cold( '--a---b---c--| ');
var xsubs = ' ^ ! ';
var y = cold( '---d--e---f---| ');
var ysubs = [];
var e1 = hot( '------(xy)------------|', { x: x, y: y });
var expected = '--------a---b---c-----|';
expectObservable(e1.switchFirst()).toBe(expected);
expectSubscriptions(x.subscriptions).toBe(xsubs);
expectSubscriptions(y.subscriptions).toBe(ysubs);
});

it('should handle a hot observable of observables, one inner throws', function () {
var x = cold( '--a---# ');
var xsubs = ' ^ ! ';
var y = cold( '---d--e---f---|');
var ysubs = [];
var e1 = hot( '------x-------y------| ', { x: x, y: y });
var expected = '--------a---# ';
expectObservable(e1.switchFirst()).toBe(expected);
expectSubscriptions(x.subscriptions).toBe(xsubs);
expectSubscriptions(y.subscriptions).toBe(ysubs);
});

it('should handle a hot observable of observables, outer throws', function () {
var x = cold( '--a---b---c--| ');
var xsubs = ' ^ ! ';
var y = cold( '---d--e---f---|');
var ysubs = [];
var e1 = hot( '------x-------y-------# ', { x: x, y: y });
var expected = '--------a---b---c-----# ';
expectObservable(e1.switchFirst()).toBe(expected);
expectSubscriptions(x.subscriptions).toBe(xsubs);
expectSubscriptions(y.subscriptions).toBe(ysubs);
});

it('should handle an empty hot observable', function () {
var e1 = hot( '------|');
var expected = '------|';
expectObservable(e1.switchFirst()).toBe(expected);
});

it('should handle a never hot observable', function () {
var e1 = hot('-');
var expected = '-';
expectObservable(e1.switchFirst()).toBe(expected);
});

it('should complete not before the outer completes', function () {
var x = cold( '--a---b---c--| ');
var xsubs = ' ^ ! ';
var e1 = hot( '------x---------------|', { x: x });
var expected = '--------a---b---c-----|';
expectObservable(e1.switchFirst()).toBe(expected);
expectSubscriptions(x.subscriptions).toBe(xsubs);
});

it('should handle an observable of promises', function (done) {
var expected = [1];

Observable.of(Promise.resolve(1), Promise.resolve(2), Promise.resolve(3))
.switchFirst()
.subscribe(function (x) {
expect(x).toBe(expected.shift());
}, null, function () {
expect(expected.length).toBe(0);
done();
});
});

it('should handle an observable of promises, where one rejects', function (done) {
Observable.of(Promise.reject(2), Promise.resolve(1))
.switchFirst()
.subscribe(function (x) {
expect(false).toBe(true);
}, function (err) {
expect(err).toBe(2);
done();
}, function () {
expect(false).toBe(true);
});
});
});
Loading

0 comments on commit 71e3dd1

Please sign in to comment.