Skip to content

Commit

Permalink
#143 Make Kefir.combine() also accept sources as objects
Browse files Browse the repository at this point in the history
  • Loading branch information
J. Holmes committed Oct 20, 2016
1 parent cbbd1e8 commit 6376726
Show file tree
Hide file tree
Showing 3 changed files with 453 additions and 168 deletions.
3 changes: 3 additions & 0 deletions kefir.js.flow
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,9 @@ declare var Kefir: {

combine<E>(obss: Observable<any,E>[], combinator?: Function): Observable<any,E>;
combine<E>(obss: Observable<any,E>[], passiveObss?: Observable<any,E>[], combinator?: Function): Observable<any,E>;
combine<E>(obss: {[key:string]:Observable<any,E>}, combinator?: Function): Observable<any,E>;
combine<E>(obss: {[key:string]:Observable<any,E>}, passiveObss?: {[key:string]:Observable<any,E>}, combinator?: Function): Observable<any,E>;

zip<V,E>(obss: Observable<V,E>[]): Observable<Array<V>,E>;
zip<E>(obss: Observable<any,E>[], combinator: Function): Observable<any,E>;
merge<V,E>(obss: Observable<V,E>[]): Observable<V,E>;
Expand Down
49 changes: 44 additions & 5 deletions src/many-sources/combine.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,14 @@ import {concat, fillArray} from '../utils/collections';
import {spread} from '../utils/functions';
import never from '../primary/never';


function collect(source, keys, values) {
for (var prop in source) {
if( source.hasOwnProperty( prop ) ) {
keys.push(prop);
values.push(source[prop]);
}
}
}

function defaultErrorsCombinator(errors) {
let latestError;
Expand All @@ -23,7 +30,7 @@ function Combine(active, passive, combinator) {
Stream.call(this);
this._activeCount = active.length;
this._sources = concat(active, passive);
this._combinator = combinator ? spread(combinator, this._sources.length) : (x => x);
this._combinator = combinator;
this._aliveCount = 0;
this._latestValues = new Array(this._sources.length);
this._latestErrors = new Array(this._sources.length);
Expand Down Expand Up @@ -153,11 +160,43 @@ inherit(Combine, Stream, {

});

function combineAsArray(active, passive = [], combinator) {
if (!Array.isArray(passive)) {
throw new Error('Combine can only combine active and passive collections of the same type.');
}

combinator = combinator ? spread(combinator, active.length + passive.length) : (x => x);
return active.length === 0 ? never() : new Combine(active, passive, combinator);
}

function combineAsObject(active, passive = {}, combinator) {
if (typeof passive !== 'object' || Array.isArray(passive)) {
throw new Error('Combine can only combine active and passive collections of the same type.');
}

export default function combine(active, passive = [], combinator) {
let keys = [],
activeObservables = [],
passiveObservables = [];

collect(active, keys, activeObservables);
collect(passive, keys, passiveObservables);

const objectify = values => {
let event = {};
for(let i = values.length - 1; 0 <= i; i--) {
event[keys[i]] = values[i];
}
return combinator ? combinator(event) : event;
}

return activeObservables.length === 0 ? never() : new Combine(activeObservables, passiveObservables, objectify);
}

export default function combine(active, passive, combinator) {
if (typeof passive === 'function') {
combinator = passive;
passive = [];
passive = undefined;
}
return active.length === 0 ? never() : new Combine(active, passive, combinator);

return Array.isArray(active) ? combineAsArray(active, passive, combinator) : combineAsObject(active, passive, combinator);
}
Loading

0 comments on commit 6376726

Please sign in to comment.