Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Separate Sink, Source & CloseableChannel interfaces #32

Merged
merged 1 commit into from
Jan 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
100 changes: 16 additions & 84 deletions core/src/main/java/com/softwaremill/jox/Channel.java
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@
*
* @param <T> The type of the elements processed by the channel.
*/
public final class Channel<T> {
public final class Channel<T> implements Source<T>, Sink<T> {
/*
Inspired by the "Fast and Scalable Channels in Kotlin Coroutines" paper (https://arxiv.org/abs/2211.04986), and
the Kotlin implementation (https://github.com/Kotlin/kotlinx.coroutines/blob/master/kotlinx-coroutines-core/common/src/channels/BufferedChannel.kt).
Expand Down Expand Up @@ -155,25 +155,15 @@ public static <T> Channel<T> newUnlimitedChannel() {
// Sending
// *******

/**
* Send a value to the channel.
*
* @param value The value to send. Not {@code null}.
* @throws ChannelClosedException When the channel is closed.
*/
@Override
public void send(T value) throws InterruptedException {
var r = sendSafe(value);
if (r instanceof ChannelClosed c) {
throw c.toException();
}
}

/**
* Send a value to the channel. Doesn't throw exceptions when the channel is closed, but returns a value.
*
* @param value The value to send. Not {@code null}.
* @return Either {@code null}, or {@link ChannelClosed}, when the channel is closed.
*/
@Override
public Object sendSafe(T value) throws InterruptedException {
return doSend(value, null, null);
}
Expand Down Expand Up @@ -345,11 +335,7 @@ private Object updateCellSend(Segment segment, int i, long s, T value, SelectIns
// Receiving
// *********

/**
* Receive a value from the channel.
*
* @throws ChannelClosedException When the channel is closed.
*/
@Override
public T receive() throws InterruptedException {
var r = receiveSafe();
if (r instanceof ChannelClosed c) {
Expand All @@ -360,11 +346,7 @@ public T receive() throws InterruptedException {
}
}

/**
* Receive a value from the channel. Doesn't throw exceptions when the channel is closed, but returns a value.
*
* @return Either a value of type {@code T}, or {@link ChannelClosed}, when the channel is closed.
*/
@Override
public Object receiveSafe() throws InterruptedException {
return doReceive(null, null);
}
Expand Down Expand Up @@ -670,49 +652,20 @@ private ExpandBufferResult updateCellExpandBuffer(Segment segment, int i) {
// Closing
// *******

/**
* Close the channel, indicating that no more elements will be sent.
* <p>
* Any elements that are already buffered will be delivered. Any send operations that are in progress will complete
* normally, when a receiver arrives. Any pending receive operations will complete with a channel closed result.
* <p>
* Subsequent {@link #send(Object)} operations will throw {@link ChannelClosedException}.
*
* @throws ChannelClosedException When the channel is already closed.
*/
@Override
public void done() {
var r = doneSafe();
if (r instanceof ChannelClosed c) {
throw c.toException();
}
}

/**
* Close the channel, indicating that no more elements will be sent. Doesn't throw exceptions when the channel is
* closed, but returns a value.
* <p>
* Any elements that are already buffered will be delivered. Any send operations that are in progress will complete
* normally, when a receiver arrives. Any pending receive operations will complete with a channel closed result.
* <p>
* Subsequent {@link #send(Object)} operations will throw {@link ChannelClosedException}.
*
* @return Either {@code null}, or {@link ChannelClosed}, when the channel is already closed.
*/
@Override
public Object doneSafe() {
return closeSafe(new ChannelDone());
}

/**
* Close the channel, indicating an error.
* <p>
* Any elements that are already buffered won't be delivered. Any send or receive operations that are in progress
* will complete with a channel closed result.
* <p>
* Subsequent {@link #send(Object)} and {@link #receive()} operations will throw {@link ChannelClosedException}.
*
* @param reason The reason of the error. Not {@code null}.
* @throws ChannelClosedException When the channel is already closed.
*/
@Override
public void error(Throwable reason) {
if (reason == null) {
throw new NullPointerException("Error reason cannot be null");
Expand All @@ -723,16 +676,7 @@ public void error(Throwable reason) {
}
}

/**
* Close the channel, indicating an error. Doesn't throw exceptions when the channel is closed, but returns a value.
* <p>
* Any elements that are already buffered won't be delivered. Any send or receive operations that are in progress
* will complete with a channel closed result.
* <p>
* Subsequent {@link #send(Object)} and {@link #receive()} operations will throw {@link ChannelClosedException}.
*
* @return Either {@code null}, or {@link ChannelClosed}, when the channel is already closed.
*/
@Override
public Object errorSafe(Throwable reason) {
return closeSafe(new ChannelError(reason));
}
Expand Down Expand Up @@ -846,17 +790,17 @@ private void updateCellClose(Segment segment, int i) {
}
}

@Override
public boolean isClosed() {
return closedReason.get() != null;
}

@Override
public boolean isDone() {
return closedReason.get() instanceof ChannelDone;
}

/**
* @return {@code null} if the channel is not closed, or if it's closed with {@link ChannelDone}.
*/
@Override
public Throwable isError() {
var reason = closedReason.get();
if (reason instanceof ChannelError e) {
Expand All @@ -872,19 +816,13 @@ public Throwable isError() {

private static final Function<Object, Object> IDENTITY = Function.identity();

/**
* Create a clause which can be used in {@link Select#select(SelectClause[])}. The clause will receive a value from
* the current channel.
*/
@Override
public SelectClause<T> receiveClause() {
//noinspection unchecked
return receiveClause((Function<T, T>) IDENTITY);
}

/**
* Create a clause which can be used in {@link Select#select(SelectClause[])}. The clause will receive a value from
* the current channel, and transform it using the provided {@code callback}.
*/
@Override
public <U> SelectClause<U> receiveClause(Function<T, U> callback) {
return new SelectClause<>() {
@Override
Expand All @@ -910,18 +848,12 @@ U transformedRawValue(Object rawValue) {
};
}

/**
* Create a clause which can be used in {@link Select#select(SelectClause[])}. The clause will send the given value
* to the current channel, and return {@code null} as the clause's result.
*/
@Override
public SelectClause<Void> sendClause(T value) {
return sendClause(value, () -> null);
}

/**
* Create a clause which can be used in {@link Select#select(SelectClause[])}. The clause will send the given value
* to the current channel, and return the value of the provided callback as the clause's result.
*/
@Override
public <U> SelectClause<U> sendClause(T value, Supplier<U> callback) {
return new SelectClause<>() {
@Override
Expand Down
86 changes: 86 additions & 0 deletions core/src/main/java/com/softwaremill/jox/CloseableChannel.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
package com.softwaremill.jox;

/**
* A channel which can be closed.
* <p>
* A channel can be closed in two ways:
* <ul>
* <li>using {@link #done()} or {@link #doneSafe()}, indicating that no more elements will be sent</li>
* <li>using {@link #error(Throwable)} or {@link #errorSafe(Throwable)}, indicating an error</li>
* </ul>
* <p>
* A channel can be closed only once. Subsequent calls to {@link #done()} or {@link #error(Throwable)} will throw
* {@link ChannelClosedException}, or return the original closing reason (when using {@link #doneSafe()} or {@link #errorSafe(Throwable)}).
* <p>
* Closing the channel is thread-safe.
*/
public interface CloseableChannel {
/**
* Close the channel, indicating that no more elements will be sent.
* <p>
* Any elements that are already buffered will be delivered. Any send operations that are in progress will complete
* normally, when a receiver arrives. Any pending receive operations will complete with a channel closed result.
* <p>
* Subsequent {@link Sink#send(Object)} operations will throw {@link ChannelClosedException}.
*
* @throws ChannelClosedException When the channel is already closed.
*/
void done();

/**
* Close the channel, indicating that no more elements will be sent. Doesn't throw exceptions when the channel is
* closed, but returns a value.
* <p>
* Any elements that are already buffered will be delivered. Any send operations that are in progress will complete
* normally, when a receiver arrives. Any pending receive operations will complete with a channel closed result.
* <p>
* Subsequent {@link Sink#send(Object)} operations will throw {@link ChannelClosedException}.
*
* @return Either {@code null}, or {@link ChannelClosed}, when the channel is already closed.
*/
Object doneSafe();

//

/**
* Close the channel, indicating an error.
* <p>
* Any elements that are already buffered won't be delivered. Any send or receive operations that are in progress
* will complete with a channel closed result.
* <p>
* Subsequent {@link Sink#send(Object)} and {@link Source#receive()} operations will throw {@link ChannelClosedException}.
*
* @param reason The reason of the error. Not {@code null}.
* @throws ChannelClosedException When the channel is already closed.
*/
void error(Throwable reason);

/**
* Close the channel, indicating an error. Doesn't throw exceptions when the channel is closed, but returns a value.
* <p>
* Any elements that are already buffered won't be delivered. Any send or receive operations that are in progress
* will complete with a channel closed result.
* <p>
* Subsequent {@link Sink#send(Object)} and {@link Source#receive()} operations will throw {@link ChannelClosedException}.
*
* @return Either {@code null}, or {@link ChannelClosed}, when the channel is already closed.
*/
Object errorSafe(Throwable reason);

//

/**
* @return {@code true} if the channel is closed using {@link #done()} or {@link #error(Throwable)}.
*/
boolean isClosed();

/**
* @return {@code true} if the channel is closed using {@link #done()}. {@code false} if it's not closed, or closed with an error.
*/
boolean isDone();

/**
* @return {@code null} if the channel is not closed, or if it's closed with {@link ChannelDone}.
*/
Throwable isError();
}
38 changes: 38 additions & 0 deletions core/src/main/java/com/softwaremill/jox/Sink.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package com.softwaremill.jox;

import java.util.function.Supplier;

/**
* A channel sink, which can be used to send values to the channel. See {@link Channel} for more details.
*/
public interface Sink<T> extends CloseableChannel {
/**
* Send a value to the channel.
*
* @param value The value to send. Not {@code null}.
* @throws ChannelClosedException When the channel is closed.
*/
void send(T value) throws InterruptedException;

/**
* Send a value to the channel. Doesn't throw exceptions when the channel is closed, but returns a value.
*
* @param value The value to send. Not {@code null}.
* @return Either {@code null}, or {@link ChannelClosed}, when the channel is closed.
*/
Object sendSafe(T value) throws InterruptedException;

//

/**
* Create a clause which can be used in {@link Select#select(SelectClause[])}. The clause will send the given value
* to the current channel, and return {@code null} as the clause's result.
*/
SelectClause<Void> sendClause(T value);

/**
* Create a clause which can be used in {@link Select#select(SelectClause[])}. The clause will send the given value
* to the current channel, and return the value of the provided callback as the clause's result.
*/
<U> SelectClause<U> sendClause(T value, Supplier<U> callback);
}
34 changes: 34 additions & 0 deletions core/src/main/java/com/softwaremill/jox/Source.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package com.softwaremill.jox;

import java.util.function.Function;

/**
* A channel source, which can be used to receive values from the channel. See {@link Channel} for more details.
*/
public interface Source<T> extends CloseableChannel {
/**
* Receive a value from the channel.
*
* @throws ChannelClosedException When the channel is closed.
*/
T receive() throws InterruptedException;

/**
* Receive a value from the channel. Doesn't throw exceptions when the channel is closed, but returns a value.
*
* @return Either a value of type {@code T}, or {@link ChannelClosed}, when the channel is closed.
*/
Object receiveSafe() throws InterruptedException;

/**
* Create a clause which can be used in {@link Select#select(SelectClause[])}. The clause will receive a value from
* the current channel.
*/
SelectClause<T> receiveClause();

/**
* Create a clause which can be used in {@link Select#select(SelectClause[])}. The clause will receive a value from
* the current channel, and transform it using the provided {@code callback}.
*/
<U> SelectClause<U> receiveClause(Function<T, U> callback);
}
Loading