Skip to content

Commit

Permalink
Ensure NamedSetOfFiles BEP event appears before event referencing the…
Browse files Browse the repository at this point in the history
… file_set.

Previously, when a `NestedSet<Artifact>` is referenced by multiple BuildEvents
(perhaps transitively) there is a race in `BuildEventStreamer`, which receives all
BuildEvents in an `@AllowConcurrentEvents` EventBus handler. If the BuildEvents are
posted by different threads, the first will acquire the name for the
`NamedSetOfFiles` and prepare the protobuf message from the NestedSet. In the
meantime, the second thread will see the NestedSet's name was already acquired and
may proceed to write its BuildEvent referencing the `named_set`. This leads to a
violation of the expected ordering.

Note that the ordering expectation here cannot be guaranteed by parent-child event
relationships, as a given `NamedSetOfFiles` cannot 'know' all of the events
that will reference it in advance. This is why `NamedSetOfFiles` events are children
of `Progress` events.

RELNOTES: Users consuming BEP may assume that a `named_set_of_files` event will
appear before any event referencing that `named_set` by ID. This allows consumers
to process the files for such events (eg. `TargetCompleted`) immediately.
PiperOrigin-RevId: 361822335
  • Loading branch information
michaeledgar authored and copybara-github committed Mar 9, 2021
1 parent a1c8a44 commit fcf9dd5
Show file tree
Hide file tree
Showing 3 changed files with 174 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadCompatible;
import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadSafe;
import com.google.devtools.build.lib.pkgcache.TargetParsingCompleteEvent;
import com.google.devtools.build.lib.runtime.CountingArtifactGroupNamer.LatchedGroupName;
import com.google.devtools.build.lib.util.Pair;
import com.google.errorprone.annotations.concurrent.GuardedBy;
import java.util.ArrayList;
Expand Down Expand Up @@ -370,27 +371,27 @@ private synchronized void close(@Nullable AbortReason reason) {
}

