Skip to content

Commit

Permalink
Fix upload_graph on v2 (#32165)
Browse files Browse the repository at this point in the history
* Fix upload_graph on v2

* compliation nits

* compliation nits

* remove streaming test change, update CHANGES

* mutability fix

* Test fix

* Remove upload_graph from it
  • Loading branch information
damccorm authored Aug 13, 2024
1 parent 7466803 commit b2d26b6
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 1 deletion.
10 changes: 10 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@
## Bugfixes

* Fixed incorrect service account impersonation flow for Python pipelines using BigQuery IOs ([#32030](https://github.com/apache/beam/issues/32030)).
* Auto-disable broken and meaningless `upload_graph` feature when using Dataflow Runner V2 ([#32159](https://github.com/apache/beam/issues/32159)).

## Security Fixes
* Fixed (CVE-YYYY-NNNN)[https://www.cve.org/CVERecord?id=CVE-YYYY-NNNN] (Java/Python/Go) ([#X](https://github.com/apache/beam/issues/X)).
Expand Down Expand Up @@ -116,6 +117,10 @@
* [BigQueryIO] Fixed a bug in batch Storage Write API that frequently exhausted concurrent connections quota ([#31710](https://github.com/apache/beam/pull/31710))
* Fixed a logging issue where Python worker dependency installation logs sometimes were not emitted in a timely manner ([#31977](https://github.com/apache/beam/pull/31977))

## Known Issues

* Large Dataflow graphs using runner v2, or pipelines explicitly enabling the `upload_graph` experiment, will fail at construction time ([#32159](https://github.com/apache/beam/issues/32159)).

# [2.57.0] - 2024-06-26

## Highlights
Expand Down Expand Up @@ -167,6 +172,10 @@
jackson-2.15 has known breaking changes. An important one is it imposed a buffer limit for parser.
If your custom PTransform/DoFn are affected, refer to [#31580](https://github.com/apache/beam/pull/31580) for mitigation.

## Known Issues

* Large Dataflow graphs using runner v2, or pipelines explicitly enabling the `upload_graph` experiment, will fail at construction time ([#32159](https://github.com/apache/beam/issues/32159)).

# [2.56.0] - 2024-05-01

## Highlights
Expand Down Expand Up @@ -202,6 +211,7 @@

* The beam interactive runner does not correctly run on flink ([#31168](https://github.com/apache/beam/issues/31168)).
* When using the Flink runner from Python, 1.17 is not supported and 1.12/13 do not work correctly. Support for 1.17 will be added in 2.57.0, and the ability to choose 1.12/13 will be cleaned up and fully removed in 2.57.0 as well ([#31168](https://github.com/apache/beam/issues/31168)).
* Large Dataflow graphs using runner v2, or pipelines explicitly enabling the `upload_graph` experiment, will fail at construction time ([#32159](https://github.com/apache/beam/issues/32159)).

# [2.55.1] - 2024-04-08

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1385,7 +1385,8 @@ public DataflowPipelineJob run(Pipeline pipeline) {
byte[] jobGraphBytes = DataflowPipelineTranslator.jobToString(newJob).getBytes(UTF_8);
int jobGraphByteSize = jobGraphBytes.length;
if (jobGraphByteSize >= CREATE_JOB_REQUEST_LIMIT_BYTES
&& !hasExperiment(options, "upload_graph")) {
&& !hasExperiment(options, "upload_graph")
&& !useUnifiedWorker(options)) {
List<String> experiments = firstNonNull(options.getExperiments(), Collections.emptyList());
options.setExperiments(
ImmutableList.<String>builder().addAll(experiments).add("upload_graph").build());
Expand All @@ -1396,6 +1397,15 @@ public DataflowPipelineJob run(Pipeline pipeline) {
CREATE_JOB_REQUEST_LIMIT_BYTES);
}

if (hasExperiment(options, "upload_graph") && useUnifiedWorker(options)) {
ArrayList<String> experiments = new ArrayList<>(options.getExperiments());
while (experiments.remove("upload_graph")) {}
options.setExperiments(experiments);
LOG.warn(
"The upload_graph experiment was specified, but it does not apply "
+ "to runner v2 jobs. Option has been automatically removed.");
}

// Upload the job to GCS and remove the graph object from the API call. The graph
// will be downloaded from GCS by the service.
if (hasExperiment(options, "upload_graph")) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -838,6 +838,19 @@ public void testUploadGraph() throws IOException {
.startsWith("gs://valid-bucket/temp/staging/dataflow_graph"));
}

@Test
public void testUploadGraphV2IsNoOp() throws IOException {
DataflowPipelineOptions options = buildPipelineOptions();
options.setExperiments(Arrays.asList("upload_graph", "use_runner_v2"));
Pipeline p = buildDataflowPipeline(options);
p.run();

ArgumentCaptor<Job> jobCaptor = ArgumentCaptor.forClass(Job.class);
Mockito.verify(mockJobs).create(eq(PROJECT_ID), eq(REGION_ID), jobCaptor.capture());
assertValidJob(jobCaptor.getValue());
assertNull(jobCaptor.getValue().getStepsLocation());
}

/** Test for automatically using upload_graph when the job graph is too large (>10MB). */
@Test
public void testUploadGraphWithAutoUpload() throws IOException {
Expand Down

0 comments on commit b2d26b6

Please sign in to comment.