diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteCache.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteCache.java index af68ddfb727957..f9801df0513959 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/RemoteCache.java +++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteCache.java @@ -30,11 +30,7 @@ import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.SettableFuture; -import com.google.devtools.build.lib.actions.ActionExecutionMetadata; -import com.google.devtools.build.lib.actions.ActionUploadFinishedEvent; -import com.google.devtools.build.lib.actions.ActionUploadStartedEvent; import com.google.devtools.build.lib.concurrent.ThreadSafety; -import com.google.devtools.build.lib.events.ExtendedEventHandler; import com.google.devtools.build.lib.exec.SpawnProgressEvent; import com.google.devtools.build.lib.remote.common.BulkTransferException; import com.google.devtools.build.lib.remote.common.LazyFileOutputStream; @@ -66,7 +62,6 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.regex.Matcher; import java.util.regex.Pattern; -import javax.annotation.Nullable; /** * A cache for storing artifacts (input and output) as well as the output of running an action. @@ -85,7 +80,6 @@ public class RemoteCache extends AbstractReferenceCounted { private static final ListenableFuture COMPLETED_SUCCESS = immediateFuture(null); private static final ListenableFuture EMPTY_BYTES = immediateFuture(new byte[0]); - private final ExtendedEventHandler reporter; private final CountDownLatch closeCountDownLatch = new CountDownLatch(1); protected final AsyncTaskCache.NoResult casUploadCache = AsyncTaskCache.NoResult.create(); @@ -94,11 +88,9 @@ public class RemoteCache extends AbstractReferenceCounted { protected final DigestUtil digestUtil; public RemoteCache( - ExtendedEventHandler reporter, RemoteCacheClient cacheProtocol, RemoteOptions options, DigestUtil digestUtil) { - this.reporter = reporter; this.cacheProtocol = cacheProtocol; this.options = options; this.digestUtil = digestUtil; @@ -110,23 +102,6 @@ public CachedActionResult downloadActionResult( return getFromFuture(cacheProtocol.downloadActionResult(context, actionKey, inlineOutErr)); } - private void postUploadStartedEvent(@Nullable ActionExecutionMetadata action, String resourceId) { - if (action == null) { - return; - } - - reporter.post(ActionUploadStartedEvent.create(action, resourceId)); - } - - private void postUploadFinishedEvent( - @Nullable ActionExecutionMetadata action, String resourceId) { - if (action == null) { - return; - } - - reporter.post(ActionUploadFinishedEvent.create(action, resourceId)); - } - /** * Returns a set of digests that the remote cache does not know about. The returned set is * guaranteed to be a subset of {@code digests}. @@ -143,38 +118,14 @@ public ListenableFuture> findMissingDigests( public ListenableFuture uploadActionResult( RemoteActionExecutionContext context, ActionKey actionKey, ActionResult actionResult) { - ActionExecutionMetadata action = context.getSpawnOwner(); - Completable upload = - Completable.using( - () -> { - String resourceId = "ac/" + actionKey.getDigest().getHash(); - postUploadStartedEvent(action, resourceId); - return resourceId; - }, - resourceId -> - RxFutures.toCompletable( - () -> cacheProtocol.uploadActionResult(context, actionKey, actionResult), - directExecutor()), - resourceId -> postUploadFinishedEvent(action, resourceId)); + RxFutures.toCompletable( + () -> cacheProtocol.uploadActionResult(context, actionKey, actionResult), + directExecutor()); return RxFutures.toListenableFuture(upload); } - private Completable doUploadFile(RemoteActionExecutionContext context, Digest digest, Path file) { - ActionExecutionMetadata action = context.getSpawnOwner(); - return Completable.using( - () -> { - String resourceId = "cas/" + digest.getHash(); - postUploadStartedEvent(action, resourceId); - return resourceId; - }, - resourceId -> - RxFutures.toCompletable( - () -> cacheProtocol.uploadFile(context, digest, file), directExecutor()), - resourceId -> postUploadFinishedEvent(action, resourceId)); - } - /** * Upload a local file to the remote cache. * @@ -191,26 +142,15 @@ public final ListenableFuture uploadFile( return COMPLETED_SUCCESS; } - Completable upload = casUploadCache.executeIfNot(digest, doUploadFile(context, digest, file)); + Completable upload = + casUploadCache.executeIfNot( + digest, + RxFutures.toCompletable( + () -> cacheProtocol.uploadFile(context, digest, file), directExecutor())); return RxFutures.toListenableFuture(upload); } - private Completable doUploadBlob( - RemoteActionExecutionContext context, Digest digest, ByteString data) { - ActionExecutionMetadata action = context.getSpawnOwner(); - return Completable.using( - () -> { - String resourceId = "cas/" + digest.getHash(); - postUploadStartedEvent(action, resourceId); - return resourceId; - }, - resourceId -> - RxFutures.toCompletable( - () -> cacheProtocol.uploadBlob(context, digest, data), directExecutor()), - resourceId -> postUploadFinishedEvent(action, resourceId)); - } - /** * Upload sequence of bytes to the remote cache. * @@ -227,7 +167,11 @@ public final ListenableFuture uploadBlob( return COMPLETED_SUCCESS; } - Completable upload = casUploadCache.executeIfNot(digest, doUploadBlob(context, digest, data)); + Completable upload = + casUploadCache.executeIfNot( + digest, + RxFutures.toCompletable( + () -> cacheProtocol.uploadBlob(context, digest, data), directExecutor())); return RxFutures.toListenableFuture(upload); } diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteExecutionCache.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteExecutionCache.java index 86ec811be5bfab..3b59afb81af1c4 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/RemoteExecutionCache.java +++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteExecutionCache.java @@ -22,7 +22,6 @@ import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.MoreExecutors; -import com.google.devtools.build.lib.events.ExtendedEventHandler; import com.google.devtools.build.lib.remote.common.RemoteActionExecutionContext; import com.google.devtools.build.lib.remote.common.RemoteCacheClient; import com.google.devtools.build.lib.remote.merkletree.MerkleTree; @@ -43,11 +42,10 @@ public class RemoteExecutionCache extends RemoteCache { public RemoteExecutionCache( - ExtendedEventHandler reporter, RemoteCacheClient protocolImpl, RemoteOptions options, DigestUtil digestUtil) { - super(reporter, protocolImpl, options, digestUtil); + super(protocolImpl, options, digestUtil); } /** diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteExecutionService.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteExecutionService.java index 1a4b123946dc87..7722aff0fd8937 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/RemoteExecutionService.java +++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteExecutionService.java @@ -84,6 +84,7 @@ import com.google.devtools.build.lib.events.Reporter; import com.google.devtools.build.lib.exec.SpawnRunner.SpawnExecutionContext; import com.google.devtools.build.lib.profiler.Profiler; +import com.google.devtools.build.lib.profiler.ProfilerTask; import com.google.devtools.build.lib.profiler.SilentCloseable; import com.google.devtools.build.lib.remote.RemoteExecutionService.ActionResultMetadata.DirectoryMetadata; import com.google.devtools.build.lib.remote.RemoteExecutionService.ActionResultMetadata.FileMetadata; @@ -1070,7 +1071,8 @@ public void uploadOutputs(RemoteAction action, SpawnResult spawnResult) Single.using( remoteCache::retain, remoteCache -> - manifest.uploadAsync(action.getRemoteActionExecutionContext(), remoteCache), + manifest.uploadAsync( + action.getRemoteActionExecutionContext(), remoteCache, reporter), RemoteCache::release) .subscribeOn(scheduler) .subscribe( @@ -1087,7 +1089,10 @@ public void onError(@NonNull Throwable e) { } }); } else { - manifest.upload(action.getRemoteActionExecutionContext(), remoteCache); + try (SilentCloseable c = + Profiler.instance().profile(ProfilerTask.UPLOAD_TIME, "upload outputs")) { + manifest.upload(action.getRemoteActionExecutionContext(), remoteCache, reporter); + } } } catch (IOException e) { reportUploadError(e); diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteModule.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteModule.java index d2508bda3b1d4a..6ad97fb2a1ece2 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/RemoteModule.java +++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteModule.java @@ -234,8 +234,7 @@ private void initHttpAndDiskCache( handleInitFailure(env, e, Code.CACHE_INIT_FAILURE); return; } - RemoteCache remoteCache = - new RemoteCache(env.getReporter(), cacheClient, remoteOptions, digestUtil); + RemoteCache remoteCache = new RemoteCache(cacheClient, remoteOptions, digestUtil); actionContextProvider = RemoteActionContextProvider.createForRemoteCaching( executorService, env, remoteCache, /* retryScheduler= */ null, digestUtil); @@ -573,7 +572,7 @@ public void beforeCommand(CommandEnvironment env) throws AbruptExitException { } execChannel.release(); RemoteExecutionCache remoteCache = - new RemoteExecutionCache(env.getReporter(), cacheClient, remoteOptions, digestUtil); + new RemoteExecutionCache(cacheClient, remoteOptions, digestUtil); actionContextProvider = RemoteActionContextProvider.createForRemoteExecution( executorService, @@ -609,8 +608,7 @@ public void beforeCommand(CommandEnvironment env) throws AbruptExitException { } } - RemoteCache remoteCache = - new RemoteCache(env.getReporter(), cacheClient, remoteOptions, digestUtil); + RemoteCache remoteCache = new RemoteCache(cacheClient, remoteOptions, digestUtil); actionContextProvider = RemoteActionContextProvider.createForRemoteCaching( executorService, env, remoteCache, retryScheduler, digestUtil); diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnCache.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnCache.java index d2219dcda44f91..3b8344cfc8f569 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnCache.java +++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnCache.java @@ -198,9 +198,7 @@ public void store(SpawnResult result) throws ExecException, InterruptedException } } - try (SilentCloseable c = prof.profile(ProfilerTask.UPLOAD_TIME, "upload outputs")) { - remoteExecutionService.uploadOutputs(action, result); - } + remoteExecutionService.uploadOutputs(action, result); } @Override diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnRunner.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnRunner.java index 1d2a08f022bafe..c8e6fecaade50a 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnRunner.java +++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnRunner.java @@ -575,9 +575,7 @@ SpawnResult execLocallyAndUpload( } } - try (SilentCloseable c = Profiler.instance().profile(UPLOAD_TIME, "upload outputs")) { - remoteExecutionService.uploadOutputs(action, result); - } + remoteExecutionService.uploadOutputs(action, result); return result; } diff --git a/src/main/java/com/google/devtools/build/lib/remote/UploadManifest.java b/src/main/java/com/google/devtools/build/lib/remote/UploadManifest.java index d2e7ba75bd3646..5dbbb0721c1dd2 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/UploadManifest.java +++ b/src/main/java/com/google/devtools/build/lib/remote/UploadManifest.java @@ -28,14 +28,20 @@ import build.bazel.remote.execution.v2.Tree; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; +import com.google.devtools.build.lib.actions.ActionExecutionMetadata; +import com.google.devtools.build.lib.actions.ActionUploadFinishedEvent; +import com.google.devtools.build.lib.actions.ActionUploadStartedEvent; import com.google.devtools.build.lib.actions.ExecException; import com.google.devtools.build.lib.actions.UserExecException; +import com.google.devtools.build.lib.events.ExtendedEventHandler; import com.google.devtools.build.lib.remote.common.RemoteActionExecutionContext; import com.google.devtools.build.lib.remote.common.RemoteCacheClient; import com.google.devtools.build.lib.remote.common.RemoteCacheClient.ActionKey; import com.google.devtools.build.lib.remote.common.RemotePathResolver; import com.google.devtools.build.lib.remote.options.RemoteOptions; import com.google.devtools.build.lib.remote.util.DigestUtil; +import com.google.devtools.build.lib.remote.util.RxUtils; import com.google.devtools.build.lib.server.FailureDetails.FailureDetail; import com.google.devtools.build.lib.server.FailureDetails.RemoteExecution; import com.google.devtools.build.lib.server.FailureDetails.RemoteExecution.Code; @@ -56,6 +62,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.stream.Collectors; import javax.annotation.Nullable; /** UploadManifest adds output metadata to a {@link ActionResult}. */ @@ -341,10 +348,11 @@ ActionResult getActionResult() { } /** Uploads outputs and action result (if exit code is 0) to remote cache. */ - public ActionResult upload(RemoteActionExecutionContext context, RemoteCache remoteCache) + public ActionResult upload( + RemoteActionExecutionContext context, RemoteCache remoteCache, ExtendedEventHandler reporter) throws IOException, InterruptedException { try { - return uploadAsync(context, remoteCache).blockingGet(); + return uploadAsync(context, remoteCache, reporter).blockingGet(); } catch (RuntimeException e) { throwIfInstanceOf(e.getCause(), InterruptedException.class); throwIfInstanceOf(e.getCause(), IOException.class); @@ -368,29 +376,91 @@ private Completable upload( return toCompletable(() -> remoteCache.uploadBlob(context, digest, blob), directExecutor()); } + private static void reportUploadStarted( + ExtendedEventHandler reporter, + @Nullable ActionExecutionMetadata action, + String prefix, + Iterable digests) { + if (action != null) { + for (Digest digest : digests) { + reporter.post(ActionUploadStartedEvent.create(action, prefix + digest.getHash())); + } + } + } + + private static void reportUploadFinished( + ExtendedEventHandler reporter, + @Nullable ActionExecutionMetadata action, + String resourceIdPrefix, + Iterable digests) { + if (action != null) { + for (Digest digest : digests) { + reporter.post( + ActionUploadFinishedEvent.create(action, resourceIdPrefix + digest.getHash())); + } + } + } + /** * Returns a {@link Single} which upon subscription will upload outputs and action result (if exit * code is 0) to remote cache. */ public Single uploadAsync( - RemoteActionExecutionContext context, RemoteCache remoteCache) { + RemoteActionExecutionContext context, + RemoteCache remoteCache, + ExtendedEventHandler reporter) { Collection digests = new ArrayList<>(); digests.addAll(digestToFile.keySet()); digests.addAll(digestToBlobs.keySet()); - Completable uploadOutputs = - mergeBulkTransfer( - toSingle(() -> remoteCache.findMissingDigests(context, digests), directExecutor()) - .flatMapPublisher(Flowable::fromIterable) - .flatMapSingle(digest -> toTransferResult(upload(context, remoteCache, digest)))); + ActionExecutionMetadata action = context.getSpawnOwner(); + + String outputPrefix = "cas/"; + Flowable bulkTransfers = + toSingle(() -> remoteCache.findMissingDigests(context, digests), directExecutor()) + .doOnSubscribe(d -> reportUploadStarted(reporter, action, outputPrefix, digests)) + .doOnError(error -> reportUploadFinished(reporter, action, outputPrefix, digests)) + .doOnDispose(() -> reportUploadFinished(reporter, action, outputPrefix, digests)) + .doOnSuccess( + missingDigests -> { + List existedDigests = + digests.stream() + .filter(digest -> !missingDigests.contains(digest)) + .collect(Collectors.toList()); + reportUploadFinished(reporter, action, outputPrefix, existedDigests); + }) + .flatMapPublisher(Flowable::fromIterable) + .flatMapSingle( + digest -> + toTransferResult(upload(context, remoteCache, digest)) + .doFinally( + () -> + reportUploadFinished( + reporter, action, outputPrefix, ImmutableList.of(digest)))); + Completable uploadOutputs = mergeBulkTransfer(bulkTransfers); ActionResult actionResult = result.build(); Completable uploadActionResult = Completable.complete(); if (actionResult.getExitCode() == 0 && actionKey != null) { + String actionResultPrefix = "ac/"; uploadActionResult = toCompletable( - () -> remoteCache.uploadActionResult(context, actionKey, actionResult), - directExecutor()); + () -> remoteCache.uploadActionResult(context, actionKey, actionResult), + directExecutor()) + .doOnSubscribe( + d -> + reportUploadStarted( + reporter, + action, + actionResultPrefix, + ImmutableList.of(actionKey.getDigest()))) + .doFinally( + () -> + reportUploadFinished( + reporter, + action, + actionResultPrefix, + ImmutableList.of(actionKey.getDigest()))); } return Completable.concatArray(uploadOutputs, uploadActionResult).toSingleDefault(actionResult); diff --git a/src/test/java/com/google/devtools/build/lib/remote/ByteStreamBuildEventArtifactUploaderTest.java b/src/test/java/com/google/devtools/build/lib/remote/ByteStreamBuildEventArtifactUploaderTest.java index c4212d0aef6716..a40960447604df 100644 --- a/src/test/java/com/google/devtools/build/lib/remote/ByteStreamBuildEventArtifactUploaderTest.java +++ b/src/test/java/com/google/devtools/build/lib/remote/ByteStreamBuildEventArtifactUploaderTest.java @@ -414,7 +414,7 @@ private ByteStreamBuildEventArtifactUploader newArtifactUploader( invocationOnMock.getArgument(0), invocationOnMock.getArgument(1))) .when(cacheClient) .findMissingDigests(any(), any()); - RemoteCache remoteCache = new RemoteCache(reporter, cacheClient, remoteOptions, DIGEST_UTIL); + RemoteCache remoteCache = new RemoteCache(cacheClient, remoteOptions, DIGEST_UTIL); return new ByteStreamBuildEventArtifactUploader( MoreExecutors.directExecutor(), diff --git a/src/test/java/com/google/devtools/build/lib/remote/GrpcCacheClientTest.java b/src/test/java/com/google/devtools/build/lib/remote/GrpcCacheClientTest.java index 1ec656f7f035e1..cbb22d71d5a7a0 100644 --- a/src/test/java/com/google/devtools/build/lib/remote/GrpcCacheClientTest.java +++ b/src/test/java/com/google/devtools/build/lib/remote/GrpcCacheClientTest.java @@ -51,7 +51,6 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSortedMap; import com.google.common.collect.Maps; -import com.google.common.eventbus.EventBus; import com.google.common.util.concurrent.ListeningScheduledExecutorService; import com.google.common.util.concurrent.MoreExecutors; import com.google.devtools.build.lib.actions.ActionInputHelper; @@ -61,7 +60,7 @@ import com.google.devtools.build.lib.authandtls.CallCredentialsProvider; import com.google.devtools.build.lib.authandtls.GoogleAuthUtils; import com.google.devtools.build.lib.clock.JavaClock; -import com.google.devtools.build.lib.events.Reporter; +import com.google.devtools.build.lib.events.NullEventHandler; import com.google.devtools.build.lib.remote.RemoteRetrier.ExponentialBackoff; import com.google.devtools.build.lib.remote.Retrier.Backoff; import com.google.devtools.build.lib.remote.common.RemoteActionExecutionContext; @@ -132,7 +131,6 @@ public class GrpcCacheClientTest { private Path execRoot; private FileOutErr outErr; private FakeActionInputFileCache fakeFileCache; - private final Reporter reporter = new Reporter(new EventBus()); private final MutableHandlerRegistry serviceRegistry = new MutableHandlerRegistry(); private final String fakeServerName = "fake server for " + getClass(); private Server fakeServer; @@ -271,7 +269,7 @@ private static byte[] downloadBlob( public void testVirtualActionInputSupport() throws Exception { RemoteOptions options = Options.getDefaults(RemoteOptions.class); RemoteExecutionCache client = - new RemoteExecutionCache(reporter, newClient(options), options, DIGEST_UTIL); + new RemoteExecutionCache(newClient(options), options, DIGEST_UTIL); PathFragment execPath = PathFragment.create("my/exec/path"); VirtualActionInput virtualActionInput = ActionsTestUtil.createVirtualActionInput(execPath, "hello"); @@ -381,7 +379,7 @@ public void testDownloadAllResults() throws Exception { // arrange RemoteOptions remoteOptions = Options.getDefaults(RemoteOptions.class); GrpcCacheClient client = newClient(remoteOptions); - RemoteCache remoteCache = new RemoteCache(reporter, client, remoteOptions, DIGEST_UTIL); + RemoteCache remoteCache = new RemoteCache(client, remoteOptions, DIGEST_UTIL); Digest fooDigest = DIGEST_UTIL.computeAsUtf8("foo-contents"); Digest barDigest = DIGEST_UTIL.computeAsUtf8("bar-contents"); @@ -404,7 +402,7 @@ public void testDownloadAllResults() throws Exception { public void testUploadDirectory() throws Exception { RemoteOptions remoteOptions = Options.getDefaults(RemoteOptions.class); GrpcCacheClient client = newClient(remoteOptions); - RemoteCache remoteCache = new RemoteCache(reporter, client, remoteOptions, DIGEST_UTIL); + RemoteCache remoteCache = new RemoteCache(client, remoteOptions, DIGEST_UTIL); final Digest fooDigest = fakeFileCache.createScratchInput(ActionInputHelper.fromPath("a/foo"), "xyz"); @@ -468,7 +466,7 @@ public void updateActionResult( public void testUploadDirectoryEmpty() throws Exception { RemoteOptions remoteOptions = Options.getDefaults(RemoteOptions.class); GrpcCacheClient client = newClient(remoteOptions); - RemoteCache remoteCache = new RemoteCache(reporter, client, remoteOptions, DIGEST_UTIL); + RemoteCache remoteCache = new RemoteCache(client, remoteOptions, DIGEST_UTIL); final Digest barDigest = fakeFileCache.createScratchInputDirectory( @@ -507,7 +505,7 @@ public void updateActionResult( public void testUploadDirectoryNested() throws Exception { RemoteOptions remoteOptions = Options.getDefaults(RemoteOptions.class); GrpcCacheClient client = newClient(remoteOptions); - RemoteCache remoteCache = new RemoteCache(reporter, client, remoteOptions, DIGEST_UTIL); + RemoteCache remoteCache = new RemoteCache(client, remoteOptions, DIGEST_UTIL); final Digest wobbleDigest = fakeFileCache.createScratchInput(ActionInputHelper.fromPath("bar/test/wobble"), "xyz"); @@ -583,7 +581,7 @@ private ActionResult upload( outputs, outErr, 0); - return uploadManifest.upload(context, remoteCache); + return uploadManifest.upload(context, remoteCache, NullEventHandler.INSTANCE); } private ActionResult uploadDirectory(RemoteCache remoteCache, List outputs) @@ -658,7 +656,7 @@ public void getActionResult( serviceRegistry.addService(ServerInterceptors.intercept(actionCache, interceptor)); GrpcCacheClient client = newClient(remoteOptions); - RemoteCache remoteCache = new RemoteCache(reporter, client, remoteOptions, DIGEST_UTIL); + RemoteCache remoteCache = new RemoteCache(client, remoteOptions, DIGEST_UTIL); remoteCache.downloadActionResult( context, DIGEST_UTIL.asActionKey(DIGEST_UTIL.computeAsUtf8("key")), @@ -669,7 +667,7 @@ public void getActionResult( public void testUpload() throws Exception { RemoteOptions remoteOptions = Options.getDefaults(RemoteOptions.class); GrpcCacheClient client = newClient(remoteOptions); - RemoteCache remoteCache = new RemoteCache(reporter, client, remoteOptions, DIGEST_UTIL); + RemoteCache remoteCache = new RemoteCache(client, remoteOptions, DIGEST_UTIL); final Digest fooDigest = fakeFileCache.createScratchInput(ActionInputHelper.fromPath("a/foo"), "xyz"); @@ -745,7 +743,7 @@ public void testUploadSplitMissingDigestsCall() throws Exception { RemoteOptions remoteOptions = Options.getDefaults(RemoteOptions.class); remoteOptions.maxOutboundMessageSize = 80; // Enough for one digest, but not two. GrpcCacheClient client = newClient(remoteOptions); - RemoteCache remoteCache = new RemoteCache(reporter, client, remoteOptions, DIGEST_UTIL); + RemoteCache remoteCache = new RemoteCache(client, remoteOptions, DIGEST_UTIL); final Digest fooDigest = fakeFileCache.createScratchInput(ActionInputHelper.fromPath("a/foo"), "xyz"); @@ -810,7 +808,7 @@ public void updateActionResult( public void testUploadCacheMissesWithRetries() throws Exception { RemoteOptions remoteOptions = Options.getDefaults(RemoteOptions.class); GrpcCacheClient client = newClient(remoteOptions); - RemoteCache remoteCache = new RemoteCache(reporter, client, remoteOptions, DIGEST_UTIL); + RemoteCache remoteCache = new RemoteCache(client, remoteOptions, DIGEST_UTIL); final Digest fooDigest = fakeFileCache.createScratchInput(ActionInputHelper.fromPath("a/foo"), "xyz"); diff --git a/src/test/java/com/google/devtools/build/lib/remote/InMemoryRemoteCache.java b/src/test/java/com/google/devtools/build/lib/remote/InMemoryRemoteCache.java index 9dcaf364ad1950..2c332180a9593a 100644 --- a/src/test/java/com/google/devtools/build/lib/remote/InMemoryRemoteCache.java +++ b/src/test/java/com/google/devtools/build/lib/remote/InMemoryRemoteCache.java @@ -16,7 +16,6 @@ import static java.nio.charset.StandardCharsets.UTF_8; import build.bazel.remote.execution.v2.Digest; -import com.google.devtools.build.lib.events.Reporter; import com.google.devtools.build.lib.remote.common.RemoteActionExecutionContext; import com.google.devtools.build.lib.remote.options.RemoteOptions; import com.google.devtools.build.lib.remote.util.DigestUtil; @@ -30,15 +29,14 @@ class InMemoryRemoteCache extends RemoteExecutionCache { InMemoryRemoteCache( - Reporter reporter, Map casEntries, RemoteOptions options, DigestUtil digestUtil) { - super(reporter, new InMemoryCacheClient(casEntries), options, digestUtil); + super(new InMemoryCacheClient(casEntries), options, digestUtil); } - InMemoryRemoteCache(Reporter reporter, RemoteOptions options, DigestUtil digestUtil) { - super(reporter, new InMemoryCacheClient(), options, digestUtil); + InMemoryRemoteCache(RemoteOptions options, DigestUtil digestUtil) { + super(new InMemoryCacheClient(), options, digestUtil); } Digest addContents(RemoteActionExecutionContext context, String txt) diff --git a/src/test/java/com/google/devtools/build/lib/remote/RemoteActionInputFetcherTest.java b/src/test/java/com/google/devtools/build/lib/remote/RemoteActionInputFetcherTest.java index 1dc2494cc9d957..9223028ba887ed 100644 --- a/src/test/java/com/google/devtools/build/lib/remote/RemoteActionInputFetcherTest.java +++ b/src/test/java/com/google/devtools/build/lib/remote/RemoteActionInputFetcherTest.java @@ -23,7 +23,6 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Maps; -import com.google.common.eventbus.EventBus; import com.google.common.hash.HashCode; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.SettableFuture; @@ -37,7 +36,6 @@ import com.google.devtools.build.lib.actions.cache.VirtualActionInput; import com.google.devtools.build.lib.actions.util.ActionsTestUtil; import com.google.devtools.build.lib.clock.JavaClock; -import com.google.devtools.build.lib.events.Reporter; import com.google.devtools.build.lib.remote.common.BulkTransferException; import com.google.devtools.build.lib.remote.options.RemoteOptions; import com.google.devtools.build.lib.remote.util.DigestUtil; @@ -67,7 +65,6 @@ public class RemoteActionInputFetcherTest { private static final DigestHashFunction HASH_FUNCTION = DigestHashFunction.SHA256; - private final Reporter reporter = new Reporter(new EventBus()); private Path execRoot; private ArtifactRoot artifactRoot; private RemoteOptions options; @@ -395,7 +392,6 @@ private RemoteCache newCache( for (Map.Entry entry : cacheEntries.entrySet()) { cacheEntriesByteArray.put(entry.getKey(), entry.getValue().toByteArray()); } - return new RemoteCache( - reporter, new InMemoryCacheClient(cacheEntriesByteArray), options, digestUtil); + return new RemoteCache(new InMemoryCacheClient(cacheEntriesByteArray), options, digestUtil); } } diff --git a/src/test/java/com/google/devtools/build/lib/remote/RemoteCacheTest.java b/src/test/java/com/google/devtools/build/lib/remote/RemoteCacheTest.java index cec16dcf5fc565..2983c8545c7ae8 100644 --- a/src/test/java/com/google/devtools/build/lib/remote/RemoteCacheTest.java +++ b/src/test/java/com/google/devtools/build/lib/remote/RemoteCacheTest.java @@ -16,19 +16,15 @@ import static com.google.common.truth.Truth.assertThat; import static com.google.devtools.build.lib.remote.util.Utils.getFromFuture; -import build.bazel.remote.execution.v2.Action; import build.bazel.remote.execution.v2.ActionResult; import build.bazel.remote.execution.v2.Digest; import build.bazel.remote.execution.v2.RequestMetadata; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; -import com.google.common.eventbus.EventBus; import com.google.common.util.concurrent.ListeningScheduledExecutorService; import com.google.common.util.concurrent.MoreExecutors; import com.google.devtools.build.lib.actions.ActionInputHelper; -import com.google.devtools.build.lib.actions.ActionUploadFinishedEvent; -import com.google.devtools.build.lib.actions.ActionUploadStartedEvent; import com.google.devtools.build.lib.actions.ArtifactRoot; import com.google.devtools.build.lib.actions.ArtifactRoot.RootType; import com.google.devtools.build.lib.actions.ResourceSet; @@ -37,11 +33,8 @@ import com.google.devtools.build.lib.clock.JavaClock; import com.google.devtools.build.lib.collect.nestedset.NestedSetBuilder; import com.google.devtools.build.lib.collect.nestedset.Order; -import com.google.devtools.build.lib.events.Reporter; -import com.google.devtools.build.lib.events.StoredEventHandler; import com.google.devtools.build.lib.exec.util.FakeOwner; import com.google.devtools.build.lib.remote.common.RemoteActionExecutionContext; -import com.google.devtools.build.lib.remote.common.RemoteCacheClient.ActionKey; import com.google.devtools.build.lib.remote.options.RemoteOptions; import com.google.devtools.build.lib.remote.util.DigestUtil; import com.google.devtools.build.lib.remote.util.TracingMetadataUtils; @@ -71,8 +64,6 @@ @RunWith(JUnit4.class) public class RemoteCacheTest { - private final Reporter reporter = new Reporter(new EventBus()); - private final StoredEventHandler eventHandler = new StoredEventHandler(); private RemoteActionExecutionContext context; private FileSystem fs; private Path execRoot; @@ -84,8 +75,6 @@ public class RemoteCacheTest { @Before public void setUp() throws Exception { - reporter.addHandler(eventHandler); - MockitoAnnotations.initMocks(this); RequestMetadata metadata = TracingMetadataUtils.buildMetadata("none", "none", "action-id", null); @@ -170,7 +159,7 @@ public void testDownloadFileWithSymlinkTemplate() throws Exception { Path file = fs.getPath("/execroot/symlink-to-file"); RemoteOptions options = Options.getDefaults(RemoteOptions.class); options.remoteDownloadSymlinkTemplate = "/home/alice/cas/{hash}-{size_bytes}"; - RemoteCache remoteCache = new InMemoryRemoteCache(reporter, cas, options, digestUtil); + RemoteCache remoteCache = new InMemoryRemoteCache(cas, options, digestUtil); // act getFromFuture(remoteCache.downloadFile(context, file, helloDigest)); @@ -199,53 +188,8 @@ public void upload_emptyBlobAndFile_doNotPerformUpload() throws Exception { .containsExactly(emptyDigest); } - @Test - public void uploadActionResult_firesUploadEvents() throws Exception { - InMemoryRemoteCache remoteCache = newRemoteCache(); - ActionKey actionKey = new ActionKey(digestUtil.compute(Action.getDefaultInstance())); - ActionResult actionResult = ActionResult.getDefaultInstance(); - - getFromFuture(remoteCache.uploadActionResult(context, actionKey, actionResult)); - - String resourceId = "ac/" + actionKey.getDigest().getHash(); - assertThat(eventHandler.getPosts()) - .containsExactly( - ActionUploadStartedEvent.create(context.getSpawn().getResourceOwner(), resourceId), - ActionUploadFinishedEvent.create(context.getSpawn().getResourceOwner(), resourceId)); - } - - @Test - public void uploadBlob_firesUploadEvents() throws Exception { - InMemoryRemoteCache remoteCache = newRemoteCache(); - ByteString content = ByteString.copyFromUtf8("content"); - Digest digest = digestUtil.compute(content.toByteArray()); - - getFromFuture(remoteCache.uploadBlob(context, digest, content)); - - String resourceId = "cas/" + digest.getHash(); - assertThat(eventHandler.getPosts()) - .containsExactly( - ActionUploadStartedEvent.create(context.getSpawn().getResourceOwner(), resourceId), - ActionUploadFinishedEvent.create(context.getSpawn().getResourceOwner(), resourceId)); - } - - @Test - public void uploadFile_firesUploadEvents() throws Exception { - InMemoryRemoteCache remoteCache = newRemoteCache(); - Digest digest = fakeFileCache.createScratchInput(ActionInputHelper.fromPath("file"), "content"); - Path file = execRoot.getRelative("file"); - - getFromFuture(remoteCache.uploadFile(context, digest, file)); - - String resourceId = "cas/" + digest.getHash(); - assertThat(eventHandler.getPosts()) - .containsExactly( - ActionUploadStartedEvent.create(context.getSpawn().getResourceOwner(), resourceId), - ActionUploadFinishedEvent.create(context.getSpawn().getResourceOwner(), resourceId)); - } - private InMemoryRemoteCache newRemoteCache() { RemoteOptions options = Options.getDefaults(RemoteOptions.class); - return new InMemoryRemoteCache(reporter, options, digestUtil); + return new InMemoryRemoteCache(options, digestUtil); } } diff --git a/src/test/java/com/google/devtools/build/lib/remote/RemoteExecutionServiceTest.java b/src/test/java/com/google/devtools/build/lib/remote/RemoteExecutionServiceTest.java index 89eb7e77149d00..03d8fc70fb9fd6 100644 --- a/src/test/java/com/google/devtools/build/lib/remote/RemoteExecutionServiceTest.java +++ b/src/test/java/com/google/devtools/build/lib/remote/RemoteExecutionServiceTest.java @@ -52,6 +52,8 @@ import com.google.common.util.concurrent.Futures; import com.google.devtools.build.lib.actions.ActionInput; import com.google.devtools.build.lib.actions.ActionInputHelper; +import com.google.devtools.build.lib.actions.ActionUploadFinishedEvent; +import com.google.devtools.build.lib.actions.ActionUploadStartedEvent; import com.google.devtools.build.lib.actions.Artifact; import com.google.devtools.build.lib.actions.Artifact.SpecialArtifact; import com.google.devtools.build.lib.actions.Artifact.TreeFileArtifact; @@ -154,7 +156,7 @@ public final void setUp() throws Exception { checkNotNull(stderr.getParentDirectory()).createDirectoryAndParents(); outErr = new FileOutErr(stdout, stderr); - cache = spy(new InMemoryRemoteCache(reporter, remoteOptions, digestUtil)); + cache = spy(new InMemoryRemoteCache(remoteOptions, digestUtil)); executor = mock(RemoteExecutionClient.class); RequestMetadata metadata = @@ -1314,6 +1316,34 @@ public void uploadOutputs_uploadFails_printWarning() throws Exception { assertThat(evt.getMessage()).contains("cache down"); } + @Test + public void uploadOutputs_firesUploadEvents() throws Exception { + Digest digest = + fakeFileCache.createScratchInput(ActionInputHelper.fromPath("outputs/file"), "content"); + Path file = execRoot.getRelative("outputs/file"); + Artifact outputFile = ActionsTestUtil.createArtifact(artifactRoot, file); + RemoteExecutionService service = newRemoteExecutionService(); + Spawn spawn = newSpawn(ImmutableMap.of(), ImmutableSet.of(outputFile)); + FakeSpawnExecutionContext context = newSpawnExecutionContext(spawn); + RemoteAction action = service.buildRemoteAction(spawn, context); + SpawnResult spawnResult = + new SpawnResult.Builder() + .setExitCode(0) + .setStatus(SpawnResult.Status.SUCCESS) + .setRunnerName("test") + .build(); + + service.uploadOutputs(action, spawnResult); + + assertThat(eventHandler.getPosts()) + .containsAtLeast( + ActionUploadStartedEvent.create(spawn.getResourceOwner(), "cas/" + digest.getHash()), + ActionUploadFinishedEvent.create(spawn.getResourceOwner(), "cas/" + digest.getHash()), + ActionUploadStartedEvent.create(spawn.getResourceOwner(), "ac/" + action.getActionId()), + ActionUploadFinishedEvent.create( + spawn.getResourceOwner(), "ac/" + action.getActionId())); + } + @Test public void uploadInputsIfNotPresent_deduplicateFindMissingBlobCalls() throws Exception { int taskCount = 100; diff --git a/src/test/java/com/google/devtools/build/lib/remote/RemoteSpawnRunnerWithGrpcRemoteExecutorTest.java b/src/test/java/com/google/devtools/build/lib/remote/RemoteSpawnRunnerWithGrpcRemoteExecutorTest.java index 5f5bfefd188017..5529421cae8a65 100644 --- a/src/test/java/com/google/devtools/build/lib/remote/RemoteSpawnRunnerWithGrpcRemoteExecutorTest.java +++ b/src/test/java/com/google/devtools/build/lib/remote/RemoteSpawnRunnerWithGrpcRemoteExecutorTest.java @@ -299,7 +299,7 @@ public int maxConcurrency() { DIGEST_UTIL, uploader); RemoteExecutionCache remoteCache = - new RemoteExecutionCache(reporter, cacheProtocol, remoteOptions, DIGEST_UTIL); + new RemoteExecutionCache(cacheProtocol, remoteOptions, DIGEST_UTIL); RemoteExecutionService remoteExecutionService = new RemoteExecutionService( directExecutor(), diff --git a/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/ExecutionServer.java b/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/ExecutionServer.java index 6d0117495de6c6..f034870abb02f4 100644 --- a/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/ExecutionServer.java +++ b/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/ExecutionServer.java @@ -35,6 +35,7 @@ import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.google.devtools.build.lib.actions.ExecException; +import com.google.devtools.build.lib.events.NullEventHandler; import com.google.devtools.build.lib.remote.ExecutionStatusException; import com.google.devtools.build.lib.remote.UploadManifest; import com.google.devtools.build.lib.remote.common.CacheNotFoundException; @@ -356,7 +357,7 @@ private ActionResult execute( outputs, outErr, exitCode); - result = manifest.upload(context, cache); + result = manifest.upload(context, cache, NullEventHandler.INSTANCE); } catch (ExecException e) { if (errStatus == null) { errStatus = diff --git a/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/OnDiskBlobStoreCache.java b/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/OnDiskBlobStoreCache.java index fc81d43fb471c2..cbf15860b2d021 100644 --- a/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/OnDiskBlobStoreCache.java +++ b/src/tools/remote/src/main/java/com/google/devtools/build/remote/worker/OnDiskBlobStoreCache.java @@ -19,8 +19,6 @@ import build.bazel.remote.execution.v2.Directory; import build.bazel.remote.execution.v2.DirectoryNode; import build.bazel.remote.execution.v2.FileNode; -import com.google.devtools.build.lib.events.Event; -import com.google.devtools.build.lib.events.ExtendedEventHandler; import com.google.devtools.build.lib.remote.RemoteCache; import com.google.devtools.build.lib.remote.common.RemoteActionExecutionContext; import com.google.devtools.build.lib.remote.disk.DiskCacheClient; @@ -34,17 +32,6 @@ class OnDiskBlobStoreCache extends RemoteCache { public OnDiskBlobStoreCache(RemoteOptions options, Path cacheDir, DigestUtil digestUtil) { super( - new ExtendedEventHandler() { - @Override - public void post(Postable obj) { - // do nothing - } - - @Override - public void handle(Event event) { - // do nothing - } - }, new DiskCacheClient(cacheDir, /* verifyDownloads= */ true, digestUtil), options, digestUtil);