Skip to content

Commit

Permalink
[grid] stop polling events on close
Browse files Browse the repository at this point in the history
  • Loading branch information
joerg1985 committed Nov 12, 2024
1 parent 9ff0f85 commit 9af4e15
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 5 deletions.
1 change: 1 addition & 0 deletions java/src/org/openqa/selenium/concurrent/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ java_library(
name = "concurrent",
srcs = glob(["*.java"]),
visibility = [
"//java/src/org/openqa/selenium/events:__subpackages__",
"//java/src/org/openqa/selenium/grid:__subpackages__",
"//java/src/org/openqa/selenium/remote:__subpackages__",
],
Expand Down
1 change: 1 addition & 0 deletions java/src/org/openqa/selenium/events/zeromq/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ java_library(
],
deps = [
"//java/src/org/openqa/selenium:core",
"//java/src/org/openqa/selenium/concurrent",
"//java/src/org/openqa/selenium/events",
"//java/src/org/openqa/selenium/grid/config",
"//java/src/org/openqa/selenium/grid/security",
Expand Down
16 changes: 11 additions & 5 deletions java/src/org/openqa/selenium/events/zeromq/UnboundZmqEventBus.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import java.util.function.Consumer;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.openqa.selenium.concurrent.ExecutorServices;
import org.openqa.selenium.events.Event;
import org.openqa.selenium.events.EventBus;
import org.openqa.selenium.events.EventListener;
Expand All @@ -61,6 +62,7 @@ class UnboundZmqEventBus implements EventBus {
private static final Logger LOG = Logger.getLogger(EventBus.class.getName());
private static final Json JSON = new Json();
private final AtomicBoolean pollingStarted = new AtomicBoolean(false);
private final PollingRunnable socketPolling;
private final ExecutorService socketPollingExecutor;
private final ExecutorService socketPublishingExecutor;
private final ExecutorService listenerNotificationExecutor;
Expand Down Expand Up @@ -147,7 +149,8 @@ class UnboundZmqEventBus implements EventBus {

LOG.info("Sockets created");

socketPollingExecutor.submit(new PollingRunnable(secret));
socketPolling = new PollingRunnable(secret);
socketPollingExecutor.submit(socketPolling);

// Give ourselves up to a second to connect, using The World's Worst heuristic. If we don't
// manage to connect, it's not the end of the world, as the socket we're connecting to may not
Expand Down Expand Up @@ -211,9 +214,11 @@ public void fire(Event event) {

@Override
public void close() {
socketPollingExecutor.shutdownNow();
socketPublishingExecutor.shutdownNow();
listenerNotificationExecutor.shutdownNow();
socketPolling.shutdown = true;
ExecutorServices.shutdownGracefully("Event Bus Poller", socketPollingExecutor);
ExecutorServices.shutdownGracefully("Event Bus Publisher", socketPublishingExecutor);
ExecutorServices.shutdownGracefully(
"Event Bus Listener Notifier", listenerNotificationExecutor);
poller.close();

if (sub != null) {
Expand All @@ -226,14 +231,15 @@ public void close() {

private class PollingRunnable implements Runnable {
private final Secret secret;
private volatile boolean shutdown;

public PollingRunnable(Secret secret) {
this.secret = secret;
}

@Override
public void run() {
while (!Thread.currentThread().isInterrupted()) {
while (!Thread.currentThread().isInterrupted() && !shutdown) {
try {
int count = poller.poll(150);

Expand Down

0 comments on commit 9af4e15

Please sign in to comment.