Skip to content

reactivego/rx

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

rx

import "github.com/reactivego/rx"

Package rx provides Reactive Extensions for Go, an API for asynchronous programming with observables and operators.

Our intellectual powers are rather geared to master static relations and our powers to visualize processes evolving in time are relatively poorly developed. For that reason we should do our utmost to shorten the conceptual gap between the static program and the dynamic process, to make the correspondence between the program (spread out in text space) and the process (spread out in time) as trivial as possible.

Edsger W. Dijkstra, March 1968

Prerequisites

A working Go environment on your system.

Installation

You can use this package directly without installing anything.

Optionally, install the jig tool in order to regenerate this package. The jig tool is also needed to support using the rx/generic sub-package as a generics library.

$ go get github.com/reactivego/jig

The jig tool provides the parametric polymorphism capability that Go 1 is missing. It works by generating code, replacing place-holder types in generic functions and datatypes with specific types.

Usage

This package can be used either directly as a standard package or as a generics library for Go 1.

Standard Package

To use it as a standard package, import the root rx package to access the API directly.

package main

import "github.com/reactivego/rx"

func main() {
    rx.From(1,2,"hello").Println()
}

For more information see the Go Package Documentation. In this document there is a selection of the most used operators and there is also a complete list of operators in a separate document.

Generics Library

To use it as a Generics Library for Go 1, import the rx/generic package. Generic programming supports writing programs that use statically typed Observables and Operators. For example:

package main

import _ "github.com/reactivego/rx/generic"

func main() {
	FromString("Hello", "Gophers!").Println()
}

Note that FromString is statically typed.

Generate statically typed code by running the jig tool.

$ jig -v
found 169 templates in package "rx" (github.com/reactivego/rx/generic)
...

Details on how to use generics can be found in the generic folder.

Observables

The main focus of rx is Observables. In accordance with Dijkstra's observation that we are ill-equipped to visualize how dynamic processes evolve over time, the use of Observables indeed narrows the conceptual gap between the static program and the dynamic process. Observables have a dynamic process at their core because they are defined as values which change over time. They are combined in static relations with the help of many different operators. This means that at the program level (spread out in the text space) you no longer have to deal explicitly with dynamic processes, but with more tangible static relations.

