From 01f1cfd68025b6882faaa9118ab04f66bfa9a1b8 Mon Sep 17 00:00:00 2001 From: Steven Hawkins Date: Mon, 5 Aug 2024 10:11:46 -0400 Subject: [PATCH] fix: prevent logging of RejectedExecutionException closes: #6215 Signed-off-by: Steve Hawkins (cherry picked from commit 6c24a98a81a2acc905c2c8626326e603865e3131) --- CHANGELOG.md | 1 + .../PortForwarderWebsocketListener.java | 53 ++++++++++--------- 2 files changed, 30 insertions(+), 24 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 82a6ca225e3..24086e950cb 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,7 @@ #### Bugs * Fix #6066: Added support for missing `v1.APIVersions` in KubernetesClient * Fix #6110: VolumeSource (and other file mode fields) in Octal are correctly interpreted +* Fix #6215: Suppressing rejected execution exception for port forwarder ### 6.13.1 (2024-07-02) diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/PortForwarderWebsocketListener.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/PortForwarderWebsocketListener.java index 2ea4867e793..3a4a5e347bc 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/PortForwarderWebsocketListener.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/PortForwarderWebsocketListener.java @@ -32,6 +32,7 @@ import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.BooleanSupplier; @@ -75,16 +76,20 @@ public void onOpen(final WebSocket webSocket) { if (e instanceof InterruptedException) { Thread.currentThread().interrupt(); } - logger.debug("Error while writing client data"); - if (alive.get()) { - clientThrowables.add(e); - closeBothWays(webSocket, 1001, "Client error"); - } + clientError(webSocket, "writing client data", e); } }); } } + private void clientError(final WebSocket webSocket, String operation, Exception e) { + if (alive.get()) { + logger.debug("Error while " + operation, e); + clientThrowables.add(e); + closeBothWays(webSocket, 1001, "Client error"); + } + } + @Override public void onMessage(WebSocket webSocket, String text) { logger.debug("{}: onMessage(String)", LOG_PREFIX); @@ -125,27 +130,27 @@ public void onMessage(WebSocket webSocket, ByteBuffer buffer) { } else { // Data if (out != null) { - serialExecutor.execute(() -> { - try { - while (buffer.hasRemaining()) { - int written = out.write(buffer); // channel byte already skipped - if (written == 0) { - // out is non-blocking, prevent a busy loop - Thread.sleep(50); + try { + serialExecutor.execute(() -> { + try { + while (buffer.hasRemaining()) { + int written = out.write(buffer); // channel byte already skipped + if (written == 0) { + // out is non-blocking, prevent a busy loop + Thread.sleep(50); + } } + webSocket.request(); + } catch (IOException | InterruptedException e) { + if (e instanceof InterruptedException) { + Thread.currentThread().interrupt(); + } + clientError(webSocket, "forwarding data to the client", e); } - webSocket.request(); - } catch (IOException | InterruptedException e) { - if (e instanceof InterruptedException) { - Thread.currentThread().interrupt(); - } - if (alive.get()) { - clientThrowables.add(e); - logger.debug("Error while forwarding data to the client", e); - closeBothWays(webSocket, 1002, PROTOCOL_ERROR); - } - } - }); + }); + } catch (RejectedExecutionException e) { + // just ignore + } } } }