Skip to content

Commit

Permalink
feat(PairsObservable): add PairsObservable creation method
Browse files Browse the repository at this point in the history
closes #1804
  • Loading branch information
kwonoj committed Aug 1, 2016
1 parent ffc8733 commit 26bafff
Show file tree
Hide file tree
Showing 5 changed files with 151 additions and 0 deletions.
43 changes: 43 additions & 0 deletions spec/observables/pairs-spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
import {expect} from 'chai';
import * as Rx from '../../dist/cjs/Rx';

declare const {hot, asDiagram, expectObservable, expectSubscriptions};
declare const rxTestScheduler: Rx.TestScheduler;
const Observable = Rx.Observable;

describe('Observable.pairs', () => {
asDiagram('pairs({a: 1, b:2})')('should create an observable emits key-value pair', () => {
const e1 = Observable.pairs({a: 1, b: 2}, rxTestScheduler);
const expected = '(ab|)';
const values = {
a: ['a', 1],
b: ['b', 2]
};

expectObservable(e1).toBe(expected, values);
});

it('should create an observable without scheduler', (done: MochaDone) => {
let expected = [
['a', 1],
['b', 2],
['c', 3]
];

Observable.pairs({a: 1, b: 2, c: 3}).subscribe(x => {
expect(x).to.deep.equal(expected.shift());
}, x => {
done(new Error('should not be called'));
}, () => {
expect(expected).to.be.empty;
done();
});
});

it('should work with empty object', () => {
const e1 = Observable.pairs({}, rxTestScheduler);
const expected = '|';

expectObservable(e1).toBe(expected);
});
});
1 change: 1 addition & 0 deletions src/Rx.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import './add/observable/race';
import './add/observable/never';
import './add/observable/of';
import './add/observable/onErrorResumeNext';
import './add/observable/pairs';
import './add/observable/range';
import './add/observable/using';
import './add/observable/throw';
Expand Down
10 changes: 10 additions & 0 deletions src/add/observable/pairs.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
import {Observable} from '../../Observable';
import {pairs as staticPairs} from '../../observable/pairs';

Observable.pairs = staticPairs;

declare module '../../Observable' {
namespace Observable {
export let pairs: typeof staticPairs;
}
}
94 changes: 94 additions & 0 deletions src/observable/PairsObservable.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
import {Scheduler} from '../Scheduler';
import {Observable} from '../Observable';
import {Subscriber} from '../Subscriber';
import {TeardownLogic} from '../Subscription';

interface PairsContext<T> {
obj: Object;
keys: Array<string>;
length: number;
index: number;
subscriber: Subscriber<Array<string | T>>;
}

function dispatch<T>(state: PairsContext<T>) {
const {obj, keys, length, index, subscriber} = state;

if (index === length) {
subscriber.complete();
return;
}

const key = keys[index];
subscriber.next([key, obj[key]]);

state.index = index + 1;

(<any> this).schedule(state);
}

/**
* We need this JSDoc comment for affecting ESDoc.
* @extends {Ignored}
* @hide true
*/
export class PairsObservable<T> extends Observable<Array<string | T>> {
private keys: Array<string>;

/**
* Convert an object into an observable sequence of [key, value] pairs
* using an optional Scheduler to enumerate the object.
*
* @example <caption>Converts a javascript object to an Observable</caption>
* var obj = {
* foo: 42,
* bar: 56,
* baz: 78
* };
*
* var source = Rx.Observable.pairs(obj);
*
* var subscription = source.subscribe(
* function (x) {
* console.log('Next: %s', x);
* },
* function (err) {
* console.log('Error: %s', err);
* },
* function () {
* console.log('Completed');
* });
*
* @param {Object} obj The object to inspect and turn into an
* Observable sequence.
* @param {Scheduler} [scheduler] An optional Scheduler to run the
* enumeration of the input sequence on.
* @returns {(Observable<Array<string | T>>)} An observable sequence of
* [key, value] pairs from the object.
*/
static create<T>(obj: Object, scheduler?: Scheduler): Observable<Array<string | T>> {
return new PairsObservable<T>(obj, scheduler);
}

constructor(private obj: Object, private scheduler?: Scheduler) {
super();
this.keys = Object.keys(obj);
}

protected _subscribe(subscriber: Subscriber<Array<string | T>>): TeardownLogic {
const {keys, scheduler} = this;
const length = keys.length;

if (scheduler) {
return scheduler.schedule(dispatch, 0, {
obj: this.obj, keys, length, index: 0, subscriber
});
} else {
for (let idx = 0; idx < length; idx++) {
const key = keys[idx];
subscriber.next([key, this.obj[key]]);
}
subscriber.complete();
}
}
}
3 changes: 3 additions & 0 deletions src/observable/pairs.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
import { PairsObservable } from './PairsObservable';

export const pairs = PairsObservable.create;

0 comments on commit 26bafff

Please sign in to comment.