Skip to content

Reactive 'plugin' for easier use

johnmcclean-aol edited this page Aug 29, 2015 · 1 revision

Reactive plugin for easier use

  • LazyReactive : implement LazyReactive for useful functionality for managing LazyFutureStreams E.g. create a Stream optimised for IO operations
this.ioStream().react(()->load1(),load2())
                       .thenSync(String::toUpperCase)
                       .peekSync(str->val=str)
                       .block();
  • EagerReactive : implement EagerReactive for useful functionality for managing EagerFutureStreams
this.cpuStream().of("hello")
                          .map(String::toUpperCase)
                          .peek(str->val=str)
                          .block();
  • Pipes : register Adapters (Queue, Topics, Signals) and get Streams back and Reactive Streams support (see below)
Pipes.register("test", QueueFactories.<String>boundedNonBlockingQueue(100)
							.build());
LazyFutureStream<String> stream =  PipesToLazyStreams.cpuBoundStream("test");
stream.filter(it->it!=null).peek(System.out::println).run();
  • PipesToLazyStreams : convert registered Pipes to LazyFutureStreams
LazyFutureStream<String> stream = PipesToLazyStreams.registerForCPU("test", QueueFactories.
											<String>boundedNonBlockingQueue(100)
												.build());
		stream.filter(it->it!=null)
		      .async()
		      .peek(this::process)
		      .sync()
		      .forEach(System.out::println);
  • PipesToEagerStreams : convert unregistered Pipes to EagerFutureStreams
EagerFutureStream<String> stream = PipesToEagerStreams.registerForCPU("test", QueueFactories.
											<String>boundedNonBlockingQueue(100)
												.build());
		stream.filter(it->it!=null)
		      .async()
		      .peek(this::process)
		      .sync()
		      .forEach(System.out::println);

More Reactive Streams Support

  • Ability to publish and subscribe to Pipes (Queues, Topics and Signals) via the Pipes class

Have a JDK 8 Stream subscribe to a Queue using the Reactive Streams support (a simpler alternative in practice for JDK Streams is to simply call the stream() method on the Queue).

JDKReactiveStreamsSubscriber subscriber = new JDKReactiveStreamsSubscriber ();
		Queue queue = new Queue();
		Pipes.register("hello", queue);
		Pipes.subscribeTo("hello",subscriber);
		queue.offer("world");
		queue.close();
		assertThat(subscriber.getStream().findAny().get(),equalTo("world"));
JDKReactiveStreamsSubscriber subscriber = new JDKReactiveStreamsSubscriber ();
		Queue queue = new Queue();
		Pipes.register("hello", queue);
		Pipes.publisher("hello").get().subscribe(subscriber);
		queue.offer("world");
		queue.close();
		assertThat(subscriber.getStream().findAny().get(),equalTo("world"));
Clone this wiki locally