Skip to content

Commit

Permalink
feat(): rework api
Browse files Browse the repository at this point in the history
Create 2 functions `subject()` and `holdSubject()` to cover
all issues.

Update the README with holdSubject() example
Add API documentation
  • Loading branch information
TylorS committed Jan 22, 2016
1 parent 917470c commit 47e0eb6
Show file tree
Hide file tree
Showing 8 changed files with 438 additions and 278 deletions.
71 changes: 67 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,76 @@ An Rx.Subject-like interface for Most.js
# Usage

```js
import subject from 'most-subject'
import {subject} from 'most-subject'

const {sink, stream} = subject()
const {sink, stream} = subject(1) // starts with initial value of 1

stream.forEach(x => console.log(x))
stream.forEach(x => console.log(x)) // 1, 2

sink.add(1) // Pushes 1 to stream
sink.add(2) // Pushes 2 to stream
sink.error(new Error('Error Message')) // Send an error
sink.end() // End the stream
```

```js
import {holdSubject} from 'most-subject'

// create subject with buffersize of 4
// and an initial value of 1
const {observer, stream} = holdSubject(4, 1) // observer is an alias for sink

observer.next(2) // next is an alias for add()
observer.next(3)
observer.next(4)

stream.observe(x => console.log(x)) // 1, 2, 3, 4

observer.complete() // alias for end()
```


## API

#### **subject( [initialValue] )**

```js
import {subject} from 'most-subject'
```

** Arguments **

- initialValue (optional) :: any - A value for the stream to start with

** Returns **

