-
Notifications
You must be signed in to change notification settings - Fork 136
EagerFutureStream
EagerFutureStream has been removed from simple-react - you can use either SimpleReactStream for Eager behaviour or the more powerful LazyFutureSteam instead. #EagerFutureStream
EagerFutureStream is a Java 8 Stream with a twist. Each element in the Stream is a Future task that is executed asynchronously and potentially in parallel.
##Creating a Stream
An EagerFutureStream starts processing as soon as it is created, and there are a number of helpful factory methods. Similarly to Java 8 Stream it can be created via of(T... data).
Stream.of(1,2,3,4)
EagerFutureStream.of(1,2,3,4)
##Asynchronous sequential operation
Stream.of(1,2,3,4).map(it->it+1)
EagerFutureStream.of(1,2,3,4).map(it->it+1)
Calling collect on the above Streams would result in a Collection containing 2,3,4,5. But the process by which that result was arrived at, would be very different. The EagerFutureStream would execute the non-terminal steps sequentially, but on a separate thread (free-thread concurrency). The standard Stream would operate on the current thread. Collect is a JDK 8 Stream method that blocks the current thread. EagerFutureStream can collect on a separate thread using allOf.
##Parallel operation
Stream.of(1,2,3,4).parallel().map(it->it+1)
EagerFutureStream.parallel(1,2,3,4).map(it->it+1)
The operation between these two Streams is also different. The JDK parallel Stream operates by generating substreams that are processed in parallel
When a stream executes in parallel, the Java runtime partitions the stream into multiple substreams. Aggregate operations iterate over and process these substreams in parallel and then combine the results.
EagerFutureStream operates by executing each individual operation independently and responding to asynchronously to a call back when it completes. This allows us to capture and respond to the state of each task when it completes. When can catch and recover from errors, for example. We can retry, if the call fails. We could also change the concurrency rules for each stage in the EagerFutureStream. We can also send in multiple map requests to the same stage, forking the Stream in multiple directions.
##All of the Stream methods are available
e.g.
EagerFutureStream.of(1,2,3,4)
.filter(it -> it<3)
.map(num -> num*100)
.reduce(0,(acc,next)-> acc+next)
All of the SimpleReactStream API methods. All of the jOOλ Seq methods.
EagerFutureStream.parallel(1,2,3,4)
.retry(id-> loadFromCache(id))
.onFail(id-> loadFromDb(id))
.capture(id -> logFailure(id))
.concat(batchedStreams.flatMap(it -> it.stream())
.allOf(Collection<Data> data -> data.stream()
.reduce(new Aggregate(), (acc,next) ->
acc.appendData(next))
).consume(aggregateData -> addToQueue(aggregateData))
To split an EagerFutureStream
EagerFutureStream<Integer> stream = EagerFutureStream.of(1,2,3,4)
.map(it->it*1000000);
List<String> toStrings = stream.map(it-> it+"!")
.block();
List<Date> toDates = stream.map(Date::new)
.block();
Another example
EagerFutureStream<Data> dataStream = eagerReact.fromStreamNoCompletableFutures(
data.stream())
.map(it -> cleanUserData(it));
dataStream.retry(data -> confirmSaved(data))
.onFail(e-> writeToFailover(e.getValue()))
.capture(e -> logFailure(e.getValue()) ;
// <-- ensure data saved on separate thread
List<SavedData> response = dataStream.map(data -> confirmSaved(data))
.block(); //<-- responsive save attempt
###Asynchronous core operations and Synchronous support operations
Some methods in EagerFutureStream operate both synchronously and asynchronously. For example merge synchronously merges two streams. But.. it operates synchronously on an immediately available Stream of CompletableFutures. The results of those futures, the actual results of the merge, are populated asynchronously.
Operations will operate as asynchronously as possible with EagerFutureStreams. For example reduce operations will start processing as soon as the first future results start flowing in to that phase, but meanwhile Futures from much earlier phases may still be waiting to be populated while processing or I/O operations continue on a separate thread.
All ships leave port on Stream creation! Operations act on the results at completion time.
limit - limits results of computation, not starts.
skip - skips results, not start.
retry allows stages in a stream to be replayed for individual tasks.
onFail allows default values, or alternative computations to be used on failure.
capture allows logging and similar failure capture to occur.
allOf allows result collection to occur asynchronously, leaving the calling thread unblocked.
oops - my bad