Skip to content

Commit

Permalink
Select restarts when a channel for a receive clause is done (#35)
Browse files Browse the repository at this point in the history
  • Loading branch information
adamw authored Jan 23, 2024
1 parent a018ba1 commit 3320290
Show file tree
Hide file tree
Showing 7 changed files with 253 additions and 29 deletions.
9 changes: 9 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,15 @@ class Demo6 {
}
```

### Selecting from "done" channels

Receive clauses, for which channels are "done", will be skipped, and `select` with restart (as long as there are any
clauses left). This is motivated by the fact that a "done" channel is not in an error state, but signals that there are
no more values; while there might be more values available from other clauses.

Optionally, clauses created with `Channel.receiveOrDoneClause`, will cause `select` to throw/ return when the associated
channel is done, bypassing the behavior described above.

## Performance

The project includes benchmarks implemented using JMH - both for the `Channel`, as well as for some built-in Java
Expand Down
28 changes: 27 additions & 1 deletion core/src/main/java/com/softwaremill/jox/Channel.java
Original file line number Diff line number Diff line change
Expand Up @@ -774,7 +774,7 @@ private void updateCellClose(Segment segment, int i) {
}
}
case StoredSelectClause ss -> {
ss.getSelect().channelClosed(closedReason.get());
ss.getSelect().channelClosed(ss, closedReason.get());
// not setting the state & updating counters, as each non-selected stored select cell will be
// cleaned up, setting an interrupted state (and informing the segment)
}
Expand Down Expand Up @@ -824,6 +824,21 @@ public SelectClause<T> receiveClause() {

@Override
public <U> SelectClause<U> receiveClause(Function<T, U> callback) {
return receiveClause(callback, true);
}

@Override
public SelectClause<T> receiveOrDoneClause() {
//noinspection unchecked
return receiveOrDoneClause((Function<T, T>) IDENTITY);
}

@Override
public <U> SelectClause<U> receiveOrDoneClause(Function<T, U> callback) {
return receiveClause(callback, false);
}

private <U> SelectClause<U> receiveClause(Function<T, U> callback, boolean skipWhenDone) {
return new SelectClause<>() {
@Override
Channel<?> getChannel() {
Expand All @@ -845,6 +860,11 @@ U transformedRawValue(Object rawValue) {
//noinspection unchecked
return callback.apply((T) rawValue);
}

@Override
boolean skipWhenDone() {
return skipWhenDone;
}
};
}

Expand Down Expand Up @@ -877,6 +897,12 @@ Object register(SelectInstance select) {
U transformedRawValue(Object rawValue) {
return callback.get();
}

@Override
boolean skipWhenDone() {
// sending to a done channel is probably wrong, skipping such channels is not allowed
return false;
}
};
}

Expand Down
30 changes: 26 additions & 4 deletions core/src/main/java/com/softwaremill/jox/CollectSource.java
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,32 @@ public <U> SelectClause<U> receiveClause(Function<T, U> callback) {
});
}

@Override
public SelectClause<T> receiveOrDoneClause() {
return original.receiveOrDoneClause(v -> {
var t = f.apply(v);
if (t != null) {
return t;
} else {
//noinspection unchecked
return (T) RestartSelectMarker.RESTART;
}
});
}

@Override
public <U> SelectClause<U> receiveOrDoneClause(Function<T, U> callback) {
return original.receiveOrDoneClause(v -> {
var t = f.apply(v);
if (t != null) {
return callback.apply(t);
} else {
//noinspection unchecked
return (U) RestartSelectMarker.RESTART;
}
});
}

// delegates for closeable channel

@Override
Expand Down Expand Up @@ -121,7 +147,3 @@ public Throwable isError() {
return original.isError();
}
}

enum RestartSelectMarker {
RESTART
}
83 changes: 71 additions & 12 deletions core/src/main/java/com/softwaremill/jox/Select.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,15 @@ public class Select {
* <p>
* If a couple of the clauses can be completed immediately, the select is biased towards the clauses that appear
* first.
* <p>
* If no clauses are given, or all clauses become filtered out, throws {@link ChannelDoneException}.
* <p>
* If a receive clause is selected for a channel that is done, select restarts, unless the clause is created with
* {@link Source#receiveOrDoneClause()}.
*
* @param clauses The clauses, from which one will be selected. Not {@code null}.
* @return The value returned by the selected clause.
* @throws ChannelClosedException When any of the channels is closed.
* @throws ChannelClosedException When any of the channels is closed (done or in error), and the select doesn't restart.
*/
@SafeVarargs
public static <U> U select(SelectClause<U>... clauses) throws InterruptedException {
Expand All @@ -66,18 +71,44 @@ public static <U> U select(SelectClause<U>... clauses) throws InterruptedExcepti
* <p>
* If a couple of the clauses can be completed immediately, the select is biased towards the clauses that appear
* first.
* <p>
* If no clauses are given, or all clauses become filtered out, returns {@link ChannelDone}.
* <p>
* If a receive clause is selected for a channel that is done, select restarts, unless the clause is created with
* {@link Source#receiveOrDoneClause()}.
*
* @param clauses The clauses, from which one will be selected. Not {@code null}.
* @return Either the value returned by the selected clause, or {@link ChannelClosed}, when any of the channels is closed.
* @return Either the value returned by the selected clause, or {@link ChannelClosed}, when any of the channels
* is closed (done or in error), and the select doesn't restart.
*/
@SafeVarargs
public static <U> Object selectSafe(SelectClause<U>... clauses) throws InterruptedException {
var currentClauses = clauses;
while (true) {
var r = doSelectSafe(clauses);
// in case a `CollectSource` function filters out the element (the transformation function returns `null`,
// which is represented as a marker because `null` is a valid result of `doSelectSafe`, e.g. for send clauses),
// we need to restart the selection process
if (r != RestartSelectMarker.RESTART) {
if (currentClauses.length == 0) {
// no clauses given, or all clauses were filtered out
return new ChannelDone();
}

var r = doSelectSafe(currentClauses);
if (r == RestartSelectMarker.RESTART) {
// in case a `CollectSource` function filters out the element (the transformation function returns `null`,
// which is represented as a marker because `null` is a valid result of `doSelectSafe`, e.g. for send clauses),
// we need to restart the selection process

// next loop
} else if (r == RestartSelectMarker.RESTART_WITHOUT_DONE) {
// a channel is closed, for which there is a receive clause with skipWhenDone = true
// filtering out done channels, and if there are any left, restarting the selection process

//noinspection unchecked
currentClauses = Arrays.stream(currentClauses).filter(clause -> {
var channel = clause.getChannel();
return !clause.skipWhenDone() || channel == null || !channel.isDone();
}).toArray(SelectClause[]::new);

// next loop, with filtered clauses
} else {
return r;
}
}
Expand Down Expand Up @@ -165,6 +196,10 @@ <U> boolean register(SelectClause<U> clause) {
storedClauses.add(ss);
return true;
}
case ChannelDone cd when clause.skipWhenDone() -> {
state.set(SelectState.SKIP_DONE);
return false;
}
case ChannelClosed cc -> {
// when setting the state, we might override another state:
// - a list of clauses to re-register - there's no point in doing that anyway (since the channel is closed)
Expand Down Expand Up @@ -224,6 +259,10 @@ Object checkStateAndWait() throws InterruptedException {
}
// else: CAS unsuccessful, retry
}
case SelectState.SKIP_DONE -> {
cleanup(null);
return RestartSelectMarker.RESTART_WITHOUT_DONE;
}
case ChannelClosed cc -> {
cleanup(null);
return cc;
Expand Down Expand Up @@ -315,6 +354,10 @@ boolean trySelect(StoredSelectClause storedSelectClause) {
// already interrupted, will be cleaned up soon
return false;
}
case SelectState.SKIP_DONE -> {
// closed, will be cleaned up soon (& restarted)
return false;
}
case ChannelClosed cc -> {
// closed, will be cleaned up soon
return false;
Expand All @@ -339,20 +382,26 @@ boolean trySelect(StoredSelectClause storedSelectClause) {
}
}

void channelClosed(ChannelClosed channelClosed) {
void channelClosed(StoredSelectClause storedSelectClause, ChannelClosed channelClosed) {
while (true) {
var currentState = state.get();
Object targetState;
if (channelClosed instanceof ChannelDone && storedSelectClause.getClause().skipWhenDone()) {
targetState = SelectState.SKIP_DONE;
} else {
targetState = channelClosed;
}
switch (currentState) {
case SelectState.REGISTERING -> {
// the channel closed state will be discovered when there's a call to `checkStateAndWait` after registration completes
if (state.compareAndSet(currentState, channelClosed)) {
if (state.compareAndSet(currentState, targetState)) {
return;
}
// else: CAS unsuccessful, retry
}
case List<?> clausesToReRegister -> {
// same as above
if (state.compareAndSet(currentState, channelClosed)) {
if (state.compareAndSet(currentState, targetState)) {
return;
}
// else: CAS unsuccessful, retry
Expand All @@ -361,6 +410,10 @@ void channelClosed(ChannelClosed channelClosed) {
// already interrupted
return;
}
case SelectState.SKIP_DONE -> {
// already closed
return;
}
case ChannelClosed cc -> {
// already closed
return;
Expand All @@ -374,7 +427,7 @@ void channelClosed(ChannelClosed channelClosed) {
return;
}
case Thread t -> {
if (state.compareAndSet(currentState, channelClosed)) {
if (state.compareAndSet(currentState, targetState)) {
LockSupport.unpark(t);
return;
}
Expand All @@ -388,7 +441,8 @@ void channelClosed(ChannelClosed channelClosed) {

enum SelectState {
REGISTERING,
INTERRUPTED
INTERRUPTED,
SKIP_DONE
}

//
Expand Down Expand Up @@ -437,3 +491,8 @@ public void setPayload(Object payload) {
this.payload = payload;
}
}

enum RestartSelectMarker {
RESTART,
RESTART_WITHOUT_DONE
}
11 changes: 9 additions & 2 deletions core/src/main/java/com/softwaremill/jox/SelectClause.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ public abstract class SelectClause<T> {
* Might throw any exceptions that the provided transformation function throws.
*/
abstract T transformedRawValue(Object rawValue);

abstract boolean skipWhenDone();
}

class DefaultClause<T> extends SelectClause<T> {
Expand All @@ -46,11 +48,16 @@ Object register(SelectInstance select) {
T transformedRawValue(Object rawValue) {
return callback.get();
}

@Override
boolean skipWhenDone() {
return false; // no associated channel, this value is never used
}
}

/**
* Used as a result of {@link DefaultClause#register(SelectInstance)}, instead of null, to indicate that the select
* clause has been selected during registration.
* Used as a result of {@link DefaultClause#register(SelectInstance)}, instead of {@code null}, to indicate that the
* default clause has been selected during registration.
*/
enum DefaultClauseMarker {
DEFAULT
Expand Down
22 changes: 22 additions & 0 deletions core/src/main/java/com/softwaremill/jox/Source.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,37 @@ public interface Source<T> extends CloseableChannel {
/**
* Create a clause which can be used in {@link Select#select(SelectClause[])}. The clause will receive a value from
* the current channel.
* <p>
* If the source is/becomes done, {@link Select#select(SelectClause[])} will restart with channels that are not done yet.
*/
SelectClause<T> receiveClause();

/**
* Create a clause which can be used in {@link Select#select(SelectClause[])}. The clause will receive a value from
* the current channel, and transform it using the provided {@code callback}.
* <p>
* If the source is/becomes done, {@link Select#select(SelectClause[])} will restart with channels that are not done yet.
*/
<U> SelectClause<U> receiveClause(Function<T, U> callback);

/**
* Create a clause which can be used in {@link Select#select(SelectClause[])}. The clause will receive a value from
* the current channel.
* <p>
* If the source is/becomes done, {@link Select#select(SelectClause[])} will stop and throw {@link ChannelDoneException}
* or return a {@link ChannelDone} value (in the {@code safe} variant).
*/
SelectClause<T> receiveOrDoneClause();

/**
* Create a clause which can be used in {@link Select#select(SelectClause[])}. The clause will receive a value from
* the current channel, and transform it using the provided {@code callback}.
* <p>
* If the source is/becomes done, {@link Select#select(SelectClause[])} will stop and throw {@link ChannelDoneException}
* or return a {@link ChannelDone} value (in the {@code safe} variant).
*/
<U> SelectClause<U> receiveOrDoneClause(Function<T, U> callback);

//

/**
Expand Down
Loading

0 comments on commit 3320290

Please sign in to comment.