diff --git a/CHANGES.md b/CHANGES.md index 49b4fdfe89b0..a97035fdc297 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -71,6 +71,7 @@ should handle this. ([#25252](https://github.com/apache/beam/issues/25252)). ## New Features / Improvements * X feature added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)). +* `upload_graph` as one of the Experiments options for DataflowRunner is no longer required when the graph is larger than 10MB for Java SDK ([PR#28621](https://github.com/apache/beam/pull/28621). ## Breaking Changes diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java index 891b4c0454c9..80b4e4cfd8b5 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java @@ -168,7 +168,6 @@ import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Joiner; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings; -import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Utf8; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables; @@ -1330,15 +1329,26 @@ public DataflowPipelineJob run(Pipeline pipeline) { hooks.modifyEnvironmentBeforeSubmission(newJob.getEnvironment()); } + // enable upload_graph when the graph is too large + byte[] jobGraphBytes = DataflowPipelineTranslator.jobToString(newJob).getBytes(UTF_8); + int jobGraphByteSize = jobGraphBytes.length; + if (jobGraphByteSize >= CREATE_JOB_REQUEST_LIMIT_BYTES + && !hasExperiment(options, "upload_graph")) { + List experiments = firstNonNull(options.getExperiments(), new ArrayList<>()); + experiments.add("upload_graph"); + options.setExperiments(ImmutableList.copyOf(experiments)); + LOG.info( + "The job graph size ({} in bytes) is larger than {}. Automatically add " + + "the upload_graph option to experiments.", + jobGraphByteSize, + CREATE_JOB_REQUEST_LIMIT_BYTES); + } + // Upload the job to GCS and remove the graph object from the API call. The graph // will be downloaded from GCS by the service. if (hasExperiment(options, "upload_graph")) { DataflowPackage stagedGraph = - options - .getStager() - .stageToFile( - DataflowPipelineTranslator.jobToString(newJob).getBytes(UTF_8), - DATAFLOW_GRAPH_FILE_NAME); + options.getStager().stageToFile(jobGraphBytes, DATAFLOW_GRAPH_FILE_NAME); newJob.getSteps().clear(); newJob.setStepsLocation(stagedGraph.getLocation()); } @@ -1398,7 +1408,7 @@ public DataflowPipelineJob run(Pipeline pipeline) { } catch (GoogleJsonResponseException e) { String errorMessages = "Unexpected errors"; if (e.getDetails() != null) { - if (Utf8.encodedLength(newJob.toString()) >= CREATE_JOB_REQUEST_LIMIT_BYTES) { + if (jobGraphByteSize >= CREATE_JOB_REQUEST_LIMIT_BYTES) { errorMessages = "The size of the serialized JSON representation of the pipeline " + "exceeds the allowable limit. " diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java index 078f25e0e38e..bcdea03dba2c 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java @@ -242,7 +242,7 @@ public void setUp() throws IOException { mockJobs = mock(Dataflow.Projects.Locations.Jobs.class); } - private Pipeline buildDataflowPipeline(DataflowPipelineOptions options) { + private static Pipeline buildDataflowPipeline(DataflowPipelineOptions options) { options.setStableUniqueNames(CheckEnabled.ERROR); options.setRunner(DataflowRunner.class); Pipeline p = Pipeline.create(options); @@ -256,6 +256,22 @@ private Pipeline buildDataflowPipeline(DataflowPipelineOptions options) { return p; } + private static Pipeline buildDataflowPipelineWithLargeGraph(DataflowPipelineOptions options) { + options.setStableUniqueNames(CheckEnabled.ERROR); + options.setRunner(DataflowRunner.class); + Pipeline p = Pipeline.create(options); + + for (int i = 0; i < 100; i++) { + p.apply("ReadMyFile_" + i, TextIO.read().from("gs://bucket/object")) + .apply("WriteMyFile_" + i, TextIO.write().to("gs://bucket/object")); + } + + // Enable the FileSystems API to know about gs:// URIs in this test. + FileSystems.setDefaultPipelineOptions(options); + + return p; + } + private static Dataflow buildMockDataflow(Dataflow.Projects.Locations.Jobs mockJobs) throws IOException { Dataflow mockDataflowClient = mock(Dataflow.class); @@ -824,6 +840,24 @@ public void testUploadGraph() throws IOException { .startsWith("gs://valid-bucket/temp/staging/dataflow_graph")); } + /** Test for automatically using upload_graph when the job graph is too large (>10MB). */ + @Test + public void testUploadGraphWithAutoUpload() throws IOException { + DataflowPipelineOptions options = buildPipelineOptions(); + Pipeline p = buildDataflowPipelineWithLargeGraph(options); + p.run(); + + ArgumentCaptor jobCaptor = ArgumentCaptor.forClass(Job.class); + Mockito.verify(mockJobs).create(eq(PROJECT_ID), eq(REGION_ID), jobCaptor.capture()); + assertValidJob(jobCaptor.getValue()); + assertTrue(jobCaptor.getValue().getSteps().isEmpty()); + assertTrue( + jobCaptor + .getValue() + .getStepsLocation() + .startsWith("gs://valid-bucket/temp/staging/dataflow_graph")); + } + @Test public void testUpdateNonExistentPipeline() throws IOException { thrown.expect(IllegalArgumentException.class);