diff --git a/common/reactive/src/main/java/io/helidon/common/reactive/Multi.java b/common/reactive/src/main/java/io/helidon/common/reactive/Multi.java index ab67dc6e0bb..0ea7ac30143 100644 --- a/common/reactive/src/main/java/io/helidon/common/reactive/Multi.java +++ b/common/reactive/src/main/java/io/helidon/common/reactive/Multi.java @@ -23,6 +23,7 @@ import java.util.concurrent.Flow.Publisher; import java.util.concurrent.Flow.Subscriber; import java.util.function.BiConsumer; +import java.util.function.BiFunction; import java.util.function.Consumer; import java.util.function.Function; import java.util.function.Predicate; @@ -241,6 +242,40 @@ default Single collectStream(java.util.stream.Collector colle return new MultiCollectorPublisher<>(this, collector); } + /** + * Combine subsequent items via a callback function and emit + * the final value result as a Single. + *

+ * If the upstream is empty, the resulting Single is also empty. + * If the upstream contains only one item, the reducer function + * is not invoked and the resulting Single will have only that + * single item. + *

+ * @param reducer the function called with the first value or the previous result, + * the current upstream value and should return a new value + * @return Single + */ + default Single reduce(BiFunction reducer) { + Objects.requireNonNull(reducer, "reducer is null"); + return new MultiReduce<>(this, reducer); + } + + /** + * Combine every upstream item with an accumulator value to produce a new accumulator + * value and emit the final accumulator value as a Single. + * @param supplier the function to return the initial accumulator value for each incoming + * Subscriber + * @param reducer the function that receives the current accumulator value, the current + * upstream value and should return a new accumulator value + * @param the accumulator and result type + * @return Single + */ + default Single reduce(Supplier supplier, BiFunction reducer) { + Objects.requireNonNull(supplier, "supplier is null"); + Objects.requireNonNull(reducer, "reducer is null"); + return new MultiReduceFull<>(this, supplier, reducer); + } + /** * Get the first item of this {@link Multi} instance as a {@link Single}. * diff --git a/common/reactive/src/main/java/io/helidon/common/reactive/MultiReduce.java b/common/reactive/src/main/java/io/helidon/common/reactive/MultiReduce.java new file mode 100644 index 00000000000..d2ce65d686c --- /dev/null +++ b/common/reactive/src/main/java/io/helidon/common/reactive/MultiReduce.java @@ -0,0 +1,119 @@ +/* + * Copyright (c) 2020 Oracle and/or its affiliates. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package io.helidon.common.reactive; + +import java.util.Objects; +import java.util.concurrent.Flow; +import java.util.function.BiFunction; + +/** + * Combine subsequent items via a callback function and emit + * the result as a Single. + * @param the element type of the source and result + */ +final class MultiReduce implements Single { + + private final Multi source; + + private final BiFunction reducer; + + MultiReduce(Multi source, BiFunction reducer) { + this.source = source; + this.reducer = reducer; + } + + @Override + public void subscribe(Flow.Subscriber subscriber) { + source.subscribe(new ReducerSubscriber<>(subscriber, reducer)); + } + + static final class ReducerSubscriber extends DeferredScalarSubscription + implements Flow.Subscriber { + + private final BiFunction reducer; + + private Flow.Subscription upstream; + + private T current; + + ReducerSubscriber(Flow.Subscriber downstream, BiFunction reducer) { + super(downstream); + this.reducer = reducer; + } + + @Override + public void onSubscribe(Flow.Subscription subscription) { + SubscriptionHelper.validate(upstream, subscription); + upstream = subscription; + // FIXME subscribeSelf() + downstream().onSubscribe(this); + subscription.request(Long.MAX_VALUE); + } + + @Override + public void onNext(T item) { + Flow.Subscription s = upstream; + if (s != SubscriptionHelper.CANCELED) { + T current = this.current; + if (current == null) { + this.current = item; + } else { + try { + this.current = Objects.requireNonNull(reducer.apply(current, item), + "The reducer returned a null item"); + } catch (Throwable ex) { + s.cancel(); + onError(ex); + } + } + } + } + + @Override + public void onError(Throwable throwable) { + if (upstream != SubscriptionHelper.CANCELED) { + upstream = SubscriptionHelper.CANCELED; + current = null; + // FIXME error() + downstream().onError(throwable); + } + } + + @Override + public void onComplete() { + if (upstream != SubscriptionHelper.CANCELED) { + upstream = SubscriptionHelper.CANCELED; + T current = this.current; + this.current = null; + if (current == null) { + // FIXME complete() + downstream().onComplete(); + } else { + complete(current); + } + } + } + + @Override + public void cancel() { + super.cancel(); + upstream.cancel(); + upstream = SubscriptionHelper.CANCELED; + } + } +} diff --git a/common/reactive/src/main/java/io/helidon/common/reactive/MultiReduceFull.java b/common/reactive/src/main/java/io/helidon/common/reactive/MultiReduceFull.java new file mode 100644 index 00000000000..3e16190f2f7 --- /dev/null +++ b/common/reactive/src/main/java/io/helidon/common/reactive/MultiReduceFull.java @@ -0,0 +1,123 @@ +/* + * Copyright (c) 2020 Oracle and/or its affiliates. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package io.helidon.common.reactive; + +import java.util.Objects; +import java.util.concurrent.Flow; +import java.util.function.BiFunction; +import java.util.function.Supplier; + +/** + * Combine items via an accumulator and function into a single value. + * @param the source value type + * @param the accumulator and result type + */ +final class MultiReduceFull implements Single { + + private final Multi source; + + private final Supplier supplier; + + private final BiFunction reducer; + + MultiReduceFull(Multi source, Supplier supplier, BiFunction reducer) { + this.source = source; + this.supplier = supplier; + this.reducer = reducer; + } + + @Override + public void subscribe(Flow.Subscriber subscriber) { + R initial; + try { + initial = Objects.requireNonNull(supplier.get(), + "The supplier returned a null item"); + } catch (Throwable ex) { + subscriber.onSubscribe(EmptySubscription.INSTANCE); + subscriber.onError(ex); + return; + } + source.subscribe(new ReduceFullSubscriber<>(subscriber, initial, reducer)); + } + + static final class ReduceFullSubscriber extends DeferredScalarSubscription + implements Flow.Subscriber { + + private final BiFunction reducer; + + private R accumulator; + + private Flow.Subscription upstream; + + ReduceFullSubscriber(Flow.Subscriber downstream, R initial, BiFunction reducer) { + super(downstream); + this.reducer = reducer; + this.accumulator = initial; + } + + @Override + public void onSubscribe(Flow.Subscription subscription) { + SubscriptionHelper.validate(upstream, subscription); + upstream = subscription; + // FIXME subscribeSelf() + downstream().onSubscribe(this); + subscription.request(Long.MAX_VALUE); + } + + @Override + public void onNext(T item) { + Flow.Subscription s = upstream; + if (s != SubscriptionHelper.CANCELED) { + try { + accumulator = Objects.requireNonNull(reducer.apply(accumulator, item), + "The reducer returned a null item"); + } catch (Throwable ex) { + s.cancel(); + onError(ex); + } + } + } + + @Override + public void onError(Throwable throwable) { + if (upstream != SubscriptionHelper.CANCELED) { + upstream = SubscriptionHelper.CANCELED; + accumulator = null; + // FIXME error() + downstream().onError(throwable); + } + } + + @Override + public void onComplete() { + if (upstream != SubscriptionHelper.CANCELED) { + upstream = SubscriptionHelper.CANCELED; + R accumulator = this.accumulator; + this.accumulator = null; + complete(accumulator); + } + } + + @Override + public void cancel() { + super.cancel(); + upstream.cancel(); + upstream = SubscriptionHelper.CANCELED; + } + } +} diff --git a/common/reactive/src/test/java/io/helidon/common/reactive/MultiReduceFullTckTest.java b/common/reactive/src/test/java/io/helidon/common/reactive/MultiReduceFullTckTest.java new file mode 100644 index 00000000000..d8bd6168ab4 --- /dev/null +++ b/common/reactive/src/test/java/io/helidon/common/reactive/MultiReduceFullTckTest.java @@ -0,0 +1,46 @@ +/* + * Copyright (c) 2020 Oracle and/or its affiliates. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package io.helidon.common.reactive; + +import org.reactivestreams.tck.TestEnvironment; +import org.reactivestreams.tck.flow.FlowPublisherVerification; +import org.testng.annotations.Test; + +import java.util.concurrent.Flow; + +@Test +public class MultiReduceFullTckTest extends FlowPublisherVerification { + + public MultiReduceFullTckTest() { + super(new TestEnvironment(50)); + } + + @Override + public Flow.Publisher createFlowPublisher(long l) { + return Multi.range(1, 10).reduce(() -> 0, Integer::sum); + } + + @Override + public Flow.Publisher createFailedFlowPublisher() { + return null; + } + + @Override + public long maxElementsFromPublisher() { + return 1; + } +} diff --git a/common/reactive/src/test/java/io/helidon/common/reactive/MultiReduceFullTest.java b/common/reactive/src/test/java/io/helidon/common/reactive/MultiReduceFullTest.java new file mode 100644 index 00000000000..ada67294ae3 --- /dev/null +++ b/common/reactive/src/test/java/io/helidon/common/reactive/MultiReduceFullTest.java @@ -0,0 +1,121 @@ +/* + * Copyright (c) 2020 Oracle and/or its affiliates. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package io.helidon.common.reactive; + +import org.testng.annotations.Test; + +import java.io.IOException; + +public class MultiReduceFullTest { + + @Test + public void empty() { + TestSubscriber ts = new TestSubscriber<>(Long.MAX_VALUE); + + Multi.empty().reduce(() -> 100, Integer::sum) + .subscribe(ts); + + ts.assertResult(100); + } + + @Test + public void single() { + TestSubscriber ts = new TestSubscriber<>(Long.MAX_VALUE); + + Multi.singleton(1).reduce(() -> 100, Integer::sum) + .subscribe(ts); + + ts.assertResult(101); + } + + @Test + public void range() { + TestSubscriber ts = new TestSubscriber<>(Long.MAX_VALUE); + + Multi.range(1, 10).reduce(() -> 100, Integer::sum) + .subscribe(ts); + + ts.assertResult(155); + } + + @Test + public void error() { + TestSubscriber ts = new TestSubscriber<>(); + + Multi.error(new IOException()).reduce(() -> 100, Integer::sum) + .subscribe(ts); + + ts.assertFailure(IOException.class); + } + + @Test + public void supplierNull() { + TestSubscriber ts = new TestSubscriber<>(); + + Multi.range(1, 5) + .reduce(() -> null, Integer::sum) + .subscribe(ts); + + ts.assertFailure(NullPointerException.class); + } + + @Test + public void supplierCrash() { + TestSubscriber ts = new TestSubscriber<>(); + + Multi.range(1, 5) + .reduce(() -> { throw new IllegalArgumentException(); }, Integer::sum) + .subscribe(ts); + + ts.assertFailure(IllegalArgumentException.class); + } + + @Test + public void reducerNull() { + TestSubscriber ts = new TestSubscriber<>(); + + Multi.range(1, 5) + .reduce(() -> 100, (a, b) -> null) + .subscribe(ts); + + ts.assertFailure(NullPointerException.class); + } + + @Test + public void reducerCrash() { + TestSubscriber ts = new TestSubscriber<>(); + + Multi.range(1, 5) + .reduce(() -> 100, (a, b) -> { throw new IllegalArgumentException(); }) + .subscribe(ts); + + ts.assertFailure(IllegalArgumentException.class); + } + + @Test + public void cancel() { + TestSubscriber ts = new TestSubscriber<>(); + + Multi.never() + .reduce(() -> 100, Integer::sum) + .subscribe(ts); + + ts.cancel(); + + ts.assertEmpty(); + } +} diff --git a/common/reactive/src/test/java/io/helidon/common/reactive/MultiReduceTckTest.java b/common/reactive/src/test/java/io/helidon/common/reactive/MultiReduceTckTest.java new file mode 100644 index 00000000000..e8fefdc1eeb --- /dev/null +++ b/common/reactive/src/test/java/io/helidon/common/reactive/MultiReduceTckTest.java @@ -0,0 +1,48 @@ +/* + * Copyright (c) 2020 Oracle and/or its affiliates. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package io.helidon.common.reactive; + +import org.reactivestreams.tck.TestEnvironment; +import org.reactivestreams.tck.flow.FlowPublisherVerification; +import org.testng.annotations.Test; + +import java.util.List; +import java.util.concurrent.Flow; +import java.util.stream.IntStream; + +@Test +public class MultiReduceTckTest extends FlowPublisherVerification { + + public MultiReduceTckTest() { + super(new TestEnvironment(50)); + } + + @Override + public Flow.Publisher createFlowPublisher(long l) { + return Multi.range(1, 10).reduce(Integer::sum); + } + + @Override + public Flow.Publisher createFailedFlowPublisher() { + return null; + } + + @Override + public long maxElementsFromPublisher() { + return 1; + } +} diff --git a/common/reactive/src/test/java/io/helidon/common/reactive/MultiReduceTest.java b/common/reactive/src/test/java/io/helidon/common/reactive/MultiReduceTest.java new file mode 100644 index 00000000000..e2d13d3b4d2 --- /dev/null +++ b/common/reactive/src/test/java/io/helidon/common/reactive/MultiReduceTest.java @@ -0,0 +1,100 @@ +/* + * Copyright (c) 2020 Oracle and/or its affiliates. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package io.helidon.common.reactive; + +import org.testng.annotations.Test; + +import java.io.IOException; + +public class MultiReduceTest { + + @Test + public void empty() { + TestSubscriber ts = new TestSubscriber<>(); + + Multi.empty().reduce(Integer::sum) + .subscribe(ts); + + ts.assertResult(); + } + + @Test + public void single() { + TestSubscriber ts = new TestSubscriber<>(Long.MAX_VALUE); + + Multi.singleton(1).reduce(Integer::sum) + .subscribe(ts); + + ts.assertResult(1); + } + + @Test + public void range() { + TestSubscriber ts = new TestSubscriber<>(Long.MAX_VALUE); + + Multi.range(1, 10).reduce(Integer::sum) + .subscribe(ts); + + ts.assertResult(55); + } + + @Test + public void error() { + TestSubscriber ts = new TestSubscriber<>(); + + Multi.error(new IOException()).reduce(Integer::sum) + .subscribe(ts); + + ts.assertFailure(IOException.class); + } + + @Test + public void reducerNull() { + TestSubscriber ts = new TestSubscriber<>(); + + Multi.range(1, 5) + .reduce((a, b) -> null) + .subscribe(ts); + + ts.assertFailure(NullPointerException.class); + } + + + @Test + public void reducerCrash() { + TestSubscriber ts = new TestSubscriber<>(); + + Multi.range(1, 5) + .reduce((a, b) -> { throw new IllegalArgumentException(); }) + .subscribe(ts); + + ts.assertFailure(IllegalArgumentException.class); + } + + @Test + public void cancel() { + TestSubscriber ts = new TestSubscriber<>(); + + Multi.never() + .reduce(Integer::sum) + .subscribe(ts); + + ts.cancel(); + + ts.assertEmpty(); + } +} diff --git a/common/reactive/src/test/java/io/helidon/common/reactive/jmh/ShakespearePlaysScrabbleWithHelidonReactiveOpt.java b/common/reactive/src/test/java/io/helidon/common/reactive/jmh/ShakespearePlaysScrabbleWithHelidonReactiveOpt.java index 2c7c18d11d3..d0ef8fb4ee8 100644 --- a/common/reactive/src/test/java/io/helidon/common/reactive/jmh/ShakespearePlaysScrabbleWithHelidonReactiveOpt.java +++ b/common/reactive/src/test/java/io/helidon/common/reactive/jmh/ShakespearePlaysScrabbleWithHelidonReactiveOpt.java @@ -125,8 +125,7 @@ public List>> measureThroughput() throws Exception { Multi.from(histoOfLetters.apply(word)) .flatMapIterable(HashMap::entrySet) .map(blank) - .collectStream(Collectors.summarizingLong(value -> value)) - .map(LongSummaryStatistics::getSum) + .reduce(Long::sum) ; @@ -143,8 +142,7 @@ public List>> measureThroughput() throws Exception { HashMap::entrySet ) .map(letterScore) - .collectStream(Collectors.summarizingInt(value -> value)) - .map(v -> (int)v.getSum()) + .reduce(Integer::sum) ; // Placing the word on the board @@ -164,8 +162,7 @@ public List>> measureThroughput() throws Exception { Function> bonusForDoubleLetter = word -> toBeMaxed.apply(word) .map(scoreOfALetter) - .collectStream(Collectors.summarizingInt(value -> value)) - .map(v -> (int)v.getMax()) + .reduce(Integer::max) ; // score of the word put on the board @@ -175,11 +172,8 @@ public List>> measureThroughput() throws Exception { Multi.from(score2.apply(word)), Multi.from(bonusForDoubleLetter.apply(word)) ) - .collectStream(Collectors.summarizingInt(value -> value)) - .map(w -> { - int v = (int) w.getSum(); - return v * 2 + (word.length() == 7 ? 50 : 0); - }) + .reduce(Integer::sum) + .map(v -> v * 2 + (word.length() == 7 ? 50 : 0)) ; Function>, Single>>> buildHistoOnScore =