Skip to content

Fine tuning SimpleReact

johnmcclean-aol edited this page Feb 19, 2015 · 1 revision

SimpleReact offers the following options for fine tuning configuration.

withTaskExecutor(ExecutorService e)

The ExecutorService provides the thread pool that tasks for a given phase will execute on.

This allows you to replace the task executor for a given phase in a stream, and subsequent stages, until it is changed. This means different phases of execution can leverage separate, independent thread pools.

withRetrier(RetryExecutor retry)

The RetryExecutor provides the thread pool for retriable tasks for a given phase, and also defines the retry rules.

This allows you to replace the retry task executor for a given phase in a stream, and subsequent stages, until it is changed. This means different phases of execution can leverage separate, independent thread pools for retrying.

withWaitStrategy(Consumer c)

The Wait Strategy is a Consumer that is called everytime an end-of-chain CompletableFuture is retrieved from a lazy SimpleReact stream. It allows SimpleReact to control the number of active chains of CompletableFutures, by slowing the production of complete CompletableFuture execution chains. The default Wait Strategy is called LimitingMonitor. It monitors current usage and limits excessive usage. The default settings are :

  1. max active chains of CompletableFutures = 2x number of cores
  2. min active chains of CompletableFutures = number of cores

LimitingMonitor prevents excessive Object creation, and protects performance, by preventing the creation of CompletableFuture tasks getting too far ahead of System resources.

Any Consumer can be used as a Wait Strategy. Other Wait Strategies include 👍

  • DoNothingMonitor - this does nothing
  • SamplingMonitor - this monitors the number of CompletableFutures created, and passes control to a supplied Consumer for a sample of CompletableFuture chains. It can be used in co-operation with the LimitingMonitor to maintain a sensible number of Active chains, while reducing monitoring touch points.

This method allows you to replace the Wait Strategy for a given phase in a stream, and subsequent stages, until it is changed.

withLazyCollector(LazyResultConsumer lazy)

The Lazy Result Consumer controls the collection of results from a lazy stream. The default Lazy Result Consumer is the BatchingResultConsumer. This Batches the collection of results, which allows lazy Streams of CompletableFutures to operate in parallel rather than sequentially. This can be used in co-operating with the SamplingResultCollector to only collect results from a small sample of a very large Stream.

The lazy collector is applied at the 'pull' end of a lazy stream, and does not operate per phase.

withQueueFactory(QueueFactory queue)

QueueFactory is used to generate SimpleReact queue's internally. SimpleReact Queue's backed by a bounded BlockingQueue can apply backpressure to fast producing streams. The implementation of non-core Stream and Seq methods will involve the generation of a Stream/Seq from a Queue that is populated asynchronously and in parallel by SimpleReact.

This method allows you to replace the QueueFactory for a given phase in a stream, and subsequent stages, until it is changed.

The default SimpleReact QueueFactory returns an unbounded queue. SimpleReact provides a QueueFactory class with the following Queue Factory methods

QueueFactory boundedQueue(int queueSize)

Use this queue to apply backpressure to fast stages

e.g. Zipping two uneven Streams

lazy().react(()->1,SimpleReact.times(1000)).peek(c -> sleep(1000))
	.zip(lazy().reactInfinitely(()-> "100").withQueueFactory(QueueFactories.boundedQueue(10))) 

The fast stream, backed by a boundedQueue Factory will not overpopulate a buffer with data to be processed, in fact it will populate only 10 data points, before pausing. The slow producing stream, will be completely unaffected by the backpressure being applied to the fast stream.

QueueFactory synchronousQueue()

This queue can also be used to apply backpressure. It creates a 0 capacity queue, that only accepts data when a consumer is present to remove it.

QueueFactory unBoundedQueue()

This is the default Queue Factory. You can use this to reset to the default after a phase using bounded or synchronous queue factories.

Clone this wiki locally