- sink :: [Sink](#Sink) - A sink to imperatively push to a stream
- observer :: [Sink](#Sink) - An alias to `sink` to more closely align with ES Observable specification.
- stream :: most.Stream - The stream the sink/observer pushes to.

#### ** holdSubject(bufferSize = 1 [, initialValue]) **
```js
import {holdSubject} from 'most-subject'
```

** Arguments **

- bufferSize (defaults to 1) :: Number - Size of the buffer which will store past values. These values will be replayed upon observation.

- initialValue (optional) :: any - A value for the stream to start with

** Returns **

- sink :: [Sink](#sink) - A sink to imperatively push to a stream
- observer :: [Sink](#sink) - An alias to `sink` to more closely align with ES Observable specification.
- stream :: most.Stream - The stream the sink/observer pushes to.


#### Sink

** Methods **

- *add(value: any): void* - pushes a value to a sink's associated stream
- *next(value: any): void* - alias for `add()`
- *error(error: Error): void* - throws an error on a sink's associated stream and also ends the stream.
- *end(value: any): void* - ends a sinks' associated stream with the specified end value
- *complete(value: any): void* - alias for `end()`
132 changes: 132 additions & 0 deletions src/ReplaySource.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
const most = require('most');
const MulticastSource = require('most/lib/source/MulticastSource');
const PropagateTask = require('most/lib/scheduler/PropagateTask');
const CompoundDisposable = require('most/lib/disposable/dispose').all;
const Stream = most.Stream;

export function replay(stream, maxBufferSize) {
if(stream.source instanceof ReplaySource && source.maxBufferSize !== maxBufferSize) {
return stream;
}
return new Stream(new ReplaySource(stream.source, maxBufferSize));
}

export function ReplaySource(source, maxBufferSize) {
this._buffer = [];
this._ended = false;
this.maxBufferSize = maxBufferSize || Infinity;
MulticastSource.call(this, source);
}
ReplaySource.prototype = Object.create(MulticastSource.prototype);

ReplaySource.prototype._run = MulticastSource.prototype.run;
ReplaySource.prototype.run = function(sink, scheduler) {
const buffer = this._buffer;
const self = this;
this.sink = sink;

if(this._ended) {
return replay();
}
if(buffer.length === 0) {
return run();
}
return new CompoundDisposable([replay(), run()]);

function replay() {
return new BufferProducer(buffer.slice(0), sink, scheduler);
}

function run() {
return self._run(sink, scheduler);
}
};

ReplaySource.prototype._event = MulticastSource.prototype.event;
ReplaySource.prototype.event = function ReplaySource_event(t, x) {
this._addToBuffer({ type: 0, t, x });
this._event(t, x);
};

MulticastSource.prototype._addToBuffer = function ReplaySource_addToBuffer(event) {
if(this._buffer.length >= this.maxBufferSize) {
this._buffer.shift();
}
this._buffer.push(event);
}

MulticastSource.prototype.end = function(t, x, r) {
MulticastSource
var s = this.sinks;
if(s.length === 1) {
s[0].end(t, x);
return;
}
for(var i=0; i<s.length; ++i) {
if (i === s.length -1) {
if (r) {
break; // don't end underlying stream
}
}
s[i].end(t, x);
};
};

ReplaySource.prototype._end = MulticastSource.prototype.end;
ReplaySource.prototype.end = function ReplaySource_end(t, x) {
const self = this
this._ended = true;
this._addToBuffer({ type: 1, t, x });
this._end(t, x, this);
this.add(this.sink); // add an extra sink so the last values can go through
setTimeout(() => {self._end(t, x)}, 0)// dispose after values are propagated
};

MulticastSource.prototype.error = function(t, e, r) {
var s = this.sinks;
if(s.length === 1) {
s[0].error(t, e);
return;
}
for (var i=0; i<s.length; ++i) {
if (i === s.length - 1) {
if (r) {
break; // don't end underlying stream
}
}
s[i].error(t, e);
};
};

ReplaySource.prototype._error = MulticastSource.prototype.error
ReplaySource.prototype.error = function ReplaySink_error(t, e) {
const self = this
this._ended = true;
this._buffer.push({ type: 2, t, x: e });
this._error(t, e, this);
this.add(this.sink)
setTimeout(() => {self._error(t, e)}, 0)
};

function BufferProducer(buffer, sink, scheduler) {
this.task = new PropagateTask(runProducer, buffer, sink);
scheduler.asap(this.task);
}

BufferProducer.prototype.dispose = function() {
return this.task.dispose();
};

function runProducer(t, buffer, sink) {
const emit = item => {
sink.event(item.t, item.x);
}
for(var i=0, j=buffer.length; i<j && this.active; i++) {
const item = buffer[i];
switch(item.type) {
case 0: emit(item); break;
case 1: return this.active && sink.end(item.t, item.x);
case 2: return this.active && sink.error(item.t, item.x);
}
}
}
57 changes: 57 additions & 0 deletions src/Subscription.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
class Subscription {
constructor() {
this.run = (sink, scheduler) => this._run(sink, scheduler)
this.add = this.next = x => this._add(x)
this.error = err => this._error(err)
this.end = this.complete = x => this._end(x)
}

_run(sink, scheduler) {
this.sink = sink
this.scheduler = scheduler
this.active = true
return this
}

dispose() {
this.active = false
}

_add(x) {
if (!this.active) {
return
}
tryEvent(this.sink, this.scheduler, x)
}

_error(e) {
this.active = false
this.sink.error(this.scheduler.now(), e)
}

_end(x) {
if (!this.active) {
return
}
this.active = false
tryEnd(this.sink, this.scheduler, x)
}
}

function tryEvent(sink, scheduler, event) {
try {
sink.event(scheduler.now(), event)
} catch(e) {
sink.error(scheduler.now(), e)
}
}

function tryEnd(sink, scheduler, event) {
try {
sink.end(scheduler.now(), event)
} catch (e) {
sink.error(scheduler.now(), e)
}
}

export {Subscription}
37 changes: 0 additions & 37 deletions src/behaviorSubject.js

This file was deleted.

8 changes: 0 additions & 8 deletions src/holdSubject.js

This file was deleted.

44 changes: 40 additions & 4 deletions src/index.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,41 @@
import subject from './subject'
import holdSubject from './holdSubject'
import behaviorSubject from './behaviorSubject'
import {Stream} from 'most'
import MulticastSource from 'most/lib/source/MulticastSource'
import {Subscription} from './Subscription'
import {replay as replayStream} from './ReplaySource'
import hold from '@most/hold'

export {subject, holdSubject, behaviorSubject}
const defaults = {
replay: false,
bufferSize: 1
}

function create(replay, bufferSize, initialValue) {
const sink = new Subscription()
let stream;

if (!replay) {
stream = new Stream(new MulticastSource(sink))
} else {
stream = bufferSize === 1 ?
hold(new Stream(sink)) :
replayStream(new Stream(sink), bufferSize)
}

stream.drain()

if (typeof initialValue !== 'undefined') {
sink.next(initialValue)
}

return {sink, stream, observer: sink}
}

function subject(initialValue) {
return create(false, 1, initialValue)
}

function holdSubject(bufferSize = 1, initialValue) {
return create(true, bufferSize, initialValue)
}

export {subject, holdSubject}
Loading

0 comments on commit 47e0eb6

Please sign in to comment.