From 65aef5d4317847e5079f14701c223c19b0c4ea03 Mon Sep 17 00:00:00 2001 From: emilb Date: Wed, 18 Dec 2024 14:27:57 +0100 Subject: [PATCH] Add more run methods for flow (#72) --- .../java/com/softwaremill/jox/flows/Flow.java | 123 +++++- .../jox/flows/FlowRunOperationsTest.java | 358 ++++++++++++++++++ .../com/softwaremill/jox/flows/FlowTest.java | 56 --- 3 files changed, 480 insertions(+), 57 deletions(-) create mode 100644 flows/src/test/java/com/softwaremill/jox/flows/FlowRunOperationsTest.java diff --git a/flows/src/main/java/com/softwaremill/jox/flows/Flow.java b/flows/src/main/java/com/softwaremill/jox/flows/Flow.java index d3c75b7..6e45946 100644 --- a/flows/src/main/java/com/softwaremill/jox/flows/Flow.java +++ b/flows/src/main/java/com/softwaremill/jox/flows/Flow.java @@ -5,17 +5,24 @@ import static com.softwaremill.jox.structured.Scopes.unsupervised; import java.util.ArrayList; +import java.util.Collections; +import java.util.LinkedList; import java.util.List; +import java.util.NoSuchElementException; +import java.util.Optional; import java.util.concurrent.ExecutionException; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiFunction; +import java.util.function.BinaryOperator; import java.util.function.Consumer; import java.util.function.Function; import java.util.function.Predicate; +import java.util.function.Supplier; import com.softwaremill.jox.Channel; +import com.softwaremill.jox.Sink; import com.softwaremill.jox.Source; import com.softwaremill.jox.structured.UnsupervisedScope; @@ -68,10 +75,28 @@ public List runToList() throws Exception { * Required for creating async forks responsible for writing to channel */ public Source runToChannel(UnsupervisedScope scope) { + return runToChannelInternal(scope, () -> Channel.withScopedBufferSize()); + } + + /** The flow is run in the background, and each emitted element is sent to a newly created channel, which is then returned as the result + * of this method. + *

+ * Method does not block until the flow completes. + * + * @param scope + * Required for creating async forks responsible for writing to channel + * @param bufferCapacity + * Specifies buffer capacity of created channel + */ + public Source runToChannel(UnsupervisedScope scope, int bufferCapacity) { + return runToChannelInternal(scope, () -> new Channel<>(bufferCapacity)); + } + + private Source runToChannelInternal(UnsupervisedScope scope, Supplier> channelProvider) { if (last instanceof SourceBackedFlowStage(Source source)) { return source; } else { - Channel channel = Channel.withScopedBufferSize(); + Channel channel = channelProvider.get(); runLastToChannelAsync(scope, channel); return channel; } @@ -103,6 +128,102 @@ public void runDrain() throws Exception { runForeach(t -> {}); } + /** + * Passes each element emitted by this flow to the given sink. Blocks until the flow completes. + *

+ * Errors are always propagated to the provided sink. Successful flow completion is propagated when `propagateDone` is set to `true`. + *

+ * Fatal errors are rethrown. + */ + public void runPipeToSink(Sink sink, boolean propagateDone) { + try { + last.run(sink::send); + if (propagateDone) { + sink.doneOrClosed(); + } + } catch (Exception e) { + sink.error(e); + } catch (Throwable t) { + sink.error(t); + throw t; + } + } + + /** Returns the last element emitted by this flow, wrapped in {@link Optional#of}, or {@link Optional#empty()} when this source is empty. */ + public Optional runLastOptional() throws Exception { + AtomicReference> value = new AtomicReference<>(Optional.empty()); + last.run(t -> value.set(Optional.of(t))); + return value.get(); + } + + /** Returns the last element emitted by this flow, or throws {@link NoSuchElementException} when the flow emits no elements (is empty). + * + * @throws NoSuchElementException + * When this flow is empty. + */ + public T runLast() throws Exception { + return runLastOptional() + .orElseThrow(() -> new NoSuchElementException("cannot obtain last element from an empty source")); + } + + /** + * Applies function `f` on the first and the following (if available) elements emitted by this flow. The returned value is used as the + * next current value and `f` is applied again with the next value emitted by this source. The operation is repeated until this flow + * emits all elements. This is similar operation to {@link Flow#runFold} but it uses the first emitted element as `zero`. + * + * @param f + * A binary function (a function that takes two arguments) that is applied to the current and next values emitted by this flow. + * @return + * Combined value retrieved from running function `f` on all flow elements in a cumulative manner where result of the previous call is + * used as an input value to the next. + * @throws NoSuchElementException + * When this flow is empty. + */ + public T runReduce(BinaryOperator f) throws Exception { + AtomicReference> current = new AtomicReference<>(Optional.empty()); + last.run(t -> { + current.updateAndGet(currentValue -> currentValue + .map(u -> f.apply(u, t)) + .or(() -> Optional.of(t))); + }); + + return current.get().orElseThrow(() -> new NoSuchElementException("cannot reduce an empty flow")); + } + + /** + * Returns the list of up to `n` last elements emitted by this flow. Less than `n` elements is returned when this flow emits less + * elements than requested. Empty list is returned when called on an empty flow. + * + * @param n + * Number of elements to be taken from the end of this flow. It is expected that `n >= 0`. + * @return + * A list of up to `n` last elements from this flow. + */ + public List runTakeLast(int n) throws Exception { + if (n < 0) { + throw new IllegalArgumentException("requirement failed: n must be >= 0"); + } + if (n == 0) { + runDrain(); + return Collections.emptyList(); + } else if (n == 1) { + return runLastOptional() + .map(Collections::singletonList) + .orElse(Collections.emptyList()); + } else { + List buffer = new LinkedList<>(); + + last.run(t -> { + if (buffer.size() == n) { + buffer.removeFirst(); + } + buffer.add(t); + }); + + return new ArrayList<>(buffer); + } + } + // endregion // region Flow operations diff --git a/flows/src/test/java/com/softwaremill/jox/flows/FlowRunOperationsTest.java b/flows/src/test/java/com/softwaremill/jox/flows/FlowRunOperationsTest.java new file mode 100644 index 0000000..ac3bd8d --- /dev/null +++ b/flows/src/test/java/com/softwaremill/jox/flows/FlowRunOperationsTest.java @@ -0,0 +1,358 @@ +package com.softwaremill.jox.flows; + +import com.softwaremill.jox.Channel; +import com.softwaremill.jox.Source; +import com.softwaremill.jox.structured.Scopes; +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.List; +import java.util.NoSuchElementException; +import java.util.Optional; +import java.util.concurrent.ExecutionException; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; + +public class FlowRunOperationsTest { + + @Test + void runForEach_shouldRun() throws Throwable { + // given + Flow flow = Flows.fromValues(1, 2, 3); + List results = new ArrayList<>(); + + // when + flow.runForeach(results::add); + + // then + assertEquals(List.of(1, 2, 3), results); + } + + @Test + void runToList_shouldRun() throws Throwable { + // given + Flow flow = Flows.fromValues(1, 2, 3); + + // when + List results = flow.runToList(); + + // then + assertEquals(List.of(1, 2, 3), results); + } + + @Test + void runToChannel_shouldRun() throws Throwable { + Scopes.unsupervised(scope -> { + // given + Flow flow = Flows.fromValues(1, 2, 3); + + // when + Source source = flow.runToChannel(scope); + + // then + assertEquals(1, source.receive()); + assertEquals(2, source.receive()); + assertEquals(3, source.receive()); + return null; + }); + } + + @Test + void runToChannel_shouldReturnOriginalSourceWhenRunningASourcedBackedFlow() throws Throwable { + Scopes.unsupervised(scope -> { + // given + Channel channel = Channel.newUnlimitedChannel(); + Flow flow = Flows.fromSource(channel); + + // when + Source receivedChannel = flow.runToChannel(scope); + + // then + assertEquals(channel, receivedChannel); + return null; + }); + } + + @Test + void runToChannel_shouldRunWithBufferSizeDefinedInScope() throws Throwable { + ScopedValue.where(Channel.BUFFER_SIZE, 2).call(() -> { + Scopes.unsupervised(scope -> { + // given + Flow flow = Flows.fromValues(1, 2, 3); + + // when + Source source = flow.runToChannel(scope); + + // then + assertEquals(1, source.receive()); + assertEquals(2, source.receive()); + assertEquals(3, source.receive()); + return null; + }); + return null; + }); + } + + @Test + void runToChannelWithBufferCapacity_shouldRun() throws Throwable { + Scopes.unsupervised(scope -> { + // given + Flow flow = Flows.fromValues(1, 2, 3); + + // when + Source source = flow.runToChannel(scope, 2); + + // then + assertEquals(1, source.receive()); + assertEquals(2, source.receive()); + assertEquals(3, source.receive()); + return null; + }); + } + + @Test + void runToChannelWithBufferCapacity_shouldReturnOriginalSourceWhenRunningASourcedBackedFlow() throws Throwable { + Scopes.unsupervised(scope -> { + // given + Channel channel = Channel.newUnlimitedChannel(); + Flow flow = Flows.fromSource(channel); + + // when + Source receivedChannel = flow.runToChannel(scope, 10); + + // then + assertEquals(channel, receivedChannel); + return null; + }); + } + + @Test + void runFold_shouldThrowExceptionForFailedFlow() { + // given + Flow flow = Flows.failed(new IllegalStateException()); + + // when & then + assertThrows(IllegalStateException.class, () -> + flow + .runFold(0, (acc, n) -> Integer.valueOf(acc.toString() + n))); + } + + @Test + void runFold_shouldThrowExceptionThrownInFunctionF() { + // given + Flow flow = Flows.fromValues(1); + + // when & then + RuntimeException thrown = assertThrows(RuntimeException.class, () -> { + flow + .runFold(0, (acc, n) -> { throw new RuntimeException("Function `f` is broken"); }); + }); + assertEquals("Function `f` is broken", thrown.getMessage()); + } + + @Test + void runFold_shouldReturnZeroValueFromFoldOnEmptySource() throws Exception { + // given + Flow flow = Flows.empty(); + + // when & then + assertEquals(0, + flow + .runFold(0, (acc, n) -> Integer.valueOf(acc.toString() + n))); + } + + @Test + void runFold_shouldReturnFoldOnNonEmptyFold() throws Exception { + // given + Flow flow = Flows.fromValues(1, 2); + + // when & then + assertEquals(3, flow.runFold(0, Integer::sum)); + } + + @Test + void runTakeLast_shouldThrowChannelClosedExceptionErrorForSourceFailedWithoutException() { + // given + Flow flow = Flows.failed(new IllegalStateException()); + + // when & then + assertThrows(IllegalStateException.class, () -> flow.runTakeLast(1)); + } + + @Test + void runTakeLast_shouldFailToTakeLastWhenNIsLessThanZero() { + // given + Flow flow = Flows.empty(); + + // when & then + IllegalArgumentException exception = assertThrows(IllegalArgumentException.class, + () -> flow.runTakeLast(-1)); + assertEquals("requirement failed: n must be >= 0", exception.getMessage()); + } + + @Test + void runTakeLast_shouldReturnEmptyListForEmptySource() throws Exception { + // given + Flow flow = Flows.empty(); + + // when & then + assertEquals(List.of(), flow.runTakeLast(1)); + } + + @Test + void runTakeLast_shouldReturnEmptyListWhenNIsZeroAndListIsNotEmpty() throws Exception { + // given + Flow flow = Flows.fromValues(1); + + // when & then + assertEquals(List.of(), flow.runTakeLast(0)); + } + + @Test + void runTakeLast_shouldReturnListWithAllElementsIfSourceIsSmallerThanRequestedNumber() throws Exception { + // given + Flow flow = Flows.fromValues(1, 2); + + // when & then + assertEquals(List.of(1, 2), flow.runTakeLast(3)); + } + + @Test + void runTakeLast_shouldReturnLastNElementsFromSource() throws Exception { + // given + Flow flow = Flows.fromValues(1, 2, 3, 4, 5); + + // when & then + assertEquals(List.of(4, 5), flow.runTakeLast(2)); + } + + @Test + void runReduce_shouldThrowNoSuchElementExceptionForReduceOverEmptySource() { + // given + Flow flow = Flows.empty(); + + // when & then + NoSuchElementException exception = assertThrows(NoSuchElementException.class, () -> flow.runReduce(Integer::sum)); + assertEquals("cannot reduce an empty flow", exception.getMessage()); + } + + @Test + void runReduce_shouldThrowExceptionThrownInFunctionFWhenFThrows() { + // given + Flow flow = Flows.fromValues(1, 2); + + // when & then + RuntimeException exception = assertThrows(RuntimeException.class, () -> + flow.runReduce((a, b) -> {throw new RuntimeException("Function `f` is broken");})); + assertEquals("Function `f` is broken", exception.getMessage()); + } + + @Test + void runReduce_shouldReturnFirstElementFromReduceOverSingleElementSource() throws Exception { + // given + Flow flow = Flows.fromValues(1); + + // when & then + assertEquals(1, flow.runReduce(Integer::sum)); + } + + @Test + void runReduce_shouldRunReduceOverNonEmptySource() throws Exception { + // given + Flow flow = Flows.fromValues(1, 2); + + // when & then + assertEquals(3, flow.runReduce(Integer::sum)); + } + + @Test + void runLast_shouldThrowNoSuchElementExceptionForEmptySource() { + // given + Flow flow = Flows.empty(); + + // when & then + NoSuchElementException exception = assertThrows(NoSuchElementException.class, () -> flow.runLast()); + assertEquals("cannot obtain last element from an empty source", exception.getMessage()); + } + + @Test + void runLast_shouldThrowRuntimeExceptionWithMessageDuringRetrieval() { + // given + Flow flow = Flows.failed(new RuntimeException("source is broken")); + + // when & then + RuntimeException exception = assertThrows(RuntimeException.class, () -> flow.runLast()); + assertEquals("source is broken", exception.getMessage()); + } + + @Test + void runLast_shouldReturnLastElementForNonEmptySource() throws Exception { + // given + Flow flow = Flows.fromValues(1, 2); + + // when & then + assertEquals(2, flow.runLast()); + } + + @Test + void runLastOptional_returnNoneForEmptyFlow() throws Exception { + // given + Flow flow = Flows.empty(); + + // when & then + assertEquals(Optional.empty(), flow.runLastOptional()); + } + + @Test + void runLastOptional_returnSomeForNonEmptyFlow() throws Exception { + // given + Flow s = Flows.fromValues(1, 2, 3); + + // when & then + assertEquals(Optional.of(3), s.runLastOptional()); + } + + @Test + void runLastOptional_throwExceptionWithMessageDuringRetrieval() { + // given + Flow flow = Flows.failed(new RuntimeException("source is broken")); + + // when & then + RuntimeException exception = assertThrows(RuntimeException.class, () -> flow.runLastOptional()); + assertEquals("source is broken", exception.getMessage()); + } + + @Test + void runPipeToSink_pipeOneSourceToAnother() throws ExecutionException, InterruptedException { + Scopes.supervised(scope -> { + Flow c1 = Flows.fromValues(1, 2, 3); + Channel c2 = new Channel<>(); + + scope.fork(() -> { + c1.runPipeToSink(c2, false); + c2.done(); + return null; + }); + + assertEquals(List.of(1, 2, 3), c2.toList()); + return null; + }); + } + + @Test + void runPipeToSink_pipeOneSourceToAnotherWithDonePropagation() throws ExecutionException, InterruptedException { + Scopes.supervised(scope -> { + Flow c1 = Flows.fromValues(1, 2, 3); + Channel c2 = new Channel<>(); + + scope.fork(() -> { + c1.runPipeToSink(c2, true); + return null; + }); + + assertEquals(List.of(1, 2, 3), c2.toList()); + return null; + }); + } +} diff --git a/flows/src/test/java/com/softwaremill/jox/flows/FlowTest.java b/flows/src/test/java/com/softwaremill/jox/flows/FlowTest.java index 25cdea9..22ecaf5 100644 --- a/flows/src/test/java/com/softwaremill/jox/flows/FlowTest.java +++ b/flows/src/test/java/com/softwaremill/jox/flows/FlowTest.java @@ -9,8 +9,6 @@ import com.softwaremill.jox.Channel; import com.softwaremill.jox.ChannelClosedException; -import com.softwaremill.jox.Source; -import com.softwaremill.jox.structured.Scopes; import org.junit.jupiter.api.Test; class FlowTest { @@ -40,60 +38,6 @@ void shouldRunToList() throws Throwable { assertEquals(List.of(1, 2, 3), results); } - @Test - void shouldRunToChannel() throws Throwable { - Scopes.unsupervised(scope -> { - // given - Flow flow = Flows.fromValues(1, 2, 3); - - // when - Source source = flow.runToChannel(scope); - - // then - assertEquals(1, source.receive()); - assertEquals(2, source.receive()); - assertEquals(3, source.receive()); - return null; - }); - } - - @Test - void shouldRunToChannelWithBufferSizeDefinedInScope() throws Throwable { - ScopedValue.where(Channel.BUFFER_SIZE, 2).call(() -> { - Scopes.unsupervised(scope -> { - // given - Flow flow = Flows.fromValues(1, 2, 3); - - // when - Source source = flow.runToChannel(scope); - - // then - assertEquals(1, source.receive()); - assertEquals(2, source.receive()); - assertEquals(3, source.receive()); - return null; - }); - return null; - }); - } - - - @Test - void shouldReturnOriginalSourceWhenRunningASourcedBackedFlow() throws Throwable { - Scopes.unsupervised(scope -> { - // given - Channel channel = Channel.newUnlimitedChannel(); - Flow flow = Flows.fromSource(channel); - - // when - Source receivedChannel = flow.runToChannel(scope); - - // then - assertEquals(channel, receivedChannel); - return null; - }); - } - @Test void shouldThrowExceptionForFailedFlow() { assertThrows(IllegalStateException.class, () -> {