Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for compression on gRPC cache #14041

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
ccea6a4
[Compression] Add experimental option and verify capabilities
AlessandroPatti Sep 20, 2021
148ebc6
[Compression] Implement bytestream compressed cache downloads
AlessandroPatti Sep 20, 2021
bb6b53f
[Compression] Implement bytestream compressed cache uploads
AlessandroPatti Sep 20, 2021
ad5b0b4
[Compression][Tests] Test chunker with compression
AlessandroPatti Sep 21, 2021
c0a3b3e
[Compression][Tests] Test compressed cache uplaods
AlessandroPatti Sep 21, 2021
1d643aa
[Compression][Tests] Test compressed cache downloads
AlessandroPatti Sep 21, 2021
4d1dc31
Allow calling getActualSize when upload are not finished
AlessandroPatti Sep 26, 2021
d84a76d
Use thin jars for zstd jni
AlessandroPatti Sep 26, 2021
df571eb
[Compression] Wrap output stream with decompression
AlessandroPatti Sep 28, 2021
8c77992
[Compression][Upload] Simplify Chunker with auxiliary classes
AlessandroPatti Oct 1, 2021
ef40b33
Ensure hasNext is consistent after seek
AlessandroPatti Oct 2, 2021
054ce2d
Exhaust input and compute final size
AlessandroPatti Oct 2, 2021
55affb4
Adjust test params
AlessandroPatti Oct 2, 2021
9fc9b0a
[Compression][Tests] Add test for compressed progressive uploads
AlessandroPatti Oct 2, 2021
91ba7b8
[Compression][Upload] Seek before jumping in rpc call
AlessandroPatti Oct 2, 2021
edf0f8c
Compile zstd-jni from sources
AlessandroPatti Oct 12, 2021
43bbd5f
[Compression] Support progressive reads
AlessandroPatti Oct 14, 2021
acb7fb7
Simplify compressing stream
AlessandroPatti Oct 14, 2021
327d6b9
[Compression][Tests] Test progressive compressed download
AlessandroPatti Oct 14, 2021
dfc3068
[Compression] Rename option to remote_cache_compression
AlessandroPatti Oct 14, 2021
f6e1d2e
Only check for expected commited size if we completed the upload
AlessandroPatti Oct 15, 2021
4f7771d
Ensure minimum pipe size
AlessandroPatti Oct 17, 2021
6c13594
Add simple tests for zstd streams
AlessandroPatti Oct 17, 2021
9648105
Move native patch to patch file
AlessandroPatti Oct 20, 2021
522a07f
Add mirror
AlessandroPatti Oct 20, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ pkg_tar(
"@com_google_protobuf//:protobuf_java",
"@com_google_protobuf//:protobuf_java_util",
"@com_google_protobuf//:protobuf_javalite",
"@zstd-jni//:zstd-jni",
],
package_dir = "derived/jars",
strip_prefix = "external",
Expand Down
8 changes: 8 additions & 0 deletions WORKSPACE
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,14 @@ dist_http_archive(
patch_cmds_win = EXPORT_WORKSPACE_IN_BUILD_FILE_WIN,
)

dist_http_archive(
name = "zstd-jni",
patch_cmds = EXPORT_WORKSPACE_IN_BUILD_BAZEL_FILE,
patch_cmds_win = EXPORT_WORKSPACE_IN_BUILD_BAZEL_FILE_WIN,
build_file = "//third_party:zstd-jni/zstd-jni.BUILD",
strip_prefix = "zstd-jni-1.5.0-4"
)

