diff --git a/README.md b/README.md index f446197..23c388c 100644 --- a/README.md +++ b/README.md @@ -209,6 +209,15 @@ class Demo6 { } ``` +### Selecting from "done" channels + +Receive clauses, for which channels are "done", will be skipped, and `select` with restart (as long as there are any +clauses left). This is motivated by the fact that a "done" channel is not in an error state, but signals that there are +no more values; while there might be more values available from other clauses. + +Optionally, clauses created with `Channel.receiveOrDoneClause`, will cause `select` to throw/ return when the associated +channel is done, bypassing the behavior described above. + ## Performance The project includes benchmarks implemented using JMH - both for the `Channel`, as well as for some built-in Java diff --git a/core/src/main/java/com/softwaremill/jox/Channel.java b/core/src/main/java/com/softwaremill/jox/Channel.java index d8a96c0..9139bd4 100644 --- a/core/src/main/java/com/softwaremill/jox/Channel.java +++ b/core/src/main/java/com/softwaremill/jox/Channel.java @@ -774,7 +774,7 @@ private void updateCellClose(Segment segment, int i) { } } case StoredSelectClause ss -> { - ss.getSelect().channelClosed(closedReason.get()); + ss.getSelect().channelClosed(ss, closedReason.get()); // not setting the state & updating counters, as each non-selected stored select cell will be // cleaned up, setting an interrupted state (and informing the segment) } @@ -824,6 +824,21 @@ public SelectClause receiveClause() { @Override public SelectClause receiveClause(Function callback) { + return receiveClause(callback, true); + } + + @Override + public SelectClause receiveOrDoneClause() { + //noinspection unchecked + return receiveOrDoneClause((Function) IDENTITY); + } + + @Override + public SelectClause receiveOrDoneClause(Function callback) { + return receiveClause(callback, false); + } + + private SelectClause receiveClause(Function callback, boolean skipWhenDone) { return new SelectClause<>() { @Override Channel getChannel() { @@ -845,6 +860,11 @@ U transformedRawValue(Object rawValue) { //noinspection unchecked return callback.apply((T) rawValue); } + + @Override + boolean skipWhenDone() { + return skipWhenDone; + } }; } @@ -877,6 +897,12 @@ Object register(SelectInstance select) { U transformedRawValue(Object rawValue) { return callback.get(); } + + @Override + boolean skipWhenDone() { + // sending to a done channel is probably wrong, skipping such channels is not allowed + return false; + } }; } diff --git a/core/src/main/java/com/softwaremill/jox/CollectSource.java b/core/src/main/java/com/softwaremill/jox/CollectSource.java index 121105b..81abd74 100644 --- a/core/src/main/java/com/softwaremill/jox/CollectSource.java +++ b/core/src/main/java/com/softwaremill/jox/CollectSource.java @@ -79,6 +79,32 @@ public SelectClause receiveClause(Function callback) { }); } + @Override + public SelectClause receiveOrDoneClause() { + return original.receiveOrDoneClause(v -> { + var t = f.apply(v); + if (t != null) { + return t; + } else { + //noinspection unchecked + return (T) RestartSelectMarker.RESTART; + } + }); + } + + @Override + public SelectClause receiveOrDoneClause(Function callback) { + return original.receiveOrDoneClause(v -> { + var t = f.apply(v); + if (t != null) { + return callback.apply(t); + } else { + //noinspection unchecked + return (U) RestartSelectMarker.RESTART; + } + }); + } + // delegates for closeable channel @Override @@ -121,7 +147,3 @@ public Throwable isError() { return original.isError(); } } - -enum RestartSelectMarker { - RESTART -} diff --git a/core/src/main/java/com/softwaremill/jox/Select.java b/core/src/main/java/com/softwaremill/jox/Select.java index ff885ae..40fe8da 100644 --- a/core/src/main/java/com/softwaremill/jox/Select.java +++ b/core/src/main/java/com/softwaremill/jox/Select.java @@ -44,10 +44,15 @@ public class Select { *

* If a couple of the clauses can be completed immediately, the select is biased towards the clauses that appear * first. + *

+ * If no clauses are given, or all clauses become filtered out, throws {@link ChannelDoneException}. + *

+ * If a receive clause is selected for a channel that is done, select restarts, unless the clause is created with + * {@link Source#receiveOrDoneClause()}. * * @param clauses The clauses, from which one will be selected. Not {@code null}. * @return The value returned by the selected clause. - * @throws ChannelClosedException When any of the channels is closed. + * @throws ChannelClosedException When any of the channels is closed (done or in error), and the select doesn't restart. */ @SafeVarargs public static U select(SelectClause... clauses) throws InterruptedException { @@ -66,18 +71,44 @@ public static U select(SelectClause... clauses) throws InterruptedExcepti *

* If a couple of the clauses can be completed immediately, the select is biased towards the clauses that appear * first. + *

+ * If no clauses are given, or all clauses become filtered out, returns {@link ChannelDone}. + *

+ * If a receive clause is selected for a channel that is done, select restarts, unless the clause is created with + * {@link Source#receiveOrDoneClause()}. * * @param clauses The clauses, from which one will be selected. Not {@code null}. - * @return Either the value returned by the selected clause, or {@link ChannelClosed}, when any of the channels is closed. + * @return Either the value returned by the selected clause, or {@link ChannelClosed}, when any of the channels + * is closed (done or in error), and the select doesn't restart. */ @SafeVarargs public static Object selectSafe(SelectClause... clauses) throws InterruptedException { + var currentClauses = clauses; while (true) { - var r = doSelectSafe(clauses); - // in case a `CollectSource` function filters out the element (the transformation function returns `null`, - // which is represented as a marker because `null` is a valid result of `doSelectSafe`, e.g. for send clauses), - // we need to restart the selection process - if (r != RestartSelectMarker.RESTART) { + if (currentClauses.length == 0) { + // no clauses given, or all clauses were filtered out + return new ChannelDone(); + } + + var r = doSelectSafe(currentClauses); + if (r == RestartSelectMarker.RESTART) { + // in case a `CollectSource` function filters out the element (the transformation function returns `null`, + // which is represented as a marker because `null` is a valid result of `doSelectSafe`, e.g. for send clauses), + // we need to restart the selection process + + // next loop + } else if (r == RestartSelectMarker.RESTART_WITHOUT_DONE) { + // a channel is closed, for which there is a receive clause with skipWhenDone = true + // filtering out done channels, and if there are any left, restarting the selection process + + //noinspection unchecked + currentClauses = Arrays.stream(currentClauses).filter(clause -> { + var channel = clause.getChannel(); + return !clause.skipWhenDone() || channel == null || !channel.isDone(); + }).toArray(SelectClause[]::new); + + // next loop, with filtered clauses + } else { return r; } } @@ -165,6 +196,10 @@ boolean register(SelectClause clause) { storedClauses.add(ss); return true; } + case ChannelDone cd when clause.skipWhenDone() -> { + state.set(SelectState.SKIP_DONE); + return false; + } case ChannelClosed cc -> { // when setting the state, we might override another state: // - a list of clauses to re-register - there's no point in doing that anyway (since the channel is closed) @@ -224,6 +259,10 @@ Object checkStateAndWait() throws InterruptedException { } // else: CAS unsuccessful, retry } + case SelectState.SKIP_DONE -> { + cleanup(null); + return RestartSelectMarker.RESTART_WITHOUT_DONE; + } case ChannelClosed cc -> { cleanup(null); return cc; @@ -315,6 +354,10 @@ boolean trySelect(StoredSelectClause storedSelectClause) { // already interrupted, will be cleaned up soon return false; } + case SelectState.SKIP_DONE -> { + // closed, will be cleaned up soon (& restarted) + return false; + } case ChannelClosed cc -> { // closed, will be cleaned up soon return false; @@ -339,20 +382,26 @@ boolean trySelect(StoredSelectClause storedSelectClause) { } } - void channelClosed(ChannelClosed channelClosed) { + void channelClosed(StoredSelectClause storedSelectClause, ChannelClosed channelClosed) { while (true) { var currentState = state.get(); + Object targetState; + if (channelClosed instanceof ChannelDone && storedSelectClause.getClause().skipWhenDone()) { + targetState = SelectState.SKIP_DONE; + } else { + targetState = channelClosed; + } switch (currentState) { case SelectState.REGISTERING -> { // the channel closed state will be discovered when there's a call to `checkStateAndWait` after registration completes - if (state.compareAndSet(currentState, channelClosed)) { + if (state.compareAndSet(currentState, targetState)) { return; } // else: CAS unsuccessful, retry } case List clausesToReRegister -> { // same as above - if (state.compareAndSet(currentState, channelClosed)) { + if (state.compareAndSet(currentState, targetState)) { return; } // else: CAS unsuccessful, retry @@ -361,6 +410,10 @@ void channelClosed(ChannelClosed channelClosed) { // already interrupted return; } + case SelectState.SKIP_DONE -> { + // already closed + return; + } case ChannelClosed cc -> { // already closed return; @@ -374,7 +427,7 @@ void channelClosed(ChannelClosed channelClosed) { return; } case Thread t -> { - if (state.compareAndSet(currentState, channelClosed)) { + if (state.compareAndSet(currentState, targetState)) { LockSupport.unpark(t); return; } @@ -388,7 +441,8 @@ void channelClosed(ChannelClosed channelClosed) { enum SelectState { REGISTERING, - INTERRUPTED + INTERRUPTED, + SKIP_DONE } // @@ -437,3 +491,8 @@ public void setPayload(Object payload) { this.payload = payload; } } + +enum RestartSelectMarker { + RESTART, + RESTART_WITHOUT_DONE +} diff --git a/core/src/main/java/com/softwaremill/jox/SelectClause.java b/core/src/main/java/com/softwaremill/jox/SelectClause.java index a91f0c4..7cffa63 100644 --- a/core/src/main/java/com/softwaremill/jox/SelectClause.java +++ b/core/src/main/java/com/softwaremill/jox/SelectClause.java @@ -23,6 +23,8 @@ public abstract class SelectClause { * Might throw any exceptions that the provided transformation function throws. */ abstract T transformedRawValue(Object rawValue); + + abstract boolean skipWhenDone(); } class DefaultClause extends SelectClause { @@ -46,11 +48,16 @@ Object register(SelectInstance select) { T transformedRawValue(Object rawValue) { return callback.get(); } + + @Override + boolean skipWhenDone() { + return false; // no associated channel, this value is never used + } } /** - * Used as a result of {@link DefaultClause#register(SelectInstance)}, instead of null, to indicate that the select - * clause has been selected during registration. + * Used as a result of {@link DefaultClause#register(SelectInstance)}, instead of {@code null}, to indicate that the + * default clause has been selected during registration. */ enum DefaultClauseMarker { DEFAULT diff --git a/core/src/main/java/com/softwaremill/jox/Source.java b/core/src/main/java/com/softwaremill/jox/Source.java index f8b951f..cb3324b 100644 --- a/core/src/main/java/com/softwaremill/jox/Source.java +++ b/core/src/main/java/com/softwaremill/jox/Source.java @@ -24,15 +24,37 @@ public interface Source extends CloseableChannel { /** * Create a clause which can be used in {@link Select#select(SelectClause[])}. The clause will receive a value from * the current channel. + *

+ * If the source is/becomes done, {@link Select#select(SelectClause[])} will restart with channels that are not done yet. */ SelectClause receiveClause(); /** * Create a clause which can be used in {@link Select#select(SelectClause[])}. The clause will receive a value from * the current channel, and transform it using the provided {@code callback}. + *

+ * If the source is/becomes done, {@link Select#select(SelectClause[])} will restart with channels that are not done yet. */ SelectClause receiveClause(Function callback); + /** + * Create a clause which can be used in {@link Select#select(SelectClause[])}. The clause will receive a value from + * the current channel. + *

+ * If the source is/becomes done, {@link Select#select(SelectClause[])} will stop and throw {@link ChannelDoneException} + * or return a {@link ChannelDone} value (in the {@code safe} variant). + */ + SelectClause receiveOrDoneClause(); + + /** + * Create a clause which can be used in {@link Select#select(SelectClause[])}. The clause will receive a value from + * the current channel, and transform it using the provided {@code callback}. + *

+ * If the source is/becomes done, {@link Select#select(SelectClause[])} will stop and throw {@link ChannelDoneException} + * or return a {@link ChannelDone} value (in the {@code safe} variant). + */ + SelectClause receiveOrDoneClause(Function callback); + // /** diff --git a/core/src/test/java/com/softwaremill/jox/SelectReceiveTest.java b/core/src/test/java/com/softwaremill/jox/SelectReceiveTest.java index 30f9b58..00b3476 100644 --- a/core/src/test/java/com/softwaremill/jox/SelectReceiveTest.java +++ b/core/src/test/java/com/softwaremill/jox/SelectReceiveTest.java @@ -13,7 +13,7 @@ public class SelectReceiveTest { @Test - public void testSelectFromFirst_buffered_immediate() throws InterruptedException { + void testSelectFromFirst_buffered_immediate() throws InterruptedException { // given Channel ch1 = new Channel<>(1); Channel ch2 = new Channel<>(1); @@ -27,7 +27,7 @@ public void testSelectFromFirst_buffered_immediate() throws InterruptedException } @Test - public void testSelectFromSecond_buffered_immediate() throws InterruptedException { + void testSelectFromSecond_buffered_immediate() throws InterruptedException { // given Channel ch1 = new Channel<>(1); Channel ch2 = new Channel<>(1); @@ -41,7 +41,7 @@ public void testSelectFromSecond_buffered_immediate() throws InterruptedExceptio } @Test - public void testSelectFromFirst_buffered_suspend() throws InterruptedException, ExecutionException { + void testSelectFromFirst_buffered_suspend() throws InterruptedException, ExecutionException { // given Channel ch1 = new Channel<>(1); Channel ch2 = new Channel<>(1); @@ -60,7 +60,7 @@ public void testSelectFromFirst_buffered_suspend() throws InterruptedException, } @Test - public void testSelectBiasedTowardsFirst_buffered() throws InterruptedException { + void testSelectBiasedTowardsFirst_buffered() throws InterruptedException { // given Channel ch1 = new Channel<>(1); Channel ch2 = new Channel<>(1); @@ -75,7 +75,7 @@ public void testSelectBiasedTowardsFirst_buffered() throws InterruptedException } @Test - public void testSelectFromReady_rendezvous_suspend() throws InterruptedException, ExecutionException { + void testSelectFromReady_rendezvous_suspend() throws InterruptedException, ExecutionException { // given Channel ch1 = new Channel<>(); Channel ch2 = new Channel<>(); @@ -95,7 +95,7 @@ public void testSelectFromReady_rendezvous_suspend() throws InterruptedException } @Test - public void testSelectFromReady_rendezvous_immediate() throws InterruptedException, ExecutionException { + void testSelectFromReady_rendezvous_immediate() throws InterruptedException, ExecutionException { // given Channel ch1 = new Channel<>(); Channel ch2 = new Channel<>(); @@ -113,12 +113,64 @@ public void testSelectFromReady_rendezvous_immediate() throws InterruptedExcepti } @Test - public void testSelectWhenDone() throws InterruptedException { + void testSelectOrDoneWhenDone() throws InterruptedException { // given Channel ch1 = new Channel<>(1); Channel ch2 = new Channel<>(1); ch2.done(); + // when + Object received = selectSafe(ch1.receiveOrDoneClause(), ch2.receiveOrDoneClause()); + + // then + assertEquals(new ChannelDone(), received); + } + + @Test + void testSkipDone_immediate() throws InterruptedException { + // given + Channel ch1 = new Channel<>(1); + Channel ch2 = new Channel<>(1); + ch1.done(); + ch2.send("x"); + + // when + Object received = selectSafe(ch1.receiveClause(), ch2.receiveClause()); + + // then + assertEquals("x", received); + } + + @Test + void testSkipDone_suspend() throws InterruptedException, ExecutionException { + // given + Channel ch1 = new Channel<>(1); + Channel ch2 = new Channel<>(1); + ch1.done(); + + scoped(scope -> { + // when + forkVoid(scope, () -> { + Thread.sleep(100); // making sure receive suspends + ch2.send("x"); + }); + + // when + Object received = selectSafe(ch1.receiveClause(), ch2.receiveClause()); + + // then + assertEquals("x", received); + }); + } + + @Test + void testSelectWhenAllDone_immediate() throws InterruptedException { + // given + Channel ch1 = new Channel<>(1); + Channel ch2 = new Channel<>(1); + ch1.done(); + ch2.done(); + // when Object received = selectSafe(ch1.receiveClause(), ch2.receiveClause()); @@ -126,8 +178,30 @@ public void testSelectWhenDone() throws InterruptedException { assertEquals(new ChannelDone(), received); } + @Test + void testSelectWhenAllDone_suspend() throws InterruptedException, ExecutionException { + // given + Channel ch1 = new Channel<>(1); + Channel ch2 = new Channel<>(1); + + scoped(scope -> { + // when + forkVoid(scope, () -> { + Thread.sleep(100); // making sure receive suspends + ch1.done(); + ch2.done(); + }); + + // when + Object received = selectSafe(ch1.receiveClause(), ch2.receiveClause()); + + // then + assertEquals(new ChannelDone(), received); + }); + } + @TestWithCapacities - public void testReceiveMany(int capacity) throws InterruptedException, ExecutionException { + void testReceiveMany(int capacity) throws InterruptedException, ExecutionException { // given int channelsCount = 5; int msgsCount = 10000; @@ -166,7 +240,7 @@ public void testReceiveMany(int capacity) throws InterruptedException, Execution } @Test - public void testSelectWithTransformation() throws InterruptedException { + void testSelectWithTransformation() throws InterruptedException { // given Channel ch1 = new Channel<>(1); Channel ch2 = new Channel<>(1); @@ -180,7 +254,7 @@ public void testSelectWithTransformation() throws InterruptedException { } @Test - public void testBufferExpandedWhenSelecting() throws InterruptedException { + void testBufferExpandedWhenSelecting() throws InterruptedException { // given Channel ch = new Channel<>(2); @@ -201,4 +275,9 @@ public void testBufferExpandedWhenSelecting() throws InterruptedException { assertEquals("v3", r3); assertEquals("v4", r4); } + + @Test + void testSelectFromNone() throws InterruptedException { + assertEquals(new ChannelDone(), selectSafe()); + } }