From 9af4e15a59c78eae3434f2a922178ab6a1d02d8b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=B6rg=20Sautter?= Date: Tue, 12 Nov 2024 18:44:49 +0100 Subject: [PATCH] [grid] stop polling events on close --- .../org/openqa/selenium/concurrent/BUILD.bazel | 1 + .../openqa/selenium/events/zeromq/BUILD.bazel | 1 + .../events/zeromq/UnboundZmqEventBus.java | 16 +++++++++++----- 3 files changed, 13 insertions(+), 5 deletions(-) diff --git a/java/src/org/openqa/selenium/concurrent/BUILD.bazel b/java/src/org/openqa/selenium/concurrent/BUILD.bazel index 9c18587dd0eb4..97f4da1ee8119 100644 --- a/java/src/org/openqa/selenium/concurrent/BUILD.bazel +++ b/java/src/org/openqa/selenium/concurrent/BUILD.bazel @@ -4,6 +4,7 @@ java_library( name = "concurrent", srcs = glob(["*.java"]), visibility = [ + "//java/src/org/openqa/selenium/events:__subpackages__", "//java/src/org/openqa/selenium/grid:__subpackages__", "//java/src/org/openqa/selenium/remote:__subpackages__", ], diff --git a/java/src/org/openqa/selenium/events/zeromq/BUILD.bazel b/java/src/org/openqa/selenium/events/zeromq/BUILD.bazel index 6a77020ad0693..fff6405c66483 100644 --- a/java/src/org/openqa/selenium/events/zeromq/BUILD.bazel +++ b/java/src/org/openqa/selenium/events/zeromq/BUILD.bazel @@ -12,6 +12,7 @@ java_library( ], deps = [ "//java/src/org/openqa/selenium:core", + "//java/src/org/openqa/selenium/concurrent", "//java/src/org/openqa/selenium/events", "//java/src/org/openqa/selenium/grid/config", "//java/src/org/openqa/selenium/grid/security", diff --git a/java/src/org/openqa/selenium/events/zeromq/UnboundZmqEventBus.java b/java/src/org/openqa/selenium/events/zeromq/UnboundZmqEventBus.java index 6e68a12e2c76d..582f758121ff5 100644 --- a/java/src/org/openqa/selenium/events/zeromq/UnboundZmqEventBus.java +++ b/java/src/org/openqa/selenium/events/zeromq/UnboundZmqEventBus.java @@ -42,6 +42,7 @@ import java.util.function.Consumer; import java.util.logging.Level; import java.util.logging.Logger; +import org.openqa.selenium.concurrent.ExecutorServices; import org.openqa.selenium.events.Event; import org.openqa.selenium.events.EventBus; import org.openqa.selenium.events.EventListener; @@ -61,6 +62,7 @@ class UnboundZmqEventBus implements EventBus { private static final Logger LOG = Logger.getLogger(EventBus.class.getName()); private static final Json JSON = new Json(); private final AtomicBoolean pollingStarted = new AtomicBoolean(false); + private final PollingRunnable socketPolling; private final ExecutorService socketPollingExecutor; private final ExecutorService socketPublishingExecutor; private final ExecutorService listenerNotificationExecutor; @@ -147,7 +149,8 @@ class UnboundZmqEventBus implements EventBus { LOG.info("Sockets created"); - socketPollingExecutor.submit(new PollingRunnable(secret)); + socketPolling = new PollingRunnable(secret); + socketPollingExecutor.submit(socketPolling); // Give ourselves up to a second to connect, using The World's Worst heuristic. If we don't // manage to connect, it's not the end of the world, as the socket we're connecting to may not @@ -211,9 +214,11 @@ public void fire(Event event) { @Override public void close() { - socketPollingExecutor.shutdownNow(); - socketPublishingExecutor.shutdownNow(); - listenerNotificationExecutor.shutdownNow(); + socketPolling.shutdown = true; + ExecutorServices.shutdownGracefully("Event Bus Poller", socketPollingExecutor); + ExecutorServices.shutdownGracefully("Event Bus Publisher", socketPublishingExecutor); + ExecutorServices.shutdownGracefully( + "Event Bus Listener Notifier", listenerNotificationExecutor); poller.close(); if (sub != null) { @@ -226,6 +231,7 @@ public void close() { private class PollingRunnable implements Runnable { private final Secret secret; + private volatile boolean shutdown; public PollingRunnable(Secret secret) { this.secret = secret; @@ -233,7 +239,7 @@ public PollingRunnable(Secret secret) { @Override public void run() { - while (!Thread.currentThread().isInterrupted()) { + while (!Thread.currentThread().isInterrupted() && !shutdown) { try { int count = poller.poll(150);