Skip to content

Commit

Permalink
Use explicit ExecutorService for Java scenario event listening
Browse files Browse the repository at this point in the history
Avoid exhausting the ForkJoin.commonPool() in constrained environments, which can cause deadlocks.

Signed-off-by: Mark S. Lewis <Mark.S.Lewis@outlook.com>
  • Loading branch information
bestbeforetoday committed Dec 2, 2023
1 parent 633e073 commit 8c1136f
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,8 @@ private Supplier<T> readNext() {
try {
next = queue.take();
} catch (InterruptedException e) {
throw new NoSuchElementException();
Thread.currentThread().interrupt();
next = () -> null;
}
}

Expand Down
4 changes: 2 additions & 2 deletions java/src/main/java/org/hyperledger/fabric/client/Network.java
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@
* // Process then checkpoint event
* checkpointer.checkpointChaincodeEvent(event);
* });
* } catch (io.grpc.StatusRuntimeException e) {
* } catch (GatewayRuntimeException e) {
* // Connection error
* }
* }
Expand All @@ -67,7 +67,7 @@
* // Process then checkpoint block
* checkpointer.checkpointBlock(event.getHeader().getNumber());
* });
* } catch (io.grpc.StatusRuntimeException e) {
* } catch (GatewayRuntimeException e) {
* // Connection error
* }
* }
Expand Down
40 changes: 28 additions & 12 deletions java/src/test/java/scenario/BasicEventListener.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,30 +6,45 @@

package scenario;

import io.grpc.Status;
import org.hyperledger.fabric.client.CloseableIterator;
import org.hyperledger.fabric.client.GatewayRuntimeException;

import java.util.Objects;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;

import org.hyperledger.fabric.client.CloseableIterator;

public final class BasicEventListener<T> implements EventListener<T> {
private final BlockingQueue<T> eventQueue = new SynchronousQueue<>();
private final Runnable close;
private final ExecutorService executor = Executors.newSingleThreadExecutor();
private final CloseableIterator<T> iterator;

public BasicEventListener(final CloseableIterator<T> iterator) {
close = iterator::close;
this.iterator = iterator;

// Start reading events immediately as Java gRPC implementation may not invoke the gRPC service until the first
// read attempt occurs.
CompletableFuture.runAsync(() -> iterator.forEachRemaining(event -> {
try {
eventQueue.put(event);
} catch (InterruptedException e) {
iterator.close();
executor.execute(this::readEvents);
}

private void readEvents() {
try {
iterator.forEachRemaining(event -> {
try {
eventQueue.put(event);
} catch (InterruptedException e) {
iterator.close();
Thread.currentThread().interrupt();
}
});
} catch (GatewayRuntimeException e) {
if (e.getStatus().getCode() != Status.Code.CANCELLED) {
throw e;
}
}));
}
}

public T next() throws InterruptedException {
Expand All @@ -39,6 +54,7 @@ public T next() throws InterruptedException {
}

public void close() {
close.run();
executor.shutdownNow();
iterator.close();
}
}

0 comments on commit 8c1136f

Please sign in to comment.