Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Finish Java Exception Sampling #27257

Merged
merged 5 commits into from
Jun 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -363,9 +363,14 @@ private void takeSample(long currentTimeMillis, long millisSinceLastSample) {
ExecutionStateImpl current = currentStateLazy.get();
if (current != null) {
return ExecutionStateTrackerStatus.create(
current.ptransformId, current.ptransformUniqueName, thread, lastTransitionTimeMs);
current.ptransformId,
current.ptransformUniqueName,
thread,
lastTransitionTimeMs,
processBundleId.get());
} else {
return ExecutionStateTrackerStatus.create(null, null, thread, lastTransitionTimeMs);
return ExecutionStateTrackerStatus.create(
null, null, thread, lastTransitionTimeMs, processBundleId.get());
}
}

Expand Down Expand Up @@ -518,9 +523,10 @@ public static ExecutionStateTrackerStatus create(
@Nullable String ptransformId,
@Nullable String ptransformUniqueName,
Thread trackedThread,
long lastTransitionTimeMs) {
long lastTransitionTimeMs,
@Nullable String processBundleId) {
return new AutoValue_ExecutionStateSampler_ExecutionStateTrackerStatus(
ptransformId, ptransformUniqueName, trackedThread, lastTransitionTimeMs);
ptransformId, ptransformUniqueName, trackedThread, lastTransitionTimeMs, processBundleId);
}

public abstract @Nullable String getPTransformId();
Expand All @@ -530,5 +536,7 @@ public static ExecutionStateTrackerStatus create(
public abstract Thread getTrackedThread();

public abstract long getLastTransitionTimeMillis();

public abstract @Nullable String getProcessBundleId();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import javax.annotation.Nullable;
import org.apache.beam.fn.harness.HandlesSplits;
import org.apache.beam.fn.harness.control.BundleProgressReporter;
import org.apache.beam.fn.harness.control.ExecutionStateSampler;
import org.apache.beam.fn.harness.control.ExecutionStateSampler.ExecutionState;
import org.apache.beam.fn.harness.control.ExecutionStateSampler.ExecutionStateTracker;
import org.apache.beam.fn.harness.control.Metrics;
Expand Down Expand Up @@ -71,16 +72,21 @@ public class PCollectionConsumerRegistry {
@SuppressWarnings({"rawtypes"})
abstract static class ConsumerAndMetadata {
public static ConsumerAndMetadata forConsumer(
FnDataReceiver consumer, String pTransformId, ExecutionState state) {
FnDataReceiver consumer,
String pTransformId,
ExecutionState state,
ExecutionStateTracker stateTracker) {
return new AutoValue_PCollectionConsumerRegistry_ConsumerAndMetadata(
consumer, pTransformId, state);
consumer, pTransformId, state, stateTracker);
}

public abstract FnDataReceiver getConsumer();

public abstract String getPTransformId();

public abstract ExecutionState getExecutionState();

public abstract ExecutionStateTracker getExecutionStateTracker();
}

private final ExecutionStateTracker stateTracker;
Expand Down Expand Up @@ -176,7 +182,7 @@ public <T> void register(
List<ConsumerAndMetadata> consumerAndMetadatas =
pCollectionIdsToConsumers.computeIfAbsent(pCollectionId, (unused) -> new ArrayList<>());
consumerAndMetadatas.add(
ConsumerAndMetadata.forConsumer(consumer, pTransformId, executionState));
ConsumerAndMetadata.forConsumer(consumer, pTransformId, executionState, stateTracker));
}

