Skip to content

JDK 8 Reactive Streams

johnmcclean-aol edited this page Nov 21, 2016 · 2 revisions

Derprecated use - SeqSubscriber to subscribe to reactive-streams Publishers and generate a Stream. ReactiveSeq or StreamUtils to turn a JDK 8 Stream into a publisher.

Reactive Streams Publisher

simple-react v0.98 includes a synchronous Reactive Streams Publisher for both LazyFutureStreams and JDK 8 Streams. The synchronous publisher, publishes Reactive Streams events on the calling thread.

Example creating a synchronous Reactive Streams publisher from a JDK 8 Stream

  Publisher p = JDKReactiveStreamsPublisher.ofSync(Stream.iterate(0l, i->i+1l).limit(elements))

Example subscribe to a synchronous LazyFutureStream publisher

  LazyFutureStream.react(this::load1,this::load2).map(this::process).sync().subscribe(subscriber);

Example creating a asynchronous Reactive Streams publisher from a JDK 8 Stream

  Publisher p = JDKReactiveStreamsPublisher.ofAsync(Stream.iterate(0l, i->i+1l).limit(elements))

Example subscribe to a asynchronous LazyFutureStream publisher

  LazyFutureStream.react(this::load1,this::load2).map(this::process).async().subscribe(subscriber);

Reactive Streams Subscriber

To have a JDK 8 Stream subscribe to another Reactive Stream use the FutureStreamSubscriber class.

Simply create a new instance

   JDKReactiveStreamsSubscriber<Integer> subscriber = new JDKReactiveStreamsSubscriber<>();

pass it to a Publisher

   Publisher publisher;

   publisher.subscribe(subscriber);

Then access the Stream via the stream method

  Stream<Integer> stream = subscriber.stream();

Data will be pulled into the stream when it requests it from the publisher (after a terminal operation - such as forEach as has been invoked).

Clone this wiki locally