-
Notifications
You must be signed in to change notification settings - Fork 136
StreamSource
StreamSource is a builder for building Stream into which data can be pushed. The supported Stream types are
- Standard JDK 8 Stream
- cyclops-react ReactiveSeq
- LazyFutureStream
The PushableStreamBuilder will create a PushableStream type (PushableStream, PushableSeq, PushableLazyFutureStream) (which are an implementations of jOOλ Tuple2 - they can be defined, accessed and used as such if desired.
PushableStream<Integer> pushable = StreamSource.stream();
pushable.getInput().add(10);
pushable.getInput().close();
assertThat(pushable.getStream().collect(Collectors.toList()),
hasItem(10));
PushableSeq<Integer> pushable = StreamSource.reactiveSeq();
pushable.getInput().add(10);
pushable.getInput().close();
assertThat(pushable.getStream().collect(Collectors.toList()),
hasItem(10));
PushableLazyFutureStream<Integer> pushable = StreamSource.lazyFutureStream();
pushable.getInput().add(100);
pushable.getInput().close();
assertThat(pushable.getStream().collect(Collectors.toList()),
hasItem(100));
This allows you to pass in an elastic cache LazyFutureStream builders for the LazyFutureStream into the PushableStreamBuilder.
PushableLazyFutureStream<Integer> pushable =StreamSource.futureStream(SequentialElasticPools.lazyReact);
pushable.getInput().add(100);
pushable.getInput().close();
assertThat(pushable.getStream().collect(Collectors.toList()),
hasItem(100));
By default the PushableStreamBuilder produces unbounded async datastructures for inputing data into the Stream, but this is configurable.
E.g.
StreamSource.of(4)
.pushableStream();
This will create a pushable JDK 8 Stream with back pressure after 4 unread messages have been added to the input Queue. Clients will then either be blocked (if using offer to add data) or will drop data (if using add)
Using an async.Topic as the input datastructure for pushable Streams means that multiple Streams can recieve exactly the same data.
Example
MultiplePushableStreamsBuilder<Integer> multi = new PushableStreamBuilder()
.multiple();
LazyFutureStream<Integer> pushable = multi
.pushableLazyFutureStream();
Seq<Integer> seq = multi.pushableSeq();
Stream<Integer> stream = multi.pushableStream();
All three Streams (of different types) will recieve exactly the same data when it is pushed
multi.getInput().offer(100);
Will make 100 available to all three Streams. Note that the Topic will only empty of a value once all three Streams have read it.
Any simple-react datastructure can be used with PushableStreamBuilder (and any supported Stream type). Here is an example of building a Java 8 Stream from an async.Signal.
Signal<Integer> signal = Signal.queueBackedSignal();
Stream<Integer> pushable = new PushableStreamBuilder()
.pushableStream(signal.getDiscrete());
signal.set(100);
signal.close();
assertThat(pushable.collect(Collectors.toList()), hasItem(100));
For more information see understanding the simple-react pull/push model
oops - my bad