Skip to content

Commit

Permalink
Update the mtime on files stored or retrieved from the disk cache.
Browse files Browse the repository at this point in the history
This makes it possible to trim the cache to a maximum size while keeping the most recently accessed entries. For now, this operation must be carried out offline.

Progress on #5139.

PiperOrigin-RevId: 568601572
Change-Id: I6b702c4e32211fab12ce3696dce57313858d7e37
  • Loading branch information
tjgq authored and copybara-github committed Sep 26, 2023
1 parent 5b96c54 commit 7a774ff
Show file tree
Hide file tree
Showing 8 changed files with 257 additions and 116 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.devtools.build.lib.remote.Store;
import com.google.devtools.build.lib.remote.common.CacheNotFoundException;
import com.google.devtools.build.lib.remote.common.LazyFileOutputStream;
import com.google.devtools.build.lib.remote.common.RemoteActionExecutionContext;
import com.google.devtools.build.lib.remote.common.RemoteCacheClient;
Expand Down Expand Up @@ -161,14 +162,23 @@ private static ListenableFuture<Void> cleanupTempFileOnError(
@Override
public ListenableFuture<Void> downloadBlob(
RemoteActionExecutionContext context, Digest digest, OutputStream out) {
if (context.getReadCachePolicy().allowDiskCache() && diskCache.contains(digest)) {
return diskCache.downloadBlob(context, digest, out);
if (context.getReadCachePolicy().allowDiskCache()) {
return Futures.catchingAsync(
diskCache.downloadBlob(context, digest, out),
CacheNotFoundException.class,
(unused) -> downloadBlobFromRemote(context, digest, out),
directExecutor());
} else {
return downloadBlobFromRemote(context, digest, out);
}
}

Path tempPath = getTempPath();
LazyFileOutputStream tempOut = new LazyFileOutputStream(tempPath);

private ListenableFuture<Void> downloadBlobFromRemote(
RemoteActionExecutionContext context, Digest digest, OutputStream out) {
if (context.getReadCachePolicy().allowRemoteCache()) {
Path tempPath = getTempPath();
LazyFileOutputStream tempOut = new LazyFileOutputStream(tempPath);

ListenableFuture<Void> download =
cleanupTempFileOnError(
remoteCache.downloadBlob(context, digest, tempOut), tempPath, tempOut);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.
package com.google.devtools.build.lib.remote.disk;

import static com.google.common.util.concurrent.Futures.immediateFailedFuture;
import static com.google.common.util.concurrent.Futures.immediateFuture;
import static com.google.devtools.build.lib.remote.util.DigestUtil.isOldStyleDigestFunction;

Expand All @@ -39,14 +40,25 @@
import com.google.devtools.build.lib.vfs.Path;
import com.google.protobuf.ByteString;
import com.google.protobuf.ExtensionRegistryLite;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.time.Instant;
import java.util.UUID;
import javax.annotation.Nullable;

/** A on-disk store for the remote action cache. */
/**
* An on-disk store for the remote action cache.
*
* <p>Concurrent Bazel processes can safely retrieve and store entries in a shared disk cache, even
* when they collide.
*
* <p>The mtime of an entry reflects the most recent time the entry was stored *or* retrieved. This
* property may be used to trim the disk cache to the most recently used entries. However, it's not
* safe to trim the cache at the same time a Bazel process is accessing it.
*/
public class DiskCacheClient implements RemoteCacheClient {
private final Path root;
private final boolean verifyDownloads;
Expand All @@ -72,33 +84,52 @@ public DiskCacheClient(Path root, boolean verifyDownloads, DigestUtil digestUtil
this.root.createDirectoryAndParents();
}

/** Returns {@code true} if the provided {@code key} is stored in the CAS. */
public boolean contains(Digest digest) {
return toPath(digest.getHash(), Store.CAS).exists();
}

/** Returns {@link Path} into the CAS for the given {@link Digest}. */
public Path getPath(Digest digest) {
return toPath(digest.getHash(), Store.CAS);
/**
* If the given path exists, updates its mtime and returns true. Otherwise, returns false.
*
* <p>This provides a cheap way to identify candidates for deletion when trimming the cache. We
* deliberately use the mtime because the atime is more likely to be externally modified and may
* be unavailable on some filesystems.
*
* <p>Prefer calling {@link #downloadBlob} or {@link #downloadActionResult} instead, which will
* automatically update the mtime. This method should only be called by the remote worker
* implementation.
*
* @throws IOException if an I/O error other than a missing file occurs.
*/
public boolean refresh(Path path) throws IOException {
try {
path.setLastModifiedTime(Instant.now().toEpochMilli());
} catch (FileNotFoundException e) {
return false;
}
return true;
}

public void captureFile(Path src, Digest digest, Store store) throws IOException {
Path target = toPath(digest.getHash(), store);
/**
* Moves an existing file into the cache.
*
* <p>The caller must ensure that the digest is correct and the file has been recently modified.
* This method should only be called by the combined cache implementation.
*/
void captureFile(Path src, Digest digest, Store store) throws IOException {
Path target = toPath(digest, store);
target.getParentDirectory().createDirectoryAndParents();
src.renameTo(target);
}

private ListenableFuture<Void> download(Digest digest, OutputStream out, Store store) {
Path p = toPath(digest.getHash(), store);
if (!p.exists()) {
return Futures.immediateFailedFuture(new CacheNotFoundException(digest));
} else {
try (InputStream in = p.getInputStream()) {
Path path = toPath(digest, store);
try {
if (!refresh(path)) {
return immediateFailedFuture(new CacheNotFoundException(digest));
}
try (InputStream in = path.getInputStream()) {
ByteStreams.copy(in, out);
return immediateFuture(null);
} catch (IOException e) {
return Futures.immediateFailedFuture(e);
}
} catch (IOException e) {
return immediateFailedFuture(e);
}
}

Expand All @@ -123,17 +154,18 @@ public ListenableFuture<Void> downloadBlob(
MoreExecutors.directExecutor());
}

private void checkDigestExists(Digest digest) throws CacheNotFoundException {
private void checkDigestExists(Digest digest) throws IOException {
if (digest.getSizeBytes() == 0) {
return;
}

if (!toPath(digest.getHash(), Store.CAS).exists()) {
Path path = toPath(digest, Store.CAS);
if (!refresh(path)) {
throw new CacheNotFoundException(digest);
}
}

private void checkOutputDirectory(Directory dir) throws CacheNotFoundException {
private void checkOutputDirectory(Directory dir) throws IOException {
for (var file : dir.getFilesList()) {
checkDigestExists(file.getDigest());
}
Expand All @@ -155,7 +187,7 @@ private void checkActionResult(ActionResult actionResult) throws IOException {
var treeDigest = outputDirectory.getTreeDigest();
checkDigestExists(treeDigest);

var treePath = toPath(treeDigest.getHash(), Store.CAS);
var treePath = toPath(treeDigest, Store.CAS);
var tree =
Tree.parseFrom(treePath.getInputStream(), ExtensionRegistryLite.getEmptyRegistry());
checkOutputDirectory(tree.getRoot());
Expand Down Expand Up @@ -198,11 +230,21 @@ public ListenableFuture<CachedActionResult> downloadActionResult(
}

try {
// Verify that all of the referenced blobs exist and update their mtime.
checkActionResult(actionResult);
} catch (CacheNotFoundException e) {
// If at least one of the referenced blobs is missing, consider the action result to be
// stale. At this point we might have unnecessarily updated the mtime on some of the
// referenced blobs, but this should happen infrequently, and doing it this way avoids a
// double pass over the blobs.
return immediateFuture(null);
}

// Update the mtime for the action result itself. This ensures that blobs are older than
// the action result, so that trimming the cache in LRU order will not create dangling
// references.
var unused = refresh(toPath(actionKey.getDigest(), Store.AC));

return immediateFuture(CachedActionResult.disk(actionResult));
},
MoreExecutors.directExecutor());
Expand All @@ -212,7 +254,7 @@ public ListenableFuture<CachedActionResult> downloadActionResult(
public ListenableFuture<Void> uploadActionResult(
RemoteActionExecutionContext context, ActionKey actionKey, ActionResult actionResult) {
try (InputStream data = actionResult.toByteString().newInput()) {
saveFile(actionKey.getDigest().getHash(), data, Store.AC);
saveFile(actionKey.getDigest(), Store.AC, data);
return immediateFuture(null);
} catch (IOException e) {
return Futures.immediateFailedFuture(e);
Expand All @@ -226,7 +268,7 @@ public void close() {}
public ListenableFuture<Void> uploadFile(
RemoteActionExecutionContext context, Digest digest, Path file) {
try (InputStream in = file.getInputStream()) {
saveFile(digest.getHash(), in, Store.CAS);
saveFile(digest, Store.CAS, in);
} catch (IOException e) {
return Futures.immediateFailedFuture(e);
}
Expand All @@ -237,7 +279,7 @@ public ListenableFuture<Void> uploadFile(
public ListenableFuture<Void> uploadBlob(
RemoteActionExecutionContext context, Digest digest, ByteString data) {
try (InputStream in = data.newInput()) {
saveFile(digest.getHash(), in, Store.CAS);
saveFile(digest, Store.CAS, in);
} catch (IOException e) {
return Futures.immediateFailedFuture(e);
}
Expand All @@ -256,17 +298,18 @@ Path getTempPath() {
return root.getChild(UUID.randomUUID().toString());
}

protected Path toPath(String key, Store store) {
// Create the file in a subfolder to bypass possible folder file count limits
return root.getChild(store.toString()).getChild(key.substring(0, 2)).getChild(key);
public Path toPath(Digest digest, Store store) {
String hash = digest.getHash();
// Create the file in a subfolder to bypass possible folder file count limits.
return root.getChild(store.toString()).getChild(hash.substring(0, 2)).getChild(hash);
}

private void saveFile(String key, InputStream in, Store store) throws IOException {
Path target = toPath(key, store);
if (target.exists()) {
private void saveFile(Digest digest, Store store, InputStream in) throws IOException {
Path path = toPath(digest, store);

if (refresh(path)) {
return;
}
target.getParentDirectory().createDirectoryAndParents();

// Write a temporary file first, and then rename, to avoid data corruption in case of a crash.
Path temp = getTempPath();
Expand All @@ -278,7 +321,8 @@ private void saveFile(String key, InputStream in, Store store) throws IOExceptio
// crashes (the OS may reorder the writes and the rename).
out.getFD().sync();
}
temp.renameTo(target);
path.getParentDirectory().createDirectoryAndParents();
temp.renameTo(path);
} catch (IOException e) {
try {
temp.delete();
Expand Down
6 changes: 6 additions & 0 deletions src/test/java/com/google/devtools/build/lib/remote/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -235,15 +235,21 @@ java_test(
],
deps = [
"//src/main/java/com/google/devtools/build/lib:runtime",
"//src/main/java/com/google/devtools/build/lib/actions",
"//src/main/java/com/google/devtools/build/lib/authandtls/credentialhelper:credential_module",
"//src/main/java/com/google/devtools/build/lib/events",
"//src/main/java/com/google/devtools/build/lib/remote",
"//src/main/java/com/google/devtools/build/lib/remote:store",
"//src/main/java/com/google/devtools/build/lib/remote/util",
"//src/main/java/com/google/devtools/build/lib/standalone",
"//src/main/java/com/google/devtools/build/lib/vfs",
"//src/main/java/com/google/devtools/build/lib/vfs:pathfragment",
"//src/test/java/com/google/devtools/build/lib/buildtool/util",
"//src/test/java/com/google/devtools/build/lib/remote/util:integration_test_utils",
"//src/test/java/com/google/devtools/build/lib/testutil:TestUtils",
"//third_party:guava",
"//third_party:junit4",
"//third_party:truth",
"@remoteapis//:build_bazel_remote_execution_v2_remote_execution_java_proto",
],
)
Loading

0 comments on commit 7a774ff

Please sign in to comment.