Skip to content

Commit

Permalink
Merge pull request #15291 Restore "Default to Runner v2 for Python St…
Browse files Browse the repository at this point in the history
…reaming jobs. (#15140)"
  • Loading branch information
robertwb authored Oct 6, 2021
2 parents 44512e9 + 7d1ec78 commit abdd396
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 3 deletions.
9 changes: 8 additions & 1 deletion sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -594,9 +594,16 @@ def run_pipeline(self, pipeline, options):
return result

def _maybe_add_unified_worker_missing_options(self, options):
debug_options = options.view_as(DebugOptions)
# Streaming is always portable, default to runner v2.
if (options.view_as(StandardOptions).streaming and
not options.view_as(GoogleCloudOptions).dataflow_kms_key):
if not debug_options.lookup_experiment('disable_runner_v2'):
debug_options.add_experiment('beam_fn_api')
debug_options.add_experiment('use_runner_v2')
debug_options.add_experiment('use_portable_job_submission')
# set default beam_fn_api experiment if use unified
# worker experiment flag exists, no-op otherwise.
debug_options = options.view_as(DebugOptions)
from apache_beam.runners.dataflow.internal import apiclient
if apiclient._use_unified_worker(options):
if not debug_options.lookup_experiment('beam_fn_api'):
Expand Down
11 changes: 9 additions & 2 deletions sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,7 @@ def test_remote_runner_translation(self):
def test_streaming_create_translation(self):
remote_runner = DataflowRunner()
self.default_properties.append("--streaming")
self.default_properties.append("--experiments=disable_runner_v2")
with Pipeline(remote_runner, PipelineOptions(self.default_properties)) as p:
p | ptransform.Create([1]) # pylint: disable=expression-not-assigned
job_dict = json.loads(str(remote_runner.job))
Expand Down Expand Up @@ -839,15 +840,21 @@ def test_group_into_batches_translation_non_unified_worker(self):
'Runner determined sharding not available in Dataflow for '
'GroupIntoBatches for jobs not using Runner V2'):
_ = self._run_group_into_batches_and_get_step_properties(
True, ['--enable_streaming_engine'])
True,
['--enable_streaming_engine', '--experiments=disable_runner_v2'])

# JRH
with self.assertRaisesRegex(
ValueError,
'Runner determined sharding not available in Dataflow for '
'GroupIntoBatches for jobs not using Runner V2'):
_ = self._run_group_into_batches_and_get_step_properties(
True, ['--enable_streaming_engine', '--experiments=beam_fn_api'])
True,
[
'--enable_streaming_engine',
'--experiments=beam_fn_api',
'--experiments=disable_runner_v2'
])

def test_pack_combiners(self):
class PackableCombines(beam.PTransform):
Expand Down

0 comments on commit abdd396

Please sign in to comment.