Skip to content
Yassine Elouafi edited this page Jun 5, 2015 · 8 revisions

#API type signature

The docs uses a Haskell-like syntax to denote functions type signature

f : a -> b means f is a function that takes an argument of type a and returns a result of type b. a and b are considered generic types (like Generic types in Java or C++ )

Multiple arguments are enclosed in parentheses : f : (a, b) -> b

Types are combined with |: f : a - > b | c means f returns an argument either of type b or type c.

Promise a means a Promise whose outcome is of type a
Stream a means a Stream whose occurrences are all of type a

A more elaborated example map : ( Stream a, a -> b | Promise b ) -> Stream b means map is function that takes 2 arguments:

  1. a Stream of a and
  2. a function that takes an a and returns either a b or a Promise of b

and returns a Stream of b

#Factory methods

Stream.array(array)

Stream.array : [a] -> Stream a

creates a Stream from a JavaScript Array. Ends after yielding all array elements.

Stream.seq(array, delay, interval)

Stream.seq : ([a], Number, Number) -> Stream a

Yields elements from array. The first element will occur after delay, and the rest of elements will occur at each interval. Ends after yielding all array elements.

Stream.range(min, max, delay, interval)

range : (Number, Number, Number, Number) -> Stream Number

Like Stream.seq but yields a sequence of numbers from min to max included.

Stream.occs(occurrences)

Stream.occs : ([(a, Number)]) -> Stream a

Each element in the occurrences array is a pair of (value, delay), the resulting Stream will yield, for each element, value after delay (starting from the stream creation). Note that times in the array are assumed to be monotonically increasing.

Stream.bind(subscribe, unsubscribe, untilP)

Stream.bind: (subscribe a, unsubscribe a, Promise a) -> Stream a
listener a : a -> ()
subscribe a : listener a -> unsubscribe a | ()
unsubscribe a : listener a -> ()

A generic function to create a Stream from an arbitrary event source. The function operates as follow:
1- When called it'll call subscribe(listener) (listener is the internal callback used to bind events from the event source to occurrences on the result Stream).
2- if unsubscribe arguments is provided, then i'll be used later (see 3) to unbind from the event source. If not provided, the function will try to use the return value of subscribe to unsubscribe. If subscribe return value is undefined (ie ()) then unsubscribe will be assigned a noop function (_ => ()).
3- Once the untilP promise resolves, the function will unbind the event source by calling unsubscribe giving it the listener registered in (1) as argument. If untilP is rejected, then the result Stream will abort the rejection reason of the promise.

