Skip to content

Commit

Permalink
Throw exception when attempting to close an event loop from within - f…
Browse files Browse the repository at this point in the history
…ixes #204
  • Loading branch information
benbonavia committed Jul 23, 2024
1 parent f24ce15 commit 0833d34
Show file tree
Hide file tree
Showing 9 changed files with 206 additions and 3 deletions.
5 changes: 5 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,11 @@
<artifactId>junit-jupiter-engine</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-params</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import net.openhft.chronicle.core.Jvm;
import net.openhft.chronicle.core.io.AbstractCloseable;
import net.openhft.chronicle.core.io.ThreadingIllegalStateException;
import net.openhft.chronicle.core.threads.EventHandler;
import net.openhft.chronicle.core.threads.EventLoop;
import org.jetbrains.annotations.NotNull;
Expand Down Expand Up @@ -122,6 +123,15 @@ protected void performClose() {
stop();
}

@Override
protected void assertCloseable() {
if (isRunningOnThread(Thread.currentThread())) {
throw new ThreadingIllegalStateException("Attempting to close " + name + " from within!", null);
}
}

public abstract boolean isRunningOnThread(Thread thread);

protected boolean isStarted() {
return lifecycle.get() == EventLoopLifecycle.STARTED;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,10 +144,21 @@ public String toString() {
'}';
}

@Override
public boolean isRunningOnThread(Thread thread) {
for (int i=0; i < runners.size(); i++) {
if (thread == runners.get(i).thread()) {
return true;
}
}
return false;
}

private final class Runner implements Runnable {
private final EventHandler handler;
private final Pauser pauser;
private boolean endedGracefully = false;
private volatile Thread thread = null;

public Runner(final EventHandler handler, Pauser pauser) {
this.handler = handler;
Expand All @@ -158,7 +169,7 @@ public Runner(final EventHandler handler, Pauser pauser) {
public void run() {
try {
throwExceptionIfClosed();

thread = Thread.currentThread();
handler.loopStarted();

while (isStarted()) {
Expand Down Expand Up @@ -194,5 +205,9 @@ private String asString(final Object handler) {
public void unpause() {
pauser.unpause();
}

public Thread thread() {
return thread;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,4 +43,6 @@ public interface CoreEventLoop extends EventLoop {
long loopStartNS();

void dumpRunningState(@NotNull final String message, @NotNull final BooleanSupplier finalCheck);

boolean isRunningOnThread(Thread thread);
}
9 changes: 8 additions & 1 deletion src/main/java/net/openhft/chronicle/threads/EventGroup.java
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public class EventGroup
private static final long WAIT_TO_START_MS = Jvm.getInteger("eventGroup.wait.to.start.ms", 2_000);
private final AtomicInteger counter = new AtomicInteger();
@NotNull
private final EventLoop monitor;
private final MonitorEventLoop monitor;
private final CoreEventLoop core;
private final BlockingEventLoop blocking;
@NotNull
Expand Down Expand Up @@ -348,4 +348,11 @@ protected void performClose() {
public boolean runsInsideCoreLoop() {
return core.runsInsideCoreLoop();
}

@Override
public boolean isRunningOnThread(Thread thread) {
return core != null && core.isRunningOnThread(thread) ||
blocking != null && blocking.isRunningOnThread(thread) ||
monitor.isRunningOnThread(thread);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -701,6 +701,11 @@ private void shutdownService() {

@Override
public boolean runsInsideCoreLoop() {
return thread == Thread.currentThread(); // false if called before run()
return isRunningOnThread(Thread.currentThread()); // false if called before run()
}

@Override
public boolean isRunningOnThread(Thread thread) {
return this.thread == thread;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ public class MonitorEventLoop extends AbstractLifecycleEventLoop implements Runn
private transient final EventLoop parent;
private final List<EventHandler> handlers = new CopyOnWriteArrayList<>();
private final Pauser pauser;
private volatile Thread thread = null;

public MonitorEventLoop(final EventLoop parent, final Pauser pauser) {
this(parent, "", pauser);
Expand Down Expand Up @@ -104,6 +105,7 @@ public void run() {
throwExceptionIfClosed();

try {
thread = Thread.currentThread();
// don't do any monitoring for the first MONITOR_INITIAL_DELAY_MS ms
final long waitUntilMs = System.currentTimeMillis() + MONITOR_INITIAL_DELAY_MS;
while (System.currentTimeMillis() < waitUntilMs && isStarted())
Expand Down Expand Up @@ -165,6 +167,11 @@ protected void performClose() {
net.openhft.chronicle.core.io.Closeable.closeQuietly(handlers);
}

@Override
public boolean isRunningOnThread(Thread thread) {
return this.thread == thread;
}

/**
* {@link EventHandler#loopStarted()} needs to be called once before the first call to
* {@link EventHandler#action()} and it must be called on the event loop thread. An
Expand Down
88 changes: 88 additions & 0 deletions src/test/java/net/openhft/chronicle/threads/EventGroupTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,25 +19,32 @@

import net.openhft.chronicle.core.Jvm;
import net.openhft.chronicle.core.io.AbstractCloseable;
import net.openhft.chronicle.core.io.InvalidMarshallableException;
import net.openhft.chronicle.core.io.SimpleCloseable;
import net.openhft.chronicle.core.io.ThreadingIllegalStateException;
import net.openhft.chronicle.core.threads.*;
import org.jetbrains.annotations.NotNull;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;

import java.io.Closeable;
import java.util.*;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.LockSupport;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;

import static java.util.Collections.singleton;
import static org.junit.jupiter.api.Assertions.*;
Expand Down Expand Up @@ -428,6 +435,87 @@ void lifecycleEventsAreCalledAtAppropriateTimesByAppropriateThreads_ForPrioritie
handlers.forEach(handler -> assertNotEquals(handler.loopFinishedNS.get(), 0, handler.priority + " was not loopFinished when loop finished, priorities=" + priorities));
}

static Stream<List<HandlerPriority>> egCloseParams() {
return Stream.of(
Arrays.asList(HandlerPriority.MEDIUM),
Arrays.asList(HandlerPriority.MEDIUM, HandlerPriority.HIGH),
Arrays.asList(HandlerPriority.TIMER, HandlerPriority.HIGH),
Arrays.asList(HandlerPriority.MEDIUM, HandlerPriority.BLOCKING, HandlerPriority.TIMER),
Arrays.asList(HandlerPriority.MEDIUM, HandlerPriority.BLOCKING, HandlerPriority.TIMER, HandlerPriority.HIGH)
);
}

/**
* Run the event group and attempt to stop each of the inner event loops by handler priority
*/
@ParameterizedTest()
@MethodSource("egCloseParams")
void closeEventGroupInWithinAndEventLoopThrowsException(List<HandlerPriority> priorities) {
EventGroup eg = EventGroupBuilder.builder().build();
try {
Map<HandlerPriority, AtomicBoolean> eventHandlerFinishedForPriority = new HashMap<>();
Map<HandlerPriority, AtomicBoolean> exceptionThrownInHandlerForPriority = new HashMap<>();

for (final HandlerPriority priority : priorities) {

if (exceptionThrownInHandlerForPriority.containsKey(priority)) {
continue; // dont test overlapping priorities
}

AtomicBoolean eventHandlerFinished = eventHandlerFinishedForPriority.computeIfAbsent(priority, p -> new AtomicBoolean());
AtomicBoolean exceptionThrownInHandler = exceptionThrownInHandlerForPriority.computeIfAbsent(priority, p -> new AtomicBoolean());

EventHandler closingEventHandler = new EventHandler() {
@Override
public boolean action() throws InvalidEventHandlerException, InvalidMarshallableException {
try {
eg.close();
return true;
} catch (ThreadingIllegalStateException e) {
exceptionThrownInHandler.set(true);
throw InvalidEventHandlerException.reusable();
}
}

@Override
public void loopFinished() {
eventHandlerFinished.set(true);
}

@Override
public @NotNull HandlerPriority priority() {
return priority;
}
};

eg.addHandler(closingEventHandler);
}
eg.start();

long timeoutTime = System.currentTimeMillis() + 500;
while (!exceptionThrownInHandlerForPriority.values().stream().allMatch(AtomicBoolean::get)) {
if (System.currentTimeMillis() > timeoutTime) {
final List<HandlerPriority> handlerPrioritiesThatDidntFinish = eventHandlerFinishedForPriority.keySet().stream().filter(k -> !eventHandlerFinishedForPriority.get(k).get()).collect(Collectors.toList());
if (handlerPrioritiesThatDidntFinish.isEmpty()) {
Assertions.fail("Event group didn't throw an exception when attempting to close!");
} else {
Assertions.fail("Handlers for " + handlerPrioritiesThatDidntFinish + " didn't finish");
}
}
Jvm.pause(10);
}

assertTrue(eg.isAlive());
assertFalse(eg.isStopped());
assertFalse(eg.isClosed());
assertFalse(eg.isClosing());
} finally {
eg.close();

assertTrue(eg.isClosed());
}
}

static class CloseableResource extends AbstractCloseable {

public CloseableResource() {
Expand Down
64 changes: 64 additions & 0 deletions src/test/java/net/openhft/chronicle/threads/EventLoopsTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,23 @@
package net.openhft.chronicle.threads;

import net.openhft.chronicle.core.Jvm;
import net.openhft.chronicle.core.io.InvalidMarshallableException;
import net.openhft.chronicle.core.io.ThreadingIllegalStateException;
import net.openhft.chronicle.core.onoes.ExceptionHandler;
import net.openhft.chronicle.core.threads.EventHandler;
import net.openhft.chronicle.core.threads.EventLoop;
import net.openhft.chronicle.core.threads.InvalidEventHandlerException;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;

import java.util.Arrays;
import java.util.Collections;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Stream;

import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
Expand Down Expand Up @@ -87,4 +96,59 @@ private static void doTest(BlockingEventLoop blockingEventLoop, MediumEventLoop
Jvm.pause(1);
}
}

public static Stream<EventLoop> eventLoopsToClose() {
return Stream.of(
new MediumEventLoop(null, "medium", Pauser.balanced(), false, null),
new BlockingEventLoop("blocking")
);
}

@ParameterizedTest
@MethodSource("eventLoopsToClose")
public void closeFromEventLoopThreadThrowsException(EventLoop el) {
try {
AtomicBoolean exceptionThrownInHandler = new AtomicBoolean();
AtomicBoolean eventHandlerFinished = new AtomicBoolean();

EventHandler closingEventHandler = new EventHandler() {
@Override
public boolean action() throws InvalidEventHandlerException, InvalidMarshallableException {
try {
el.close();
return true;
} catch (ThreadingIllegalStateException e) {
exceptionThrownInHandler.set(true);
throw InvalidEventHandlerException.reusable();
}
}

@Override
public void loopFinished() {
eventHandlerFinished.set(true);
}
};

el.addHandler(closingEventHandler);
el.start();

long timeoutTime = System.currentTimeMillis() + 500;
while (!exceptionThrownInHandler.get()) {
if (System.currentTimeMillis() > timeoutTime) {
Assertions.fail("Event loop " + el.name() + " didn't " + (eventHandlerFinished.get() ? "throw an exception when attempting to close" : "run in this time"));
}
Jvm.pause(10);
}

assertTrue(el.isAlive());
assertFalse(el.isStopped());
assertFalse(el.isClosed());
assertFalse(el.isClosing());
} finally {
el.close();

assertTrue(el.isClosed());
}

}
}

0 comments on commit 0833d34

Please sign in to comment.