Skip to content

Commit

Permalink
Merge branch 'master' into gh-pages
Browse files Browse the repository at this point in the history
* master:
  cleanup repository after release
  3.7.0
  update changelog
  #143 Make Kefir.combine() also accept sources as objects (#225)
  fix RxJS url (#231)
  • Loading branch information
rpominov committed Dec 31, 2016
2 parents 3e5122b + 0a6a1df commit a80274b
Show file tree
Hide file tree
Showing 8 changed files with 494 additions and 172 deletions.
2 changes: 1 addition & 1 deletion bower.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "kefir",
"version": "3.6.1",
"version": "3.7.0",
"homepage": "https://github.com/rpominov/kefir",
"authors": [
"Roman Pominov <rpominov@gmail.com>"
Expand Down
4 changes: 4 additions & 0 deletions changelog.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
## 3.7.0 (31/12/2016)

- The `.combine` method now can also accept objects instead of arrays [#225](https://github.com/rpominov/kefir/pull/225) [@32bitkid](https://github.com/32bitkid)

## 3.6.1 (29/11/2016)

- Flow definitions fixed and updated to be compatible with Flow 0.36 [#229](https://github.com/rpominov/kefir/pull/229) [@AgentME](https://github.com/AgentME)
Expand Down
2 changes: 1 addition & 1 deletion docs-src/descriptions/intro.jade
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ p.
library for JavaScript inspired by
<a href="https://github.com/baconjs/bacon.js">Bacon.js</a>
and
<a href="http://reactive-extensions.github.io/RxJS/">RxJS</a>,
<a href="http://reactivex.io/rxjs/">RxJS</a>,
with focus on high performance and low memory usage.


Expand Down
35 changes: 34 additions & 1 deletion docs-src/descriptions/multiple-sources.jade
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,40 @@ pre(title='events in time').
result: ------•--•-•X
9 12 14

div

p.
Also, #[b combine] supports passing objects as both #[b obss] #[i and] #[b passiveObss].
The #[b combinator] function will then be called with a single argument, a new object with
the latest value from each observable. If no #[b combinator] is provided, it emits
the object containing latest values.

pre.javascript(title='example')
:escapehtml
var aStream = Kefir.sequentially(100, [1, 3]);
var bStream = Kefir.sequentially(100, [2, 4]).delay(40);

var result = Kefir.combine({ a: aStream, b: bStream });
result.log();

pre(title='console output')
:escapehtml
> [combine] <value> { a: 1, b: 2 }
> [combine] <value> { a: 3, b: 2 }
> [combine] <value> { a: 3, b: 4 }
> [combine] <end>

pre(title='events in time').
a: ----1----3X
b: ------2----4X

result: ------•--•-•X

p.
#[img(data-emoji="point_up")] If there are duplicate keys in both #[b obss]
#[i and] #[b passiveObss], only the latest values from #[b obss] will appear
in the combined object for the duplicated keys.

p.
The result stream emits a value only when it has at least one value from each of source observables.
Ends when all the active source observables (#[b obss] array) end.
Expand All @@ -70,7 +104,6 @@ p.




+descr-method('zip', 'zip', 'Kefir.zip(sources, [combinator])', 'obs.zip(otherObs, [combinator])').
Creates a stream with values from #[b sources]
lined up with each other. For example if you have two sources with values
Expand Down
3 changes: 3 additions & 0 deletions kefir.js.flow
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,9 @@ declare var Kefir: {

combine<E>(obss: Observable<any,E>[], combinator?: Function): Observable<any,E>;
combine<E>(obss: Observable<any,E>[], passiveObss?: Observable<any,E>[], combinator?: Function): Observable<any,E>;
combine<E>(obss: {[key:string]:Observable<any,E>}, combinator?: Function): Observable<any,E>;
combine<E>(obss: {[key:string]:Observable<any,E>}, passiveObss?: {[key:string]:Observable<any,E>}, combinator?: Function): Observable<any,E>;

zip<V,E>(obss: Observable<V,E>[]): Observable<Array<V>,E>;
zip<E>(obss: Observable<any,E>[], combinator: Function): Observable<any,E>;
merge<V,E>(obss: Observable<V,E>[]): Observable<V,E>;
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "kefir",
"version": "3.6.1",
"version": "3.7.0",
"description": "Reactive Programming library for JavaScript inspired by Bacon.js and RxJS with focus on high performance and low memory usage",
"main": "dist/kefir.js",
"scripts": {
Expand Down
49 changes: 44 additions & 5 deletions src/many-sources/combine.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,14 @@ import {concat, fillArray} from '../utils/collections';
import {spread} from '../utils/functions';
import never from '../primary/never';


function collect(source, keys, values) {
for (var prop in source) {
if( source.hasOwnProperty( prop ) ) {
keys.push(prop);
values.push(source[prop]);
}
}
}

function defaultErrorsCombinator(errors) {
let latestError;
Expand All @@ -23,7 +30,7 @@ function Combine(active, passive, combinator) {
Stream.call(this);
this._activeCount = active.length;
this._sources = concat(active, passive);
this._combinator = combinator ? spread(combinator, this._sources.length) : (x => x);
this._combinator = combinator;
this._aliveCount = 0;
this._latestValues = new Array(this._sources.length);
this._latestErrors = new Array(this._sources.length);
Expand Down Expand Up @@ -153,11 +160,43 @@ inherit(Combine, Stream, {

});

function combineAsArray(active, passive = [], combinator) {
if (!Array.isArray(passive)) {
throw new Error('Combine can only combine active and passive collections of the same type.');
}

combinator = combinator ? spread(combinator, active.length + passive.length) : (x => x);
return active.length === 0 ? never() : new Combine(active, passive, combinator);
}

function combineAsObject(active, passive = {}, combinator) {
if (typeof passive !== 'object' || Array.isArray(passive)) {
throw new Error('Combine can only combine active and passive collections of the same type.');
}

export default function combine(active, passive = [], combinator) {
let keys = [],
activeObservables = [],
passiveObservables = [];

collect(active, keys, activeObservables);
collect(passive, keys, passiveObservables);

const objectify = values => {
let event = {};
for(let i = values.length - 1; 0 <= i; i--) {
event[keys[i]] = values[i];
}
return combinator ? combinator(event) : event;
}

return activeObservables.length === 0 ? never() : new Combine(activeObservables, passiveObservables, objectify);
}

export default function combine(active, passive, combinator) {
if (typeof passive === 'function') {
combinator = passive;
passive = [];
passive = undefined;
}
return active.length === 0 ? never() : new Combine(active, passive, combinator);

return Array.isArray(active) ? combineAsArray(active, passive, combinator) : combineAsObject(active, passive, combinator);
}
Loading

0 comments on commit a80274b

Please sign in to comment.