Skip to content

Commit

Permalink
Add more run methods for flow (#72)
Browse files Browse the repository at this point in the history
  • Loading branch information
uini223 authored Dec 18, 2024
1 parent 1c4a4e6 commit 65aef5d
Show file tree
Hide file tree
Showing 3 changed files with 480 additions and 57 deletions.
123 changes: 122 additions & 1 deletion flows/src/main/java/com/softwaremill/jox/flows/Flow.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,24 @@
import static com.softwaremill.jox.structured.Scopes.unsupervised;

import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiFunction;
import java.util.function.BinaryOperator;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.function.Supplier;

import com.softwaremill.jox.Channel;
import com.softwaremill.jox.Sink;
import com.softwaremill.jox.Source;
import com.softwaremill.jox.structured.UnsupervisedScope;

Expand Down Expand Up @@ -68,10 +75,28 @@ public List<T> runToList() throws Exception {
* Required for creating async forks responsible for writing to channel
*/
public Source<T> runToChannel(UnsupervisedScope scope) {
return runToChannelInternal(scope, () -> Channel.withScopedBufferSize());
}

/** The flow is run in the background, and each emitted element is sent to a newly created channel, which is then returned as the result
* of this method.
* <p>
* Method does not block until the flow completes.
*
* @param scope
* Required for creating async forks responsible for writing to channel
* @param bufferCapacity
* Specifies buffer capacity of created channel
*/
public Source<T> runToChannel(UnsupervisedScope scope, int bufferCapacity) {
return runToChannelInternal(scope, () -> new Channel<>(bufferCapacity));
}

private Source<T> runToChannelInternal(UnsupervisedScope scope, Supplier<Channel<T>> channelProvider) {
if (last instanceof SourceBackedFlowStage<T>(Source<T> source)) {
return source;
} else {
Channel<T> channel = Channel.withScopedBufferSize();
Channel<T> channel = channelProvider.get();
runLastToChannelAsync(scope, channel);
return channel;
}
Expand Down Expand Up @@ -103,6 +128,102 @@ public void runDrain() throws Exception {
runForeach(t -> {});
}

/**
* Passes each element emitted by this flow to the given sink. Blocks until the flow completes.
* <p>
* Errors are always propagated to the provided sink. Successful flow completion is propagated when `propagateDone` is set to `true`.
* <p>
* Fatal errors are rethrown.
*/
public void runPipeToSink(Sink<T> sink, boolean propagateDone) {
try {
last.run(sink::send);
if (propagateDone) {
sink.doneOrClosed();
}
} catch (Exception e) {
sink.error(e);
} catch (Throwable t) {
sink.error(t);
throw t;
}
}

/** Returns the last element emitted by this flow, wrapped in {@link Optional#of}, or {@link Optional#empty()} when this source is empty. */
public Optional<T> runLastOptional() throws Exception {
AtomicReference<Optional<T>> value = new AtomicReference<>(Optional.empty());
last.run(t -> value.set(Optional.of(t)));
return value.get();
}

/** Returns the last element emitted by this flow, or throws {@link NoSuchElementException} when the flow emits no elements (is empty).
*
* @throws NoSuchElementException
* When this flow is empty.
*/
public T runLast() throws Exception {
return runLastOptional()
.orElseThrow(() -> new NoSuchElementException("cannot obtain last element from an empty source"));
}

/**
* Applies function `f` on the first and the following (if available) elements emitted by this flow. The returned value is used as the
* next current value and `f` is applied again with the next value emitted by this source. The operation is repeated until this flow
* emits all elements. This is similar operation to {@link Flow#runFold} but it uses the first emitted element as `zero`.
*
* @param f
* A binary function (a function that takes two arguments) that is applied to the current and next values emitted by this flow.
* @return
* Combined value retrieved from running function `f` on all flow elements in a cumulative manner where result of the previous call is
* used as an input value to the next.
* @throws NoSuchElementException
* When this flow is empty.
*/
public T runReduce(BinaryOperator<T> f) throws Exception {
AtomicReference<Optional<T>> current = new AtomicReference<>(Optional.empty());
last.run(t -> {
current.updateAndGet(currentValue -> currentValue
.map(u -> f.apply(u, t))
.or(() -> Optional.of(t)));
});

return current.get().orElseThrow(() -> new NoSuchElementException("cannot reduce an empty flow"));
}

/**
* Returns the list of up to `n` last elements emitted by this flow. Less than `n` elements is returned when this flow emits less
* elements than requested. Empty list is returned when called on an empty flow.
*
* @param n
* Number of elements to be taken from the end of this flow. It is expected that `n >= 0`.
* @return
* A list of up to `n` last elements from this flow.
*/
public List<T> runTakeLast(int n) throws Exception {
if (n < 0) {
throw new IllegalArgumentException("requirement failed: n must be >= 0");
}
if (n == 0) {
runDrain();
return Collections.emptyList();
} else if (n == 1) {
return runLastOptional()
.map(Collections::singletonList)
.orElse(Collections.emptyList());
} else {
List<T> buffer = new LinkedList<>();

last.run(t -> {
if (buffer.size() == n) {
buffer.removeFirst();
}
buffer.add(t);
});

return new ArrayList<>(buffer);
}
}

// endregion

// region Flow operations
Expand Down
Loading

0 comments on commit 65aef5d

Please sign in to comment.