diff --git a/flows/src/main/java/com/softwaremill/jox/flows/Flow.java b/flows/src/main/java/com/softwaremill/jox/flows/Flow.java index da1b9e2..6e45946 100644 --- a/flows/src/main/java/com/softwaremill/jox/flows/Flow.java +++ b/flows/src/main/java/com/softwaremill/jox/flows/Flow.java @@ -15,6 +15,7 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiFunction; +import java.util.function.BinaryOperator; import java.util.function.Consumer; import java.util.function.Function; import java.util.function.Predicate; @@ -178,13 +179,12 @@ public T runLast() throws Exception { * @throws NoSuchElementException * When this flow is empty. */ - public U runReduce(BiFunction f) throws Exception { - AtomicReference> current = new AtomicReference<>(Optional.empty()); + public T runReduce(BinaryOperator f) throws Exception { + AtomicReference> current = new AtomicReference<>(Optional.empty()); last.run(t -> { - // noinspection unchecked current.updateAndGet(currentValue -> currentValue - .map(u -> f.apply(u, (U) t)) - .or(() -> Optional.of((U) t))); + .map(u -> f.apply(u, t)) + .or(() -> Optional.of(t))); }); return current.get().orElseThrow(() -> new NoSuchElementException("cannot reduce an empty flow")); diff --git a/flows/src/test/java/com/softwaremill/jox/flows/FlowRunOperationsTest.java b/flows/src/test/java/com/softwaremill/jox/flows/FlowRunOperationsTest.java index cbf5fc0..ac3bd8d 100644 --- a/flows/src/test/java/com/softwaremill/jox/flows/FlowRunOperationsTest.java +++ b/flows/src/test/java/com/softwaremill/jox/flows/FlowRunOperationsTest.java @@ -230,7 +230,7 @@ void runTakeLast_shouldReturnLastNElementsFromSource() throws Exception { @Test void runReduce_shouldThrowNoSuchElementExceptionForReduceOverEmptySource() { // given - Flow flow = Flows.empty(); + Flow flow = Flows.empty(); // when & then NoSuchElementException exception = assertThrows(NoSuchElementException.class, () -> flow.runReduce(Integer::sum));