From b2d26b6b5f376db079679d620a812af25c4a90f8 Mon Sep 17 00:00:00 2001 From: Danny McCormick Date: Tue, 13 Aug 2024 17:43:30 +0200 Subject: [PATCH] Fix upload_graph on v2 (#32165) * Fix upload_graph on v2 * compliation nits * compliation nits * remove streaming test change, update CHANGES * mutability fix * Test fix * Remove upload_graph from it --- CHANGES.md | 10 ++++++++++ .../beam/runners/dataflow/DataflowRunner.java | 12 +++++++++++- .../beam/runners/dataflow/DataflowRunnerTest.java | 13 +++++++++++++ 3 files changed, 34 insertions(+), 1 deletion(-) diff --git a/CHANGES.md b/CHANGES.md index 950abc694488..fce3aa26a72b 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -83,6 +83,7 @@ ## Bugfixes * Fixed incorrect service account impersonation flow for Python pipelines using BigQuery IOs ([#32030](https://github.com/apache/beam/issues/32030)). +* Auto-disable broken and meaningless `upload_graph` feature when using Dataflow Runner V2 ([#32159](https://github.com/apache/beam/issues/32159)). ## Security Fixes * Fixed (CVE-YYYY-NNNN)[https://www.cve.org/CVERecord?id=CVE-YYYY-NNNN] (Java/Python/Go) ([#X](https://github.com/apache/beam/issues/X)). @@ -116,6 +117,10 @@ * [BigQueryIO] Fixed a bug in batch Storage Write API that frequently exhausted concurrent connections quota ([#31710](https://github.com/apache/beam/pull/31710)) * Fixed a logging issue where Python worker dependency installation logs sometimes were not emitted in a timely manner ([#31977](https://github.com/apache/beam/pull/31977)) +## Known Issues + +* Large Dataflow graphs using runner v2, or pipelines explicitly enabling the `upload_graph` experiment, will fail at construction time ([#32159](https://github.com/apache/beam/issues/32159)). + # [2.57.0] - 2024-06-26 ## Highlights @@ -167,6 +172,10 @@ jackson-2.15 has known breaking changes. An important one is it imposed a buffer limit for parser. If your custom PTransform/DoFn are affected, refer to [#31580](https://github.com/apache/beam/pull/31580) for mitigation. +## Known Issues + +* Large Dataflow graphs using runner v2, or pipelines explicitly enabling the `upload_graph` experiment, will fail at construction time ([#32159](https://github.com/apache/beam/issues/32159)). + # [2.56.0] - 2024-05-01 ## Highlights @@ -202,6 +211,7 @@ * The beam interactive runner does not correctly run on flink ([#31168](https://github.com/apache/beam/issues/31168)). * When using the Flink runner from Python, 1.17 is not supported and 1.12/13 do not work correctly. Support for 1.17 will be added in 2.57.0, and the ability to choose 1.12/13 will be cleaned up and fully removed in 2.57.0 as well ([#31168](https://github.com/apache/beam/issues/31168)). +* Large Dataflow graphs using runner v2, or pipelines explicitly enabling the `upload_graph` experiment, will fail at construction time ([#32159](https://github.com/apache/beam/issues/32159)). # [2.55.1] - 2024-04-08 diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java index 708c63413268..abe7d0d364d3 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java @@ -1385,7 +1385,8 @@ public DataflowPipelineJob run(Pipeline pipeline) { byte[] jobGraphBytes = DataflowPipelineTranslator.jobToString(newJob).getBytes(UTF_8); int jobGraphByteSize = jobGraphBytes.length; if (jobGraphByteSize >= CREATE_JOB_REQUEST_LIMIT_BYTES - && !hasExperiment(options, "upload_graph")) { + && !hasExperiment(options, "upload_graph") + && !useUnifiedWorker(options)) { List experiments = firstNonNull(options.getExperiments(), Collections.emptyList()); options.setExperiments( ImmutableList.builder().addAll(experiments).add("upload_graph").build()); @@ -1396,6 +1397,15 @@ public DataflowPipelineJob run(Pipeline pipeline) { CREATE_JOB_REQUEST_LIMIT_BYTES); } + if (hasExperiment(options, "upload_graph") && useUnifiedWorker(options)) { + ArrayList experiments = new ArrayList<>(options.getExperiments()); + while (experiments.remove("upload_graph")) {} + options.setExperiments(experiments); + LOG.warn( + "The upload_graph experiment was specified, but it does not apply " + + "to runner v2 jobs. Option has been automatically removed."); + } + // Upload the job to GCS and remove the graph object from the API call. The graph // will be downloaded from GCS by the service. if (hasExperiment(options, "upload_graph")) { diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java index cf1066e41d25..37c20c61ad8e 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java @@ -838,6 +838,19 @@ public void testUploadGraph() throws IOException { .startsWith("gs://valid-bucket/temp/staging/dataflow_graph")); } + @Test + public void testUploadGraphV2IsNoOp() throws IOException { + DataflowPipelineOptions options = buildPipelineOptions(); + options.setExperiments(Arrays.asList("upload_graph", "use_runner_v2")); + Pipeline p = buildDataflowPipeline(options); + p.run(); + + ArgumentCaptor jobCaptor = ArgumentCaptor.forClass(Job.class); + Mockito.verify(mockJobs).create(eq(PROJECT_ID), eq(REGION_ID), jobCaptor.capture()); + assertValidJob(jobCaptor.getValue()); + assertNull(jobCaptor.getValue().getStepsLocation()); + } + /** Test for automatically using upload_graph when the job graph is too large (>10MB). */ @Test public void testUploadGraphWithAutoUpload() throws IOException {