An Observable:

  • is a stream of events.
  • assumes zero to many values over time.
  • pushes values
  • can take any amount of time to complete (or may never)
  • is cancellable
  • is lazy (it doesn't do anything until you subscribe).

This package uses interface{} for value types, so an observable can emit a mix of differently typed values. To create an observable that emits three values of different types you could write the following little program.

package main

import "github.com/reactivego/rx"

func main() {
    rx.From(1,"hi",2.3).Println()
}

Note the program creates a mixed type observable from an int, string and a float.

Observables in rx are somewhat similar to Go channels but have much richer semantics:

Observables can be hot or cold. A hot observable will try to emit values even when nobody is subscribed. Values emitted during that period will be lost. The position of a mouse pointer or the current time are examples of hot observables.

A cold observable will only start emitting values after somebody subscribes. The contents of a file or a database are examples of cold observables.

An observable can complete normally or with an error, it uses subscriptions that can be canceled from the subscriber side. Where a normal variable is just a place where you read and write values from, an observable captures how the value of this variable changes over time.

Concurrency flows naturally from the fact that an observable is an ever changing stream of values. Every Observable conceptually has at its core a concurrently running process that pushes out values.

Operators

Operators form a language in which programs featuring Observables can be expressed. They work on one or more Observables to transform, filter and combine them into new Observables. A complete list of operators is available in a separate document.

Below is a selection of the most commonly used ones:

BufferTime

buffers the source Observable values for a specific time period and emits those as a slice periodically in time.

BufferTime

Code:

const ms = time.Millisecond
source := rx.Timer(0*ms, 100*ms).Take(4).ConcatMap(func(i interface{}) rx.Observable {
    switch i.(int) {
    case 0:
        return rx.From("a", "b")
    case 1:
        return rx.From("c", "d", "e")
    case 3:
        return rx.From("f", "g")
    }
    return rx.Empty()
})
source.BufferTime(100 * ms).Println()

Output:

[a b]
[c d e]
[]
[f g]
Catch

recovers from an error notification by continuing the sequence without emitting the error but by switching to the catch ObservableInt to provide items.

Catch

Code:

const problem = rx.RxError("problem")

rx.From(1, 2, 3).ConcatWith(rx.Throw(problem)).Catch(rx.From(4, 5)).Println()

Output:

1
2
3
4
5
CatchError

catches errors on the Observable to be handled by returning a new Observable or throwing an error. It is passed a selector function that takes as arguments err, which is the error, and caught, which is the source observable, in case you'd like to "retry" that observable by returning it again. Whatever observable is returned by the selector will be used to continue the observable chain.

CatchError

Code:

const problem = rx.RxError("problem")

catcher := func(err error, caught rx.Observable) rx.Observable {
    if err == problem {
        return rx.From(4, 5)
    } else {
        return caught
    }
}

rx.From(1, 2, 3).ConcatWith(rx.Throw(problem)).CatchError(catcher).Println()

Output:

1
2
3
4
5
CombineLatest

will subscribe to all Observables. It will then wait for all of them to emit before emitting the first slice. Whenever any of the subscribed observables emits, a new slice will be emitted containing all the latest value.

Concat

emits the emissions from two or more observables without interleaving them.

ConcatMap

transforms the items emitted by an Observable by applying a function to each item and returning an Observable. The stream of Observable items is then flattened by concattenating the emissions from the observables without interleaving.

ConcatWith

emits the emissions from two or more observables without interleaving them.

ConcatWith

Create

provides a way of creating an Observable from scratch by calling observer methods programmatically.

The create function provided to Create will be called once to implement the observable. It is provided with a Next, Error, Complete and Canceled function that can be called by the code that implements the Observable.

DebounceTime

only emits the last item of a burst from an Observable if a particular timespan has passed without it emitting another item.

DistinctUntilChanged

only emits when the current value is different from the last.

The operator only compares emitted items from the source Observable against their immediate predecessors in order to determine whether or not they are distinct.

DistinctUntilChanged

Code:

rx.From(1, 2, 2, 1, 3).DistinctUntilChanged().Println()

Output:

1
2
1
3
Do

calls a function for each next value passing through the observable.

Filter

emits only those items from an observable that pass a predicate test.

From

creates an observable from multiple values passed in.

Just

creates an observable that emits a particular item.

Map

transforms the items emitted by an Observable by applying a function to each item.

Merge

combines multiple Observables into one by merging their emissions. An error from any of the observables will terminate the merged observables.

Merge

Code:

a := rx.From(0, 2, 4)
b := rx.From(1, 3, 5)
rx.Merge(a, b).Println()

Output:

0
1
2
3
4
5
MergeMap

transforms the items emitted by an Observable by applying a function to each item an returning an Observable. The stream of Observable items is then merged into a single stream of items using the MergeAll operator.

This operator was previously named FlatMap. The name FlatMap is deprecated as MergeMap more accurately describes what the operator does with the observables returned from the Map project function.

MergeWith

combines multiple Observables into one by merging their emissions. An error from any of the observables will terminate the merged observables.

MergeWith

Code:

a := rx.From(0, 2, 4)
b := rx.From(1, 3, 5)
a.MergeWith(b).Println()

Output:

0
1
2
3
4
5
Of

emits a variable amount of values in a sequence and then emits a complete notification.

Publish

returns a Multicaster for a Subject to an underlying Observable and turns the subject into a connnectable observable.

A Subject emits to an observer only those items that are emitted by the underlying Observable subsequent to the time the observer subscribes.

When the underlying Obervable terminates with an error, then subscribed observers will receive that error. After all observers have unsubscribed due to an error, the Multicaster does an internal reset just before the next observer subscribes. So this Publish operator is re-connectable, unlike the RxJS 5 behavior that isn't. To simulate the RxJS 5 behavior use Publish().AutoConnect(1) this will connect on the first subscription but will never re-connect.

PublishReplay

returns a Multicaster for a ReplaySubject to an underlying Observable and turns the subject into a connectable observable. A ReplaySubject emits to any observer all of the items that were emitted by the source observable, regardless of when the observer subscribes. When the underlying Obervable terminates with an error, then subscribed observers will receive that error. After all observers have unsubscribed due to an error, the Multicaster does an internal reset just before the next observer subscribes.

Scan

applies a accumulator function to each item emitted by an Observable and the previous accumulator result.

The operator accepts a seed argument that is passed to the accumulator for the first item emitted by the Observable. Scan emits every value, both intermediate and final.

StartWith

returns an observable that, at the moment of subscription, will synchronously emit all values provided to this operator, then subscribe to the source and mirror all of its emissions to subscribers.

StartWith

Code:

rx.From(2, 3).StartWith(1).Println()

Output:

1
2
3
SwitchMap

transforms the items emitted by an Observable by applying a function to each item an returning an Observable.

In doing so, it behaves much like MergeMap (previously FlatMap), except that whenever a new Observable is emitted SwitchMap will unsubscribe from the previous Observable and begin emitting items from the newly emitted one.

Take

emits only the first n items emitted by an Observable.

TakeUntil

emits items emitted by an Observable until another Observable emits an item.

WithLatestFrom

will subscribe to all Observables and wait for all of them to emit before emitting the first slice. The source observable determines the rate at which the values are emitted. The idea is that observables that are faster than the source, don't determine the rate at which the resulting observable emits. The observables that are combined with the source will be allowed to continue emitting but only will have their last emitted value emitted whenever the source emits.

Note that any values emitted by the source before all other observables have emitted will effectively be lost. The first emit will occur the first time the source emits after all other observables have emitted.

WithLatestFrom

Code:

a := rx.From(1,2,3,4,5)
b := rx.From("A","B","C","D","E")
a.WithLatestFrom(b).Println()

Output:

[2 A]
[3 B]
[4 C]
[5 D]

Concurency

The rx package does not use any 'bare' go routines internally. Concurrency is tightly controlled by the use of a specific scheduler. Currently there is a choice between 2 different schedulers; a trampoline schedulers and a goroutine scheduler.

By default all subscribing operators except ToChan use the trampoline scheduler. A trampoline puts tasks on a task queue and only starts processing them when the Wait method is called on a returned subscription. The subscription itself calls the Wait method of the scheduler.

Only the Connect and Subscribe methods return a subscription. The other subscribing operators Println, ToSingle, ToSlice and Wait are blocking by default and only return when the scheduler returns after wait. In the case of ToSingle and ToSlice both a value and error are returned and in the case of Println and Wait just an error is returned or nil when the observable completed succesfully.

The only operator that uses the goroutine scheduler by default is the ToChan operator. ToChan returns a channel that it feeds by subscribing on the goroutine scheduler.

To change the scheduler on which subscribing needs to occur, use the SubscribeOn operator

Regenerating this Package

This package is generated from generics in the sub-folder generic by the jig tool. You don't need to regenerate this package in order to use it. However, if you are interested in regenerating it, then read on.

If not already done so, install the jig tool.

$ go get github.com/reactivego/jig

Change the current working directory to the package directory and run the jig tool as follows:

$ jig -v

This works by replacing place-holder types of generic functions and datatypes with the interface{} type.

Acknowledgements

This library started life as the Reactive eXtensions for Go library by Alec Thomas. Although the library has been through the metaphorical meat grinder a few times, its DNA is still clearly present in this library and I owe Alec a debt of grattitude for the work he has made so generously available.

This implementation takes most of its cues from RxJS. It is the ReactiveX incarnation that pushes the envelope in evolving operator semantics.

License

This library is licensed under the terms of the MIT License. See LICENSE file for copyright notice and exact wording.