Skip to content

Pipes : an event bus

johnmcclean-aol edited this page Nov 22, 2016 · 2 revisions

The Pipes event bus

Pipes : Stores and manages cyclops-react Adapters for cross-thread communication

Connected Streams will not be able to complete collect or reduce style methods unless the underlying Adapter for data transfer is closed.

I.e. connected Streams remain connected until either the Adapter is closed, or they disconnect (due to a limit for example).

Examples

//create a Pipes instance to manage inter-thread communication
Pipes<String, Integer> bus = Pipes.of();

//register a non-blocking queue for data transfer
bus.register("reactor", QueueFactories.<Integer>boundedNonBlockingQueue(1000)
                                              .build());
        
//publish data to transfer queue
bus.publishTo("reactor",ReactiveSeq.of(10,20,30));
        
//close transfer queue - connected Streams will disconnect once all
//data transferred
bus.close("reactor");
        
        
//on another thread
       
//connect to our transfer queue
LazyFutureStream<Integer> futureStream =  bus.futureStream("reactor", new LazyReact(10,10)).get();
       
       
//read data and print it out the console.
futureStream.map(i->"fan-out to handle blocking I/O:" + Thread.currentThread().getId() + ":"+i)
                   .forEach(System.out::println);
Clone this wiki locally