Skip to content

Agrona Wait Free Queues

johnmcclean-aol edited this page Aug 5, 2015 · 2 revisions

Info on how to configure simple-react LazyFutureStream to use an Agron wait-free queue with mechanical sympathy.

NB : LazyFutureStream makes use of Queues internally for some operations, this article details how & why to configure internal Queues for LazyFutureStream. Advice on bound sizes would not apply to external Queues you may wish to set up (even if LazyFutureStreams read from the those Queues). For external Queues your own business rules will determine max bounds.

Performance enhancements

New wait free Queue with mechanical sympathy

0.96 of simple-react adds support for simple-react Queue’s backed by a ManyToOneConcurrentArrayQueue.This is a Java implementation of an algorithm from Fast Flow, by the leading Java experts in this area (Martin Thompson, Richard Warburton, Todd Montgomery) via their Agrona project (which provides data structures and utilities used in the ultra-low-latency Aeron messaging system.

Performance characteristics

In simple benchmarking, LazyFutureStreams backed by an Agrona ManyToOneConcurrentArrayQueue can perform up to 40% faster than LazyFutureStreams backed by a JDK bounded wait free Queue (ConcurrentLinkedQueue). While results for most queue types showed significant variation in performance, throughput from LazyFutureStreams that are backed by Agrona ManyToOneConcurrentArrayQueue’s were much more stable. Differences in performance for LazyFutureStreams backed by ManyToOneConcurrentArrayQueue and ConcurrentLinkedQueue varied between 0 and over 40%. Non-blocking Queues performed up to twice as well as blocking queues (also with a lot of variation).

Bound size matters

Creating a bounded queue with a large buffer capacity can be expensive. Agrona ManyToOneConcurrentArrayQueue’s with a bound size of 200,000 entries were ~10 times slower than Agrona ManyToOneConcurrentArrayQueue’s with a bound size of 110.

JDK ConcurrentLinkedQueue is the default

Despite the improvements apparent from using the Agrona ManyToOneConcurrentArrayQueue as the backing Queue, we continue to use JDK ConcurrentLinkedQueue as the default. This is because it is impossible to tell what the bound size should be for all operations. For many / most operations, the bound size can be very similar to the concurrency level as determined by the MaxActive settings. But for other operators such as flatMap, it isn’t possible to tell how many elements would need to be buffered on the queue. For that reason, the backing queue is configurable per stage via two simple operators.

Non-blocking Queue operators

boundedWaitFree(int size)

This operator tells LazyFutureStream to use a bounded wait free queue for the subsequent stages (until configured otherwise). E.g.

new LazyReact(10,100)  //configure a LazyFutureStream builder with a thread pool of 10 threads, that accepts 100 concurrent tasks
		.react(toMyData)   //define the initial Suppliers to asynchronously react to
		.boundedWaitFree(110)  //use a bounded queue factory, with max queue size slightly above the max number of concurrent tasks
		.limit(400)    //take the first 400 elements from this async Stream. The limit operator makes use of an async Queue and can thus benefit from being backed by an Agrona wait free queue.
		.toList();

unboundedWaitFree()

This is the default operator and it tells LazyFutureStream to use an unbounded wait free Queue. Although algorithmically wait-free this Queue is more likely to experience contention at lower levels (e.g. via garbage collector activity, or invalidation of CPU caches), it is however, unbounded and as such

  1. does not incur significant queue creation costs

  2. less likely to cause data loss should flatMap based expansions turn out to be large.

     new LazyReact(10,100)  //configure a LazyFutureStream builder with a thread pool of 10 threads, that accepts 100 concurrent tasks
    	.react(toMyData)   //define the initial Suppliers to asynchronously react to
    	.boundedWaitFree(110)  //use a bounded queue factory, with max queue size slightly above the max number of concurrent tasks
    	.limit(400)    //take the first 400 elements from this async Stream. The limit operator makes use of an async Queue and can thus benefit from being backed by an Agrona wait free queue.
    	.unboundedWaitFree()  //switch to unboundedWaitFree before performing an large flatMap operation
    	.flatMap(this::loadAndStream)
    	.toList();
    

Recommendations

Use a boundedWaitFree queue where bound sizes can be reasonably asserted to be low, otherwise favor an unbounded wait free queue. Test, test, test your assumptions on performance critical paths.

What do we mean backed by a Queue?

simple-react works by perforrming aggregate operations over Streams of CompletableFutures. To perform more complex operations we often need the result that will be asynchronously populated in the Future. In order to keep data flowing smoothly through your Stream, we ‘plumb’ multiple different streams together internally, asynchronously populatng and extracting data from simple-react async.Queues. While the simple-react async.Queue provides the necessary logic to manage this, it isn’t in itself an actual Queue data structure implementation. The backing Queue is pluggable, and simple-react QueueFactories are provided for a range of blocking and non-blocking queue implementations.

Clone this wiki locally