/**
Expand Down Expand Up @@ -250,6 +256,8 @@ private class MetricTrackingFnDataReceiver<T> implements FnDataReceiver<Windowed
private final SampleByteSizeDistribution<T> sampledByteSizeDistribution;
private final Coder<T> coder;
private final @Nullable OutputSampler<T> outputSampler;
private final String ptransformId;
private final ExecutionStateTracker executionStateTracker;

public MetricTrackingFnDataReceiver(
String pCollectionId,
Expand All @@ -258,6 +266,8 @@ public MetricTrackingFnDataReceiver(
@Nullable OutputSampler<T> outputSampler) {
this.delegate = consumerAndMetadata.getConsumer();
this.executionState = consumerAndMetadata.getExecutionState();
this.executionStateTracker = consumerAndMetadata.getExecutionStateTracker();
this.ptransformId = consumerAndMetadata.getPTransformId();

HashMap<String, String> labels = new HashMap<>();
labels.put(Labels.PCOLLECTION, pCollectionId);
Expand Down Expand Up @@ -315,7 +325,10 @@ public void accept(WindowedValue<T> input) throws Exception {
this.delegate.accept(input);
} catch (Exception e) {
if (outputSampler != null) {
outputSampler.exception(elementSample, e);
ExecutionStateSampler.ExecutionStateTrackerStatus status =
executionStateTracker.getStatus();
String processBundleId = status == null ? null : status.getProcessBundleId();
outputSampler.exception(elementSample, e, ptransformId, processBundleId);
}
throw e;
} finally {
Expand Down Expand Up @@ -407,7 +420,11 @@ public void accept(WindowedValue<T> input) throws Exception {
consumerAndMetadata.getConsumer().accept(input);
} catch (Exception e) {
if (outputSampler != null) {
outputSampler.exception(elementSample, e);
ExecutionStateSampler.ExecutionStateTrackerStatus status =
consumerAndMetadata.getExecutionStateTracker().getStatus();
String processBundleId = status == null ? null : status.getProcessBundleId();
outputSampler.exception(
elementSample, e, consumerAndMetadata.getPTransformId(), processBundleId);
}
throw e;
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,21 @@ public class ElementSample<T> {
// The element sample to be serialized and later queried.
public final WindowedValue<T> sample;

public static class ExceptionMetadata {
ExceptionMetadata(String message, String ptransformId) {
this.message = message;
this.ptransformId = ptransformId;
}

// The stringified exception that caused the bundle to fail.
public final String message;

// The PTransform of where the exception occurred first.
public final String ptransformId;
}

// An optional exception to be given as metadata on the FnApi for the given sample.
@Nullable public Exception exception = null;
@Nullable public ExceptionMetadata exception = null;

ElementSample(long id, WindowedValue<T> sample) {
this.id = id;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicLong;
import javax.annotation.Nullable;
Expand All @@ -44,7 +46,7 @@ public class OutputSampler<T> {
// Temporarily holds exceptional elements. These elements can also be duplicated in the main
// buffer. This is in order to always track exceptional elements even if the number of samples in
// the main buffer drops it.
private final List<ElementSample<T>> exceptions = new ArrayList<>();
private final Map<String, ElementSample<T>> exceptions = new HashMap<>();

// Maximum number of elements in buffer.
private final int maxElements;
Expand Down Expand Up @@ -120,22 +122,68 @@ public ElementSample<T> sample(WindowedValue<T> element) {
}

/**
* Samples an exceptional element to be later queried.
* Samples an exceptional element to be later queried. The enforces that only one exception occurs
* per bundle.
*
* @param elementSample the sampled element to add an exception to.
* @param e the exception.
* @param ptransformId the source of the exception.
* @param processBundleId the failing bundle.
*/
public void exception(ElementSample<T> elementSample, Exception e) {
if (elementSample == null) {
public void exception(
ElementSample<T> elementSample, Exception e, String ptransformId, String processBundleId) {
if (elementSample == null || processBundleId == null) {
return;
}

synchronized (this) {
elementSample.exception = e;
exceptions.add(elementSample);
exceptions.computeIfAbsent(
processBundleId,
pbId -> {
elementSample.exception =
new ElementSample.ExceptionMetadata(e.toString(), ptransformId);
return elementSample;
});
}
}

/**
* Fills and returns the BeamFnApi proto.
*
* @param sample the sampled element.
* @param stream the stream to use to serialize the element.
* @param processBundleId the bundle the element belongs to. Currently only set when there is an
* exception.
*/
private BeamFnApi.SampledElement sampleToProto(
ElementSample<T> sample, ByteStringOutputStream stream, @Nullable String processBundleId)
throws IOException {
if (valueCoder != null) {
this.valueCoder.encode(sample.sample.getValue(), stream, Coder.Context.NESTED);
} else if (windowedValueCoder != null) {
this.windowedValueCoder.encode(sample.sample, stream, Coder.Context.NESTED);
}

BeamFnApi.SampledElement.Builder elementBuilder =
BeamFnApi.SampledElement.newBuilder().setElement(stream.toByteStringAndReset());

ElementSample.ExceptionMetadata exception = sample.exception;
if (exception != null) {
BeamFnApi.SampledElement.Exception.Builder exceptionBuilder =
BeamFnApi.SampledElement.Exception.newBuilder()
.setTransformId(exception.ptransformId)
.setError(exception.message);

if (processBundleId != null) {
exceptionBuilder.setInstructionId(processBundleId);
}

elementBuilder.setException(exceptionBuilder);
}

return elementBuilder.build();
}

/**
* Clears samples at end of call. This is to help mitigate memory use.
*
Expand All @@ -162,39 +210,26 @@ public List<BeamFnApi.SampledElement> samples() throws IOException {
// to deduplicate samples.
HashSet<Long> seen = new HashSet<>();
ByteStringOutputStream stream = new ByteStringOutputStream();
for (int i = 0; i < bufferToSend.size(); i++) {
int index = (sampleIndex + i) % bufferToSend.size();
ElementSample<T> sample = bufferToSend.get(index);
for (Map.Entry<String, ElementSample<T>> pair : exceptions.entrySet()) {
String processBundleId = pair.getKey();
ElementSample<T> sample = pair.getValue();
seen.add(sample.id);

if (valueCoder != null) {
this.valueCoder.encode(sample.sample.getValue(), stream, Coder.Context.NESTED);
} else if (windowedValueCoder != null) {
this.windowedValueCoder.encode(sample.sample, stream, Coder.Context.NESTED);
}

ret.add(
BeamFnApi.SampledElement.newBuilder().setElement(stream.toByteStringAndReset()).build());
ret.add(sampleToProto(sample, stream, processBundleId));
}
exceptions.clear();

// TODO: set the exception metadata on the proto once that PR is merged.
for (ElementSample<T> sample : exceptions) {
for (int i = 0; i < bufferToSend.size(); i++) {
int index = (sampleIndex + i) % bufferToSend.size();

ElementSample<T> sample = bufferToSend.get(index);
if (seen.contains(sample.id)) {
continue;
}

if (valueCoder != null) {
this.valueCoder.encode(sample.sample.getValue(), stream, Coder.Context.NESTED);
} else if (windowedValueCoder != null) {
this.windowedValueCoder.encode(sample.sample, stream, Coder.Context.NESTED);
}

ret.add(
BeamFnApi.SampledElement.newBuilder().setElement(stream.toByteStringAndReset()).build());
ret.add(sampleToProto(sample, stream, null));
}

exceptions.clear();

return ret;
}
}
Loading