Skip to content

Commit

Permalink
review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
douglascraigschmidt committed Dec 4, 2023
1 parent 2c91603 commit 399bf98
Show file tree
Hide file tree
Showing 10 changed files with 79 additions and 48 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
* (including strategies implemented via Java sequential streams,
* parallel streams, completable futures, RxJava, and Project Reactor
* frameworks) and provides apples-to-apples comparisons of these
* strategies in terms of there performance and scalability.
* strategies in terms of their performance and scalability.
*/
public class ImageStreamGangTest {
/**
Expand Down Expand Up @@ -113,34 +113,24 @@ public static void runTests() {
private static ImageStreamGang makeImageStreamGang(Filter[] filters,
Iterator<List<URL>> urlIterator,
TestsToRun choice) {
switch (choice) {
case SEQUENTIAL_STREAM:
return new ImageStreamSequential(filters,
urlIterator);
case PARALLEL_STREAM:
return new ImageStreamParallel(filters,
urlIterator);
case COMPLETABLE_FUTURES_1:
return new ImageStreamCompletableFuture1(filters,
urlIterator);
case COMPLETABLE_FUTURES_2:
return new ImageStreamCompletableFuture2(filters,
urlIterator);
case RXJAVA1:
return new ImageStreamRxJava1(filters,
urlIterator);
case RXJAVA2:
return new ImageStreamRxJava2(filters,
urlIterator);
case REACTOR1:
return new ImageStreamReactor1(filters,
urlIterator);

case REACTOR2:
return new ImageStreamReactor2(filters,
urlIterator);
}
return null;
return switch (choice) {
case SEQUENTIAL_STREAM -> new ImageStreamSequential(filters,
urlIterator);
case PARALLEL_STREAM -> new ImageStreamParallel(filters,
urlIterator);
case COMPLETABLE_FUTURES_1 -> new ImageStreamCompletableFuture1(filters,
urlIterator);
case COMPLETABLE_FUTURES_2 -> new ImageStreamCompletableFuture2(filters,
urlIterator);
case RXJAVA1 -> new ImageStreamRxJava1(filters,
urlIterator);
case RXJAVA2 -> new ImageStreamRxJava2(filters,
urlIterator);
case REACTOR1 -> new ImageStreamReactor1(filters,
urlIterator);
case REACTOR2 -> new ImageStreamReactor2(filters,
urlIterator);
};
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ protected void processStream() {
List<URL> urls = getInput();

// A future to a stream of URLs.
Stream<CompletableFuture<Optional<URL>>> urlStream = urls
var urlStream = urls
// Convert the URLs in the input list into a sequential
// stream.
.parallelStream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ abstract class ImageStreamCompletableFutureBase
CompletableFuture<Optional<URL>> checkUrlCachedAsync(URL url) {
return CompletableFuture
// Asynchronously check if the URL is cached.
.supplyAsync(() -> Optional.ofNullable(urlCached(url) ? null : url),
.supplyAsync(() -> Optional
.ofNullable(urlCached(url) ? null : url),
getExecutor());

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ protected void processStream() {
.flatMap(url ->
Observable
// Just omit this one object.
.just(url)
.fromCallable(() ->url)

// Run this flow of operations in the common
// fork-join pool.
Expand Down Expand Up @@ -100,7 +100,9 @@ private Observable<Image> applyFilters(Image image) {
// item emitted by the source.
.flatMap(filter -> Observable
// Just omit this one object.
.just(filter)
.fromCallable(() ->filter)

// Run this flow of operations in the common

// Run this flow of operations in the common
// fork-join pool.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@
*/
public class StreamOfFuturesCollector<T>
implements Collector<CompletableFuture<T>,
List<CompletableFuture<T>>,
CompletableFuture<Stream<T>>> {
List<CompletableFuture<T>>,
CompletableFuture<Stream<T>>> {
/**
* A function that creates and returns a new mutable result
* container that will hold all the CompletableFutures in the
Expand Down Expand Up @@ -73,7 +73,8 @@ public BinaryOperator<List<CompletableFuture<T>>> combiner() {
* the final result
*/
@Override
public Function<List<CompletableFuture<T>>, CompletableFuture<Stream<T>>> finisher() {
public Function<List<CompletableFuture<T>>,
CompletableFuture<Stream<T>>> finisher() {
return futures
-> CompletableFuture
// Use CompletableFuture.allOf() to obtain a
Expand Down Expand Up @@ -102,9 +103,8 @@ public Function<List<CompletableFuture<T>>, CompletableFuture<Stream<T>>> finish
* @return An immutable set of collector characteristics, which in
* this case is simply UNORDERED
*/
@SuppressWarnings("unchecked")
@Override
public Set characteristics() {
public Set<Characteristics> characteristics() {
return Collections.singleton(Characteristics.UNORDERED);
}

Expand Down
7 changes: 6 additions & 1 deletion Reactive/Flowable/ex1/.run/flowable-ex1.run.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,12 @@
<option name="ALTERNATIVE_JRE_PATH_ENABLED" value="true" />
<option name="MAIN_CLASS_NAME" value="ex1" />
<module name="flowable-ex1.main" />
<option name="PROGRAM_PARAMETERS" value="-d true -c 1000 -o E -p 100" />
<option name="PROGRAM_PARAMETERS" value="-d true -c 100000 -o M -p 10000" />
<extension name="software.aws.toolkits.jetbrains.core.execution.JavaAwsConnectionExtension">
<option name="credential" />
<option name="region" />
<option name="useCurrentConnection" value="false" />
</extension>
<method v="2">
<option name="Make" enabled="true" />
</method>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package common;

import io.reactivex.rxjava3.annotations.NonNull;
import io.reactivex.rxjava3.core.Flowable;
import io.reactivex.rxjava3.core.FlowableEmitter;
import io.reactivex.rxjava3.core.FlowableOnSubscribe;

import java.util.Random;
Expand Down Expand Up @@ -39,7 +41,7 @@ private NonBackpressureEmitter() {}
Random random = new Random();

// Emit random integers without concern for backpressure.
return emitter -> Flowable
return (@NonNull FlowableEmitter<Integer> emitter) -> Flowable
// Generate a stream of Integer objects from 1 to count, a
// la a reactive for loop!
.range(1, count)
Expand Down
5 changes: 5 additions & 0 deletions Reactive/Flowable/ex2/.run/flowable-ex2.run.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,11 @@
<option name="MAIN_CLASS_NAME" value="ex2" />
<module name="flowable-ex2.main" />
<option name="PROGRAM_PARAMETERS" value="-d true -c 1000 -r 100 -p 100" />
<extension name="software.aws.toolkits.jetbrains.core.execution.JavaAwsConnectionExtension">
<option name="credential" />
<option name="region" />
<option name="useCurrentConnection" value="false" />
</extension>
<method v="2">
<option name="Make" enabled="true" />
</method>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ public void onSubscribe(Subscription subscription) {
}

/**
* Hook method called when next item arrives. It prints the
* Hook method called when the next item arrives. It prints the
* results of prime # checking and may issue the next request.
*/
@Override
Expand Down
40 changes: 33 additions & 7 deletions Reactive/Flowable/ex2/src/main/java/ex2.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,11 @@ public class ex2 {
private static final AtomicInteger sPendingItemCount =
new AtomicInteger(0);

/**
* Count the # of integers emitted by the {@link Publisher}.
*/
private static int sIntegersEmitted;

/**
* Constructor initializes the fields.
*/
Expand All @@ -46,21 +51,28 @@ static public void main(String[] argv) {
BackpressureSubscriber sSubscriber = new BackpressureSubscriber
(sPendingItemCount, Options.instance().requestSize());

// Create a new Scheduler that runs its task in a new Thread.
Scheduler newThreadScheduler = Schedulers.newThread();

// Track all disposables to dispose of them all at once.
CompositeDisposable sDisposables =
new CompositeDisposable(sSubscriber);

ex2
// Create a publisher that runs in a new scheduler thread
// and return a Flowable that emits random Integers at a
// rate determined by the subscriber.
.publishRandomIntegers(Schedulers.newThread())
// and return a Flowable that emits random Integer objects
// at a rate determined by the subscriber.
.publishRandomIntegers(newThreadScheduler)

// Concurrently check each random # to see if it's prime.
.flatMap(ex2.checkForPrimality())

// Dispose of the Subscriber.
.doFinally(sDisposables::dispose)
// Dispose of the Subscriber and newThreadScheduler
// when the stream processing is done.
.doFinally(() -> {
sDisposables.dispose();
newThreadScheduler.shutdown();
})

// The blocking subscriber sets the program in motion.
.blockingSubscribe(sSubscriber);
Expand All @@ -75,7 +87,8 @@ static public void main(String[] argv) {
* numbers on
* @return A {@link Flowable} that publishes random numbers
*/
private static Flowable<Integer> publishRandomIntegers(Scheduler scheduler) {
private static Flowable<Integer> publishRandomIntegers
(Scheduler scheduler) {
return Flowable
// This factory method emits a stream of random integers.
.generate(BackpressureEmitter
Expand Down Expand Up @@ -109,7 +122,20 @@ private static Flowable<Integer> publishRandomIntegers(Scheduler scheduler) {

// Check if the number is prime.
.map(__ ->
PrimeUtils.checkIfPrime(number));
PrimeUtils.checkIfPrime(number))

// Indicate the Scheduler context where the processing was done.
.doOnNext(item -> {
if (Options.instance().printIteration(sIntegersEmitted++))
Options.debug("["
+ (sIntegersEmitted - 1)
+ "] published "
+ item
+ " with "
+ sPendingItemCount.get()
+ " pending");

});
}
}

0 comments on commit 399bf98

Please sign in to comment.