Skip to content

Commit

Permalink
[Compression] Wrap output stream with decompression
Browse files Browse the repository at this point in the history
  • Loading branch information
AlessandroPatti committed Sep 29, 2021
1 parent a9c8861 commit 3d6339e
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,6 @@
import build.bazel.remote.execution.v2.FindMissingBlobsResponse;
import build.bazel.remote.execution.v2.GetActionResultRequest;
import build.bazel.remote.execution.v2.RequestMetadata;
import build.bazel.remote.execution.v2.UpdateActionResultRequest;
import com.github.luben.zstd.ZstdInputStream;
import com.google.bytestream.ByteStreamGrpc;
import com.google.bytestream.ByteStreamGrpc.ByteStreamStub;
import com.google.bytestream.ByteStreamProto.ReadRequest;
Expand Down Expand Up @@ -61,9 +59,7 @@
import io.grpc.StatusRuntimeException;
import io.grpc.stub.StreamObserver;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.List;
Expand Down Expand Up @@ -298,6 +294,14 @@ public ListenableFuture<Void> downloadBlob(
out = digestOut;
}

if (options.cacheByteStreamCompression) {
try {
out = new ZstdDecompressingOutputStream(out);
} catch (IOException e) {
return Futures.immediateFailedFuture(e);
}
}

return downloadBlob(context, digest, out, digestSupplier);
}

Expand Down Expand Up @@ -352,41 +356,13 @@ private ListenableFuture<Void> requestRead(
.setReadOffset(offset.get())
.build(),
new StreamObserver<ReadResponse>() {
InputStream inner;
ZstdInputStream zis;

{
initialise();
}

private void initialise() throws IOException {
if (options.cacheByteStreamCompression) {
zis =
new ZstdInputStream(
new InputStream() {
@Override
public int read() throws IOException {
return inner.read();
}
});

zis.setContinuous(true);
}
}

@Override
public void onNext(ReadResponse readResponse) {
ByteString data = readResponse.getData();
try {
if (options.cacheByteStreamCompression) {
inner = new ByteArrayInputStream(data.toByteArray());
ByteString bs = ByteString.readFrom(zis);
bs.writeTo(out);
offset.addAndGet(bs.size());
} else {
data.writeTo(out);
offset.addAndGet(data.size());
}
data.writeTo(out);
offset.addAndGet(data.size());
} catch (IOException e) {
// Cancel the call.
throw new RuntimeException(e);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package com.google.devtools.build.lib.remote;

import com.github.luben.zstd.ZstdInputStream;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;

public class ZstdDecompressingOutputStream extends OutputStream {
private final ByteArrayOutputStream baos;
private final ZstdInputStream zis;
private final OutputStream out;
private int lastByte;

ZstdDecompressingOutputStream(OutputStream out) throws IOException {
this.out = out;
baos = new ByteArrayOutputStream();
zis =
new ZstdInputStream(
new InputStream() {
@Override
public int read() throws IOException {
int value = lastByte;
lastByte = -1;
return value;
}
});
zis.setContinuous(true);
}

@Override
public void write(int b) throws IOException {
lastByte = b;
int c;
while ((c = zis.read()) != -1) {
out.write(c);
}
}
}

0 comments on commit 3d6339e

Please sign in to comment.