From d0877a98ab3011dbead5dcfa9ee905f2a820d719 Mon Sep 17 00:00:00 2001 From: Steve Hawkins Date: Mon, 2 May 2022 16:13:49 -0400 Subject: [PATCH] fix #4112: cleaning up exec streams removing usage of piped streams adding backpressure support adding javadocs to stream methods exposing terminate on error also updating the exec examples --- CHANGELOG.md | 1 + doc/MIGRATION-v6.md | 17 +- .../client/jdkhttp/JdkWebSocketImpl.java | 11 +- .../client/okhttp/OkHttpWebSocketImpl.java | 57 +++- .../client/http/OkHttpClientTest.java | 34 +++ .../client/dsl/ContainerResource.java | 26 +- .../kubernetes/client/dsl/ExecListener.java | 34 ++- .../kubernetes/client/dsl/ExecWatch.java | 14 +- .../kubernetes/client/dsl/LogWatch.java | 23 +- .../kubernetes/client/dsl/Loggable.java | 5 +- .../client/dsl/TtyExecErrorChannelable.java | 18 +- .../client/dsl/TtyExecErrorable.java | 19 +- .../client/dsl/TtyExecOutputErrorable.java | 15 +- .../kubernetes/client/http/WebSocket.java | 8 + .../client/utils/InputStreamPumper.java | 8 +- .../dsl/internal/ExecWatchInputStream.java | 153 +++++++++++ .../dsl/internal/ExecWebSocketListener.java | 247 +++++++++--------- .../client/dsl/internal/LogWatchCallback.java | 36 +-- .../dsl/internal/PodOperationContext.java | 65 ++--- .../dsl/internal/PortForwarderWebsocket.java | 44 ++-- .../internal/WatcherWebSocketListener.java | 1 + .../internal/core/v1/PodOperationsImpl.java | 132 ++++------ .../dsl/internal/uploadable/PodUpload.java | 19 +- .../internal/ExecWatchInputStreamTest.java | 88 +++++++ .../internal/ExecWebSocketListenerTest.java | 3 +- .../kubernetes/examples/ExecExample.java | 64 +++-- .../examples/ExecExampleWithTerminalSize.java | 93 ++++--- .../kubernetes/examples/ExecPipesExample.java | 64 ----- .../kubernetes/examples/ExecStdInExample.java | 63 +++++ .../java/io/fabric8/kubernetes/PodIT.java | 11 +- .../kubernetes/client/mock/PodTest.java | 30 +++ 31 files changed, 902 insertions(+), 501 deletions(-) create mode 100644 kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/ExecWatchInputStream.java create mode 100644 kubernetes-client/src/test/java/io/fabric8/kubernetes/client/dsl/internal/ExecWatchInputStreamTest.java delete mode 100644 kubernetes-examples/src/main/java/io/fabric8/kubernetes/examples/ExecPipesExample.java create mode 100644 kubernetes-examples/src/main/java/io/fabric8/kubernetes/examples/ExecStdInExample.java diff --git a/CHANGELOG.md b/CHANGELOG.md index f14019f8ccd..c10dedd8e34 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -35,6 +35,7 @@ And Store.getKey can be used rather than directly referencing static Cache funct There is also client.resourceList(...).resources() and client.configMaps().resources() - that will provide a Resource stream. This allows you to implement composite operations easily with lambda: client.secrets().resources().forEach(r -> r.delete()); * Fix #3472 #3587: Allowing for customization of the Informer store/cache key function and how state is stored. See BasicItemStore and ReducedStateItemStore and the SharedIndexInformer.itemStore function. +* Fix #4112: Added TtyExecErrorable.terminateOnError to produce an exceptional outcome to the exitCode when a message is seen on stdErr. * Fix #3854: Camel-K: Missing method for manipulating KameletBindings #### _**Note**_: Breaking changes in the API diff --git a/doc/MIGRATION-v6.md b/doc/MIGRATION-v6.md index ba6764c089b..d2eb06956f4 100644 --- a/doc/MIGRATION-v6.md +++ b/doc/MIGRATION-v6.md @@ -17,6 +17,7 @@ - [DSL Interface Changes](#dsl-interface-changes) - [evict Changes](#evict-changes) - [Delete Behavior](#delete-behavior) +- [Stream Changes](#stream-changes) ## Backwards Compatibility Interceptor @@ -259,7 +260,17 @@ Evictable.evict will throw an exception rather than returning false if the pod i Deleting a collection is now implemented using a single delete call, rather than for each item. When the collection is namespaced and inAnyNamespace is used, in which case a call will be made to first determine the affected namespaces, and then a collection delete issued against each namespace. -The result of the delete calls will be a List of StatusDetails rather than a boolean value. A best -effort is made to process the response from the server to populate which items are deleted. This information is generally useful if you wish to implement some kind of blocking delete behavior - that is ensure the returned resources based upon a matching uid have been deleted. +The result of the delete calls will be a List of StatusDetails rather than a boolean value. A best effort is made to process the response from the server to populate which items are deleted. This information is generally useful if you wish to implement some kind of blocking delete behavior - that is ensure the returned resources based upon a matching uid have been deleted. -/ list will always return true and 404s on individual items will simply be ignored. +delete(List) and delete(T[]) returning boolean have been deprecated. They will always return true and 404s on individual items will simply be ignored. + +## Stream Changes + +The usage of Piped streams is no longer supported - they make assumptions about reading and writing threads, which the client no longer honors. They should not be passed into +the methods accepting InputStreams and OutputStreams. + +ContainerResource.writingInput(PipedOutputStream in) and readingXXX(PipedInputStream out) have been removed - use the redirecting methods instead. + +TtyExecErrorChannelable methods have been deprecated in favor of ExecWatch.exitCode and ExecListener.onExit. + +ContainerResource.readingInput(InputStream in) has been deprecated - use redirectingInput instead. \ No newline at end of file diff --git a/httpclient-jdk/src/main/java/io/fabric8/kubernetes/client/jdkhttp/JdkWebSocketImpl.java b/httpclient-jdk/src/main/java/io/fabric8/kubernetes/client/jdkhttp/JdkWebSocketImpl.java index 326ec21dd0d..b925a8f861e 100644 --- a/httpclient-jdk/src/main/java/io/fabric8/kubernetes/client/jdkhttp/JdkWebSocketImpl.java +++ b/httpclient-jdk/src/main/java/io/fabric8/kubernetes/client/jdkhttp/JdkWebSocketImpl.java @@ -106,11 +106,12 @@ public CompletionStage onBinary(java.net.http.WebSocket webSocket, ByteBuffer } catch (IOException e) { throw new RuntimeException(e); } - webSocket.request(1); if (last) { ByteBuffer value = ByteBuffer.wrap(byteArrayOutputStream.toByteArray()); byteArrayOutputStream.reset(); listener.onMessage(new JdkWebSocketImpl(queueSize, webSocket), value); + } else { + webSocket.request(1); } return null; } @@ -118,11 +119,12 @@ public CompletionStage onBinary(java.net.http.WebSocket webSocket, ByteBuffer @Override public CompletionStage onText(java.net.http.WebSocket webSocket, CharSequence data, boolean last) { stringBuilder.append(data); - webSocket.request(1); if (last) { String value = stringBuilder.toString(); stringBuilder.setLength(0); listener.onMessage(new JdkWebSocketImpl(queueSize, webSocket), value); + } else { + webSocket.request(1); } return null; } @@ -188,4 +190,9 @@ public long queueSize() { return queueSize.get(); } + @Override + public void request() { + this.webSocket.request(1); + } + } diff --git a/httpclient-okhttp/src/main/java/io/fabric8/kubernetes/client/okhttp/OkHttpWebSocketImpl.java b/httpclient-okhttp/src/main/java/io/fabric8/kubernetes/client/okhttp/OkHttpWebSocketImpl.java index 52ccefea3bd..f6b87c334a3 100644 --- a/httpclient-okhttp/src/main/java/io/fabric8/kubernetes/client/okhttp/OkHttpWebSocketImpl.java +++ b/httpclient-okhttp/src/main/java/io/fabric8/kubernetes/client/okhttp/OkHttpWebSocketImpl.java @@ -16,6 +16,7 @@ package io.fabric8.kubernetes.client.okhttp; +import io.fabric8.kubernetes.client.KubernetesClientException; import io.fabric8.kubernetes.client.http.WebSocket; import io.fabric8.kubernetes.client.http.WebSocketHandshakeException; import io.fabric8.kubernetes.client.okhttp.OkHttpClientImpl.OkHttpResponseImpl; @@ -30,6 +31,8 @@ import java.net.URI; import java.nio.ByteBuffer; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; class OkHttpWebSocketImpl implements WebSocket { @@ -54,6 +57,9 @@ public CompletableFuture buildAsync(Listener listener) { CompletableFuture future = new CompletableFuture<>(); httpClient.newWebSocket(request, new WebSocketListener() { private volatile boolean opened; + private boolean more = true; + private ReentrantLock lock = new ReentrantLock(); + private Condition moreRequested = lock.newCondition(); @Override public void onFailure(okhttp3.WebSocket webSocket, Throwable t, Response response) { @@ -72,7 +78,7 @@ public void onFailure(okhttp3.WebSocket webSocket, Throwable t, Response respons future.completeExceptionally(t); } } else { - listener.onError(new OkHttpWebSocketImpl(webSocket), t); + listener.onError(new OkHttpWebSocketImpl(webSocket, this::request), t); } } @@ -82,24 +88,56 @@ public void onOpen(okhttp3.WebSocket webSocket, Response response) { if (response != null) { response.close(); } - OkHttpWebSocketImpl value = new OkHttpWebSocketImpl(webSocket); + OkHttpWebSocketImpl value = new OkHttpWebSocketImpl(webSocket, this::request); listener.onOpen(value); future.complete(value); } @Override public void onMessage(okhttp3.WebSocket webSocket, ByteString bytes) { - listener.onMessage(new OkHttpWebSocketImpl(webSocket), bytes.asByteBuffer()); + awaitMoreRequest(); + listener.onMessage(new OkHttpWebSocketImpl(webSocket, this::request), bytes.asByteBuffer()); } @Override public void onMessage(okhttp3.WebSocket webSocket, String text) { - listener.onMessage(new OkHttpWebSocketImpl(webSocket), text); + awaitMoreRequest(); + listener.onMessage(new OkHttpWebSocketImpl(webSocket, this::request), text); + } + + /** + * To back pressure OkHttp, we need to hold the socket processing thread before + * it delivers more results than expected + */ + private void awaitMoreRequest() { + lock.lock(); + try { + while (!more) { + moreRequested.await(); + } + more = false; + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw KubernetesClientException.launderThrowable(e); + } finally { + lock.unlock(); + } + } + + private void request() { + lock.lock(); + try { + more = true; + moreRequested.signalAll(); + } finally { + lock.unlock(); + } } @Override public void onClosing(okhttp3.WebSocket webSocket, int code, String reason) { - listener.onClose(new OkHttpWebSocketImpl(webSocket), code, reason); + awaitMoreRequest(); + listener.onClose(new OkHttpWebSocketImpl(webSocket, this::request), code, reason); } }); @@ -127,9 +165,11 @@ public Builder subprotocol(String protocol) { } private okhttp3.WebSocket webSocket; + private Runnable requestMethod; - public OkHttpWebSocketImpl(okhttp3.WebSocket webSocket) { + public OkHttpWebSocketImpl(okhttp3.WebSocket webSocket, Runnable requestMethod) { this.webSocket = webSocket; + this.requestMethod = requestMethod; } @Override @@ -147,4 +187,9 @@ public long queueSize() { return webSocket.queueSize(); } + @Override + public void request() { + requestMethod.run(); + } + } \ No newline at end of file diff --git a/httpclient-tests/src/test/java/io/fabric8/kubernetes/client/http/OkHttpClientTest.java b/httpclient-tests/src/test/java/io/fabric8/kubernetes/client/http/OkHttpClientTest.java index 26705d4a07f..463fb900c17 100644 --- a/httpclient-tests/src/test/java/io/fabric8/kubernetes/client/http/OkHttpClientTest.java +++ b/httpclient-tests/src/test/java/io/fabric8/kubernetes/client/http/OkHttpClientTest.java @@ -32,10 +32,12 @@ import java.nio.ByteBuffer; import java.util.List; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.fail; @@ -116,6 +118,38 @@ public void onOpen(WebSocket webSocket) { startedFuture.get(10, TimeUnit.SECONDS); } + @Test + void testRequest() throws Exception { + server.expect().withPath("/foo") + .andUpgradeToWebSocket() + .open() + .waitFor(0) + .andEmit("hello") + .waitFor(0) + .andEmit("world") + .done().always(); + + CountDownLatch latch = new CountDownLatch(2); + + CompletableFuture startedFuture = client.getHttpClient().newWebSocketBuilder() + .uri(URI.create(client.getConfiguration().getMasterUrl() + "foo")) + .buildAsync(new Listener() { + + @Override + public void onMessage(WebSocket webSocket, String text) { + latch.countDown(); + } + + }); + + assertFalse(latch.await(2, TimeUnit.SECONDS)); + assertEquals(1, latch.getCount()); + + startedFuture.get().request(); + + assertTrue(latch.await(1, TimeUnit.SECONDS)); + } + @Test void testAsyncBody() throws Exception { server.expect().withPath("/async").andReturn(200, "hello world").always(); diff --git a/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/dsl/ContainerResource.java b/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/dsl/ContainerResource.java index 7032fc927d4..0156b360837 100644 --- a/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/dsl/ContainerResource.java +++ b/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/dsl/ContainerResource.java @@ -15,19 +15,39 @@ */ package io.fabric8.kubernetes.client.dsl; +import io.fabric8.kubernetes.client.utils.InputStreamPumper; + import java.io.InputStream; -import java.io.PipedOutputStream; +import java.io.OutputStream; public interface ContainerResource extends TtyExecOutputErrorable, TimestampBytesLimitTerminateTimeTailPrettyLoggable { + /** + * Will send the given input stream via a polling mechanism. + * + * @deprecated use redirectingInput and the resulting {@link ExecWatch#getOutput()} with + * InputStream#transferTo(java.io.OutputStream) on JDK 9+ + * or + * {@link InputStreamPumper#transferTo(InputStream, io.fabric8.kubernetes.client.utils.InputStreamPumper.Writable)} + * @param in the {@link InputStream} + */ + @Deprecated TtyExecOutputErrorable readingInput(InputStream in); - TtyExecOutputErrorable writingInput(PipedOutputStream in); - + /** + * Will provide an {@link OutputStream} via {@link ExecWatch#getInput()} with the + * default buffer size. + */ TtyExecOutputErrorable redirectingInput(); + /** + * Will provide an {@link OutputStream} via {@link ExecWatch#getInput()} with the + * given buffer size. + * + * @param bufferSize if null will use the default + */ TtyExecOutputErrorable redirectingInput(Integer bufferSize); CopyOrReadable file(String path); diff --git a/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/dsl/ExecListener.java b/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/dsl/ExecListener.java index 603d5efa87a..e83d5ea671a 100644 --- a/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/dsl/ExecListener.java +++ b/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/dsl/ExecListener.java @@ -27,6 +27,7 @@ public interface Response { /** * May be null if not provided by the underlying implementation. + * * @return the body as a String * @throws IOException */ @@ -37,7 +38,8 @@ public interface Response { /** * Called when the request has successfully been upgraded to a web socket. */ - default void onOpen() {} + default void onOpen() { + } /** * Called when the transport or protocol layer of this web socket errors during communication. @@ -45,23 +47,29 @@ default void onOpen() {} * @param t Throwable * @param failureResponse non-null if the failure is caused by the handshake */ - default void onFailure(Throwable t, Response failureResponse) {} + default void onFailure(Throwable t, Response failureResponse) { + } - /** - * Called when the server sends a close message. - * - * @param code The RFC-compliant - * status code. - * @param reason Reason for close or an empty string. - */ - void onClose(int code, String reason); + /** + * Called when the server sends a close message. + * + * @param code The RFC-compliant + * status code. + * @param reason Reason for close or an empty string. + */ + void onClose(int code, String reason); /** * Called after a Status message is seen on channel 3. - * - * Use {@link ExecWatch#getErrorChannel()} if you need the raw channel 3 contents. + *