http_archive(
name = "org_snakeyaml",
build_file_content = """
Expand Down
15 changes: 15 additions & 0 deletions distdir_deps.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,21 @@ DIST_DEPS = {
"test_WORKSPACE_files",
],
},
"zstd-jni": {
"archive": "v1.5.0-4.zip",
"patch_args": ["-p1"],
"patches": [
"//third_party:zstd-jni/Native.java.patch",
],
"sha256": "d320d59b89a163c5efccbe4915ae6a49883ce653cdc670643dfa21c6063108e4",
"urls": [
"https://mirror.bazel.build/github.com/luben/zstd-jni/archive/v1.5.0-4.zip",
"https://github.com/luben/zstd-jni/archive/v1.5.0-4.zip",
Copy link
Contributor Author

@AlessandroPatti AlessandroPatti Oct 19, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might be good to mirror this in mirror.bazel.build. @philwo could you help with that as part of the merge?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

],
"used_in": [
"additional_distfiles",
],
},
###################################################
#
# Build time dependencies for testing and packaging
Expand Down
3 changes: 3 additions & 0 deletions src/main/java/com/google/devtools/build/lib/remote/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ filegroup(
"//src/main/java/com/google/devtools/build/lib/remote/merkletree:srcs",
"//src/main/java/com/google/devtools/build/lib/remote/options:srcs",
"//src/main/java/com/google/devtools/build/lib/remote/util:srcs",
"//src/main/java/com/google/devtools/build/lib/remote/zstd:srcs",
],
visibility = ["//src:__subpackages__"],
)
Expand Down Expand Up @@ -80,6 +81,7 @@ java_library(
"//src/main/java/com/google/devtools/build/lib/remote/merkletree",
"//src/main/java/com/google/devtools/build/lib/remote/options",
"//src/main/java/com/google/devtools/build/lib/remote/util",
"//src/main/java/com/google/devtools/build/lib/remote/zstd",
"//src/main/java/com/google/devtools/build/lib/sandbox",
"//src/main/java/com/google/devtools/build/lib/skyframe:mutable_supplier",
"//src/main/java/com/google/devtools/build/lib/skyframe:tree_artifact_value",
Expand All @@ -93,6 +95,7 @@ java_library(
"//src/main/java/com/google/devtools/build/lib/vfs:pathfragment",
"//src/main/java/com/google/devtools/common/options",
"//src/main/protobuf:failure_details_java_proto",
"//third_party:apache_commons_compress",
"//third_party:auth",
"//third_party:flogger",
"//third_party:guava",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -298,9 +298,11 @@ boolean uploadsInProgress() {
}
}

private static String buildUploadResourceName(String instanceName, UUID uuid, Digest digest) {
String resourceName =
format("uploads/%s/blobs/%s/%d", uuid, digest.getHash(), digest.getSizeBytes());
private static String buildUploadResourceName(
String instanceName, UUID uuid, Digest digest, boolean compressed) {
String template =
compressed ? "uploads/%s/compressed-blobs/zstd/%s/%d" : "uploads/%s/blobs/%s/%d";
String resourceName = format(template, uuid, digest.getHash(), digest.getSizeBytes());
if (!Strings.isNullOrEmpty(instanceName)) {
resourceName = instanceName + "/" + resourceName;
}
Expand All @@ -325,7 +327,8 @@ private ListenableFuture<Void> startAsyncUpload(
}

UUID uploadId = UUID.randomUUID();
String resourceName = buildUploadResourceName(instanceName, uploadId, digest);
String resourceName =
buildUploadResourceName(instanceName, uploadId, digest, chunker.isCompressed());
AsyncUpload newUpload =
new AsyncUpload(
context,
Expand Down Expand Up @@ -405,7 +408,20 @@ ListenableFuture<Void> start() {
() ->
retrier.executeAsync(
() -> {
if (committedOffset.get() < chunker.getSize()) {
if (chunker.getSize() == 0) {
return Futures.immediateFuture(null);
}
try {
chunker.seek(committedOffset.get());
} catch (IOException e) {
try {
chunker.reset();
} catch (IOException resetException) {
e.addSuppressed(resetException);
}
return Futures.immediateFailedFuture(e);
}
if (chunker.hasNext()) {
return callAndQueryOnFailure(committedOffset, progressiveBackoff);
}
return Futures.immediateFuture(null);
Expand All @@ -416,13 +432,19 @@ ListenableFuture<Void> start() {
return Futures.transformAsync(
callFuture,
(result) -> {
long committedSize = committedOffset.get();
long expected = chunker.getSize();
if (committedSize != expected) {
String message =
format(
"write incomplete: committed_size %d for %d total", committedSize, expected);
return Futures.immediateFailedFuture(new IOException(message));
if (!chunker.hasNext()) {
// Only check for matching committed size if we have completed the upload.
// If another client did, they might have used a different compression
// level/algorithm, so we cannot know the expected committed offset
long committedSize = committedOffset.get();
long expected = chunker.getOffset();
if (!chunker.hasNext() && committedSize != expected) {
String message =
format(
"write incomplete: committed_size %d for %d total",
committedSize, expected);
return Futures.immediateFailedFuture(new IOException(message));
}
}
return Futures.immediateFuture(null);
},
Expand Down Expand Up @@ -517,17 +539,6 @@ private ListenableFuture<Void> call(AtomicLong committedOffset) {
.withDeadlineAfter(callTimeoutSecs, SECONDS);
call = channel.newCall(ByteStreamGrpc.getWriteMethod(), callOptions);

try {
chunker.seek(committedOffset.get());
} catch (IOException e) {
try {
chunker.reset();
} catch (IOException resetException) {
e.addSuppressed(resetException);
}
return Futures.immediateFailedFuture(e);
}

SettableFuture<Void> uploadResult = SettableFuture.create();
ClientCall.Listener<WriteResponse> callListener =
new ClientCall.Listener<WriteResponse>() {
Expand Down
110 changes: 80 additions & 30 deletions src/main/java/com/google/devtools/build/lib/remote/Chunker.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,14 @@
import com.google.devtools.build.lib.actions.ActionInput;
import com.google.devtools.build.lib.actions.ActionInputHelper;
import com.google.devtools.build.lib.actions.cache.VirtualActionInput;
import com.google.devtools.build.lib.remote.zstd.ZstdCompressingInputStream;
import com.google.devtools.build.lib.vfs.Path;
import com.google.protobuf.ByteString;

import java.io.ByteArrayInputStream;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.io.PushbackInputStream;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.function.Supplier;
Expand All @@ -55,6 +57,10 @@ static int getDefaultChunkSize() {
return defaultChunkSize;
}

public boolean isCompressed() {
return compressed;
}

/** A piece of a byte[] blob. */
public static final class Chunk {

Expand Down Expand Up @@ -98,19 +104,22 @@ public int hashCode() {
private final int chunkSize;
private final Chunk emptyChunk;

private InputStream data;
private ChunkerInputStream data;
private long offset;
private byte[] chunkCache;

private final boolean compressed;

// Set to true on the first call to next(). This is so that the Chunker can open its data source
// lazily on the first call to next(), as opposed to opening it in the constructor or on reset().
private boolean initialized;

Chunker(Supplier<InputStream> dataSupplier, long size, int chunkSize) {
Chunker(Supplier<InputStream> dataSupplier, long size, int chunkSize, boolean compressed) {
this.dataSupplier = checkNotNull(dataSupplier);
this.size = size;
this.chunkSize = chunkSize;
this.emptyChunk = new Chunk(ByteString.EMPTY, 0);
this.compressed = compressed;
}

public long getOffset() {
Expand All @@ -127,13 +136,9 @@ public long getSize() {
* <p>Closes any open resources (file handles, ...).
*/
public void reset() throws IOException {
if (data != null) {
data.close();
}
data = null;
close();
offset = 0;
initialized = false;
chunkCache = null;
}

/**
Expand All @@ -148,6 +153,9 @@ public void seek(long toOffset) throws IOException {
maybeInitialize();
ByteStreams.skipFully(data, toOffset - offset);
offset = toOffset;
if (data.finished()) {
close();
}
}

/**
Expand All @@ -157,6 +165,32 @@ public boolean hasNext() {
return data != null || !initialized;
}

/**
* Closes the input stream and reset chunk cache
* @throws IOException
*/
private void close() throws IOException {
if (data != null) {
data.close();
data = null;
}
chunkCache = null;
}

/**
* Attempts reading at most a full chunk and stores it in the chunkCache buffer
*/
private int read() throws IOException {
int count = 0;
while (count < chunkCache.length) {
int c = data.read(chunkCache, count, chunkCache.length - count);
if (c < 0) {
break;
}
count += c;
}
return count;
}
/**
* Returns the next {@link Chunk} or throws a {@link NoSuchElementException} if no data is left.
*
Expand All @@ -178,46 +212,40 @@ public Chunk next() throws IOException {
return emptyChunk;
}

// The cast to int is safe, because the return value is capped at chunkSize.
int bytesToRead = (int) Math.min(bytesLeft(), chunkSize);
if (bytesToRead == 0) {
if (data.finished()) {
chunkCache = null;
data = null;
throw new NoSuchElementException();
}

if (chunkCache == null) {
// If the output is compressed we can't know how many bytes there are yet to read,
// so we allocate the whole chunkSize, otherwise we try to compute the smallest possible value
// The cast to int is safe, because the return value is capped at chunkSize.
int cacheSize = compressed ? chunkSize : (int) Math.min(getSize() - getOffset(), chunkSize);
// Lazily allocate it in order to save memory on small data.
// 1) bytesToRead < chunkSize: There will only ever be one next() call.
// 2) bytesToRead == chunkSize: chunkCache will be set to its biggest possible value.
// 3) bytestoRead > chunkSize: Not possible, due to Math.min above.
chunkCache = new byte[bytesToRead];
chunkCache = new byte[cacheSize];
}

long offsetBefore = offset;
try {
ByteStreams.readFully(data, chunkCache, 0, bytesToRead);
} catch (EOFException e) {
throw new IllegalStateException("Reached EOF, but expected "
+ bytesToRead + " bytes.", e);
}
offset += bytesToRead;

ByteString blob = ByteString.copyFrom(chunkCache, 0, bytesToRead);
int bytesRead = read();

if (bytesLeft() == 0) {
data.close();
data = null;
chunkCache = null;
ByteString blob = ByteString.copyFrom(chunkCache, 0, bytesRead);

// This has to happen after actualSize has been updated
// or the guard in getActualSize won't work.
offset += bytesRead;
if (data.finished()) {
close();
}

return new Chunk(blob, offsetBefore);
}

public long bytesLeft() {
return getSize() - getOffset();
}

private void maybeInitialize() throws IOException {
if (initialized) {
return;
Expand All @@ -226,7 +254,9 @@ private void maybeInitialize() throws IOException {
checkState(offset == 0);
checkState(chunkCache == null);
try {
data = dataSupplier.get();
data = compressed ?
new ChunkerInputStream(new ZstdCompressingInputStream(dataSupplier.get())) :
new ChunkerInputStream(dataSupplier.get());
} catch (RuntimeException e) {
Throwables.propagateIfPossible(e.getCause(), IOException.class);
throw e;
Expand All @@ -242,6 +272,7 @@ public static Builder builder() {
public static class Builder {
private int chunkSize = getDefaultChunkSize();
private long size;
private boolean compressed;
private Supplier<InputStream> inputStream;

public Builder setInput(byte[] data) {
Expand All @@ -251,6 +282,11 @@ public Builder setInput(byte[] data) {
return this;
}

public Builder setCompressed(boolean compressed) {
this.compressed = compressed;
return this;
}

public Builder setInput(long size, InputStream in) {
checkState(inputStream == null);
checkNotNull(in);
Expand Down Expand Up @@ -305,7 +341,21 @@ public Builder setChunkSize(int chunkSize) {

public Chunker build() {
checkNotNull(inputStream);
return new Chunker(inputStream, size, chunkSize);
return new Chunker(inputStream, size, chunkSize, compressed);
}
}

static class ChunkerInputStream extends PushbackInputStream {
ChunkerInputStream(InputStream in) {
super(in);
}

public boolean finished() throws IOException {
int c = super.read();
if (c == -1)
return true;
super.unread(c);
return false;
}
}
}
Loading