From af7aa71bd10c623269e25404dcd5ec95fa8e5337 Mon Sep 17 00:00:00 2001 From: Kanishk Karanawat Date: Wed, 6 May 2020 10:03:36 -0400 Subject: [PATCH] Enable metric extraction for batch dataflow worker (#1) Co-authored-by: steve Co-authored-by: Kanishk Karanawat --- .../worker/legacy-worker/build.gradle | 2 +- .../dataflow/worker/BatchDataflowWorker.java | 28 +++++++++++++++++-- .../worker/BatchModeExecutionContext.java | 12 ++++++++ .../dataflow/worker/WorkItemStatusClient.java | 24 ++++++++++++++++ 4 files changed, 63 insertions(+), 3 deletions(-) diff --git a/runners/google-cloud-dataflow-java/worker/legacy-worker/build.gradle b/runners/google-cloud-dataflow-java/worker/legacy-worker/build.gradle index 379ed4a38673..9fdffe422f41 100644 --- a/runners/google-cloud-dataflow-java/worker/legacy-worker/build.gradle +++ b/runners/google-cloud-dataflow-java/worker/legacy-worker/build.gradle @@ -29,7 +29,7 @@ plugins { id 'org.apache.beam.module' } // by adding -Pdataflow.version= in Gradle command. Otherwise, // 'google_clients_version' defined in BeamModulePlugin will be used as default. def DATAFLOW_VERSION = "dataflow.version" -def DATAFLOW_WORKER_REV = "20200203" +def DATAFLOW_WORKER_REV = "20200504" // Get full dependency of 'com.google.apis:google-api-services-dataflow' def google_api_services_dataflow = project.hasProperty(DATAFLOW_VERSION) ? "com.google.apis:google-api-services-dataflow:" + getProperty(DATAFLOW_VERSION) : library.java.google_api_services_dataflow diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BatchDataflowWorker.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BatchDataflowWorker.java index 9031ce089d6e..0021fc911d91 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BatchDataflowWorker.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BatchDataflowWorker.java @@ -17,6 +17,8 @@ */ package org.apache.beam.runners.dataflow.worker; +import static org.apache.beam.runners.dataflow.worker.DataflowSystemMetrics.StreamingSystemCounterNames.*; + import com.google.api.services.dataflow.model.MapTask; import com.google.api.services.dataflow.model.WorkItem; import java.io.Closeable; @@ -30,6 +32,8 @@ import org.apache.beam.runners.dataflow.options.DataflowWorkerHarnessOptions; import org.apache.beam.runners.dataflow.worker.SdkHarnessRegistry.SdkWorkerHarness; import org.apache.beam.runners.dataflow.worker.apiary.FixMultiOutputInfosOnParDoInstructions; +import org.apache.beam.runners.dataflow.worker.counters.Counter; +import org.apache.beam.runners.dataflow.worker.counters.CounterName; import org.apache.beam.runners.dataflow.worker.counters.CounterSet; import org.apache.beam.runners.dataflow.worker.graph.CloneAmbiguousFlattensFunction; import org.apache.beam.runners.dataflow.worker.graph.CreateExecutableStageNodeFunction; @@ -144,9 +148,15 @@ public class BatchDataflowWorker implements Closeable { private final SdkHarnessRegistry sdkHarnessRegistry; private final Function> mapTaskToNetwork; + private final CounterSet memoryCounter; private final MemoryMonitor memoryMonitor; private final Thread memoryMonitorThread; + private static final CounterName BATCH_WORK_ITEM_SUCCESS_COUNTER_NAME = + CounterName.named("work_item_success"); + private static final CounterName BATCH_WORK_ITEM_FAILURE_COUNTER_NAME = + CounterName.named("work_item_failure"); + /** * Returns a {@link BatchDataflowWorker} configured to execute user functions via intrinsic Java * execution. @@ -209,7 +219,12 @@ protected BatchDataflowWorker( .concurrencyLevel(CACHE_CONCURRENCY_LEVEL) .build(); - this.memoryMonitor = MemoryMonitor.fromOptions(options); + this.memoryCounter = new CounterSet(); + this.memoryMonitor = + MemoryMonitor.fromOptions( + options, + memoryCounter.longSum(MEMORY_MONITOR_NUM_PUSHBACKS.counterName()), + memoryCounter.longSum(MEMORY_MONITOR_IS_THRASHING.counterName())); this.statusPages = WorkerStatusPages.create( DEFAULT_STATUS_PORT, this.memoryMonitor, sdkHarnessRegistry::sdkHarnessesAreHealthy); @@ -324,6 +339,11 @@ boolean doWork(WorkItem workItem, WorkItemStatusClient workItemStatusClient) thr DataflowWorkExecutor worker = null; SdkWorkerHarness sdkWorkerHarness = sdkHarnessRegistry.getAvailableWorkerAndAssignWork(); + CounterSet counterSet = new CounterSet(); + Counter workItemsReceived = counterSet.longSum(WORK_ITEMS_RECEIVED.counterName()); + Counter workItemSuccess = counterSet.longSum(BATCH_WORK_ITEM_SUCCESS_COUNTER_NAME); + Counter workItemFailure = counterSet.longSum(BATCH_WORK_ITEM_FAILURE_COUNTER_NAME); + try { // Populate PipelineOptions with data from work unit. options.setProject(workItem.getProjectId()); @@ -337,10 +357,10 @@ boolean doWork(WorkItem workItem, WorkItemStatusClient workItemStatusClient) thr throw new RuntimeException("Unknown kind of work item: " + workItem.toString()); } - CounterSet counterSet = new CounterSet(); BatchModeExecutionContext executionContext = BatchModeExecutionContext.create( counterSet, + this.memoryCounter, sideInputDataCache, sideInputWeakReferenceCache, readerRegistry, @@ -383,11 +403,15 @@ boolean doWork(WorkItem workItem, WorkItemStatusClient workItemStatusClient) thr DataflowWorkProgressUpdater progressUpdater = new DataflowWorkProgressUpdater(workItemStatusClient, workItem, worker); + + workItemsReceived.addValue(1L); executeWork(worker, progressUpdater); + workItemSuccess.addValue(1L); workItemStatusClient.reportSuccess(); return true; } catch (Throwable e) { + workItemFailure.addValue(1L); workItemStatusClient.reportError(e); return false; diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BatchModeExecutionContext.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BatchModeExecutionContext.java index 7e836ff2cae8..d0ba6a502ce5 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BatchModeExecutionContext.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BatchModeExecutionContext.java @@ -34,6 +34,8 @@ import org.apache.beam.runners.core.metrics.MetricsContainerImpl; import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions; import org.apache.beam.runners.dataflow.worker.counters.CounterFactory; +import org.apache.beam.runners.dataflow.worker.counters.CounterSet; +import org.apache.beam.runners.dataflow.worker.counters.DataflowCounterUpdateExtractor; import org.apache.beam.runners.dataflow.worker.counters.NameContext; import org.apache.beam.runners.dataflow.worker.profiler.ScopedProfiler; import org.apache.beam.runners.dataflow.worker.profiler.ScopedProfiler.ProfileScope; @@ -61,6 +63,7 @@ public class BatchModeExecutionContext protected final Cache logicalReferenceCache; protected final PipelineOptions options; protected final ReaderFactory readerFactory; + private final CounterSet memoryDeltaCounter; private Object key; private final MetricsContainerRegistry containerRegistry; @@ -76,6 +79,7 @@ public class BatchModeExecutionContext private BatchModeExecutionContext( CounterFactory counterFactory, + CounterSet memoryDeltaCounter, Cache> dataCache, Cache logicalReferenceCache, ReaderFactory readerFactory, @@ -88,6 +92,7 @@ private BatchModeExecutionContext( executionStateTracker, executionStateRegistry, Long.MAX_VALUE); + this.memoryDeltaCounter = memoryDeltaCounter; this.logicalReferenceCache = logicalReferenceCache; this.readerFactory = readerFactory; this.options = options; @@ -110,6 +115,7 @@ public static BatchModeExecutionContext forTesting( BatchModeExecutionStateRegistry stateRegistry = new BatchModeExecutionStateRegistry(); return new BatchModeExecutionContext( counterFactory, + new CounterSet(), CacheBuilder.newBuilder() .maximumWeight(1_000_000) // weights are in bytes .weigher(Weighers.fixedWeightKeys(8)) @@ -220,6 +226,7 @@ protected DataflowOperationContext.DataflowExecutionState createState( public static BatchModeExecutionContext create( CounterFactory counterFactory, + CounterSet memoryCounter, Cache> dataCache, Cache logicalReferenceCache, ReaderFactory readerFactory, @@ -229,6 +236,7 @@ public static BatchModeExecutionContext create( BatchModeExecutionStateRegistry executionStateRegistry = new BatchModeExecutionStateRegistry(); return new BatchModeExecutionContext( counterFactory, + memoryCounter, dataCache, logicalReferenceCache, readerFactory, @@ -320,6 +328,10 @@ public Cache getLogicalReferenceCache() { return rval; } + public Iterable extractMemoryCounters() { + return memoryDeltaCounter.extractModifiedDeltaUpdates(DataflowCounterUpdateExtractor.INSTANCE); + } + /** {@link DataflowStepContext} used in batch mode. */ public class StepContext extends DataflowExecutionContext.DataflowStepContext { diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WorkItemStatusClient.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WorkItemStatusClient.java index 42446d1c8371..11fc5f9f77a0 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WorkItemStatusClient.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WorkItemStatusClient.java @@ -40,6 +40,7 @@ import org.apache.beam.runners.core.metrics.ExecutionStateTracker; import org.apache.beam.runners.core.metrics.ExecutionStateTracker.ExecutionState; import org.apache.beam.runners.core.metrics.MetricsContainerImpl; +import org.apache.beam.runners.dataflow.WorkerMetricsReceiver; import org.apache.beam.runners.dataflow.util.TimeUtil; import org.apache.beam.runners.dataflow.worker.counters.CounterSet; import org.apache.beam.runners.dataflow.worker.counters.DataflowCounterUpdateExtractor; @@ -48,6 +49,7 @@ import org.apache.beam.runners.dataflow.worker.util.common.worker.NativeReader.DynamicSplitResult; import org.apache.beam.runners.dataflow.worker.util.common.worker.NativeReader.Progress; import org.apache.beam.sdk.util.UserCodeException; +import org.apache.beam.sdk.util.common.ReflectHelpers; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList; import org.checkerframework.checker.nullness.qual.Nullable; @@ -69,6 +71,7 @@ public class WorkItemStatusClient { private final WorkUnitClient workUnitClient; private @Nullable DataflowWorkExecutor worker; private Long nextReportIndex; + private final Iterable workerMetricReceivers; private transient String uniqueWorkId = null; private boolean finalStateSent = false; @@ -88,6 +91,7 @@ public WorkItemStatusClient(WorkUnitClient workUnitClient, WorkItem workItem) { this.workItem = workItem; this.nextReportIndex = checkNotNull(workItem.getInitialReportIndex(), "WorkItem missing initial report index"); + this.workerMetricReceivers = ReflectHelpers.loadServicesOrdered(WorkerMetricsReceiver.class); } public String uniqueWorkId() { @@ -302,6 +306,9 @@ synchronized void populateCounterUpdates(WorkItemStatus status) { // MSec counters reported in worker extractMsecCounters(isFinalUpdate).forEach(appendCounterUpdate); + // Extract memory metrics in worker + extractMemoryCounters().forEach(appendCounterUpdate); + // Metrics reported in SDK runner. // This includes all different kinds of metrics coming from SDK. // Keep in mind that these metrics might contain different types of counter names: @@ -309,6 +316,17 @@ synchronized void populateCounterUpdates(WorkItemStatus status) { worker.extractMetricUpdates().forEach(appendCounterUpdate); status.setCounterUpdates(ImmutableList.copyOf(counterUpdatesMap.values())); + publishCounterUpdates(ImmutableList.copyOf(counterUpdatesMap.values())); + } + + private void publishCounterUpdates(List updates) { + try { + for (WorkerMetricsReceiver receiver : workerMetricReceivers) { + receiver.receiverCounterUpdates(updates); + } + } catch (Exception e) { + LOG.error("Error publishing counter updates", e); + } } private synchronized Iterable extractCounters(@Nullable CounterSet counters) { @@ -343,6 +361,12 @@ public Iterable extractMsecCounters(boolean isFinalUpdate) { : executionContext.extractMsecCounters(isFinalUpdate); } + public Iterable extractMemoryCounters() { + return executionContext == null + ? Collections.emptyList() + : executionContext.extractMemoryCounters(); + } + public long extractThrottleTime() { return executionContext == null ? 0L : executionContext.extractThrottleTime(); }