+ * See https://github.com/kubernetes/kubernetes/issues/89899 - which explains there's currently no way to indicate + * end of input over a websocket, so you may not get an exit code when using stdIn. + *

+ * See also {@link ExecWatch#exitCode()} + * * @param code the exit code, -1 will be used if the code cannot be determined * @param status may be null if no valid status was received */ - default void onExit(int code, Status status) {} + default void onExit(int code, Status status) { + } } diff --git a/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/dsl/ExecWatch.java b/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/dsl/ExecWatch.java index 07786daf42d..a594af2c5a1 100644 --- a/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/dsl/ExecWatch.java +++ b/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/dsl/ExecWatch.java @@ -23,21 +23,24 @@ public interface ExecWatch extends Closeable { /** - * Gets the {@link OutputStream} for stdIn if one is associated + * Gets the {@link OutputStream} for stdIn if {@link ContainerResource#redirectingInput()} has been called. + *

+ * Closing this stream does not immediately force sending. You will typically call {@link #close()} after + * you are finished writing - the close message will not be sent until all pending messages have been sent. * * @return the stdIn stream */ OutputStream getInput(); /** - * Gets the {@link InputStream} for stdOut if one is associated + * Gets the {@link InputStream} for stdOut if {@link TtyExecOutputErrorable#redirectingOutput()} has been called. * * @return the stdErr stream */ InputStream getOutput(); /** - * Gets the {@link InputStream} for stdErr if one is associated + * Gets the {@link InputStream} for stdErr if {@link TtyExecErrorable#redirectingError()} has been called. * * @return the stdErr stream */ @@ -55,7 +58,7 @@ public interface ExecWatch extends Closeable { InputStream getErrorChannel(); /** - * Gracefully close the Watch. + * Gracefully close the Watch - the close message will not be sent until all pending messages have been sent. */ @Override void close(); @@ -66,6 +69,9 @@ public interface ExecWatch extends Closeable { * Get a future that will be completed with the exit code. *

* Will be -1 if the exit code can't be determined from the status, or null if close is received before the exit code. + *
+ * See https://github.com/kubernetes/kubernetes/issues/89899 - which explains there's currently no way to indicate + * end of input over a websocket, so you may not get an exit code when using stdIn. *

* Can be used as an alternative to * {@link ExecListener#onFailure(Throwable, io.fabric8.kubernetes.client.dsl.ExecListener.Response)} diff --git a/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/dsl/LogWatch.java b/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/dsl/LogWatch.java index 87493d68f43..21d7ff368ae 100644 --- a/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/dsl/LogWatch.java +++ b/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/dsl/LogWatch.java @@ -21,16 +21,17 @@ public interface LogWatch extends Closeable { - /** - * Returns the {@link InputStream} for the log watch. - * If an {@link OutputStream} was passed in, will be null - * @return the {@link InputStream} - */ - InputStream getOutput(); + /** + * Returns the {@link InputStream} for the log watch. + * If an {@link OutputStream} was passed in, will be null + * + * @return the {@link InputStream} which must be read completely or closed + */ + InputStream getOutput(); - /** - * Close the Watch. - */ - @Override - void close(); + /** + * Close the Watch. + */ + @Override + void close(); } diff --git a/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/dsl/Loggable.java b/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/dsl/Loggable.java index 20ccc439fca..e7940aba6b7 100644 --- a/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/dsl/Loggable.java +++ b/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/dsl/Loggable.java @@ -17,6 +17,7 @@ package io.fabric8.kubernetes.client.dsl; import java.io.OutputStream; +import java.io.PipedOutputStream; import java.io.Reader; /** @@ -47,7 +48,7 @@ public interface Loggable { Reader getLogReader(); /** - * Watch logs of a resource + * Watch logs of a resource. Use {@link LogWatch#getOutput()} to obtain the stream * * @return returns a Closeable interface for log watch */ @@ -56,7 +57,7 @@ public interface Loggable { /** * Watch logs of resource and put them inside OutputStream inside *
- * if the OutputStream is a PipedOutputStream, it will be closed when the Watch terminates + * Should not be called with a {@link PipedOutputStream} - use {@link #watchLog()} instead * * @param out {@link OutputStream} for storing logs * @return returns a Closeable interface for log watch diff --git a/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/dsl/TtyExecErrorChannelable.java b/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/dsl/TtyExecErrorChannelable.java index b2e606395f2..1ea23b27f8c 100644 --- a/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/dsl/TtyExecErrorChannelable.java +++ b/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/dsl/TtyExecErrorChannelable.java @@ -15,15 +15,27 @@ */ package io.fabric8.kubernetes.client.dsl; +import java.io.InputStream; import java.io.OutputStream; -import java.io.PipedInputStream; public interface TtyExecErrorChannelable extends TtyExecable { + /** + * @deprecated use {@link ExecListener#onExit(int, io.fabric8.kubernetes.api.model.Status)} + * or {@link ExecWatch#exitCode()} + */ + @Deprecated TtyExecable writingErrorChannel(OutputStream in); - TtyExecable readingErrorChannel(PipedInputStream in); - + /** + * Will provide an {@link InputStream} via {@link ExecWatch#getErrorChannel()} + *

+ * WARNING: the resulting stream must be fully read or closed for other events to be processed properly + * + * @deprecated use {@link ExecListener#onExit(int, io.fabric8.kubernetes.api.model.Status)} + * or {@link ExecWatch#exitCode()} + */ + @Deprecated TtyExecable redirectingErrorChannel(); } diff --git a/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/dsl/TtyExecErrorable.java b/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/dsl/TtyExecErrorable.java index c02ffd4a1b9..bab2f620937 100644 --- a/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/dsl/TtyExecErrorable.java +++ b/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/dsl/TtyExecErrorable.java @@ -15,16 +15,31 @@ */ package io.fabric8.kubernetes.client.dsl; +import java.io.InputStream; import java.io.OutputStream; -import java.io.PipedInputStream; +import java.io.PipedOutputStream; public interface TtyExecErrorable extends TtyExecErrorChannelable { + /** + * Should only be called with a minimally blocking or non-blocking stream + *

+ * In particular do no use a {@link PipedOutputStream} - use {@link #redirectingError()} instead + */ TtyExecErrorChannelable writingError(OutputStream in); - TtyExecErrorChannelable readingError(PipedInputStream in); + /** + * If the {@link ExecWatch} should terminate when a stdErr message is received. + * The message will be provided as an exceptional outcome of {@link ExecWatch#exitCode()} + */ + TtyExecErrorChannelable terminateOnError(); + /** + * Will provide an {@link InputStream} via {@link ExecWatch#getError()} + *

+ * WARNING: the resulting stream must be fully read or closed for other events to be processed properly + */ TtyExecErrorChannelable redirectingError(); } diff --git a/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/dsl/TtyExecOutputErrorable.java b/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/dsl/TtyExecOutputErrorable.java index b72c0975037..cdf9d0cd3ad 100644 --- a/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/dsl/TtyExecOutputErrorable.java +++ b/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/dsl/TtyExecOutputErrorable.java @@ -15,15 +15,24 @@ */ package io.fabric8.kubernetes.client.dsl; +import java.io.InputStream; import java.io.OutputStream; -import java.io.PipedInputStream; +import java.io.PipedOutputStream; public interface TtyExecOutputErrorable extends TtyExecErrorable { + /** + * Should only be called with a minimally blocking or non-blocking stream + *

+ * In particular do no use a {@link PipedOutputStream} - use {@link #redirectingOutput()} instead + */ TtyExecErrorable writingOutput(OutputStream in); - TtyExecErrorable readingOutput(PipedInputStream in); - + /** + * Will provide an {@link InputStream} via {@link ExecWatch#getOutput()} + *

+ * WARNING: the resulting stream must be fully read or closed for other events to be processed properly + */ TtyExecErrorable redirectingOutput(); } diff --git a/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/http/WebSocket.java b/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/http/WebSocket.java index c67b7626282..dbb8ef41ea2 100644 --- a/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/http/WebSocket.java +++ b/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/http/WebSocket.java @@ -88,4 +88,12 @@ interface Builder extends BasicBuilder { */ long queueSize(); + /** + * Used to receive more onMessage or {@link Listener#onClose(WebSocket, int, String)} events after the initial message is + * received + *

+ * request is implicitly called by {@link Listener#onOpen(WebSocket)} + */ + void request(); + } diff --git a/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/utils/InputStreamPumper.java b/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/utils/InputStreamPumper.java index 7e1dc43073f..b2c79545d08 100644 --- a/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/utils/InputStreamPumper.java +++ b/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/utils/InputStreamPumper.java @@ -28,6 +28,8 @@ public class InputStreamPumper { + private static final int DEFAULT_BUFFER_SIZE = 8192; + private InputStreamPumper() { } @@ -74,7 +76,7 @@ public int read(byte[] b, int off, int len) throws IOException { * See InputStream.transferTo(java.io.OutputStream) in Java 9 or later */ public static void transferTo(InputStream in, Writable out) throws IOException { - byte[] buffer = new byte[8192]; + byte[] buffer = new byte[DEFAULT_BUFFER_SIZE]; int length; while ((length = in.read(buffer, 0, buffer.length)) != -1) { out.write(buffer, 0, length); @@ -125,8 +127,8 @@ public void write(int b) throws IOException { } - public static OutputStream writableOutputStream(Writable writer) { - return new BufferedOutputStream(new WritableOutputStream(writer)); + public static OutputStream writableOutputStream(Writable writer, Integer bufferSize) { + return new BufferedOutputStream(new WritableOutputStream(writer), Utils.getNonNullOrElse(bufferSize, DEFAULT_BUFFER_SIZE)); } } diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/ExecWatchInputStream.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/ExecWatchInputStream.java new file mode 100644 index 00000000000..52506cf65de --- /dev/null +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/ExecWatchInputStream.java @@ -0,0 +1,153 @@ +/** + * Copyright (C) 2015 Red Hat, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.fabric8.kubernetes.client.dsl.internal; + +import java.io.IOException; +import java.io.InputStream; +import java.io.InterruptedIOException; +import java.nio.ByteBuffer; +import java.util.LinkedList; +import java.util.List; + +/** + * Provides an InputStream that is non-blocking to the producer + * and that will request more input when needed. + */ +public class ExecWatchInputStream extends InputStream { + + private final LinkedList buffers = new LinkedList<>(); + private boolean complete; + private boolean closed; + private Throwable failed; + private ByteBuffer currentBuffer; + + private Runnable request; + + public ExecWatchInputStream(Runnable request) { + this.request = request; + } + + void onExit(Integer exitCode, Throwable t) { + synchronized (buffers) { + if (complete) { + return; + } + complete = true; + if (t != null) { + failed = t; + } + buffers.notifyAll(); + } + } + + void consume(List value) { + synchronized (buffers) { + if (closed) { + // even if closed there may be other streams + // so keep pulling + request.run(); + return; + } + buffers.addAll(value); + buffers.notifyAll(); + } + } + + private ByteBuffer current() throws IOException { + synchronized (buffers) { + while (currentBuffer == null || !currentBuffer.hasRemaining()) { + // Check whether the stream is closed or exhausted + if (closed) { + throw new IOException("closed", failed); + } + if (buffers.isEmpty()) { + if (complete) { + if (failed != null) { + throw new IOException("closed", failed); + } + return null; + } + requestMoreIfNeeded(); + } + + currentBuffer = buffers.poll(); + + if (currentBuffer == null) { + try { + buffers.wait(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new InterruptedIOException(); + } + } + } + return currentBuffer; + } + } + + /** + * Adapted from HttpResponseInputStream + */ + @Override + public int read(byte[] bytes, int off, int len) throws IOException { + // get the buffer to read from, possibly blocking if + // none is available + ByteBuffer buffer = current(); + if (buffer == null) { + return -1; + } + + // don't attempt to read more than what is available + // in the current buffer. + int read = Math.min(buffer.remaining(), len); + assert read > 0 && read <= buffer.remaining(); + + // buffer.get() will do the boundary check for us. + buffer.get(bytes, off, read); + return read; + } + + @Override + public int read() throws IOException { + byte[] single = new byte[1]; + if (read(single) == -1) { + return -1; + } + return single[0] & 0xFF; + } + + @Override + public void close() throws IOException { + synchronized (buffers) { + if (this.closed) { + return; + } + this.closed = true; + requestMoreIfNeeded(); + this.buffers.clear(); + buffers.notifyAll(); + } + } + + private void requestMoreIfNeeded() { + if (currentBuffer != null) { + this.currentBuffer = null; + this.request.run(); + } + } + +} \ No newline at end of file diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/ExecWebSocketListener.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/ExecWebSocketListener.java index 36112462abd..4c5f73de2e8 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/ExecWebSocketListener.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/ExecWebSocketListener.java @@ -23,28 +23,28 @@ import io.fabric8.kubernetes.client.dsl.ExecListener; import io.fabric8.kubernetes.client.dsl.ExecListener.Response; import io.fabric8.kubernetes.client.dsl.ExecWatch; +import io.fabric8.kubernetes.client.dsl.internal.PodOperationContext.StreamContext; import io.fabric8.kubernetes.client.http.HttpResponse; import io.fabric8.kubernetes.client.http.WebSocket; import io.fabric8.kubernetes.client.http.WebSocketHandshakeException; import io.fabric8.kubernetes.client.utils.InputStreamPumper; import io.fabric8.kubernetes.client.utils.Serialization; +import io.fabric8.kubernetes.client.utils.Utils; +import io.fabric8.kubernetes.client.utils.internal.SerialExecutor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.Closeable; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; -import java.io.PipedInputStream; -import java.io.PipedOutputStream; import java.nio.ByteBuffer; import java.nio.channels.Channels; +import java.nio.channels.WritableByteChannel; import java.nio.charset.StandardCharsets; +import java.util.Arrays; import java.util.HashMap; -import java.util.LinkedHashSet; import java.util.List; import java.util.Map; -import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; import java.util.concurrent.ExecutorService; @@ -52,8 +52,6 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; -import static io.fabric8.kubernetes.client.utils.Utils.closeQuietly; - /** * A {@link WebSocket.Listener} for exec operations. * @@ -92,56 +90,105 @@ public String body() throws IOException { } } + @FunctionalInterface + public interface MessageHandler { + + void handle(ByteBuffer bytes) throws IOException; + + } + + private final class ListenerStream { + private MessageHandler handler; + private ExecWatchInputStream inputStream; + + private void handle(ByteBuffer byteString, WebSocket webSocket) throws IOException { + if (handler != null) { + handler.handle(byteString); + } else { + webSocket.request(); + } + } + } + private static final Logger LOGGER = LoggerFactory.getLogger(ExecWebSocketListener.class); private static final String HEIGHT = "Height"; private static final String WIDTH = "Width"; private final InputStream in; - private final OutputStream out; - private final OutputStream err; - private final OutputStream errChannel; - private final OutputStream input; - private final PipedInputStream output; - private final PipedInputStream error; - private final PipedInputStream errorChannel; - - private final boolean forUpload; - private final AtomicReference webSocketRef = new AtomicReference<>(); - private final ExecutorService executorService = Executors.newSingleThreadExecutor(); + private final ListenerStream out; + private final ListenerStream error; + private final ListenerStream errorChannel; + private final boolean terminateOnError; private final ExecListener listener; + private final AtomicReference webSocketRef = new AtomicReference<>(); + private final ExecutorService executorService = Executors.newSingleThreadExecutor(); + private final SerialExecutor serialExecutor = new SerialExecutor(Utils.getCommonExecutorSerive()); private final AtomicBoolean closed = new AtomicBoolean(false); - - private final Set toClose = new LinkedHashSet<>(); - private final CompletableFuture exitCode = new CompletableFuture<>(); - private ObjectMapper objectMapper = new ObjectMapper(); + public static String toString(ByteBuffer buffer) { + return StandardCharsets.UTF_8.decode(buffer).toString(); + } + public ExecWebSocketListener(PodOperationContext context) { this.listener = context.getExecListener(); - this.forUpload = context.isForUpload(); - PipedOutputStream inputPipe = context.getInPipe(); - PipedInputStream outputPipe = context.getOutPipe(); - PipedInputStream errorPipe = context.getErrPipe(); - PipedInputStream errorChannelPipe = context.getErrChannelPipe(); - this.in = inputStreamOrPipe(context.getIn(), inputPipe, toClose, context.getBufferSize()); - this.out = outputStreamOrPipe(context.getOut(), outputPipe, toClose); - this.err = outputStreamOrPipe(context.getErr(), errorPipe, toClose); - this.errChannel = outputStreamOrPipe(context.getErrChannel(), errorChannelPipe, toClose); - - if (inputPipe == null && in == null && forUpload) { - // if there's no explicit in, then we create an OutputStream that writes directly to send - this.input = InputStreamPumper.writableOutputStream(this::sendWithErrorChecking); + + Integer bufferSize = context.getBufferSize(); + if (context.isRedirectingIn()) { + this.input = InputStreamPumper.writableOutputStream(this::sendWithErrorChecking, bufferSize); + this.in = null; } else { - this.input = inputPipe; + this.input = null; + this.in = context.getIn(); } - this.output = outputPipe; - this.error = errorPipe; - this.errorChannel = errorChannelPipe; + + this.terminateOnError = context.isTerminateOnError(); + this.out = createStream(context.getOutput()); + this.error = createStream(context.getError()); + this.errorChannel = createStream(context.getErrorChannel()); + } + + private ListenerStream createStream(StreamContext streamContext) { + ListenerStream stream = new ListenerStream(); + if (streamContext == null) { + return stream; + } + OutputStream os = streamContext.getOutputStream(); + if (os == null) { + // redirecting + stream.inputStream = new ExecWatchInputStream(() -> this.webSocketRef.get().request()); + this.exitCode.whenComplete(stream.inputStream::onExit); + stream.handler = b -> stream.inputStream.consume(Arrays.asList(b)); + } else { + WritableByteChannel channel = Channels.newChannel(os); + stream.handler = b -> asyncWrite(channel, b); + } + return stream; + } + + private void asyncWrite(WritableByteChannel channel, ByteBuffer b) { + CompletableFuture.runAsync(() -> { + try { + channel.write(b); + } catch (IOException e) { + throw KubernetesClientException.launderThrowable(e); + } + }, serialExecutor).whenComplete((v, t) -> { + webSocketRef.get().request(); + if (t != null) { + if (closed.get()) { + LOGGER.debug("Stream write failed after close", t); + } else { + // This could happen if the user simply closes their stream prior to completion + LOGGER.warn("Stream write failed", t); + } + } + }); } @Override @@ -154,14 +201,11 @@ public void close() { /** * Performs the cleanup tasks: * 1. cancels the InputStream pumper - * 2. closes all internally managed closeables (piped streams). - * - * The order of these tasks can't change or its likely that the pumper will throw errors, - * if the stream it uses closes before the pumper it self. + * 2. closes all pending message work */ private void cleanUpOnce() { executorService.shutdownNow(); - closeQuietly(toClose); + serialExecutor.shutdownNow(); } private void closeWebSocketOnce(int code, String reason) { @@ -182,27 +226,14 @@ private void closeWebSocketOnce(int code, String reason) { @Override public void onOpen(WebSocket webSocket) { try { - if (in instanceof PipedInputStream && input instanceof PipedOutputStream) { - ((PipedOutputStream) input).connect((PipedInputStream) in); - } - if (out instanceof PipedOutputStream && output != null) { - output.connect((PipedOutputStream) out); - } - if (err instanceof PipedOutputStream && error != null) { - error.connect((PipedOutputStream) err); - } - if (errChannel instanceof PipedOutputStream && errorChannel != null) { - errorChannel.connect((PipedOutputStream) errChannel); - } - + // ensure onClose is processed + this.exitCode.whenComplete((i, t) -> webSocket.request()); webSocketRef.set(webSocket); if (in != null && !executorService.isShutdown()) { // the task will be cancelled via shutdownNow // TODO: this does not work if the inputstream does not support available InputStreamPumper.pump(InputStreamPumper.asInterruptible(in), this::send, executorService); } - } catch (IOException e) { - onError(webSocket, e); } finally { if (listener != null) { listener.onOpen(); @@ -250,37 +281,45 @@ public void onError(WebSocket webSocket, Throwable t) { @Override public void onMessage(WebSocket webSocket, ByteBuffer bytes) { - // TODO: these could be blocking writes and may need moved to another thread - // if we do that, we'll need to make the exit code wait on the final write + boolean close = false; try { byte streamID = bytes.get(0); bytes.position(1); ByteBuffer byteString = bytes.slice(); - if (byteString.remaining() > 0) { - switch (streamID) { - case 1: - writeAndFlush(out, byteString); - break; - case 2: - writeAndFlush(err, byteString); - if (forUpload) { - String stringValue = StandardCharsets.UTF_8.decode(bytes).toString(); - exitCode.completeExceptionally(new KubernetesClientException(stringValue)); - this.close(); - } - break; - case 3: - handleExitStatus(bytes); - writeAndFlush(errChannel, byteString); - // once the process is done, we can proactively close - this.close(); - break; - default: - throw new IOException("Unknown stream ID " + streamID); - } + if (byteString.remaining() == 0) { + webSocket.request(); + return; + } + switch (streamID) { + case 1: + out.handle(byteString, webSocket); + break; + case 2: + if (terminateOnError) { + String stringValue = toString(bytes); + exitCode.completeExceptionally(new KubernetesClientException(stringValue)); + close = true; + } else { + error.handle(byteString, webSocket); + } + break; + case 3: + close = true; + try { + errorChannel.handle(bytes, webSocket); + } finally { + handleExitStatus(byteString); + } + break; + default: + throw new IOException("Unknown stream ID " + streamID); } } catch (IOException e) { throw KubernetesClientException.launderThrowable(e); + } finally { + if (close) { + this.close(); + } } } @@ -288,7 +327,7 @@ private void handleExitStatus(ByteBuffer bytes) { Status status = null; int code = -1; try { - String stringValue = StandardCharsets.UTF_8.decode(bytes).toString(); + String stringValue = toString(bytes); status = Serialization.unmarshal(stringValue, Status.class); if (status != null) { code = parseExitCode(status); @@ -305,15 +344,6 @@ private void handleExitStatus(ByteBuffer bytes) { } } - private void writeAndFlush(OutputStream stream, ByteBuffer byteString) throws IOException { - if (stream != null) { - Channels.newChannel(stream).write(byteString); - if (stream instanceof PipedOutputStream) { - stream.flush(); // immediately wake up the reader - } - } - } - @Override public void onClose(WebSocket webSocket, int code, String reason) { if (!exitCode.isDone()) { @@ -341,17 +371,17 @@ public OutputStream getInput() { @Override public InputStream getOutput() { - return output; + return out.inputStream; } @Override public InputStream getError() { - return error; + return error.inputStream; } @Override public InputStream getErrorChannel() { - return errorChannel; + return errorChannel.inputStream; } @Override @@ -393,31 +423,6 @@ void sendWithErrorChecking(byte[] bytes, int offset, int length) { checkError(); } - private static InputStream inputStreamOrPipe(InputStream stream, PipedOutputStream out, Set toClose, - Integer bufferSize) { - if (stream != null) { - return stream; - } else if (out != null) { - PipedInputStream pis = bufferSize == null ? new PipedInputStream() : new PipedInputStream(bufferSize.intValue()); - toClose.add(pis); - return pis; - } else { - return null; - } - } - - private static OutputStream outputStreamOrPipe(OutputStream stream, PipedInputStream in, Set toClose) { - if (stream != null) { - return stream; - } else if (in != null) { - PipedOutputStream pos = new PipedOutputStream(); - toClose.add(pos); - return pos; - } else { - return null; - } - } - public static int parseExitCode(Status status) { if (STATUS_SUCCESS.equals(status.getStatus())) { return 0; diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/LogWatchCallback.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/LogWatchCallback.java index 60ef6bf1e31..dc0e38b154b 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/LogWatchCallback.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/LogWatchCallback.java @@ -15,6 +15,7 @@ */ package io.fabric8.kubernetes.client.dsl.internal; +import io.fabric8.kubernetes.client.KubernetesClientException; import io.fabric8.kubernetes.client.dsl.LogWatch; import io.fabric8.kubernetes.client.http.HttpClient; import io.fabric8.kubernetes.client.http.HttpRequest; @@ -23,22 +24,18 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.Closeable; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; -import java.io.PipedOutputStream; import java.net.URL; import java.nio.ByteBuffer; import java.nio.channels.Channels; import java.nio.channels.WritableByteChannel; -import java.util.LinkedHashSet; -import java.util.Set; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import static io.fabric8.kubernetes.client.utils.Utils.closeQuietly; - public class LogWatchCallback implements LogWatch, AutoCloseable { private static final Logger LOGGER = LoggerFactory.getLogger(LogWatchCallback.class); @@ -46,16 +43,13 @@ public class LogWatchCallback implements LogWatch, AutoCloseable { private OutputStream out; private WritableByteChannel outChannel; private volatile InputStream output; - private final Set toClose = new LinkedHashSet<>(); private final AtomicBoolean closed = new AtomicBoolean(false); + private volatile Optional asyncBody = Optional.empty(); private final SerialExecutor serialExecutor = new SerialExecutor(Utils.getCommonExecutorSerive()); public LogWatchCallback(OutputStream out) { this.out = out; - if (this.out instanceof PipedOutputStream) { - toClose.add(this.out); - } if (out != null) { outChannel = Channels.newChannel(out); } @@ -66,20 +60,12 @@ public void close() { cleanUp(); } - /** - * Performs the cleanup tasks: - * 1. cancels the InputStream pumper - * 2. closes all internally managed closeables (piped streams). - * - * The order of these tasks can't change or its likely that the pumper will through errors, - * if the stream it uses closes before the pumper it self. - */ private void cleanUp() { if (!closed.compareAndSet(false, true)) { return; } + asyncBody.ifPresent(HttpClient.AsyncBody::cancel); serialExecutor.shutdownNow(); - closeQuietly(toClose); } public LogWatchCallback callAndWait(HttpClient client, URL url) { @@ -99,16 +85,19 @@ public LogWatchCallback callAndWait(HttpClient client, URL url) { } else { // we need to write the bytes to the given output // we don't know if the write will be blocking, so hand it off to another thread - clone.consumeBytes(request, (buffers, a) -> serialExecutor.execute(() -> { + clone.consumeBytes(request, (buffers, a) -> CompletableFuture.runAsync(() -> { for (ByteBuffer byteBuffer : buffers) { try { outChannel.write(byteBuffer); } catch (IOException e1) { - onFailure(e1); - break; + throw KubernetesClientException.launderThrowable(e1); } } - if (!closed.get()) { + }, serialExecutor).whenComplete((v, t) -> { + if (t != null) { + a.cancel(); + onFailure(t); + } else if (!closed.get()) { a.consume(); } else { a.cancel(); @@ -118,6 +107,7 @@ public LogWatchCallback callAndWait(HttpClient client, URL url) { onFailure(e); } if (a != null) { + asyncBody = Optional.of(a.body()); a.body().consume(); a.body().done().whenComplete((v, t) -> { if (t != null) { diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/PodOperationContext.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/PodOperationContext.java index f3f63ccb34a..7ec951b78cf 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/PodOperationContext.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/PodOperationContext.java @@ -24,8 +24,6 @@ import java.io.InputStream; import java.io.OutputStream; -import java.io.PipedInputStream; -import java.io.PipedOutputStream; @Builder(toBuilder = true) @NoArgsConstructor @@ -33,16 +31,27 @@ @Getter public class PodOperationContext { + @Getter + public static final class StreamContext { + private OutputStream outputStream; + + public StreamContext(OutputStream outputStream) { + this.outputStream = outputStream; + } + + public StreamContext() { + } + } + private String containerId; + + private StreamContext output; + private StreamContext error; + private StreamContext errorChannel; + + private boolean redirectingIn; private InputStream in; - private OutputStream out; - private OutputStream err; - private OutputStream errChannel; - - private PipedOutputStream inPipe; - private PipedInputStream outPipe; - private PipedInputStream errPipe; - private PipedInputStream errChannelPipe; + private boolean tty; private boolean terminatedStatus; private boolean timestamps; @@ -56,7 +65,7 @@ public class PodOperationContext { private Integer bufferSize; private String file; private String dir; - private boolean forUpload; + private boolean terminateOnError; public PodOperationContext withContainerId(String containerId) { return this.toBuilder().containerId(containerId).build(); @@ -66,34 +75,6 @@ public PodOperationContext withIn(InputStream in) { return this.toBuilder().in(in).build(); } - public PodOperationContext withOut(OutputStream out) { - return this.toBuilder().out(out).build(); - } - - public PodOperationContext withErr(OutputStream err) { - return this.toBuilder().err(err).build(); - } - - public PodOperationContext withErrChannel(OutputStream errChannel) { - return this.toBuilder().errChannel(errChannel).build(); - } - - public PodOperationContext withInPipe(PipedOutputStream inPipe) { - return this.toBuilder().inPipe(inPipe).build(); - } - - public PodOperationContext withOutPipe(PipedInputStream outPipe) { - return this.toBuilder().outPipe(outPipe).build(); - } - - public PodOperationContext withErrPipe(PipedInputStream errPipe) { - return this.toBuilder().errPipe(errPipe).build(); - } - - public PodOperationContext withErrChannelPipe(PipedInputStream errChannelPipe) { - return this.toBuilder().errChannelPipe(errChannelPipe).build(); - } - public PodOperationContext withTty(boolean tty) { return this.toBuilder().tty(tty).build(); } @@ -180,13 +161,13 @@ public void addQueryParameters(URLBuilder httpUrlBuilder) { if (tty) { httpUrlBuilder.addQueryParameter("tty", "true"); } - if (in != null || inPipe != null || forUpload) { + if (in != null || redirectingIn) { httpUrlBuilder.addQueryParameter("stdin", "true"); } - if (out != null || outPipe != null) { + if (output != null) { httpUrlBuilder.addQueryParameter("stdout", "true"); } - if (err != null || errPipe != null || forUpload) { + if (error != null || terminateOnError) { httpUrlBuilder.addQueryParameter("stderr", "true"); } } diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/PortForwarderWebsocket.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/PortForwarderWebsocket.java index 4a29e69b407..23131427531 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/PortForwarderWebsocket.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/PortForwarderWebsocket.java @@ -15,6 +15,14 @@ */ package io.fabric8.kubernetes.client.dsl.internal; +import io.fabric8.kubernetes.client.LocalPortForward; +import io.fabric8.kubernetes.client.PortForward; +import io.fabric8.kubernetes.client.http.HttpClient; +import io.fabric8.kubernetes.client.http.WebSocket; +import io.fabric8.kubernetes.client.utils.URLUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.io.Closeable; import java.io.IOException; import java.net.InetAddress; @@ -37,15 +45,6 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import io.fabric8.kubernetes.client.LocalPortForward; -import io.fabric8.kubernetes.client.PortForward; -import io.fabric8.kubernetes.client.http.HttpClient; -import io.fabric8.kubernetes.client.http.WebSocket; -import io.fabric8.kubernetes.client.utils.URLUtils; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - /** * A port-forwarder using the websocket protocol. * It requires Kubernetes 1.6+ (previous versions support the SPDY protocol only). @@ -89,7 +88,7 @@ public void close() throws IOException { try { server.close(); } finally { - closeQuietly(handles.toArray(new Closeable[]{})); + closeQuietly(handles.toArray(new Closeable[] {})); closeExecutor(executorService); } } @@ -220,6 +219,7 @@ public void onMessage(WebSocket webSocket, String text) { @Override public void onMessage(WebSocket webSocket, ByteBuffer buffer) { + webSocket.request(); messagesRead++; if (messagesRead <= 2) { // skip the first two messages, containing the ports used internally @@ -308,23 +308,25 @@ private void closeForwarder() { } }; CompletableFuture socket = client - .newWebSocketBuilder() - .uri(URI.create(URLUtils.join(resourceBaseUrl.toString(), "portforward?ports=" + port))) - .buildAsync(listener); - + .newWebSocketBuilder() + .uri(URI.create(URLUtils.join(resourceBaseUrl.toString(), "portforward?ports=" + port))) + .buildAsync(listener); + socket.whenComplete((w, t) -> { if (t != null) { listener.onError(w, t); } }); - + return new PortForward() { @Override public void close() throws IOException { socket.cancel(true); - socket.whenComplete((w, t) -> { if (w != null) { - w.sendClose(1001, "User closing"); - }}); + socket.whenComplete((w, t) -> { + if (w != null) { + w.sendClose(1001, "User closing"); + } + }); } @Override @@ -366,10 +368,10 @@ private void closeExecutor(ExecutorService executor) { } public static void closeQuietly(Closeable... cloaseables) { - if(cloaseables != null) { - for(Closeable c : cloaseables) { + if (cloaseables != null) { + for (Closeable c : cloaseables) { try { - if(c != null) { + if (c != null) { c.close(); } } catch (IOException ioe) { diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WatcherWebSocketListener.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WatcherWebSocketListener.java index 46ca046f2cb..0129d43915a 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WatcherWebSocketListener.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WatcherWebSocketListener.java @@ -63,6 +63,7 @@ public void onError(WebSocket webSocket, Throwable t) { @Override public void onMessage(WebSocket webSocket, String text) { + webSocket.request(); synchronized (reconnectLock) { manager.onMessage(text); } diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/core/v1/PodOperationsImpl.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/core/v1/PodOperationsImpl.java index a14eba60d98..24b27a6dae9 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/core/v1/PodOperationsImpl.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/core/v1/PodOperationsImpl.java @@ -47,6 +47,7 @@ import io.fabric8.kubernetes.client.dsl.internal.LogWatchCallback; import io.fabric8.kubernetes.client.dsl.internal.OperationContext; import io.fabric8.kubernetes.client.dsl.internal.PodOperationContext; +import io.fabric8.kubernetes.client.dsl.internal.PodOperationContext.StreamContext; import io.fabric8.kubernetes.client.dsl.internal.PortForwarderWebsocket; import io.fabric8.kubernetes.client.dsl.internal.uploadable.PodUpload; import io.fabric8.kubernetes.client.http.HttpClient; @@ -62,7 +63,6 @@ import java.io.BufferedOutputStream; import java.io.File; import java.io.FileOutputStream; -import java.io.FilterInputStream; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; @@ -144,8 +144,15 @@ public LogWatch watchLog() { return watchLog(null); } + private void checkForPiped(Object object) { + if (object instanceof PipedOutputStream || object instanceof PipedInputStream) { + throw new KubernetesClientException("Piped streams should not be used"); + } + } + @Override public LogWatch watchLog(OutputStream out) { + checkForPiped(out); try { PodOperationUtil.waitUntilReadyBeforeFetchingLogs(this, getContext().getLogWaitTimeout() != null ? getContext().getLogWaitTimeout() : DEFAULT_POD_LOG_WAIT_TIMEOUT); @@ -348,26 +355,12 @@ public InputStream read() { } } - private Future readFileTo(String source, OutputStream out) { - return readTo(new Base64.OutputStream(out, Base64.DECODE), "sh", "-c", - String.format("cat %s | base64", shellQuote(source))); + private String[] readFileCommand(String source) { + return new String[] { "sh", "-c", String.format("cat %s | base64", shellQuote(source)) }; } private InputStream readFile(String source) { - try { - PipedOutputStream out = new PipedOutputStream(); - PipedInputStream in = new PipedInputStream(out, 1024); - final Future future = readFileTo(source, out); - return new FilterInputStream(in) { - @Override - public void close() throws IOException { - future.cancel(false); - super.close(); - } - }; - } catch (Exception e) { - throw KubernetesClientException.launderThrowable(e); - } + return read(readFileCommand(source)); } // @@ -389,49 +382,39 @@ private void copyFile(String source, File target) { } try (OutputStream out = new BufferedOutputStream(new FileOutputStream(destination))) { - readFileTo(source, out).get(); + readTo(new Base64.OutputStream(out, Base64.DECODE), readFileCommand(source)).get(); } catch (Exception e) { throw KubernetesClientException.launderThrowable(e); } } - private Future readTarTo(String source, OutputStream out) { - return readTo(new Base64.OutputStream(out, Base64.DECODE), "sh", "-c", "tar -cf - " + source + "|" + "base64"); + public InputStream readTar(String source) { + return read("sh", "-c", "tar -cf - " + shellQuote(source) + "|" + "base64"); } - public InputStream readTar(String source) { - try { - PipedOutputStream out = new PipedOutputStream(); - PipedInputStream in = new PipedInputStream(out, 1024); - final Future future = readTarTo(source, out); - return new FilterInputStream(in) { - @Override - public void close() throws IOException { - future.cancel(false); - super.close(); - } - }; - } catch (Exception e) { - throw KubernetesClientException.launderThrowable(e); - } + private InputStream read(String... command) { + ExecWatch watch = redirectingOutput().exec(command); + return new Base64.InputStream(watch.getOutput(), Base64.DECODE) { + @Override + public void close() throws IOException { + watch.close(); + super.close(); + } + }; } private Future readTo(OutputStream out, String... cmd) { - try { - ExecWatch w = writingOutput(out).exec(cmd); - CompletableFuture result = w.exitCode(); - result.whenComplete((i, t) -> { - try { - out.close(); - } catch (Exception e) { - result.obtrudeException(e); - } - w.close(); - }); - return result; - } catch (Exception e) { - throw KubernetesClientException.launderThrowable(e); - } + ExecWatch w = writingOutput(out).exec(cmd); + CompletableFuture result = w.exitCode(); + result.whenComplete((i, t) -> { + try { + out.close(); + } catch (Exception e) { + result.obtrudeException(e); + } + w.close(); + }); + return result; } private void copyDir(String source, File target) throws Exception { @@ -486,67 +469,51 @@ public void run() { @Override public TtyExecOutputErrorable readingInput(InputStream in) { + checkForPiped(in); return new PodOperationsImpl(getContext().withIn(in), context); } @Override - public TtyExecOutputErrorable writingInput(PipedOutputStream inPipe) { - return new PodOperationsImpl(getContext().withInPipe(inPipe), context); - } - - @Override - public TtyExecOutputErrorable redirectingInput() { + public PodOperationsImpl redirectingInput() { return redirectingInput(null); } @Override - public TtyExecOutputErrorable redirectingInput(Integer bufferSize) { - return new PodOperationsImpl(getContext().withInPipe(new PipedOutputStream()).withBufferSize(bufferSize), context); + public PodOperationsImpl redirectingInput(Integer bufferSize) { + return new PodOperationsImpl(getContext().toBuilder().redirectingIn(true).bufferSize(bufferSize).build(), context); } @Override public TtyExecErrorable writingOutput(OutputStream out) { - return new PodOperationsImpl(getContext().withOut(out), context); - } - - @Override - public TtyExecErrorable readingOutput(PipedInputStream outPipe) { - return new PodOperationsImpl(getContext().withOutPipe(outPipe), context); + checkForPiped(out); + return new PodOperationsImpl(getContext().toBuilder().output(new StreamContext(out)).build(), context); } @Override public TtyExecErrorable redirectingOutput() { - return readingOutput(new PipedInputStream()); + return new PodOperationsImpl(getContext().toBuilder().output(new StreamContext()).build(), context); } @Override public TtyExecErrorChannelable writingError(OutputStream err) { - return new PodOperationsImpl(getContext().withErr(err), context); - } - - @Override - public TtyExecErrorChannelable readingError(PipedInputStream errPipe) { - return new PodOperationsImpl(getContext().withErrPipe(errPipe), context); + checkForPiped(err); + return new PodOperationsImpl(getContext().toBuilder().error(new StreamContext(err)).build(), context); } @Override public TtyExecErrorChannelable redirectingError() { - return readingError(new PipedInputStream()); + return new PodOperationsImpl(getContext().toBuilder().error(new StreamContext()).build(), context); } @Override public TtyExecable writingErrorChannel(OutputStream errChannel) { - return new PodOperationsImpl(getContext().withErrChannel(errChannel), context); - } - - @Override - public TtyExecable readingErrorChannel(PipedInputStream errChannelPipe) { - return new PodOperationsImpl(getContext().withErrChannelPipe(errChannelPipe), context); + checkForPiped(errChannel); + return new PodOperationsImpl(getContext().toBuilder().errorChannel(new StreamContext(errChannel)).build(), context); } @Override public TtyExecable redirectingErrorChannel() { - return readingErrorChannel(new PipedInputStream()); + return new PodOperationsImpl(getContext().toBuilder().errorChannel(new StreamContext()).build(), context); } @Override @@ -598,7 +565,8 @@ public static String shellQuote(String value) { return "'" + value.replace("'", "'\\\\''") + "'"; } - public PodOperationsImpl forUpload() { - return new PodOperationsImpl(getContext().toBuilder().forUpload(true).build(), context); + @Override + public PodOperationsImpl terminateOnError() { + return new PodOperationsImpl(getContext().toBuilder().terminateOnError(true).build(), context); } } diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/uploadable/PodUpload.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/uploadable/PodUpload.java index 6746ea1ee77..2b7dd58dc78 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/uploadable/PodUpload.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/uploadable/PodUpload.java @@ -31,6 +31,7 @@ import java.nio.file.Files; import java.nio.file.Path; import java.util.Optional; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import java.util.zip.GZIPOutputStream; @@ -61,23 +62,25 @@ private static interface UploadProcessor { } private static boolean upload(PodOperationsImpl operation, String command, UploadProcessor processor) throws IOException { - operation = operation.forUpload(); + operation = operation.redirectingInput().terminateOnError(); String containerId = operation.getContext().getContainerId(); if (Utils.isNotNullOrEmpty(containerId)) { operation = operation.inContainer(containerId); } + CompletableFuture exitFuture; try (ExecWatch execWatch = operation.exec("sh", "-c", command)) { OutputStream out = execWatch.getInput(); processor.process(out); out.close(); // also flushes - execWatch.close(); - if (!Utils.waitUntilReady(execWatch.exitCode(), operation.getConfig().getRequestConfig().getUploadRequestTimeout(), - TimeUnit.MILLISECONDS)) { - return false; - } - Integer exitCode = execWatch.exitCode().getNow(null); - return exitCode == null || exitCode.intValue() == 0; + exitFuture = execWatch.exitCode(); + } + // TODO: should this timeout be from the start of the upload? + if (!Utils.waitUntilReady(exitFuture, operation.getConfig().getRequestConfig().getUploadRequestTimeout(), + TimeUnit.MILLISECONDS)) { + return false; } + Integer exitCode = exitFuture.getNow(null); + return exitCode == null || exitCode.intValue() == 0; } public static boolean uploadFileData(PodOperationsImpl operation, InputStream inputStream) diff --git a/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/dsl/internal/ExecWatchInputStreamTest.java b/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/dsl/internal/ExecWatchInputStreamTest.java new file mode 100644 index 00000000000..b5b45d1b90b --- /dev/null +++ b/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/dsl/internal/ExecWatchInputStreamTest.java @@ -0,0 +1,88 @@ +/** + * Copyright (C) 2015 Red Hat, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.fabric8.kubernetes.client.dsl.internal; + +import org.awaitility.Awaitility; +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Collections; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; + +public class ExecWatchInputStreamTest { + + @Test + void testExceptionalExit() throws IOException { + AtomicInteger count = new AtomicInteger(); + ExecWatchInputStream is = new ExecWatchInputStream(() -> count.getAndIncrement()); + is.onExit(null, new Exception()); + assertThrows(IOException.class, () -> is.read()); + assertEquals(0, count.get()); + } + + @Test + void testNormalExit() throws IOException { + AtomicInteger count = new AtomicInteger(); + ExecWatchInputStream is = new ExecWatchInputStream(() -> count.getAndIncrement()); + is.onExit(1, null); + assertEquals(-1, is.read()); + assertEquals(0, count.get()); + } + + @Test + void testConsumerAfterClose() throws IOException { + AtomicInteger count = new AtomicInteger(); + ExecWatchInputStream is = new ExecWatchInputStream(() -> count.getAndIncrement()); + is.close(); + assertEquals(0, count.get()); + is.consume(Collections.singletonList(ByteBuffer.allocate(1))); + // should simply keep going + assertEquals(1, count.get()); + } + + @Test + void testConsume() throws IOException { + AtomicInteger count = new AtomicInteger(); + ExecWatchInputStream is = new ExecWatchInputStream(() -> count.getAndIncrement()); + is.consume(Collections.singletonList(ByteBuffer.allocate(1))); + + assertEquals(0, is.read()); + + assertEquals(0, count.get()); + + CompletableFuture readFuture = CompletableFuture.runAsync(() -> { + try { + is.read(); + } catch (IOException e) { + throw new AssertionError(e); + } + }); + + // make sure another result was requested + Awaitility.await().atMost(1, TimeUnit.SECONDS).until(() -> count.get() == 1); + + is.consume(Collections.singletonList(ByteBuffer.allocate(1))); + readFuture.join(); + } + +} diff --git a/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/dsl/internal/ExecWebSocketListenerTest.java b/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/dsl/internal/ExecWebSocketListenerTest.java index 2e0330e1912..5e1e2f10fe0 100644 --- a/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/dsl/internal/ExecWebSocketListenerTest.java +++ b/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/dsl/internal/ExecWebSocketListenerTest.java @@ -83,7 +83,8 @@ void testSendShouldTruncateAndSendFlaggedWebSocketData() { @Test void testCheckErrorHasErrorFromMessageShouldThrowException() { - ExecWebSocketListener listener = new ExecWebSocketListener(new PodOperationContext().toBuilder().forUpload(true).build()); + ExecWebSocketListener listener = new ExecWebSocketListener( + new PodOperationContext().toBuilder().terminateOnError(true).build()); listener.onMessage(null, ByteBuffer.wrap(new byte[] { (byte) 2, (byte) 1, (byte) 1 })); diff --git a/kubernetes-examples/src/main/java/io/fabric8/kubernetes/examples/ExecExample.java b/kubernetes-examples/src/main/java/io/fabric8/kubernetes/examples/ExecExample.java index 4c696f8ecfc..bf76ec27c6a 100644 --- a/kubernetes-examples/src/main/java/io/fabric8/kubernetes/examples/ExecExample.java +++ b/kubernetes-examples/src/main/java/io/fabric8/kubernetes/examples/ExecExample.java @@ -26,53 +26,51 @@ @SuppressWarnings("java:S106") public class ExecExample { - public static void main(String[] args) throws InterruptedException { - if (args.length < 1) { - System.out.println("Usage: podName [namespace]"); - return; - } + public static void main(String[] args) { + if (args.length < 1) { + System.out.println("Usage: podName [namespace]"); + return; + } - String podName = args[0]; - String namespace = "default"; + String podName = args[0]; + String namespace = "default"; - if (args.length > 1) { - namespace = args[1]; - } + if (args.length > 1) { + namespace = args[1]; + } - try ( - KubernetesClient client = new KubernetesClientBuilder().build(); - ExecWatch ignore = newExecWatch(client, namespace, podName) - ) { - Thread.sleep(10 * 1000L); - } + try ( + KubernetesClient client = new KubernetesClientBuilder().build(); + ExecWatch watch = newExecWatch(client, namespace, podName)) { + watch.exitCode().join(); } + } - private static ExecWatch newExecWatch(KubernetesClient client, String namespace, String podName) { - return client.pods().inNamespace(namespace).withName(podName) - .readingInput(System.in) + private static ExecWatch newExecWatch(KubernetesClient client, String namespace, String podName) { + return client.pods().inNamespace(namespace).withName(podName) .writingOutput(System.out) .writingError(System.err) .withTTY() .usingListener(new SimpleListener()) .exec("sh", "-c", "echo 'Hello world!'"); - } + } - private static class SimpleListener implements ExecListener { + private static class SimpleListener implements ExecListener { - @Override - public void onOpen() { - System.out.println("The shell will remain open for 10 seconds."); - } + @Override + public void onOpen() { + System.out.println("The shell will remain open for 10 seconds."); + } - @Override - public void onFailure(Throwable t, Response failureResponse) { - System.err.println("shell barfed"); - } + @Override + public void onFailure(Throwable t, Response failureResponse) { + System.err.println("shell barfed"); + } - @Override - public void onClose(int code, String reason) { - System.out.println("The shell will now close."); - } + @Override + public void onClose(int code, String reason) { + System.out.println("The shell will now close."); } + } } diff --git a/kubernetes-examples/src/main/java/io/fabric8/kubernetes/examples/ExecExampleWithTerminalSize.java b/kubernetes-examples/src/main/java/io/fabric8/kubernetes/examples/ExecExampleWithTerminalSize.java index 49ea9f43018..49fb396b2b7 100644 --- a/kubernetes-examples/src/main/java/io/fabric8/kubernetes/examples/ExecExampleWithTerminalSize.java +++ b/kubernetes-examples/src/main/java/io/fabric8/kubernetes/examples/ExecExampleWithTerminalSize.java @@ -23,62 +23,61 @@ @SuppressWarnings("java:S106") public class ExecExampleWithTerminalSize { - public static void main(String[] args) throws InterruptedException { - if (args.length < 1) { - System.out.println("Usage: podName [namespace] [columns] [lines]\n" + - "Use env variable COLUMNS & LINES to initialize terminal size."); - return; - } - - String podName = args[0]; - String namespace = "default"; - String columns = "80"; - String lines = "24"; + public static void main(String[] args) throws InterruptedException { + if (args.length < 1) { + System.out.println("Usage: podName [namespace] [columns] [lines]\n" + + "Use env variable COLUMNS & LINES to initialize terminal size."); + return; + } - if (args.length > 1) { - namespace = args[1]; - } - if (args.length > 2) { - columns = args[2]; - } - if (args.length > 3) { - lines = args[3]; - } + String podName = args[0]; + String namespace = "default"; + String columns = "80"; + String lines = "24"; - try ( - KubernetesClient client = new KubernetesClientBuilder().build(); - ExecWatch ignore = newExecWatch(client, namespace, podName, columns, lines); - ) { - Thread.sleep(10 * 1000L); - } + if (args.length > 1) { + namespace = args[1]; + } + if (args.length > 2) { + columns = args[2]; + } + if (args.length > 3) { + lines = args[3]; } - private static ExecWatch newExecWatch(KubernetesClient client, String namespace, String podName, String columns, String lines) { - return client.pods().inNamespace(namespace).withName(podName) - .readingInput(System.in) - .writingOutput(System.out) - .writingError(System.err) - .withTTY() - .usingListener(new SimpleListener()) - .exec("env", "TERM=xterm", "COLUMNS=" + columns, "LINES=" + lines, "sh", "-c", "ls -la"); + try ( + KubernetesClient client = new KubernetesClientBuilder().build(); + ExecWatch exec = newExecWatch(client, namespace, podName, columns, lines);) { + exec.exitCode().join(); } + } - private static class SimpleListener implements ExecListener { + private static ExecWatch newExecWatch(KubernetesClient client, String namespace, String podName, String columns, + String lines) { + return client.pods().inNamespace(namespace).withName(podName) + .writingOutput(System.out) + .writingError(System.err) + .withTTY() + .usingListener(new SimpleListener()) + .exec("env", "TERM=xterm", "COLUMNS=" + columns, "LINES=" + lines, "sh", "-c", "ls -la"); + } - @Override - public void onOpen() { - System.out.println("The shell will remain open for 10 seconds."); - } + private static class SimpleListener implements ExecListener { - @Override - public void onFailure(Throwable t, Response failureResponse) { - System.err.println("shell barfed"); - } + @Override + public void onOpen() { + System.out.println("The shell will remain open for 10 seconds."); + } + + @Override + public void onFailure(Throwable t, Response failureResponse) { + System.err.println("shell barfed"); + } - @Override - public void onClose(int code, String reason) { - System.out.println("The shell will now close."); - } + @Override + public void onClose(int code, String reason) { + System.out.println("The shell will now close."); } + } } diff --git a/kubernetes-examples/src/main/java/io/fabric8/kubernetes/examples/ExecPipesExample.java b/kubernetes-examples/src/main/java/io/fabric8/kubernetes/examples/ExecPipesExample.java deleted file mode 100644 index 750d8e859bc..00000000000 --- a/kubernetes-examples/src/main/java/io/fabric8/kubernetes/examples/ExecPipesExample.java +++ /dev/null @@ -1,64 +0,0 @@ -/** - * Copyright (C) 2015 Red Hat, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.fabric8.kubernetes.examples; - -import io.fabric8.kubernetes.client.Config; -import io.fabric8.kubernetes.client.ConfigBuilder; -import io.fabric8.kubernetes.client.KubernetesClient; -import io.fabric8.kubernetes.client.KubernetesClientBuilder; -import io.fabric8.kubernetes.client.KubernetesClientException; -import io.fabric8.kubernetes.client.dsl.ExecWatch; -import io.fabric8.kubernetes.client.utils.InputStreamPumper; - -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; - -public class ExecPipesExample { - - public static void main(String[] args) { - String master = "https://localhost:8443/"; - String podName = null; - - if (args.length == 2) { - master = args[0]; - podName = args[1]; - } - if (args.length == 1) { - podName = args[0]; - } - - Config config = new ConfigBuilder().withMasterUrl(master).build(); - ExecutorService executorService = Executors.newSingleThreadExecutor(); - try ( - KubernetesClient client = new KubernetesClientBuilder().withConfig(config).build(); - ExecWatch watch = client.pods().withName(podName) - .redirectingInput() - .redirectingOutput() - .redirectingError() - .redirectingErrorChannel() - .exec();) - { - InputStreamPumper.pump(watch.getOutput(), (b, o, l) -> System.out.print(new String(b, o, l)), - executorService); - watch.getInput().write("ls -al\n".getBytes()); - Thread.sleep(5 * 1000L); - } catch (Exception e) { - throw KubernetesClientException.launderThrowable(e); - } finally { - executorService.shutdownNow(); - } - } -} diff --git a/kubernetes-examples/src/main/java/io/fabric8/kubernetes/examples/ExecStdInExample.java b/kubernetes-examples/src/main/java/io/fabric8/kubernetes/examples/ExecStdInExample.java new file mode 100644 index 00000000000..f612ef779f6 --- /dev/null +++ b/kubernetes-examples/src/main/java/io/fabric8/kubernetes/examples/ExecStdInExample.java @@ -0,0 +1,63 @@ +/** + * Copyright (C) 2015 Red Hat, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.fabric8.kubernetes.examples; + +import io.fabric8.kubernetes.client.Config; +import io.fabric8.kubernetes.client.ConfigBuilder; +import io.fabric8.kubernetes.client.KubernetesClient; +import io.fabric8.kubernetes.client.KubernetesClientBuilder; +import io.fabric8.kubernetes.client.KubernetesClientException; +import io.fabric8.kubernetes.client.dsl.ExecWatch; + +import java.io.OutputStream; + +public class ExecStdInExample { + + public static void main(String[] args) { + String master = "https://localhost:8443/"; + String podName = null; + + if (args.length == 2) { + master = args[0]; + podName = args[1]; + } + if (args.length == 1) { + podName = args[0]; + } + + Config config = new ConfigBuilder().withMasterUrl(master).build(); + try ( + KubernetesClient client = new KubernetesClientBuilder().withConfig(config).build(); + ExecWatch watch = client.pods().withName(podName) + .redirectingInput() + .writingOutput(System.out) + .terminateOnError() + .exec("cat");) { + // send hello + OutputStream input = watch.getInput(); + input.write("hello".getBytes()); + input.flush(); + + // close is needed when we're reading from stdin to terminate + watch.close(); + + // wait for the process to exit + watch.exitCode().join(); + } catch (Exception e) { + throw KubernetesClientException.launderThrowable(e); + } + } +} diff --git a/kubernetes-itests/src/test/java/io/fabric8/kubernetes/PodIT.java b/kubernetes-itests/src/test/java/io/fabric8/kubernetes/PodIT.java index f59606b360e..ed39580aead 100644 --- a/kubernetes-itests/src/test/java/io/fabric8/kubernetes/PodIT.java +++ b/kubernetes-itests/src/test/java/io/fabric8/kubernetes/PodIT.java @@ -49,6 +49,7 @@ import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Collectors; import static org.assertj.core.api.Assertions.assertThat; @@ -175,6 +176,7 @@ void exec() throws InterruptedException, IOException { client.pods().withName("pod-standard").waitUntilReady(POD_READY_WAIT_IN_SECONDS, TimeUnit.SECONDS); final CountDownLatch execLatch = new CountDownLatch(1); ByteArrayOutputStream out = new ByteArrayOutputStream(); + AtomicBoolean closed = new AtomicBoolean(); int[] exitCode = new int[] { Integer.MAX_VALUE }; ExecWatch execWatch = client.pods().withName("pod-standard") .writingOutput(out) @@ -194,6 +196,7 @@ public void onFailure(Throwable t, Response failureResponse) { @Override public void onClose(int i, String s) { logger.info("Shell closed"); + closed.set(true); execLatch.countDown(); } @@ -202,12 +205,12 @@ public void onExit(int code, Status status) { exitCode[0] = code; } }).exec("date"); - + // the stream must be read or closed to receive onClose + assertEquals("{\"metadata\":{},\"status\":\"Success\"}", IOHelpers.readFully(execWatch.getErrorChannel())); execLatch.await(5, TimeUnit.SECONDS); - assertNotNull(execWatch); - assertNotNull(out.toString()); assertEquals(0, exitCode[0]); - assertEquals("{\"metadata\":{},\"status\":\"Success\"}", IOHelpers.readFully(execWatch.getErrorChannel())); + assertTrue(closed.get()); + assertNotNull(out.toString()); } @Test diff --git a/kubernetes-tests/src/test/java/io/fabric8/kubernetes/client/mock/PodTest.java b/kubernetes-tests/src/test/java/io/fabric8/kubernetes/client/mock/PodTest.java index 4786dc8e43d..7beb462e12b 100644 --- a/kubernetes-tests/src/test/java/io/fabric8/kubernetes/client/mock/PodTest.java +++ b/kubernetes-tests/src/test/java/io/fabric8/kubernetes/client/mock/PodTest.java @@ -51,6 +51,8 @@ import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.IOException; +import java.io.PipedInputStream; +import java.io.PipedOutputStream; import java.net.HttpURLConnection; import java.net.InetSocketAddress; import java.nio.ByteBuffer; @@ -548,6 +550,34 @@ void testOptionalCopyDir() { }); } + @Test + void testPipesNotAllowed() { + PipedInputStream in = new PipedInputStream(); + PipedOutputStream out = new PipedOutputStream(); + + PodResource podOp = client.pods().inNamespace("ns1").withName("pod2"); + + Assertions.assertThrows(KubernetesClientException.class, () -> { + podOp.watchLog(out); + }); + + Assertions.assertThrows(KubernetesClientException.class, () -> { + podOp.writingError(out); + }); + + Assertions.assertThrows(KubernetesClientException.class, () -> { + podOp.writingErrorChannel(out); + }); + + Assertions.assertThrows(KubernetesClientException.class, () -> { + podOp.writingOutput(out); + }); + + Assertions.assertThrows(KubernetesClientException.class, () -> { + podOp.readingInput(in); + }); + } + @Test void testListFromServer() { PodBuilder podBuilder = new PodBuilder()