Skip to content

Commit

Permalink
* add missing documentation
Browse files Browse the repository at this point in the history
* suppress warnings
  • Loading branch information
emil-bar committed Dec 12, 2024
1 parent b51faa1 commit 8342557
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 1 deletion.
6 changes: 6 additions & 0 deletions flows/src/main/java/com/softwaremill/jox/FlowEmit.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,13 @@

import java.util.concurrent.ExecutionException;

/**
* Instances of this interface should be considered thread-unsafe, and only used within the scope in which they have been obtained, e.g. as
* part of {@link Flows#usingEmit} or {@link Flow#mapUsingEmit}.
*/
public interface FlowEmit<T> {

/** Emit a value to be processed downstream. Blocks until the value is fully processed, or throws an exception if an error occurred. */
void apply(T t) throws Exception;

static <T> void channelToEmit(Source<T> source, FlowEmit<T> emit) throws Exception {
Expand Down
4 changes: 4 additions & 0 deletions flows/src/main/java/com/softwaremill/jox/FlowStage.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
package com.softwaremill.jox;

/**
* Contains the logic for running a single flow stage. As part of `run`s implementation, previous flow stages might be run, either
* synchronously or asynchronously.
*/
public interface FlowStage<T> {
void run(FlowEmit<T> emit) throws Exception;
}
6 changes: 5 additions & 1 deletion flows/src/main/java/com/softwaremill/jox/Flows.java
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ public static <T> Flow<T> fromFork(Fork<T> f) {
public static <T> Flow<T> iterate(T zero, Function<T, T> mappingFunction) {
return usingEmit(emit -> {
T t = zero;
//noinspection InfiniteLoopStatement
while (true) {
emit.apply(t);
t = mappingFunction.apply(t);
Expand Down Expand Up @@ -109,12 +110,14 @@ public static Flow<Integer> range(int from, int to, int step) {
*/
public static <T> Flow<T> tick(Duration interval, T value) {
return usingEmit(emit -> {
//noinspection InfiniteLoopStatement
while (true) {
long start = System.nanoTime();
emit.apply(value);
long end = System.nanoTime();
long sleep = interval.toNanos() - (end - start);
if (sleep > 0) {
//noinspection BusyWait
Thread.sleep(TimeUnit.NANOSECONDS.toMillis(sleep), (int) (sleep % 1_000_000));
}
}
Expand All @@ -134,6 +137,7 @@ public static <T> Flow<T> repeat(T element) {
*/
public static <T> Flow<T> repeatEval(Supplier<T> supplierFunction) {
return usingEmit(emit -> {
//noinspection InfiniteLoopStatement
while (true) {
emit.apply(supplierFunction.get());
}
Expand Down Expand Up @@ -213,7 +217,7 @@ public static <T> Flow<T> fromFutureSource(CompletableFuture<Source<T>> from) {
* The {@link java.lang.Exception} to fail with
*/
public static <T> Flow<T> failed(Exception t) {
return new Flow<>(emit -> {
return usingEmit(emit -> {
throw t;
});
}
Expand Down

0 comments on commit 8342557

Please sign in to comment.