diff --git a/airflow/providers/apache/beam/hooks/beam.py b/airflow/providers/apache/beam/hooks/beam.py index 76f9524c2bfac..28a5abc0c6e3e 100644 --- a/airflow/providers/apache/beam/hooks/beam.py +++ b/airflow/providers/apache/beam/hooks/beam.py @@ -18,6 +18,7 @@ """This module contains a Apache Beam Hook.""" from __future__ import annotations +import contextlib import json import os import select @@ -28,6 +29,8 @@ from tempfile import TemporaryDirectory from typing import Callable +from packaging.version import Version + from airflow.exceptions import AirflowConfigException, AirflowException from airflow.hooks.base import BaseHook from airflow.providers.google.go_module_utils import init_module, install_dependencies @@ -226,37 +229,47 @@ def start_python_pipeline( if "labels" in variables: variables["labels"] = [f"{key}={value}" for key, value in variables["labels"].items()] - if py_requirements is not None: - if not py_requirements and not py_system_site_packages: - warning_invalid_environment = textwrap.dedent( - """\ - Invalid method invocation. You have disabled inclusion of system packages and empty list - required for installation, so it is not possible to create a valid virtual environment. - In the virtual environment, apache-beam package must be installed for your job to be \ - executed. To fix this problem: - * install apache-beam on the system, then set parameter py_system_site_packages to True, - * add apache-beam to the list of required packages in parameter py_requirements. - """ - ) - raise AirflowException(warning_invalid_environment) - - with TemporaryDirectory(prefix="apache-beam-venv") as tmp_dir: + with contextlib.ExitStack() as exit_stack: + if py_requirements is not None: + if not py_requirements and not py_system_site_packages: + warning_invalid_environment = textwrap.dedent( + """\ + Invalid method invocation. You have disabled inclusion of system packages and empty + list required for installation, so it is not possible to create a valid virtual + environment. In the virtual environment, apache-beam package must be installed for + your job to be executed. + + To fix this problem: + * install apache-beam on the system, then set parameter py_system_site_packages + to True, + * add apache-beam to the list of required packages in parameter py_requirements. + """ + ) + raise AirflowException(warning_invalid_environment) + tmp_dir = exit_stack.enter_context(TemporaryDirectory(prefix="apache-beam-venv")) py_interpreter = prepare_virtualenv( venv_directory=tmp_dir, python_bin=py_interpreter, system_site_packages=py_system_site_packages, requirements=py_requirements, ) - command_prefix = [py_interpreter] + py_options + [py_file] - self._start_pipeline( - variables=variables, - command_prefix=command_prefix, - process_line_callback=process_line_callback, - ) - else: command_prefix = [py_interpreter] + py_options + [py_file] + beam_version = ( + subprocess.check_output( + [py_interpreter, "-c", "import apache_beam; print(apache_beam.__version__)"] + ) + .decode() + .strip() + ) + self.log.info("Beam version: %s", beam_version) + impersonate_service_account = variables.get("impersonate_service_account") + if impersonate_service_account: + if Version(beam_version) < Version("2.39.0") or True: + raise AirflowException( + "The impersonateServiceAccount option requires Apache Beam 2.39.0 or newer." + ) self._start_pipeline( variables=variables, command_prefix=command_prefix, diff --git a/airflow/providers/apache/beam/provider.yaml b/airflow/providers/apache/beam/provider.yaml index c7655bafb833b..96d76057d2e78 100644 --- a/airflow/providers/apache/beam/provider.yaml +++ b/airflow/providers/apache/beam/provider.yaml @@ -36,7 +36,7 @@ versions: dependencies: - apache-airflow>=2.3.0 - - apache-beam>=2.39.0 + - apache-beam>=2.33.0 integrations: - integration-name: Apache Beam diff --git a/airflow/providers/google/cloud/operators/dataflow.py b/airflow/providers/google/cloud/operators/dataflow.py index 7cfab6bbff598..b7dc2b149ced9 100644 --- a/airflow/providers/google/cloud/operators/dataflow.py +++ b/airflow/providers/google/cloud/operators/dataflow.py @@ -80,6 +80,11 @@ class DataflowConfiguration: If set as a sequence, the identities from the list must grant Service Account Token Creator IAM role to the directly preceding identity, with first account from the list granting this role to the originating account (templated). + + .. warning:: + + This option requires Apache Beam 2.39.0 or newer. + :param drain_pipeline: Optional, set to True if want to stop streaming job by draining it instead of canceling during killing task instance. See: https://cloud.google.com/dataflow/docs/guides/stopping-a-pipeline diff --git a/generated/provider_dependencies.json b/generated/provider_dependencies.json index e408ef7e37a35..247377759240d 100644 --- a/generated/provider_dependencies.json +++ b/generated/provider_dependencies.json @@ -45,7 +45,7 @@ "apache.beam": { "deps": [ "apache-airflow>=2.3.0", - "apache-beam>=2.39.0" + "apache-beam>=2.33.0" ], "cross-providers-deps": [ "google" diff --git a/tests/providers/apache/beam/hooks/test_beam.py b/tests/providers/apache/beam/hooks/test_beam.py index b4c4ddd27f794..bc9e8187f008b 100644 --- a/tests/providers/apache/beam/hooks/test_beam.py +++ b/tests/providers/apache/beam/hooks/test_beam.py @@ -18,11 +18,13 @@ import copy import os +import re import subprocess import unittest from unittest import mock from unittest.mock import MagicMock +import pytest from parameterized import parameterized from airflow.exceptions import AirflowException @@ -58,7 +60,8 @@ class TestBeamHook(unittest.TestCase): @mock.patch(BEAM_STRING.format("BeamCommandRunner")) - def test_start_python_pipeline(self, mock_runner): + @mock.patch("airflow.providers.apache.beam.hooks.beam.subprocess.check_output", return_value=b"2.39.0") + def test_start_python_pipeline(self, mock_check_output, mock_runner): hook = BeamHook(runner=DEFAULT_RUNNER) wait_for_done = mock_runner.return_value.wait_for_done process_line_callback = MagicMock() @@ -83,6 +86,26 @@ def test_start_python_pipeline(self, mock_runner): ) wait_for_done.assert_called_once_with() + @mock.patch("airflow.providers.apache.beam.hooks.beam.subprocess.check_output", return_value=b"2.35.0") + def test_start_python_pipeline_unsupported_option(self, mock_check_output): + hook = BeamHook(runner=DEFAULT_RUNNER) + + with pytest.raises( + AirflowException, + match=re.escape("The impersonateServiceAccount option requires Apache Beam 2.39.0 or newer."), + ): + hook.start_python_pipeline( + variables={ + "impersonate_service_account": "test@impersonation.com", + }, + py_file="/tmp/file.py", + py_options=["-m"], + py_interpreter="python3", + py_requirements=None, + py_system_site_packages=False, + process_line_callback=MagicMock(), + ) + @parameterized.expand( [ ("default_to_python3", "python3"), @@ -92,7 +115,10 @@ def test_start_python_pipeline(self, mock_runner): ] ) @mock.patch(BEAM_STRING.format("BeamCommandRunner")) - def test_start_python_pipeline_with_custom_interpreter(self, _, py_interpreter, mock_runner): + @mock.patch("airflow.providers.apache.beam.hooks.beam.subprocess.check_output", return_value=b"2.39.0") + def test_start_python_pipeline_with_custom_interpreter( + self, _, py_interpreter, mock_check_output, mock_runner + ): hook = BeamHook(runner=DEFAULT_RUNNER) wait_for_done = mock_runner.return_value.wait_for_done process_line_callback = MagicMock() @@ -127,8 +153,14 @@ def test_start_python_pipeline_with_custom_interpreter(self, _, py_interpreter, ) @mock.patch(BEAM_STRING.format("prepare_virtualenv")) @mock.patch(BEAM_STRING.format("BeamCommandRunner")) + @mock.patch("airflow.providers.apache.beam.hooks.beam.subprocess.check_output", return_value=b"2.39.0") def test_start_python_pipeline_with_non_empty_py_requirements_and_without_system_packages( - self, current_py_requirements, current_py_system_site_packages, mock_runner, mock_virtualenv + self, + current_py_requirements, + current_py_system_site_packages, + mock_check_output, + mock_runner, + mock_virtualenv, ): hook = BeamHook(runner=DEFAULT_RUNNER) wait_for_done = mock_runner.return_value.wait_for_done @@ -164,7 +196,10 @@ def test_start_python_pipeline_with_non_empty_py_requirements_and_without_system ) @mock.patch(BEAM_STRING.format("BeamCommandRunner")) - def test_start_python_pipeline_with_empty_py_requirements_and_without_system_packages(self, mock_runner): + @mock.patch("airflow.providers.apache.beam.hooks.beam.subprocess.check_output", return_value=b"2.39.0") + def test_start_python_pipeline_with_empty_py_requirements_and_without_system_packages( + self, mock_check_output, mock_runner + ): hook = BeamHook(runner=DEFAULT_RUNNER) wait_for_done = mock_runner.return_value.wait_for_done process_line_callback = MagicMock()