For example, here is how you bind a Node event, note how the unsubscribe function is explicitly provided (because emitter.on() doesnt't return its own un-subscription function)

Stream.fromEmitter = function (emitter, event, untilP) {
  return Stream.bind(
    listener => emitter.on(event, listener),
    listener => emitter.removeListener(event, listener),
    untilP
  );
};

Stream.fromDomEvent(target, event, untilP) (alias adts.$on())

Stream.fromDomEvent: (EventTraget, String, Promise) -> Stream anEvent

Creates a Stream from a DOM event. Ends when untilP is resolved. Aborts when untilP is rejected.

Stream.fromEmitter(emitter, event, untilP)

Stream.fromEmitter: (events.EventEmitter, String, Promise) -> Stream anEvent

Creates a Stream from a Node event. Ends when untilP is resolved. Aborts when untilP is rejected.

Stream.fromReadable(readable)

Stream.fromReadable : (stream.Readable, String, Promise) -> Stream anEvent

Creates a Stream from a Node readable stream. The result Stream Yields all data events, ends on the end Event and aborts on the first error event.

Stream.timer(interval, count)

Stream.timer : (Number, count) -> Stream Number

Yields the current time (Date.now()) at each interval. Ends after yielding count times

Stream.seconds(max)

Stream.seconds: (max) -> Stream Number

Yields the number of seconds elapsed since the stream creation (i.e. yields 0, then 1 ...).
Ends after yielding max times.

#Stream methods

stream.map(f)

map : ( Stream a, a -> b | Promise b ) -> Stream b

stream.map(f) applies f to each occurrence on the input stream and returns a stream whose occurrences are the result of the mapping through f.

If f is synchronous then the resulting occurrences will occur as soon as the sources occurrences occur. If f is asynchronous (i.e. returns a Promise) then the resulting occurrences will occur as soon as:

  1. The returned promise is resolved
  2. The precedent occurrence has occurred (meaning out of order responses from promises will be properly sequenced)

In case the returned promise is rejected the returned stream will abort with the rejection reason.

example: Stream.seq([1,2,3], 20, 50).map( x => x * 2) yields a sequence of elements [1,4,9] who will occur at the same time (well semantically) as the sources elements

stream.mapError(f)

mapError : (Stream a, anError -> Stream a) -> Stream a

extends the lifetime of an aborted stream by another stream returned by the extender function f.

stream.filter(f)

filter : (Stream a, a -> aBool | Promise aBool) -> Stream a

Filters the occurrences from the stream by the predicate f. Only occurrences satisfying the predicate will occur on the returned stream. As for map, the predicate maybe an asynchronous function, in which case the occurrences on the output stream will occur as soon as the promise returned by the predicate is resolved (preserving the order of the input stream). If the returned promise is rejected the output stream is aborted with the rejection reason.

stream.length()

length : Stream a -> Promise Number

returns a promise holding the number of occurrences on the stream. The promise resolves as soon as the input stream ends. If the input stream aborts, the promise is rejected with abort error.

stream.first()

first : Stream a -> Promise a

returns promise holding the first occurrence of the stream. The promise resolves as soon as the first occurrence appears in the input stream. If the input stream aborts, the promise is rejected with the abort error. If the input stream is empty, the promise is rejected with the string 'Empty Stream'.

stream.last()

last : Stream a -> Promise a

returns promise holding the last occurrence of the stream. The promise resolves as soon as the stream ends (which maybe later than the last occurrence). If the input stream aborts, the promise is rejected with the abort error. If the input stream is empty, the promise is rejected with the string 'Empty Stream'.

stream.at(index)

at : ( Stream a, Number ) -> Promise a

returns promise holding the occurrence at index position. The promise resolves as soon as the occurrence appears on the input stream. If the input stream aborts, the promise is rejected with abort error. If the input stream is empty or if index is greater than the number of occurrences in the stream, the promise is rejected with the string 'index too large'.

stream.take(n)

take : (Stream a, Number ) -> Stream a

takes at most the n first occurrences in the stream. If the number of occurrences on the input stream is smaller than n the resulting stream will yields all the occurrences of the sources stream. The output stream aborts if the input stream aborts before reaching n occurrences.

stream.takeWhile(p)

takeWhile : (Stream a, a -> aBool | Promise aBool) -> Stream a

Takes the first occurrences that satisfies the predicate p. If p returns a promise then the same rules mentioned in filter applies.

stream.takeUntil(promise)

takeUntil : (Stream a, Promise) -> Stream a

Takes occurrences from the input stream until promise completes. If promise is rejected the output stream aborts with the rejection reason.

stream.skip(n)

skip : (Stream a, n) -> Stream a

skips at most the first n occurrences from the input stream. In case of the input stream ends (resp. aborts) too soon (i.e. before reaching n occurrences) then the output stream will be empty (resp. aborts without any occurrence).

stream.skipWhile(p)

skipWhile : (Stream a, a -> aBool | Promise aBool) -> Stream a

Skips the first occurrences that satisfies the predicate p. If p returns a promise then the same rules mentioned in filter and takeWhile applies.

stream.skipUntil(promise)

skipUntil : (Stream a, Promise) -> Stream a

Skips occurrences from the input stream until promise completes. If promise is rejected the output stream aborts with the rejection reason.

stream.span(p)

span : (Stream a, a -> aBool | Promise aBool) -> [Stream a, Stream a]

splits the input stream into 2 streams, the first yields the first occurrence that satisfies p and the second yields the remaining occurrences. stream.span(p) is equivalent to [stream.takeWhile(p), stream.skipWhile(p)].

stream.break(p)

break : (Stream a, a -> aBool | Promise aBool) -> [Stream a, Stream a]

Similar to span but the first stream yields occurrences that do not satisfy p. stream.break(p) is similar to stream.span( x => !p(x) )

stream.groupBy(p)

groupBy : (Stream a, (a, a) -> aBool | Promise aBool) -> Stream (Stream a)

returns a stream of streams. The nested sub-streams groups adjacent (i.e. successive) occurrences that satisfies the predicate p.

stream.group()

group : Stream a -> Stream (Stream a)

Same as stream.group( (x,y) => x === y ).

stream.reduce(f, [seed])

reduce : ( Stream a, (b, a) -> b | Promise b, b | Promise b ) -> Promise b

runs an accumulator function through the occurrences using an optional start value. On each occurrence, combine the incoming value with the seed, then use the resulting value as a seed for the next occurrence. The result is a promise that is resolved with the final result when the stream ends. This function can also be named reduceLeft because it run the accumulation from left to right, which in the Stream terminology means from the first occurrence to the last one.

The seed value, as well as the return value of the accumulator function, can be a normal value or a promise. If the seed argument is omitted, the accumulation will try to use the first occurrence as a starting value.

The resulting promise is resolved as soon as the last generated promise returned by the accumulator function is resolved. It is rejected as soon as any preceding promise (seed or intermediary results of the accumulation) is rejected. It is also rejected if the Stream is empty and no seed argument has been provided.

stream.reduceRight(f, [seed])

reduceRight: ( Stream a, (a, b) -> b | Promise b, b | Promise b ) -> Promise b

Like reduce but runs the accumulation backwards or from the right to the left, i.e. the seed value is combined with the last occurrence then result is used as a seed with the precedent occurrence.

stream.scan(f, [seed])

scan : ( Stream a, (b, a) -> b | Promise b, b | Promise b ) -> Stream b

Similar to reduce but the result is stream whose occurrences are the intermediary results of the accumulation.

stream.window(size, [min = 0])

window : ( Stream a, Number, Number ) -> Stream [a]

yields a sliding window, an array of size elements, constructed from successive occurrences. If min is specified then it'll not start yielding until having the minimum required of occurrences.

stream.changes([eq])

changes : ( Stream a, (a,a) -> aBool ) -> Stream a

Skips successive duplicates using the eq function for equality testing, defaulting to the === operator. Useful for representing mutable state and avoiding wasteful updates.

stream.toArray()

toArray : Stream a -> Promise [a]

returns a Promise that will resolve to an Array of values from the stream occurrences. The promise is rejected if the stream aborts

stream.all(p)

all : ( Stream a, a -> aBool | Promise aBool ) => Promise Boolean

returns a boolean promise reflecting if all occurrences satisfy a given predicate.

stream.any(p)

any : ( Stream a, a -> aBool | Promise aBool ) -> Promise Boolean

returns a boolean promise reflecting if any occurrences satisfy a given predicate. resolves as soon as an occurrence evaluates to true.

stream.join([sep = ', '])

join : ( Stream a, a) => Promise a

joins string occurrences by a given separator.

stream.combineWith(stream2, f)

combineWith : (Stream a, Stream b, (a,b) -> c ) -> Stream c

combine the latest values from the 2 input streams. On each occurrence on a stream, combine the incoming value and the value of the latest occurrence from the other stream by the function f. Will not start yielding until both streams have started.

stream.combine(stream2)

combine : (Stream a, Stream b) -> Stream [a,b]

Like combine but yields a combined pair of occurrences instead of applying a function.
This is like stream.combineWith(s2, (x,y) => [x,y])

Stream.combine(...streams)

Stream.combine : [Stream a] => Stream [a]

Static method, like the non static stream.combine(stream2...) but combines multiple streams.

stream.concat(stream2)

concat : (Stream a, Stream a) -> Stream a

concatenates 2 streams, first yields occurrences from stream until it ends, then yields occurrences from stream2. If the first stream aborts the resulting stream will abort too, meaning no occurrences will be taken from stream2. Note if occurrences appear in the second stream while the first stream is still running, they will be postponed until it ends.

stream.merge(stream2)

merge : (Stream a, Stream a) -> Stream a

merge occurrences from 2 streams, occurrences from one or other stream will be published as soon as they appear.

stream.zipWith(stream2, f)

zipWith : (Stream a, Stream b, (a,b) -> c) -> Stream c

Combines occurrences pairwise from the 2 input streams and applies f to each pair. Unlike combine, each pair will wait from occurrences to happen on both streams before yielding. The result stream ends (resp. aborts) as soon as one of the input streams end (resp. aborts)

stream.zip(stream2)

zip : (Stream a, Stream b) -> Stream [a,b]

Like zipWith but yields a zipped pair of occurrences without applying a function.
This is like stream.zipWith(stream2, (x,y) => [x,y])

Stream.zip(...streams)

Stream.combine : [Stream a] => Stream [a]

Static method, like the non static stream.zip(stream2...) but zips multiple streams.

stream.relay(stream2)

relay : (Stream a, Stream a) -> Stream a

yields occurrences from the first stream until a first occurrence appears on the second stream, then continue with second stream occurrences, this means no occurrences will be taken further from the first stream.

stream.flatten(f)

flatten : ( Stream (Stream a), (Stream a, Stream a) -> Stream a ) -> Stream a

flattens a 2 layered stream (a stream of streams) into a simple stream using the binary function f.

stream.concatAll()

concatAll : Stream (Stream a) -> Stream a

flattens a 2 layered stream using the concat function. Incoming substreams will be published side by side one after another.

stream.mergeAll()

mergeAll : Stream (Stream a) -> Stream a

flattens a 2 layered stream using the merge function. Each incoming stream will be merged with its predecessors which means occurrences from all incoming substreams get published as soon as they happen.

stream.relayAll()

relayAll : Stream (Stream a) -> Stream a

flattens a 2 layered stream using the relay function. Each incoming substream take the relay from its predecessor, i.e. only occurrences from the last incoming stream get published (the result stream always switches to the latest incoming stream)

stream.concatMap(f)

concatMap : (Stream a, a -> Stream b) -> Stream a

Transforms each occurrence into a substream then flattens the result stream by the concat operation.

stream.mergeMap(f)

mergeMap : (Stream a, a -> Stream b) -> Stream a

Transforms each occurrence into a substream then flattens the result stream by the merge operation.

stream.relayMap(f)

relayMap : (Stream a, a -> Stream b) -> Stream a

Transforms each occurrence into a substream then flattens the result stream by the relay operation.

stream.debounce(eventGen)

debounce : (Stream a, () -> Promise) -> Stream a

debounces occurrences by a given event. Yields an occurrence only if it was followed by a quite period, i.e. no occurrences happened while waiting for the debouncing event. eventGen is a generator function used to generate a new debouncing event on each iteration.

stream.throttle(eventGen)

throttle : (Stream a, () -> Promise) -> Stream a

throttles occurrences by a given event. Once it yield an occurrence, skips all following occurrences until the throttling event occurs. eventGen is a generator function used to generate a new throttling event on each iteration.

stream.forEach((onNext, [onError=noop], [onComplete=noop])

forEach : (Stream a, a -> (), a -> (), a -> ()) -> ()

execute onNext action on each occurrence, onError action if the stream aborts and onComplete when the stream ends.

stream.log(prefix)

log : (Stream a, String) -> ()

a debug utility method