Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix upload_graph on v2 #32165

Merged
merged 7 commits into from
Aug 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@
## Bugfixes

* Fixed incorrect service account impersonation flow for Python pipelines using BigQuery IOs ([#32030](https://github.com/apache/beam/issues/32030)).
* Auto-disable broken and meaningless `upload_graph` feature when using Dataflow Runner V2 ([#32159](https://github.com/apache/beam/issues/32159)).

## Security Fixes
* Fixed (CVE-YYYY-NNNN)[https://www.cve.org/CVERecord?id=CVE-YYYY-NNNN] (Java/Python/Go) ([#X](https://github.com/apache/beam/issues/X)).
Expand Down Expand Up @@ -116,6 +117,10 @@
* [BigQueryIO] Fixed a bug in batch Storage Write API that frequently exhausted concurrent connections quota ([#31710](https://github.com/apache/beam/pull/31710))
* Fixed a logging issue where Python worker dependency installation logs sometimes were not emitted in a timely manner ([#31977](https://github.com/apache/beam/pull/31977))

## Known Issues

* Large Dataflow graphs using runner v2, or pipelines explicitly enabling the `upload_graph` experiment, will fail at construction time ([#32159](https://github.com/apache/beam/issues/32159)).

# [2.57.0] - 2024-06-26

## Highlights
Expand Down Expand Up @@ -167,6 +172,10 @@
jackson-2.15 has known breaking changes. An important one is it imposed a buffer limit for parser.
If your custom PTransform/DoFn are affected, refer to [#31580](https://github.com/apache/beam/pull/31580) for mitigation.

## Known Issues

* Large Dataflow graphs using runner v2, or pipelines explicitly enabling the `upload_graph` experiment, will fail at construction time ([#32159](https://github.com/apache/beam/issues/32159)).

# [2.56.0] - 2024-05-01

## Highlights
Expand Down Expand Up @@ -202,6 +211,7 @@

* The beam interactive runner does not correctly run on flink ([#31168](https://github.com/apache/beam/issues/31168)).
* When using the Flink runner from Python, 1.17 is not supported and 1.12/13 do not work correctly. Support for 1.17 will be added in 2.57.0, and the ability to choose 1.12/13 will be cleaned up and fully removed in 2.57.0 as well ([#31168](https://github.com/apache/beam/issues/31168)).
* Large Dataflow graphs using runner v2, or pipelines explicitly enabling the `upload_graph` experiment, will fail at construction time ([#32159](https://github.com/apache/beam/issues/32159)).

# [2.55.1] - 2024-04-08

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1385,7 +1385,8 @@ public DataflowPipelineJob run(Pipeline pipeline) {
byte[] jobGraphBytes = DataflowPipelineTranslator.jobToString(newJob).getBytes(UTF_8);
int jobGraphByteSize = jobGraphBytes.length;
if (jobGraphByteSize >= CREATE_JOB_REQUEST_LIMIT_BYTES
&& !hasExperiment(options, "upload_graph")) {
&& !hasExperiment(options, "upload_graph")
&& !useUnifiedWorker(options)) {
List<String> experiments = firstNonNull(options.getExperiments(), Collections.emptyList());
options.setExperiments(
ImmutableList.<String>builder().addAll(experiments).add("upload_graph").build());
Expand All @@ -1396,6 +1397,15 @@ public DataflowPipelineJob run(Pipeline pipeline) {
CREATE_JOB_REQUEST_LIMIT_BYTES);
}

if (hasExperiment(options, "upload_graph") && useUnifiedWorker(options)) {
ArrayList<String> experiments = new ArrayList<>(options.getExperiments());
while (experiments.remove("upload_graph")) {}
options.setExperiments(experiments);
LOG.warn(
"The upload_graph experiment was specified, but it does not apply "
+ "to runner v2 jobs. Option has been automatically removed.");
}

// Upload the job to GCS and remove the graph object from the API call. The graph
// will be downloaded from GCS by the service.
if (hasExperiment(options, "upload_graph")) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -838,6 +838,19 @@ public void testUploadGraph() throws IOException {
.startsWith("gs://valid-bucket/temp/staging/dataflow_graph"));
}

@Test
public void testUploadGraphV2IsNoOp() throws IOException {
DataflowPipelineOptions options = buildPipelineOptions();
options.setExperiments(Arrays.asList("upload_graph", "use_runner_v2"));
Pipeline p = buildDataflowPipeline(options);
p.run();

ArgumentCaptor<Job> jobCaptor = ArgumentCaptor.forClass(Job.class);
Mockito.verify(mockJobs).create(eq(PROJECT_ID), eq(REGION_ID), jobCaptor.capture());
assertValidJob(jobCaptor.getValue());
assertNull(jobCaptor.getValue().getStepsLocation());
}

/** Test for automatically using upload_graph when the job graph is too large (>10MB). */
@Test
public void testUploadGraphWithAutoUpload() throws IOException {
Expand Down
Loading