Skip to content

Commit

Permalink
fix: prevent logging of RejectedExecutionException
Browse files Browse the repository at this point in the history
closes: #6215

Signed-off-by: Steve Hawkins <shawkins@redhat.com>
(cherry picked from commit 6c24a98)
  • Loading branch information
shawkins authored and manusa committed Aug 9, 2024
1 parent 030cabe commit 01f1cfd
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 24 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#### Bugs
* Fix #6066: Added support for missing `v1.APIVersions` in KubernetesClient
* Fix #6110: VolumeSource (and other file mode fields) in Octal are correctly interpreted
* Fix #6215: Suppressing rejected execution exception for port forwarder

### 6.13.1 (2024-07-02)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BooleanSupplier;

Expand Down Expand Up @@ -75,16 +76,20 @@ public void onOpen(final WebSocket webSocket) {
if (e instanceof InterruptedException) {
Thread.currentThread().interrupt();
}
logger.debug("Error while writing client data");
if (alive.get()) {
clientThrowables.add(e);
closeBothWays(webSocket, 1001, "Client error");
}
clientError(webSocket, "writing client data", e);
}
});
}
}

private void clientError(final WebSocket webSocket, String operation, Exception e) {
if (alive.get()) {
logger.debug("Error while " + operation, e);
clientThrowables.add(e);
closeBothWays(webSocket, 1001, "Client error");
}
}

@Override
public void onMessage(WebSocket webSocket, String text) {
logger.debug("{}: onMessage(String)", LOG_PREFIX);
Expand Down Expand Up @@ -125,27 +130,27 @@ public void onMessage(WebSocket webSocket, ByteBuffer buffer) {
} else {
// Data
if (out != null) {
serialExecutor.execute(() -> {
try {
while (buffer.hasRemaining()) {
int written = out.write(buffer); // channel byte already skipped
if (written == 0) {
// out is non-blocking, prevent a busy loop
Thread.sleep(50);
try {
serialExecutor.execute(() -> {
try {
while (buffer.hasRemaining()) {
int written = out.write(buffer); // channel byte already skipped
if (written == 0) {
// out is non-blocking, prevent a busy loop
Thread.sleep(50);
}
}
webSocket.request();
} catch (IOException | InterruptedException e) {
if (e instanceof InterruptedException) {
Thread.currentThread().interrupt();
}
clientError(webSocket, "forwarding data to the client", e);
}
webSocket.request();
} catch (IOException | InterruptedException e) {
if (e instanceof InterruptedException) {
Thread.currentThread().interrupt();
}
if (alive.get()) {
clientThrowables.add(e);
logger.debug("Error while forwarding data to the client", e);
closeBothWays(webSocket, 1002, PROTOCOL_ERROR);
}
}
});
});
} catch (RejectedExecutionException e) {
// just ignore
}
}
}
}
Expand Down

0 comments on commit 01f1cfd

Please sign in to comment.