Skip to content

Commit

Permalink
Fixes fabric8io#3889 : Remove piped stream for file download
Browse files Browse the repository at this point in the history
  • Loading branch information
hypnoce committed Feb 24, 2022
1 parent 7c9cdb4 commit 0eb88bc
Showing 1 changed file with 65 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,10 @@
*/
package io.fabric8.kubernetes.client.dsl.internal.core.v1;

import java.io.BufferedOutputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.FilterInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
Expand All @@ -33,7 +36,10 @@
import java.nio.file.StandardCopyOption;
import java.util.Collections;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import static io.fabric8.kubernetes.client.utils.internal.OptionalDependencyWrapper.wrapRunWithOptionalDependency;

Expand Down Expand Up @@ -427,23 +433,19 @@ public InputStream read() {
}
}

private Future<?> readFileTo(String source, OutputStream out) {
return readTo(new Base64.OutputStream(out, Base64.DECODE), "sh", "-c", String.format("cat %s | base64", shellQuote(source)));
}

private InputStream readFile(String source) {
try {
PipedOutputStream out = new PipedOutputStream();
PipedInputStream in = new PipedInputStream(out, 1024);
ExecWatch watch = writingOutput(out).usingListener((int code, String reason) -> {
try {
out.flush();
out.close();
} catch (IOException e) {
throw KubernetesClientException.launderThrowable(e);
}
}
).exec("sh", "-c", String.format("cat %s | base64", shellQuote(source)));
return new Base64.InputStream(in) {
final Future<?> future = readFileTo(source, out);
return new FilterInputStream(in) {
@Override
public void close() throws IOException {
watch.close();
future.cancel(false);
super.close();
}
};
Expand All @@ -468,33 +470,76 @@ private void copyFile(String source, File target) {
String filename = parts[parts.length - 1];
destination = destination.toPath().resolve(filename).toFile();
}
try (InputStream is = readFile(source);) {
Files.copy(is, destination.toPath(), StandardCopyOption.REPLACE_EXISTING);

try {
Files.deleteIfExists(destination.toPath());
} catch (IOException e) {
throw KubernetesClientException.launderThrowable(e);
}
try (OutputStream out = new BufferedOutputStream(new FileOutputStream(destination))) {
readFileTo(source, out).get();
} catch (Exception e) {
throw KubernetesClientException.launderThrowable(e);
}
}

private Future<?> readTarTo(String source, OutputStream out) {
return readTo(new Base64.OutputStream(out, Base64.DECODE), "sh", "-c", "tar -cf - " + source + "|" + "base64");
}

public InputStream readTar(String source) {
try {
PipedOutputStream out = new PipedOutputStream();
PipedInputStream in = new PipedInputStream(out, 1024);
ExecWatch watch = writingOutput(out).usingListener((int code, String reason) -> {
final Future<?> future = readTarTo(source, out);
return new FilterInputStream(in) {
@Override
public void close() throws IOException {
future.cancel(false);
super.close();
}
};
} catch (Exception e) {
throw KubernetesClientException.launderThrowable(e);
}
}

private Future<?> readTo(OutputStream out, String... cmd) {
try {
CompletableFuture<Void> cp = new CompletableFuture<>();
ExecWatch w = writingOutput(out).usingListener(new ExecListener() {
@Override
public void onClose(int code, String reason) {
try {
out.flush();
out.close();
cp.complete(null);
} catch (IOException e) {
cp.completeExceptionally(e);
throw KubernetesClientException.launderThrowable(e);
}
}
).exec("sh", "-c", "tar -cf - " + source + "|" + "base64");
return new Base64.InputStream(in) {

@Override
public void close() throws IOException {
watch.close();
super.close();
public void onFailure(Throwable t, Response failureResponse) {
try {
out.flush();
out.close();
cp.completeExceptionally(t);
} catch (IOException e) {
e.addSuppressed(t);
cp.completeExceptionally(e);
throw KubernetesClientException.launderThrowable(e);
}
}
};
}).exec(cmd);

cp.whenComplete((o, t) -> {
if (cp.isCancelled()) {
w.close();
}
});
return cp;
} catch (Exception e) {
throw KubernetesClientException.launderThrowable(e);
}
Expand Down

0 comments on commit 0eb88bc

Please sign in to comment.