private void maybeReportArtifactSet(CompletionContext ctx, NestedSet<?> set) {
String name = artifactGroupNamer.maybeName(set);
if (name == null) {
return;
}

set = NamedArtifactGroup.expandSet(ctx, set);
// Invariant: all leaf successors ("direct elements") of set are ExpandedArtifacts.

// We only split if the max number of entries is at least 2 (it must be at least a binary tree).
// The method throws for smaller values.
if (besOptions.maxNamedSetEntries >= 2) {
// We only split the event after naming it to avoid splitting the same node multiple times.
// Note that the artifactGroupNames keeps references to the individual pieces, so this can
// double the memory consumption of large nested sets.
set = set.splitIfExceedsMaximumSize(besOptions.maxNamedSetEntries);
}
try (LatchedGroupName lockedName = artifactGroupNamer.maybeName(set)) {
if (lockedName == null) {
return;
}
set = NamedArtifactGroup.expandSet(ctx, set);
// Invariant: all leaf successors ("direct elements") of set are ExpandedArtifacts.

// We only split if the max number of entries is at least 2 (it must be at least a binary
// tree). The method throws for smaller values.
if (besOptions.maxNamedSetEntries >= 2) {
// We only split the event after naming it to avoid splitting the same node multiple times.
// Note that the artifactGroupNames keeps references to the individual pieces, so this can
// double the memory consumption of large nested sets.
set = set.splitIfExceedsMaximumSize(besOptions.maxNamedSetEntries);
}

for (NestedSet<?> succ : set.getNonLeaves()) {
maybeReportArtifactSet(ctx, succ);
for (NestedSet<?> succ : set.getNonLeaves()) {
maybeReportArtifactSet(ctx, succ);
}
post(new NamedArtifactGroup(lockedName.getName(), ctx, set));
}
post(new NamedArtifactGroup(name, ctx, set));
}

private void maybeReportConfiguration(BuildEvent configuration) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,41 +13,76 @@
// limitations under the License.
package com.google.devtools.build.lib.runtime;

import com.google.common.util.concurrent.Uninterruptibles;
import com.google.devtools.build.lib.buildeventstream.ArtifactGroupNamer;
import com.google.devtools.build.lib.buildeventstream.BuildEventStreamProtos.BuildEventId.NamedSetOfFilesId;
import com.google.devtools.build.lib.collect.nestedset.NestedSet;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import javax.annotation.concurrent.ThreadSafe;

/** Conversion of paths to URIs. */
@ThreadSafe
public class CountingArtifactGroupNamer implements ArtifactGroupNamer {

private final Map<NestedSet.Node, Integer> nodeNames = new HashMap<>();
private final ConcurrentMap<NestedSet.Node, LatchedGroupName> nodeNames =
new ConcurrentHashMap<>();

@Override
public NamedSetOfFilesId apply(NestedSet.Node id) {
Integer name;
synchronized (this) {
name = nodeNames.get(id);
}
LatchedGroupName name = nodeNames.get(id);
if (name == null) {
return null;
}
return NamedSetOfFilesId.newBuilder().setId(name.toString()).build();
return NamedSetOfFilesId.newBuilder().setId(name.getName()).build();
}

/**
* If the {@link NestedSet} has no name already, return a new name for it. Return null otherwise.
*/
public synchronized String maybeName(NestedSet<?> set) {
public LatchedGroupName maybeName(NestedSet<?> set) {
NestedSet.Node id = set.toNode();
if (nodeNames.containsKey(id)) {
LatchedGroupName existingGroupName;
LatchedGroupName newGroupName;
// synchronized necessary only to ensure node names are chosen uniquely and compactly.
// TODO(adgar): consider dropping compactness and unconditionally increment an AtomicLong to
// pick unique node names.
synchronized (this) {
newGroupName = new LatchedGroupName(nodeNames.size());
existingGroupName = nodeNames.putIfAbsent(id, newGroupName);
}
if (existingGroupName != null) {
existingGroupName.waitUntilWritten();
return null;
}
Integer name = nodeNames.size();
nodeNames.put(id, name);
return name.toString();
return newGroupName;
}

/**
* A name for a {@code NestedSet<?>} that the constructor must {@link #close()} after the set is
* written, allowing all other consumers to {@link #waitUntilWritten()}.
*/
public static class LatchedGroupName implements AutoCloseable {
private final CountDownLatch latch;
private final int name;

public LatchedGroupName(int name) {
this.latch = new CountDownLatch(1);
this.name = name;
}

@Override
public void close() {
latch.countDown();
}

String getName() {
return Integer.toString(name);
}

private void waitUntilWritten() {
Uninterruptibles.awaitUninterruptibly(latch);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@
import com.google.devtools.build.lib.buildtool.buildevent.NoAnalyzeEvent;
import com.google.devtools.build.lib.collect.nestedset.NestedSet;
import com.google.devtools.build.lib.collect.nestedset.NestedSetBuilder;
import com.google.devtools.build.lib.collect.nestedset.Order;
import com.google.devtools.build.lib.server.FailureDetails.FailureDetail;
import com.google.devtools.build.lib.server.FailureDetails.Spawn;
import com.google.devtools.build.lib.server.FailureDetails.Spawn.Code;
Expand All @@ -86,6 +87,7 @@
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -711,6 +713,110 @@ public void testReportedArtifacts() {
assertThat(reportedArtifactSets.get(0)).isEqualTo(eventProtos.get(4).getId().getNamedSet());
}

@Test
public void testArtifactSetsPrecedeReportingEvent() throws InterruptedException {
// Verify that reported artifacts appear as named_set_of_files before their ID is referenced by
// a reporting event.
BuildEvent startEvent =
new GenericBuildEvent(
testId("Initial"), ImmutableSet.of(ProgressEvent.INITIAL_PROGRESS_UPDATE));

// Prepare a dense NestedSet DAG with lots of shared references.
List<NestedSet<Artifact>> baseSets = new ArrayList<>();
baseSets.add(NestedSetBuilder.create(Order.STABLE_ORDER, makeArtifact("path/a")));
baseSets.add(NestedSetBuilder.create(Order.STABLE_ORDER, makeArtifact("path/b")));
baseSets.add(NestedSetBuilder.create(Order.STABLE_ORDER, makeArtifact("path/c")));
baseSets.add(NestedSetBuilder.create(Order.STABLE_ORDER, makeArtifact("path/d")));
List<NestedSet<Artifact>> depth2Sets = new ArrayList<>();
for (int i = 0; i < baseSets.size(); i++) {
depth2Sets.add(
NestedSetBuilder.<Artifact>stableOrder()
.addTransitive(baseSets.get(i))
.addTransitive(baseSets.get((i + 1) % baseSets.size()))
.build());
}
List<NestedSet<Artifact>> depth3Sets = new ArrayList<>();
for (int i = 0; i < depth2Sets.size(); i++) {
depth3Sets.add(
NestedSetBuilder.<Artifact>stableOrder()
.addTransitive(depth2Sets.get(i))
.addTransitive(depth2Sets.get((i + 1) % depth2Sets.size()))
.build());
}
List<NestedSet<Artifact>> depth4Sets = new ArrayList<>();
for (int i = 0; i < depth3Sets.size(); i++) {
depth4Sets.add(
NestedSetBuilder.<Artifact>stableOrder()
.addTransitive(depth3Sets.get(i))
.addTransitive(depth3Sets.get((i + 1) % depth3Sets.size()))
.build());
}
int numEvents = 20;
List<BuildEvent> eventsToPost = new ArrayList<>();
for (int i = 0; i < numEvents; i++) {
eventsToPost.add(
new GenericArtifactReportingEvent(
testId("reporting" + i), ImmutableSet.of(depth4Sets.get(i % depth4Sets.size()))));
}

streamer.buildEvent(startEvent);
// Publish `numEvents` different events that all report the same NamedSet of artifacts on
// `numEvents` different threads. Use latches to ensure:
//
// 1. all threads have started, before:
// 2. all threads send their event, before:
// 3. verifying the recorded events.
CountDownLatch readyToPublishLatch = new CountDownLatch(numEvents);
CountDownLatch startPublishingLatch = new CountDownLatch(1);
CountDownLatch donePublishingLatch = new CountDownLatch(numEvents);
for (int i = 0; i < numEvents; i++) {
int num = i;
new Thread(
() -> {
try {
BuildEvent reportingArtifacts = eventsToPost.get(num);
readyToPublishLatch.countDown();
startPublishingLatch.await();
streamer.buildEvent(reportingArtifacts);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
donePublishingLatch.countDown();
})
.start();
}
readyToPublishLatch.await();
startPublishingLatch.countDown();
donePublishingLatch.await();

assertThat(streamer.isClosed()).isFalse();
List<BuildEvent> allEventsSeen = transport.getEvents();
List<BuildEventStreamProtos.BuildEvent> eventProtos = transport.getEventProtos();
// Each GenericArtifactReportingEvent and NamedArtifactGroup event has a corresponding Progress
// event posted immediately before.
assertThat(allEventsSeen)
.hasSize(1 + ((numEvents + baseSets.size() + depth2Sets.size() + depth3Sets.size()) * 2));
assertThat(allEventsSeen.get(0).getEventId()).isEqualTo(startEvent.getEventId());
// Verify that each named_set_of_files event is sent before all of the events that report that
// named_set.
Set<String> seenFileSets = new HashSet<>();
for (int i = 1; i < eventProtos.size(); i++) {
BuildEventStreamProtos.BuildEvent buildEvent = eventProtos.get(i);
if (buildEvent.getId().hasNamedSet()) {
// These are the separately-posted contents of reported artifacts.
seenFileSets.add(buildEvent.getId().getNamedSet().getId());
for (NamedSetOfFilesId nestedSetId : buildEvent.getNamedSetOfFiles().getFileSetsList()) {
assertThat(seenFileSets).contains(nestedSetId.getId());
}
} else if (buildEvent.getId().hasUnknown()) {
// These are the GenericArtifactReportingEvent that report artifacts.
for (NamedSetOfFilesId nestedSetId : buildEvent.getNamedSetOfFiles().getFileSetsList()) {
assertThat(seenFileSets).contains(nestedSetId.getId());
}
}
}
}

@Test
public void testStdoutReported() {
// Verify that stdout and stderr are reported in the build-event stream on progress
Expand Down

0 comments on commit fcf9dd5

Please sign in to comment.