From 6723b63dc2d86a2e608cc875a4cd7b01e6bc498f Mon Sep 17 00:00:00 2001 From: Sam Rohde Date: Mon, 26 Jun 2023 20:08:09 -0700 Subject: [PATCH 1/5] Finish Java Exception Sampling --- .../control/ExecutionStateSampler.java | 16 +++- .../data/PCollectionConsumerRegistry.java | 27 +++++- .../beam/fn/harness/debug/ElementSample.java | 15 ++- .../beam/fn/harness/debug/OutputSampler.java | 93 +++++++++++++------ .../fn/harness/debug/OutputSamplerTest.java | 91 ++++++++++++++++-- .../status/BeamFnStatusClientTest.java | 2 +- 6 files changed, 198 insertions(+), 46 deletions(-) diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ExecutionStateSampler.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ExecutionStateSampler.java index 2c2485dd842c..c8cef8cf8616 100644 --- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ExecutionStateSampler.java +++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ExecutionStateSampler.java @@ -363,9 +363,14 @@ private void takeSample(long currentTimeMillis, long millisSinceLastSample) { ExecutionStateImpl current = currentStateLazy.get(); if (current != null) { return ExecutionStateTrackerStatus.create( - current.ptransformId, current.ptransformUniqueName, thread, lastTransitionTimeMs); + current.ptransformId, + current.ptransformUniqueName, + thread, + lastTransitionTimeMs, + processBundleId.get()); } else { - return ExecutionStateTrackerStatus.create(null, null, thread, lastTransitionTimeMs); + return ExecutionStateTrackerStatus.create( + null, null, thread, lastTransitionTimeMs, processBundleId.get()); } } @@ -518,9 +523,10 @@ public static ExecutionStateTrackerStatus create( @Nullable String ptransformId, @Nullable String ptransformUniqueName, Thread trackedThread, - long lastTransitionTimeMs) { + long lastTransitionTimeMs, + @Nullable String processBundleId) { return new AutoValue_ExecutionStateSampler_ExecutionStateTrackerStatus( - ptransformId, ptransformUniqueName, trackedThread, lastTransitionTimeMs); + ptransformId, ptransformUniqueName, trackedThread, lastTransitionTimeMs, processBundleId); } public abstract @Nullable String getPTransformId(); @@ -530,5 +536,7 @@ public static ExecutionStateTrackerStatus create( public abstract Thread getTrackedThread(); public abstract long getLastTransitionTimeMillis(); + + public abstract @Nullable String getProcessBundleId(); } } diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/PCollectionConsumerRegistry.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/PCollectionConsumerRegistry.java index 150580c7f64f..a7a8766ffc7b 100644 --- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/PCollectionConsumerRegistry.java +++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/PCollectionConsumerRegistry.java @@ -27,6 +27,7 @@ import javax.annotation.Nullable; import org.apache.beam.fn.harness.HandlesSplits; import org.apache.beam.fn.harness.control.BundleProgressReporter; +import org.apache.beam.fn.harness.control.ExecutionStateSampler; import org.apache.beam.fn.harness.control.ExecutionStateSampler.ExecutionState; import org.apache.beam.fn.harness.control.ExecutionStateSampler.ExecutionStateTracker; import org.apache.beam.fn.harness.control.Metrics; @@ -71,9 +72,12 @@ public class PCollectionConsumerRegistry { @SuppressWarnings({"rawtypes"}) abstract static class ConsumerAndMetadata { public static ConsumerAndMetadata forConsumer( - FnDataReceiver consumer, String pTransformId, ExecutionState state) { + FnDataReceiver consumer, + String pTransformId, + ExecutionState state, + ExecutionStateTracker stateTracker) { return new AutoValue_PCollectionConsumerRegistry_ConsumerAndMetadata( - consumer, pTransformId, state); + consumer, pTransformId, state, stateTracker); } public abstract FnDataReceiver getConsumer(); @@ -81,6 +85,8 @@ public static ConsumerAndMetadata forConsumer( public abstract String getPTransformId(); public abstract ExecutionState getExecutionState(); + + public abstract ExecutionStateTracker getExecutionStateTracker(); } private final ExecutionStateTracker stateTracker; @@ -176,7 +182,7 @@ public void register( List consumerAndMetadatas = pCollectionIdsToConsumers.computeIfAbsent(pCollectionId, (unused) -> new ArrayList<>()); consumerAndMetadatas.add( - ConsumerAndMetadata.forConsumer(consumer, pTransformId, executionState)); + ConsumerAndMetadata.forConsumer(consumer, pTransformId, executionState, stateTracker)); } /** @@ -250,6 +256,8 @@ private class MetricTrackingFnDataReceiver implements FnDataReceiver sampledByteSizeDistribution; private final Coder coder; private final @Nullable OutputSampler outputSampler; + private final String ptransformId; + private final ExecutionStateTracker executionStateTracker; public MetricTrackingFnDataReceiver( String pCollectionId, @@ -258,6 +266,8 @@ public MetricTrackingFnDataReceiver( @Nullable OutputSampler outputSampler) { this.delegate = consumerAndMetadata.getConsumer(); this.executionState = consumerAndMetadata.getExecutionState(); + this.executionStateTracker = consumerAndMetadata.getExecutionStateTracker(); + this.ptransformId = consumerAndMetadata.getPTransformId(); HashMap labels = new HashMap<>(); labels.put(Labels.PCOLLECTION, pCollectionId); @@ -315,7 +325,10 @@ public void accept(WindowedValue input) throws Exception { this.delegate.accept(input); } catch (Exception e) { if (outputSampler != null) { - outputSampler.exception(elementSample, e); + ExecutionStateSampler.ExecutionStateTrackerStatus status = + executionStateTracker.getStatus(); + String processBundleId = status == null ? null : status.getProcessBundleId(); + outputSampler.exception(elementSample, e, ptransformId, processBundleId); } throw e; } finally { @@ -407,7 +420,11 @@ public void accept(WindowedValue input) throws Exception { consumerAndMetadata.getConsumer().accept(input); } catch (Exception e) { if (outputSampler != null) { - outputSampler.exception(elementSample, e); + ExecutionStateSampler.ExecutionStateTrackerStatus status = + consumerAndMetadata.getExecutionStateTracker().getStatus(); + String processBundleId = status == null ? null : status.getProcessBundleId(); + outputSampler.exception( + elementSample, e, consumerAndMetadata.getPTransformId(), processBundleId); } throw e; } finally { diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/debug/ElementSample.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/debug/ElementSample.java index 85abd02e1d96..4ef1674c9ec6 100644 --- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/debug/ElementSample.java +++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/debug/ElementSample.java @@ -34,8 +34,21 @@ public class ElementSample { // The element sample to be serialized and later queried. public final WindowedValue sample; + public static class ExceptionMetadata { + ExceptionMetadata(String message, String ptransformId) { + this.message = message; + this.ptransformId = ptransformId; + } + + // The stringified exception that caused the bundle to fail. + public final String message; + + // The PTransform of where the exception occurred first. + public final String ptransformId; + } + // An optional exception to be given as metadata on the FnApi for the given sample. - @Nullable public Exception exception = null; + @Nullable public ExceptionMetadata exception = null; ElementSample(long id, WindowedValue sample) { this.id = id; diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/debug/OutputSampler.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/debug/OutputSampler.java index a81796a239ae..1827d800967b 100644 --- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/debug/OutputSampler.java +++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/debug/OutputSampler.java @@ -19,8 +19,10 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.HashMap; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.atomic.AtomicLong; import javax.annotation.Nullable; @@ -44,7 +46,7 @@ public class OutputSampler { // Temporarily holds exceptional elements. These elements can also be duplicated in the main // buffer. This is in order to always track exceptional elements even if the number of samples in // the main buffer drops it. - private final List> exceptions = new ArrayList<>(); + private final Map> exceptions = new HashMap<>(); // Maximum number of elements in buffer. private final int maxElements; @@ -120,22 +122,68 @@ public ElementSample sample(WindowedValue element) { } /** - * Samples an exceptional element to be later queried. + * Samples an exceptional element to be later queried. The enforces that only one exception occurs + * per bundle. * * @param elementSample the sampled element to add an exception to. * @param e the exception. + * @param ptransformId the source of the exception. + * @param processBundleId the failing bundle. */ - public void exception(ElementSample elementSample, Exception e) { - if (elementSample == null) { + public void exception( + ElementSample elementSample, Exception e, String ptransformId, String processBundleId) { + if (elementSample == null || processBundleId == null) { return; } synchronized (this) { - elementSample.exception = e; - exceptions.add(elementSample); + exceptions.computeIfAbsent( + processBundleId, + pbId -> { + elementSample.exception = + new ElementSample.ExceptionMetadata(e.toString(), ptransformId); + return elementSample; + }); } } + /** + * Fills and returns the BeamFnApi proto. + * + * @param sample the sampled element. + * @param ByteStringOutputStream the stream to use to serialize the element. + * @param processBundleId the bundle the element belongs to. Currently only set when there is an + * exception. + */ + private BeamFnApi.SampledElement sampleToProto( + ElementSample sample, ByteStringOutputStream stream, @Nullable String processBundleId) + throws IOException { + if (valueCoder != null) { + this.valueCoder.encode(sample.sample.getValue(), stream, Coder.Context.NESTED); + } else if (windowedValueCoder != null) { + this.windowedValueCoder.encode(sample.sample, stream, Coder.Context.NESTED); + } + + BeamFnApi.SampledElement.Builder elementBuilder = + BeamFnApi.SampledElement.newBuilder().setElement(stream.toByteStringAndReset()); + + ElementSample.ExceptionMetadata exception = sample.exception; + if (exception != null) { + BeamFnApi.SampledElement.Exception.Builder exceptionBuilder = + BeamFnApi.SampledElement.Exception.newBuilder() + .setTransformId(exception.ptransformId) + .setError(exception.message); + + if (processBundleId != null) { + exceptionBuilder.setInstructionId(processBundleId); + } + + elementBuilder.setException(exceptionBuilder); + } + + return elementBuilder.build(); + } + /** * Clears samples at end of call. This is to help mitigate memory use. * @@ -162,39 +210,26 @@ public List samples() throws IOException { // to deduplicate samples. HashSet seen = new HashSet<>(); ByteStringOutputStream stream = new ByteStringOutputStream(); - for (int i = 0; i < bufferToSend.size(); i++) { - int index = (sampleIndex + i) % bufferToSend.size(); - ElementSample sample = bufferToSend.get(index); + for (Map.Entry> pair : exceptions.entrySet()) { + String processBundleId = pair.getKey(); + ElementSample sample = pair.getValue(); seen.add(sample.id); - if (valueCoder != null) { - this.valueCoder.encode(sample.sample.getValue(), stream, Coder.Context.NESTED); - } else if (windowedValueCoder != null) { - this.windowedValueCoder.encode(sample.sample, stream, Coder.Context.NESTED); - } - - ret.add( - BeamFnApi.SampledElement.newBuilder().setElement(stream.toByteStringAndReset()).build()); + ret.add(sampleToProto(sample, stream, processBundleId)); } + exceptions.clear(); - // TODO: set the exception metadata on the proto once that PR is merged. - for (ElementSample sample : exceptions) { + for (int i = 0; i < bufferToSend.size(); i++) { + int index = (sampleIndex + i) % bufferToSend.size(); + + ElementSample sample = bufferToSend.get(index); if (seen.contains(sample.id)) { continue; } - if (valueCoder != null) { - this.valueCoder.encode(sample.sample.getValue(), stream, Coder.Context.NESTED); - } else if (windowedValueCoder != null) { - this.windowedValueCoder.encode(sample.sample, stream, Coder.Context.NESTED); - } - - ret.add( - BeamFnApi.SampledElement.newBuilder().setElement(stream.toByteStringAndReset()).build()); + ret.add(sampleToProto(sample, stream, null)); } - exceptions.clear(); - return ret; } } diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/debug/OutputSamplerTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/debug/OutputSamplerTest.java index 7946251b30f9..761f710b0baa 100644 --- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/debug/OutputSamplerTest.java +++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/debug/OutputSamplerTest.java @@ -27,6 +27,7 @@ import java.util.Collections; import java.util.List; import java.util.concurrent.CountDownLatch; +import javax.annotation.Nullable; import org.apache.beam.model.fnexecution.v1.BeamFnApi; import org.apache.beam.sdk.coders.VarIntCoder; import org.apache.beam.sdk.transforms.windowing.GlobalWindow; @@ -59,6 +60,28 @@ public BeamFnApi.SampledElement encodeGlobalWindowedInt(Integer i) throws IOExce .build(); } + public BeamFnApi.SampledElement encodeException( + Integer i, String error, String ptransformId, @Nullable String processBundleId) + throws IOException { + VarIntCoder coder = VarIntCoder.of(); + ByteArrayOutputStream stream = new ByteArrayOutputStream(); + coder.encode(i, stream); + + BeamFnApi.SampledElement.Exception.Builder builder = + BeamFnApi.SampledElement.Exception.newBuilder() + .setTransformId(ptransformId) + .setError(error); + + if (processBundleId != null) { + builder.setInstructionId(processBundleId); + } + + return BeamFnApi.SampledElement.newBuilder() + .setElement(ByteString.copyFrom(stream.toByteArray())) + .setException(builder) + .build(); + } + /** * Test that the first N are always sampled. * @@ -154,11 +177,65 @@ public void testCanSampleExceptions() throws IOException { ElementSample elementSample = outputSampler.sample(windowedValue); Exception exception = new RuntimeException("Test exception"); - outputSampler.exception(elementSample, exception); + String ptransformId = "ptransform"; + String processBundleId = "processBundle"; + outputSampler.exception(elementSample, exception, ptransformId, processBundleId); + + List expected = new ArrayList<>(); + expected.add(encodeException(1, exception.toString(), ptransformId, processBundleId)); + + List samples = outputSampler.samples(); + assertThat(samples, containsInAnyOrder(expected.toArray())); + } + + /** + * Test that in the event that an exception happens multiple times in a bundle, it's only recorded + * at the source. + * + * @throws IOException when encoding fails (shouldn't happen). + */ + @Test + public void testNoDuplicateExceptions() throws IOException { + VarIntCoder coder = VarIntCoder.of(); + OutputSampler outputSampler = new OutputSampler<>(coder, 5, 20); + + ElementSample elementSampleA = + outputSampler.sample(WindowedValue.valueInGlobalWindow(1)); + ElementSample elementSampleB = + outputSampler.sample(WindowedValue.valueInGlobalWindow(2)); + + Exception exception = new RuntimeException("Test exception"); + String ptransformIdA = "ptransformA"; + String ptransformIdB = "ptransformB"; + String processBundleId = "processBundle"; + outputSampler.exception(elementSampleA, exception, ptransformIdA, processBundleId); + outputSampler.exception(elementSampleB, exception, ptransformIdB, processBundleId); + + List expected = new ArrayList<>(); + expected.add(encodeException(1, exception.toString(), ptransformIdA, processBundleId)); + expected.add(encodeInt(2)); + + List samples = outputSampler.samples(); + assertThat(samples, containsInAnyOrder(expected.toArray())); + } + + /** + * Test that exception metadata is only set if there is a process bundle. + * + * @throws IOException when encoding fails (shouldn't happen). + */ + @Test + public void testExceptionOnlySampledIfNonNullProcessBundle() throws IOException { + VarIntCoder coder = VarIntCoder.of(); + OutputSampler outputSampler = new OutputSampler<>(coder, 5, 20); + + WindowedValue windowedValue = WindowedValue.valueInGlobalWindow(1); + ElementSample elementSample = outputSampler.sample(windowedValue); + + Exception exception = new RuntimeException("Test exception"); + String ptransformId = "ptransform"; + outputSampler.exception(elementSample, exception, ptransformId, null); - // The first 10 are always sampled, but with maxSamples = 5, the first ten are downsampled to - // 4..9 inclusive. Then, - // the 20th element is sampled (19) and every 20 after. List expected = new ArrayList<>(); expected.add(encodeInt(1)); @@ -185,7 +262,9 @@ public void testExceptionSamplesAreNotRemoved() throws IOException { } Exception exception = new RuntimeException("Test exception"); - outputSampler.exception(elementSample, exception); + String ptransformId = "ptransform"; + String processBundleId = "processBundle"; + outputSampler.exception(elementSample, exception, ptransformId, processBundleId); // The first 10 are always sampled, but with maxSamples = 5, the first ten are downsampled to // 4..9 inclusive. Then, the 20th element is sampled (19) and every 20 after. Finally, @@ -196,7 +275,7 @@ public void testExceptionSamplesAreNotRemoved() throws IOException { expected.add(encodeInt(59)); expected.add(encodeInt(79)); expected.add(encodeInt(99)); - expected.add(encodeInt(0)); + expected.add(encodeException(0, exception.toString(), ptransformId, processBundleId)); List samples = outputSampler.samples(); assertThat(samples, containsInAnyOrder(expected.toArray())); diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/status/BeamFnStatusClientTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/status/BeamFnStatusClientTest.java index f4265b158348..7a71a58d04ca 100644 --- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/status/BeamFnStatusClientTest.java +++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/status/BeamFnStatusClientTest.java @@ -75,7 +75,7 @@ public void testActiveBundleState() { when(executionStateTracker.getStatus()) .thenReturn( ExecutionStateTrackerStatus.create( - "ptransformId", "ptransformIdName", Thread.currentThread(), i * 1000)); + "ptransformId", "ptransformIdName", Thread.currentThread(), i * 1000, null)); String instruction = Integer.toString(i); when(processorCache.find(instruction)).thenReturn(processor); bundleProcessorMap.put(instruction, processor); From 0b2349cfdcd8194d5d2bcf99037d878dc465dd48 Mon Sep 17 00:00:00 2001 From: Sam Rohde Date: Tue, 27 Jun 2023 09:56:14 -0700 Subject: [PATCH 2/5] wrong param name in comment --- .../java/org/apache/beam/fn/harness/debug/OutputSampler.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/debug/OutputSampler.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/debug/OutputSampler.java index 1827d800967b..e86b2d5b5ceb 100644 --- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/debug/OutputSampler.java +++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/debug/OutputSampler.java @@ -151,7 +151,7 @@ public void exception( * Fills and returns the BeamFnApi proto. * * @param sample the sampled element. - * @param ByteStringOutputStream the stream to use to serialize the element. + * @param stream the stream to use to serialize the element. * @param processBundleId the bundle the element belongs to. Currently only set when there is an * exception. */ From 63dc40cad67785533ce5081f708c7465c6fd5148 Mon Sep 17 00:00:00 2001 From: Sam Rohde Date: Tue, 27 Jun 2023 10:48:53 -0700 Subject: [PATCH 3/5] run tests From fb67221535cda8cfc30a25d0ac9f32bfe78576e2 Mon Sep 17 00:00:00 2001 From: Sam Rohde Date: Tue, 27 Jun 2023 12:40:13 -0700 Subject: [PATCH 4/5] run tests From 7be9b56c0c2cf0cea9f5717620b0bae9306fbd7d Mon Sep 17 00:00:00 2001 From: Sam Rohde Date: Tue, 27 Jun 2023 17:22:02 -0700 Subject: [PATCH 5/5] run tests