Skip to content

Commit

Permalink
Add deferrable mode to BeamRunPythonPipelineOperator (#31471)
Browse files Browse the repository at this point in the history
* Add deferrable mode to BeamRunPythonPipelineOperator

* Fix docs

* Fix deferrable parameter in init

* fix system test
  • Loading branch information
VladaZakharova committed Jul 13, 2023
1 parent e013236 commit 44b4a37
Show file tree
Hide file tree
Showing 12 changed files with 1,023 additions and 46 deletions.
190 changes: 187 additions & 3 deletions airflow/providers/apache/beam/hooks/beam.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,19 @@
"""This module contains a Apache Beam Hook."""
from __future__ import annotations

import asyncio
import contextlib
import copy
import functools
import json
import logging
import os
import select
import shlex
import shutil
import subprocess
import tempfile
import textwrap
from tempfile import TemporaryDirectory
from typing import Callable

from packaging.version import Version
Expand Down Expand Up @@ -222,7 +224,7 @@ def start_python_pipeline(
If a value is passed to this parameter, a new virtual environment has been created with
additional packages installed.
You could also install the apache-beam package if it is not installed on your system or you want
You could also install the apache-beam package if it is not installed on your system, or you want
to use a different version.
:param py_system_site_packages: Whether to include system_site_packages in your virtualenv.
See virtualenv documentation for more information.
Expand Down Expand Up @@ -251,7 +253,7 @@ def start_python_pipeline(
"""
)
raise AirflowException(warning_invalid_environment)
tmp_dir = exit_stack.enter_context(TemporaryDirectory(prefix="apache-beam-venv"))
tmp_dir = exit_stack.enter_context(tempfile.TemporaryDirectory(prefix="apache-beam-venv"))
py_interpreter = prepare_virtualenv(
venv_directory=tmp_dir,
python_bin=py_interpreter,
Expand Down Expand Up @@ -381,3 +383,185 @@ def start_go_pipeline_with_binary(
command_prefix=command_prefix,
process_line_callback=process_line_callback,
)


class BeamAsyncHook(BeamHook):
"""
Asynchronous hook for Apache Beam.
:param runner: Runner type.
"""

def __init__(
self,
runner: str,
) -> None:
self.runner = runner
super().__init__(runner=self.runner)

@staticmethod
async def _create_tmp_dir(prefix: str) -> str:
"""Helper method to create temporary directory."""
# Creating separate thread to create temporary directory
loop = asyncio.get_running_loop()
partial_func = functools.partial(tempfile.mkdtemp, prefix=prefix)
tmp_dir = await loop.run_in_executor(None, partial_func)
return tmp_dir

@staticmethod
async def _cleanup_tmp_dir(tmp_dir: str) -> None:
"""
Helper method to delete temporary directory after finishing work with it.
Is uses `rmtree` method to recursively remove the temporary directory.
"""
shutil.rmtree(tmp_dir)

async def start_python_pipeline_async(
self,
variables: dict,
py_file: str,
py_options: list[str] | None = None,
py_interpreter: str = "python3",
py_requirements: list[str] | None = None,
py_system_site_packages: bool = False,
):
"""
Starts Apache Beam python pipeline.
:param variables: Variables passed to the pipeline.
:param py_file: Path to the python file to execute.
:param py_options: Additional options.
:param py_interpreter: Python version of the Apache Beam pipeline.
If None, this defaults to the python3.
To track python versions supported by beam and related
issues check: https://issues.apache.org/jira/browse/BEAM-1251
:param py_requirements: Additional python package(s) to install.
If a value is passed to this parameter, a new virtual environment has been created with
additional packages installed.
You could also install the apache-beam package if it is not installed on your system, or you want
to use a different version.
:param py_system_site_packages: Whether to include system_site_packages in your virtualenv.
See virtualenv documentation for more information.
This option is only relevant if the ``py_requirements`` parameter is not None.
"""
if "labels" in variables:
variables["labels"] = [f"{key}={value}" for key, value in variables["labels"].items()]

# Creating temporary directory
tmp_dir = await self._create_tmp_dir(prefix="apache-beam-venv")

async with contextlib.AsyncExitStack() as exit_stack:
if py_requirements is not None:
if not py_requirements and not py_system_site_packages:
warning_invalid_environment = textwrap.dedent(
"""\
Invalid method invocation. You have disabled inclusion of system packages and empty
list required for installation, so it is not possible to create a valid virtual
environment. In the virtual environment, apache-beam package must be installed for
your job to be executed.
To fix this problem:
* install apache-beam on the system, then set parameter py_system_site_packages
to True,
* add apache-beam to the list of required packages in parameter py_requirements.
"""
)
raise AirflowException(warning_invalid_environment)

# Pushing asynchronous callback to ensure the cleanup of the temporary
# directory when the asynchronous context is exited
exit_stack.push_async_callback(self._cleanup_tmp_dir, tmp_dir)

py_interpreter = prepare_virtualenv(
venv_directory=tmp_dir,
python_bin=py_interpreter,
system_site_packages=py_system_site_packages,
requirements=py_requirements,
)
command_prefix: list[str] = [py_interpreter] + (py_options or []) + [py_file]
beam_version = (
subprocess.check_output(
[py_interpreter, "-c", "import apache_beam; print(apache_beam.__version__)"]
)
.decode()
.strip()
)
self.log.info("Beam version: %s", beam_version)
impersonate_service_account = variables.get("impersonate_service_account")
if impersonate_service_account:
if Version(beam_version) < Version("2.39.0") or True:
raise AirflowException(
"The impersonateServiceAccount option requires Apache Beam 2.39.0 or newer."
)
return_code = await self.start_pipeline_async(
variables=variables,
command_prefix=command_prefix,
)
return return_code

async def start_pipeline_async(
self,
variables: dict,
command_prefix: list[str],
working_directory: str | None = None,
) -> int:
cmd = command_prefix + [
f"--runner={self.runner}",
]
if variables:
cmd.extend(beam_options_to_args(variables))
return await self.run_beam_command_async(
cmd=cmd,
working_directory=working_directory,
log=self.log,
)

async def run_beam_command_async(
self,
cmd: list[str],
log: logging.Logger,
working_directory: str | None = None,
) -> int:
"""
Function responsible for running pipeline command in subprocess.
:param cmd: Parts of the command to be run in subprocess
:param working_directory: Working directory
:param log: logger.
"""
cmd_str_representation = " ".join(shlex.quote(c) for c in cmd)
log.info("Running command: %s", cmd_str_representation)

# Creating a separate asynchronous process
process = await asyncio.create_subprocess_shell(
cmd_str_representation,
shell=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
close_fds=True,
cwd=working_directory,
)
# Waits for Apache Beam pipeline to complete.
log.info("Start waiting for Apache Beam process to complete.")

# Creating separate threads for stdout and stderr
stdout_task = asyncio.create_task(self.read_logs(process.stdout))
stderr_task = asyncio.create_task(self.read_logs(process.stderr))

# Waiting for the both tasks to complete
await asyncio.gather(stdout_task, stderr_task)

# Wait for the process to complete and return return_code
return_code = await process.wait()
log.info("Process exited with return code: %s", return_code)

if return_code != 0:
raise AirflowException(f"Apache Beam process failed with return code {return_code}")
return return_code

async def read_logs(self, stream_reader):
while True:
line = await stream_reader.readline()
if not line:
break
decoded_line = line.decode().strip()
self.log.info(decoded_line)
Loading

0 comments on commit 44b4a37

Please sign in to comment.