Skip to content

Queues explained

johnmcclean-aol edited this page Jun 11, 2015 · 6 revisions

SimpleReact Queues are a way of joining two or more JDK 8 Streams. Producing Streams add to the Queue, and Consuming Streams remove data from the Queue. Consuming Streams will block if there is no data.

SimpleReact Queues can be backed by any BlockingQueue implementation (and we also supply a wrapped Non-Blocking Queue via QueueFactories - but special care needs to be taken using this). With unbounded BlockingQueues Producers can limitlessly (until OutOfMemory) add to a Queue. With bounded Blocking Queues Producers will be blocked (using queue.offer() ) until Consumers can remove data.

Simple example :

Stream<String> stream = Stream.of("1", "2", "3");
Queue<String> q = new Queue(new LinkedBlockingQueue());
q.fromStream(stream);
Stream<String> dq = q.stream();
Integer dequeued = q.stream().limit(3).map(it -> Integer.valueOf(it))
			.reduce(0, (acc, next) -> acc + next);

In this example we populate a Queue from a finite Stream, and then build another Steam from the Queue. Both Streams are operating in the same thread.

Streaming across threads

Queues can be very useful when infinite (or large) Producing Streams provide data to infinite (or large) Consuming Streams on separate threads.

Generating an infinite Stream using SimpleReact

Queue<Integer> q = new Queue(new LinkedBlockingQueue());
AtomicInteger count = new AtomicInteger(0);
new SimpleReact().react(() -> q.fromStream(
                                  Stream.generate(() -> count.incrementAndGet())));

In this example, we are simply returning the next number in a sequence, but we could use similar code to read data from file, remote service, or database. Similarly we could populate the Stream on a listening thread triggered by external events (incoming REST Calls, or messages from a message queue).

We can extract a finite set of values from the queue

q.stream().limit(1000)
      .peek(it -> System.out.println(it))
      .collect(Collectors.toList());

Or build an infinite lazy Stream

    SimpleReact.lazy()
	 .fromStream(q.streamCompletableFutures())
	 .then(it -> "*" + it).peek(it -> incrementFound())
	 .peek(it -> System.out.println(it))
	 .run(Executors.newSingleThreadExecutor());

This will move Stream management unto a single thread (not the current thread) - via .run(Executors.newSingleThreadExecutor()), while SimpleReact will react to the Stream in a multithreaded fashion o with SimpleReact's ExecutorService (by default configured to have parallelism equal to the number of cores).

Using a Queue to apply back pressure

If a bounded Queue is full it will block until data is extracted by a consumer

AtomicInteger found = new AtomicInteger(0);
Queue<Integer> q = new Queue<>(new LinkedBlockingQueue<>(2));

new SimpleReact().react(() -> 1, ()->2,()->3,()->4)
                 .then( (it) -> { q.offer(it); return found.incrementAndGet();});

 sleep(10);
 assertThat(found.get(), is(2));

Removing two items from the Queue will allow SimpleReact to fill it back up again.

 q.stream().limit(2).collect(Collectors.toList());
 sleep(2);
 assertThat(found, is(4));

Multiple Streams reading from a Queue

Visualisation of a SimpleReact dataflow

Queue queue = new Queue();

//**thread 1:** manage input
queue.fromStream(restStream);

//**thread 2:** manage processing stream A
Stream processingStreamA = queue.stream()
                       .map(entity -> process(entity))
                       .forEach(processed -> save(processed));

//**thread 3:** manage processing stream B
Stream processingStreamB = queue.stream()
                       .map(entity -> process(entity))
                       .forEach(processed -> save(processed));

Multiple Streams writing to a Queue

Visualisation of a SimpleReact dataflow

Queue queue = new Queue();

//thread 1 : manage file stream
fileStream.map(next -> toEntity(next))
       .peek(entity-> queue.add(entity));

//thread 2: manage rest stream
queue.fromStream(restStream);

//thread 3: manage processing stream
Stream processingStream = queue.stream()
                       .map(entity -> process(entity))
                       .forEach(processed -> save(processed));
Clone this wiki locally