Skip to content

Commit

Permalink
Revert "Implementing lull reporting at bundle level processing (#29882)…
Browse files Browse the repository at this point in the history
…" (#30648) (#30664)

This reverts commit ffe2dba.

Co-authored-by: Arvind Ram <arvindram03@gmail.com>
  • Loading branch information
Abacn and arvindram03 authored Mar 18, 2024
1 parent d9897b0 commit 0dc330e
Show file tree
Hide file tree
Showing 6 changed files with 155 additions and 353 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ public class ExecutionStateTracker implements Comparable<ExecutionStateTracker>
new ConcurrentHashMap<>();

private static final long LULL_REPORT_MS = TimeUnit.MINUTES.toMillis(5);
private static final long BUNDLE_LULL_REPORT_MS = TimeUnit.MINUTES.toMillis(10);
private static final AtomicIntegerFieldUpdater<ExecutionStateTracker> SAMPLING_UPDATER =
AtomicIntegerFieldUpdater.newUpdater(ExecutionStateTracker.class, "sampling");

Expand Down Expand Up @@ -140,17 +139,8 @@ public String getDescription() {
*/
private volatile long millisSinceLastTransition = 0;

/**
* The number of milliseconds since the {@link ExecutionStateTracker} initial state.
*
* <p>This variable is updated by the Sampling thread, and read by the Progress Reporting thread,
* thus it being marked volatile.
*/
private volatile long millisSinceBundleStart = 0;

private long transitionsAtLastSample = 0;
private long nextLullReportMs = LULL_REPORT_MS;
private long nextBundleLullReportMs = BUNDLE_LULL_REPORT_MS;

public ExecutionStateTracker(ExecutionStateSampler sampler) {
this.sampler = sampler;
Expand All @@ -165,10 +155,8 @@ public synchronized void reset() {
currentState = null;
numTransitions = 0;
millisSinceLastTransition = 0;
millisSinceBundleStart = 0;
transitionsAtLastSample = 0;
nextLullReportMs = LULL_REPORT_MS;
nextBundleLullReportMs = BUNDLE_LULL_REPORT_MS;
}

@VisibleForTesting
Expand Down Expand Up @@ -347,19 +335,6 @@ protected void takeSampleOnce(long millisSinceLastSample) {
transitionsAtLastSample = transitionsAtThisSample;
}
updateMillisSinceLastTransition(millisSinceLastSample, state);
updateMillisSinceBundleStart(millisSinceLastSample);
}

// Override this to implement bundle level lull reporting.
protected void reportBundleLull(long millisSinceBundleStart) {}

@SuppressWarnings("NonAtomicVolatileUpdate")
private void updateMillisSinceBundleStart(long millisSinceLastSample) {
millisSinceBundleStart += millisSinceLastSample;
if (millisSinceBundleStart > nextBundleLullReportMs) {
reportBundleLull(millisSinceBundleStart);
nextBundleLullReportMs += BUNDLE_LULL_REPORT_MS;
}
}

@SuppressWarnings("NonAtomicVolatileUpdate")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull;

import com.google.api.client.util.Clock;
import com.google.api.services.dataflow.model.SideInputInfo;
import java.io.Closeable;
import java.io.IOException;
Expand All @@ -30,8 +29,6 @@
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Optional;
import java.util.logging.Level;
import java.util.logging.LogRecord;
import java.util.stream.Collectors;
import javax.annotation.concurrent.GuardedBy;
import org.apache.beam.runners.core.NullSideInputReader;
Expand All @@ -40,30 +37,22 @@
import org.apache.beam.runners.core.TimerInternals.TimerData;
import org.apache.beam.runners.core.metrics.ExecutionStateSampler;
import org.apache.beam.runners.core.metrics.ExecutionStateTracker;
import org.apache.beam.runners.core.metrics.ExecutionStateTracker.ExecutionState;
import org.apache.beam.runners.dataflow.worker.DataflowExecutionContext.DataflowStepContext;
import org.apache.beam.runners.dataflow.worker.DataflowOperationContext.DataflowExecutionState;
import org.apache.beam.runners.dataflow.worker.counters.CounterFactory;
import org.apache.beam.runners.dataflow.worker.counters.NameContext;
import org.apache.beam.runners.dataflow.worker.logging.DataflowWorkerLoggingHandler;
import org.apache.beam.runners.dataflow.worker.logging.DataflowWorkerLoggingInitializer;
import org.apache.beam.runners.dataflow.worker.util.common.worker.ElementExecutionTracker;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.metrics.MetricsContainer;
import org.apache.beam.sdk.metrics.MetricsEnvironment;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.io.Closer;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.joda.time.Duration;
import org.joda.time.DateTimeUtils.MillisProvider;
import org.joda.time.Instant;
import org.joda.time.format.PeriodFormatter;
import org.joda.time.format.PeriodFormatterBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/** Execution context for the Dataflow worker. */
@SuppressWarnings({
Expand Down Expand Up @@ -271,59 +260,23 @@ public static class DataflowExecutionStateTracker extends ExecutionStateTracker
@Nullable
private ActiveMessageMetadata activeMessageMetadata = null;

/** Clock used to either provide real system time or mocked to virtualize time for testing. */
private final Clock clock;
private final MillisProvider clock = System::currentTimeMillis;

@GuardedBy("this")
private final Map<String, IntSummaryStatistics> processingTimesByStep = new HashMap<>();

/** Last milliseconds since epoch when a full thread dump was performed. */
private long lastFullThreadDumpMillis = 0;

/** The minimum lull duration in milliseconds to perform a full thread dump. */
private static final long LOG_BUNDLE_LULL_FULL_THREAD_DUMP_LULL_MS = 20 * 60 * 1000;

private static final Logger LOG = LoggerFactory.getLogger(DataflowExecutionStateTracker.class);

private static final PeriodFormatter DURATION_FORMATTER =
new PeriodFormatterBuilder()
.appendDays()
.appendSuffix("d")
.minimumPrintedDigits(2)
.appendHours()
.appendSuffix("h")
.printZeroAlways()
.appendMinutes()
.appendSuffix("m")
.appendSeconds()
.appendSuffix("s")
.toFormatter();

public DataflowExecutionStateTracker(
ExecutionStateSampler sampler,
DataflowOperationContext.DataflowExecutionState otherState,
CounterFactory counterFactory,
PipelineOptions options,
String workItemId) {
this(sampler, otherState, counterFactory, options, workItemId, Clock.SYSTEM);
}

@VisibleForTesting
public DataflowExecutionStateTracker(
ExecutionStateSampler sampler,
DataflowOperationContext.DataflowExecutionState otherState,
CounterFactory counterFactory,
PipelineOptions options,
String workItemId,
Clock clock) {
super(sampler);
this.elementExecutionTracker =
DataflowElementExecutionTracker.create(counterFactory, options);
this.otherState = otherState;
this.workItemId = workItemId;
this.contextActivationObserverRegistry = ContextActivationObserverRegistry.createDefault();
this.clock = clock;
DataflowWorkerLoggingInitializer.initialize();
}

@Override
Expand All @@ -348,76 +301,12 @@ public Closeable activate() {
}
}

private boolean shouldLogFullThreadDumpForBundle(Duration lullDuration) {
if (lullDuration.getMillis() < LOG_BUNDLE_LULL_FULL_THREAD_DUMP_LULL_MS) {
return false;
}
long now = clock.currentTimeMillis();
if (lastFullThreadDumpMillis + LOG_BUNDLE_LULL_FULL_THREAD_DUMP_LULL_MS < now) {
lastFullThreadDumpMillis = now;
return true;
}
return false;
}

private String getBundleLullMessage(Duration lullDuration) {
StringBuilder message = new StringBuilder();
message
.append("Operation ongoing in bundle for at least ")
.append(DURATION_FORMATTER.print(lullDuration.toPeriod()))
.append(" without completing")
.append("\n");
synchronized (this) {
if (this.activeMessageMetadata != null) {
message.append(
"Current user step name: " + getActiveMessageMetadata().get().userStepName() + "\n");
message.append(
"Time spent in this step(millis): "
+ (clock.currentTimeMillis() - getActiveMessageMetadata().get().startTime())
+ "\n");
}
message.append("Processing times in each step(millis)\n");
for (Map.Entry<String, IntSummaryStatistics> entry :
this.processingTimesByStep.entrySet()) {
message.append("Step name: " + entry.getKey() + "\n");
message.append("Time spent in this step: " + entry.getValue().toString() + "\n");
}
}

return message.toString();
}

@Override
protected void takeSampleOnce(long millisSinceLastSample) {
elementExecutionTracker.takeSample(millisSinceLastSample);
super.takeSampleOnce(millisSinceLastSample);
}

@Override
protected void reportBundleLull(long millisElapsedSinceBundleStart) {
// If we're not logging warnings, nothing to report.
if (!LOG.isWarnEnabled()) {
return;
}

Duration lullDuration = Duration.millis(millisElapsedSinceBundleStart);

// Since the lull reporting executes in the sampler thread, it won't automatically inherit the
// context of the current step. To ensure things are logged correctly, we get the currently
// registered DataflowWorkerLoggingHandler and log directly in the desired context.
LogRecord logRecord = new LogRecord(Level.WARNING, getBundleLullMessage(lullDuration));
logRecord.setLoggerName(DataflowExecutionStateTracker.LOG.getName());

// Publish directly in the context of this specific ExecutionState.
DataflowWorkerLoggingHandler dataflowLoggingHandler =
DataflowWorkerLoggingInitializer.getLoggingHandler();
dataflowLoggingHandler.publish(logRecord);

if (shouldLogFullThreadDumpForBundle(lullDuration)) {
StackTraceUtil.logAllStackTraces();
}
}

/**
* Enter a new state on the tracker. If the new state is a Dataflow processing state, tracks the
* activeMessageMetadata with the start time of the new state.
Expand All @@ -434,7 +323,7 @@ public Closeable enterState(ExecutionState newState) {
synchronized (this) {
this.activeMessageMetadata =
ActiveMessageMetadata.create(
newDFState.getStepName().userName(), clock.currentTimeMillis());
newDFState.getStepName().userName(), clock.getMillis());
}
}
elementExecutionTracker.enter(newDFState.getStepName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,16 @@

import static org.apache.beam.runners.dataflow.worker.counters.DataflowCounterUpdateExtractor.longToSplitInt;

import com.google.api.client.util.Clock;
import com.google.api.services.dataflow.model.CounterMetadata;
import com.google.api.services.dataflow.model.CounterStructuredName;
import com.google.api.services.dataflow.model.CounterStructuredNameAndMetadata;
import com.google.api.services.dataflow.model.CounterUpdate;
import java.io.Closeable;
import java.util.Map;
import java.util.logging.Level;
import java.util.logging.LogRecord;
import org.apache.beam.runners.core.SimpleDoFnRunner;
import org.apache.beam.runners.core.metrics.ExecutionStateTracker;
import org.apache.beam.runners.core.metrics.ExecutionStateTracker.ExecutionState;
import org.apache.beam.runners.dataflow.worker.MetricsToCounterUpdateConverter.Kind;
Expand All @@ -39,6 +42,7 @@
import org.apache.beam.sdk.metrics.MetricsContainer;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.joda.time.Duration;
import org.joda.time.format.PeriodFormatter;
Expand Down Expand Up @@ -181,19 +185,41 @@ public abstract static class DataflowExecutionState extends ExecutionState {
private final ProfileScope profileScope;
private final @Nullable MetricsContainer metricsContainer;

/** Clock used to either provide real system time or mocked to virtualize time for testing. */
private final Clock clock;

public DataflowExecutionState(
NameContext nameContext,
String stateName,
@Nullable String requestingStepName,
@Nullable Integer inputIndex,
@Nullable MetricsContainer metricsContainer,
ProfileScope profileScope) {
this(
nameContext,
stateName,
requestingStepName,
inputIndex,
metricsContainer,
profileScope,
Clock.SYSTEM);
}

public DataflowExecutionState(
NameContext nameContext,
String stateName,
@Nullable String requestingStepName,
@Nullable Integer inputIndex,
@Nullable MetricsContainer metricsContainer,
ProfileScope profileScope,
Clock clock) {
super(stateName);
this.stepName = nameContext;
this.requestingStepName = requestingStepName;
this.inputIndex = inputIndex;
this.profileScope = Preconditions.checkNotNull(profileScope);
this.metricsContainer = metricsContainer;
this.clock = clock;
}

/**
Expand Down Expand Up @@ -225,6 +251,9 @@ public String getDescription() {
return description.toString();
}

private static final ImmutableSet<String> FRAMEWORK_CLASSES =
ImmutableSet.of(SimpleDoFnRunner.class.getName(), DoFnInstanceManagers.class.getName());

protected String getLullMessage(Thread trackedThread, Duration lullDuration) {
StringBuilder message = new StringBuilder();
message.append("Operation ongoing");
Expand All @@ -243,7 +272,7 @@ protected String getLullMessage(Thread trackedThread, Duration lullDuration) {

message.append("\n");

message.append(StackTraceUtil.getStackTraceForLullMessage(trackedThread.getStackTrace()));
message.append(getStackTraceForLullMessage(trackedThread.getStackTrace()));
return message.toString();
}

Expand All @@ -267,6 +296,55 @@ public void reportLull(Thread trackedThread, long millis) {
DataflowWorkerLoggingHandler dataflowLoggingHandler =
DataflowWorkerLoggingInitializer.getLoggingHandler();
dataflowLoggingHandler.publish(this, logRecord);

if (shouldLogFullThreadDump(lullDuration)) {
Map<Thread, StackTraceElement[]> threadSet = Thread.getAllStackTraces();
for (Map.Entry<Thread, StackTraceElement[]> entry : threadSet.entrySet()) {
Thread thread = entry.getKey();
StackTraceElement[] stackTrace = entry.getValue();
StringBuilder message = new StringBuilder();
message.append(thread.toString()).append(":\n");
message.append(getStackTraceForLullMessage(stackTrace));
logRecord = new LogRecord(Level.INFO, message.toString());
logRecord.setLoggerName(DataflowOperationContext.LOG.getName());
dataflowLoggingHandler.publish(this, logRecord);
}
}
}

/**
* The time interval between two full thread dump. (A full thread dump is performed at most once
* every 20 minutes.)
*/
private static final long LOG_LULL_FULL_THREAD_DUMP_INTERVAL_MS = 20 * 60 * 1000;

/** The minimum lull duration to perform a full thread dump. */
private static final long LOG_LULL_FULL_THREAD_DUMP_LULL_MS = 20 * 60 * 1000;

/** Last time when a full thread dump was performed. */
private long lastFullThreadDumpMillis = 0;

private boolean shouldLogFullThreadDump(Duration lullDuration) {
if (lullDuration.getMillis() < LOG_LULL_FULL_THREAD_DUMP_LULL_MS) {
return false;
}
long now = clock.currentTimeMillis();
if (lastFullThreadDumpMillis + LOG_LULL_FULL_THREAD_DUMP_INTERVAL_MS < now) {
lastFullThreadDumpMillis = now;
return true;
}
return false;
}

private String getStackTraceForLullMessage(StackTraceElement[] stackTrace) {
StringBuilder message = new StringBuilder();
for (StackTraceElement e : stackTrace) {
if (FRAMEWORK_CLASSES.contains(e.getClassName())) {
break;
}
message.append(" at ").append(e).append("\n");
}
return message.toString();
}

public @Nullable MetricsContainer getMetricsContainer() {
Expand Down
Loading

0 comments on commit 0dc330e

Please sign in to comment.