diff --git a/CHANGELOG.md b/CHANGELOG.md index c6df0109d06..11bf25f2939 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,6 +3,7 @@ ### 6.4-SNAPSHOT #### Bugs +* Fix #4249 #4726: prevent the over-logging of errors after the websocket has been closed * Fix #4650: allowing for comments at the end of certificate files #### Improvements diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/ExecWebSocketListener.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/ExecWebSocketListener.java index fb127ac8c7f..af710eb2dcc 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/ExecWebSocketListener.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/ExecWebSocketListener.java @@ -201,7 +201,7 @@ private void asyncWrite(WritableByteChannel channel, ByteBuffer b) { if (closed.get()) { LOGGER.debug("Stream write failed after close", t); } else { - // This could happen if the user simply closes their stream prior to completion + // This could happen if the user simply closes their stream prior to completion LOGGER.warn("Stream write failed", t); } } @@ -211,7 +211,7 @@ private void asyncWrite(WritableByteChannel channel, ByteBuffer b) { @Override public void close() { // simply sends a close, which will shut down the output - // it's expected that the server will respond with a close, but if not the input will be shutdown implicitly + // it's expected that the server will respond with a close, but if not the input will be shutdown implicitly closeWebSocketOnce(1000, "Closing..."); } @@ -280,18 +280,22 @@ public void onError(WebSocket webSocket, Throwable t) { status.setMessage(t.getMessage()); cleanUpOnce(); } finally { - try { - if (listener != null) { - ExecListener.Response execResponse = null; - if (response != null) { - execResponse = new SimpleResponse(response); + if (exitCode.isDone()) { + LOGGER.debug("Exec failure after done", t); + } else { + try { + if (listener != null) { + ExecListener.Response execResponse = null; + if (response != null) { + execResponse = new SimpleResponse(response); + } + listener.onFailure(t, execResponse); + } else { + LOGGER.error("Exec Failure", t); } - listener.onFailure(t, execResponse); - } else { - LOGGER.error("Exec Failure", t); + } finally { + exitCode.completeExceptionally(t); } - } finally { - exitCode.completeExceptionally(t); } } } diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/PortForwarderWebsocket.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/PortForwarderWebsocket.java index 32e4714f843..13cdb61ead0 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/PortForwarderWebsocket.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/PortForwarderWebsocket.java @@ -187,7 +187,7 @@ public void close() { socket.cancel(true); socket.whenComplete((w, t) -> { if (w != null) { - w.sendClose(1001, "User closing"); + listener.closeBothWays(w, 1001, "User closing"); } }); } diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/PortForwarderWebsocketListener.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/PortForwarderWebsocketListener.java index 404ef238a22..00181da2965 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/PortForwarderWebsocketListener.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/PortForwarderWebsocketListener.java @@ -15,7 +15,9 @@ */ package io.fabric8.kubernetes.client.dsl.internal; +import io.fabric8.kubernetes.client.KubernetesClientException; import io.fabric8.kubernetes.client.http.WebSocket; +import io.fabric8.kubernetes.client.utils.Utils; import io.fabric8.kubernetes.client.utils.internal.SerialExecutor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -46,8 +48,6 @@ public class PortForwarderWebsocketListener implements WebSocket.Listener { private final AtomicBoolean alive = new AtomicBoolean(true); - private final AtomicBoolean errorOccurred = new AtomicBoolean(false); - final Collection clientThrowables = new CopyOnWriteArrayList<>(); final Collection serverThrowables = new CopyOnWriteArrayList<>(); @@ -75,9 +75,9 @@ public void onOpen(final WebSocket webSocket) { if (e instanceof InterruptedException) { Thread.currentThread().interrupt(); } + logger.debug("Error while writing client data"); if (alive.get()) { clientThrowables.add(e); - logger.error("Error while writing client data"); closeBothWays(webSocket, 1001, "Client error"); } } @@ -101,21 +101,27 @@ public void onMessage(WebSocket webSocket, ByteBuffer buffer) { } if (!buffer.hasRemaining()) { - errorOccurred.set(true); - logger.error("Received an empty message"); + KubernetesClientException e = new KubernetesClientException("Received an empty message"); + serverThrowables.add(e); + logger.debug("Protocol error", e); closeBothWays(webSocket, 1002, PROTOCOL_ERROR); return; } byte channel = buffer.get(); if (channel < 0 || channel > 1) { - errorOccurred.set(true); - logger.error("Received a wrong channel from the remote socket: {}", channel); + KubernetesClientException e = new KubernetesClientException( + String.format("Received a wrong channel from the remote socket: %s", channel)); + serverThrowables.add(e); + logger.debug("Protocol error", e); closeBothWays(webSocket, 1002, PROTOCOL_ERROR); } else if (channel == 1) { // Error channel - errorOccurred.set(true); - logger.error("Received an error from the remote socket"); + // TODO: read the error + KubernetesClientException e = new KubernetesClientException( + String.format("Received an error from the remote socket")); + serverThrowables.add(e); + logger.debug("Server error", e); closeForwarder(); } else { // Data @@ -136,7 +142,7 @@ public void onMessage(WebSocket webSocket, ByteBuffer buffer) { } if (alive.get()) { clientThrowables.add(e); - logger.error("Error while forwarding data to the client", e); + logger.debug("Error while forwarding data to the client", e); closeBothWays(webSocket, 1002, PROTOCOL_ERROR); } } @@ -155,10 +161,9 @@ public void onClose(WebSocket webSocket, int code, String reason) { @Override public void onError(WebSocket webSocket, Throwable t) { - logger.debug("{}: onFailure", LOG_PREFIX); + logger.debug("{}: Throwable received from websocket", LOG_PREFIX, t); if (alive.get()) { serverThrowables.add(t); - logger.error("{}: Throwable received from websocket", LOG_PREFIX, t); closeForwarder(); } } @@ -168,7 +173,7 @@ boolean isAlive() { } boolean errorOccurred() { - return errorOccurred.get() || !clientThrowables.isEmpty() || !serverThrowables.isEmpty(); + return !clientThrowables.isEmpty() || !serverThrowables.isEmpty(); } Collection getClientThrowables() { @@ -179,14 +184,14 @@ Collection getServerThrowables() { return serverThrowables; } - private void closeBothWays(WebSocket webSocket, int code, String message) { + void closeBothWays(WebSocket webSocket, int code, String message) { logger.debug("{}: Closing with code {} and reason: {}", LOG_PREFIX, code, message); alive.set(false); try { webSocket.sendClose(code, message); } catch (Exception e) { serverThrowables.add(e); - logger.error("Error while closing the websocket", e); + logger.debug("Error while closing the websocket", e); } closeForwarder(); } @@ -194,18 +199,10 @@ private void closeBothWays(WebSocket webSocket, int code, String message) { private void closeForwarder() { alive.set(false); if (in != null) { - try { - in.close(); - } catch (IOException e) { - logger.error("{}: Error while closing the client input channel", LOG_PREFIX, e); - } + Utils.closeQuietly(in); } if (out != null && out != in) { - try { - out.close(); - } catch (IOException e) { - logger.error("{}: Error while closing the client output channel", LOG_PREFIX, e); - } + Utils.closeQuietly(out); } pumperService.shutdownNow(); serialExecutor.shutdownNow(); @@ -228,4 +225,5 @@ private static void pipe(ReadableByteChannel in, WebSocket webSocket, BooleanSup } } while (isAlive.getAsBoolean() && read >= 0); } + } diff --git a/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/dsl/internal/PortForwarderWebsocketListenerTest.java b/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/dsl/internal/PortForwarderWebsocketListenerTest.java index dadc0f2934f..badd646991a 100644 --- a/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/dsl/internal/PortForwarderWebsocketListenerTest.java +++ b/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/dsl/internal/PortForwarderWebsocketListenerTest.java @@ -161,7 +161,7 @@ void onMessage_withEmptyMessage_shouldEndWithError() { verify(webSocket, timeout(10_000)).sendClose(1002, "Protocol error"); assertThat(outputContent.toString()).isEmpty(); assertThat(listener.errorOccurred()).isTrue(); - assertThat(listener.getServerThrowables()).isEmpty(); + assertThat(listener.getServerThrowables()).isNotEmpty(); assertThat(in.isOpen()).isFalse(); assertThat(out.isOpen()).isFalse(); } @@ -210,7 +210,8 @@ void onMessage_withWrongChannel_shouldLogAndEndWithError() { verify(webSocket, timeout(10_000)).sendClose(1002, "Protocol error"); assertThat(outputContent.toString()).isEmpty(); assertThat(listener.errorOccurred()).isTrue(); - verify(logger).error("Received a wrong channel from the remote socket: {}", (byte) 5); + assertThat(listener.getServerThrowables().iterator().next().getMessage()) + .isEqualTo("Received a wrong channel from the remote socket: 5"); } }