From 39a4e88f07ab8bd4b1fe2e2c6fa9ec6a45b9e61f Mon Sep 17 00:00:00 2001 From: Rik Heijdens Date: Thu, 3 Dec 2020 18:23:23 +0100 Subject: [PATCH 1/9] Add a test that reproduces airflow#12785 Added a test case to reproduce the issue reported in https://github.com/apache/airflow/issues/12785 --- tests/plugins/test_plugins_manager.py | 29 +++++++++++++++++++++++++++ 1 file changed, 29 insertions(+) diff --git a/tests/plugins/test_plugins_manager.py b/tests/plugins/test_plugins_manager.py index 117df989c4c23..c0a4ba03e7f15 100644 --- a/tests/plugins/test_plugins_manager.py +++ b/tests/plugins/test_plugins_manager.py @@ -15,6 +15,7 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. +import importlib import logging import unittest from unittest import mock @@ -213,6 +214,34 @@ def test_entrypoint_plugin_errors_dont_raise_exceptions(self, caplog): assert "Failed to import plugin test-entrypoint" in received_logs assert ("test.plugins.test_plugins_manager", "my_fake_module not found") in import_errors.items() + def test_registering_plugin_macros(self): + """ + Tests whether macros that originate from plugins are being registered correctly. + """ + from airflow import macros + from airflow.plugins_manager import integrate_macros_plugins + + def custom_macro(): + return 'foo' + + class MacroPlugin(AirflowPlugin): + name = 'macro_plugin' + macros = [custom_macro] + + with mock_plugin_manager(plugins=[MacroPlugin()]): + # Ensure the macros for the plugin have been integrated. + integrate_macros_plugins() + # Test whether the modules have been created as expected. + plugin_macros = importlib.import_module(f"airflow.macros.{MacroPlugin.name}") + for macro in MacroPlugin.macros: + # Verify that the macros added by the plugin are being set correctly + # on the plugin's macro module. + assert hasattr(plugin_macros, macro.__name__) + # Verify that the symbol table in airflow.macros has been updated with an entry for + # this plugin, this is necessary in order to allow the plugin's macros to be used when + # rendering templates. + assert hasattr(macros, MacroPlugin.name) + class TestPluginsDirectorySource(unittest.TestCase): def test_should_return_correct_path_name(self): From a6eec0c56992eee1ec003ab8be7b12820a5d570b Mon Sep 17 00:00:00 2001 From: Rik Heijdens Date: Thu, 3 Dec 2020 18:34:19 +0100 Subject: [PATCH 2/9] Fix plugin-provided macros not being exposed on airflow.macros In order to allow a plugin-provided macro to be used at templating time, it needs to be exposed through the airflow.macros module. This commit fixes GitHub issue airflow#12785. --- airflow/plugins_manager.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/airflow/plugins_manager.py b/airflow/plugins_manager.py index 3286f52a9c223..235e2ed482243 100644 --- a/airflow/plugins_manager.py +++ b/airflow/plugins_manager.py @@ -398,6 +398,7 @@ def integrate_macros_plugins() -> None: global plugins global macros_modules # pylint: enable=global-statement + from airflow import macros if macros_modules is not None: return @@ -419,4 +420,9 @@ def integrate_macros_plugins() -> None: if macros_module: macros_modules.append(macros_module) - sys.modules[macros_module.__name__] = macros_module # pylint: disable=no-member + # pylint: disable=no-member + sys.modules[macros_module.__name__] = macros_module + # Register the newly created module on airflow.macros such that it + # can be accessed when rendering templates. + setattr(macros, macros_module.__name__.split('.')[-1], macros_module) + # pylint: enable=no-member From 6ba9f40dcc5995cc7bb43e0176b24fbfc2c18523 Mon Sep 17 00:00:00 2001 From: Rik Heijdens Date: Thu, 3 Dec 2020 20:00:44 +0100 Subject: [PATCH 3/9] Use plugin.name instead of splitting the module name --- airflow/plugins_manager.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/airflow/plugins_manager.py b/airflow/plugins_manager.py index 235e2ed482243..9478f7e24c961 100644 --- a/airflow/plugins_manager.py +++ b/airflow/plugins_manager.py @@ -420,9 +420,7 @@ def integrate_macros_plugins() -> None: if macros_module: macros_modules.append(macros_module) - # pylint: disable=no-member - sys.modules[macros_module.__name__] = macros_module + sys.modules[macros_module.__name__] = macros_module # pylint: disable=no-member # Register the newly created module on airflow.macros such that it # can be accessed when rendering templates. - setattr(macros, macros_module.__name__.split('.')[-1], macros_module) - # pylint: enable=no-member + setattr(macros, plugin.name, macros_module) From 36af9167e72975b08c119a7858bba6784d352f5c Mon Sep 17 00:00:00 2001 From: Rik Heijdens Date: Thu, 3 Dec 2020 20:21:15 +0100 Subject: [PATCH 4/9] Add cleanup logic to test_registering_plugin_macros This test-case has side-effects in the sense that the symbol table of the airflow.macros module is altered when integrate_macros_plugins() is invoked. This commit adds a finalizer to the test case that ensures that that module is being reloaded completely in order to prevent impact on other tests. --- tests/plugins/test_plugins_manager.py | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/tests/plugins/test_plugins_manager.py b/tests/plugins/test_plugins_manager.py index c0a4ba03e7f15..5ed0b91ee2b7d 100644 --- a/tests/plugins/test_plugins_manager.py +++ b/tests/plugins/test_plugins_manager.py @@ -17,6 +17,7 @@ # under the License. import importlib import logging +import sys import unittest from unittest import mock @@ -214,13 +215,24 @@ def test_entrypoint_plugin_errors_dont_raise_exceptions(self, caplog): assert "Failed to import plugin test-entrypoint" in received_logs assert ("test.plugins.test_plugins_manager", "my_fake_module not found") in import_errors.items() - def test_registering_plugin_macros(self): + def test_registering_plugin_macros(self, request): """ Tests whether macros that originate from plugins are being registered correctly. """ from airflow import macros from airflow.plugins_manager import integrate_macros_plugins + def cleanup_macros(): + """Reloads the airflow.macros module such that the symbol table is reset after the test.""" + # We're explicitly deleting the module from sys.modules and importing it again + # using import_module() as opposed to using importlib.reload() because the latter + # does not undo the changes to the airflow.macros module that are being caused by + # invoking integrate_macros_plugins() + del sys.modules['airflow.macros'] + importlib.import_module('airflow.macros') + + request.addfinalizer(cleanup_macros) + def custom_macro(): return 'foo' From 087c74f4a563bd61e3a91dd3b23565e349c95c51 Mon Sep 17 00:00:00 2001 From: Rik Heijdens Date: Fri, 4 Dec 2020 14:29:17 +0100 Subject: [PATCH 5/9] Remove access to airflow.macros for subprocesses The process that runs in the virtual environment may not be able to access macros that have been provided by plugins. E.g. in cases where the virtual environment uses a different major version of Python than Airflow itself. In order to prevent this from being an issue macros has been removed from the serializable context. --- airflow/operators/python.py | 2 +- tests/operators/test_python.py | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/airflow/operators/python.py b/airflow/operators/python.py index c03f6b5750337..fd36a1d476fad 100644 --- a/airflow/operators/python.py +++ b/airflow/operators/python.py @@ -431,7 +431,7 @@ class PythonVirtualenvOperator(PythonOperator): 'prev_execution_date_success', 'prev_start_date_success', } - AIRFLOW_SERIALIZABLE_CONTEXT_KEYS = {'macros', 'conf', 'dag', 'dag_run', 'task'} + AIRFLOW_SERIALIZABLE_CONTEXT_KEYS = {'conf', 'dag', 'dag_run', 'task'} @apply_defaults def __init__( # pylint: disable=too-many-arguments diff --git a/tests/operators/test_python.py b/tests/operators/test_python.py index a6300c030754a..83bf050ef7a35 100644 --- a/tests/operators/test_python.py +++ b/tests/operators/test_python.py @@ -1266,7 +1266,6 @@ def f( prev_execution_date_success, prev_start_date_success, # airflow-specific - macros, conf, dag, dag_run, From 348ffba0fee158e943e73bc513af22a11e41dba3 Mon Sep 17 00:00:00 2001 From: Rik Heijdens Date: Mon, 7 Dec 2020 09:26:37 +0100 Subject: [PATCH 6/9] Revert "Remove access to airflow.macros for subprocesses" This reverts commit 5a6c3cfd19c584a2b5ab2220ee580f8fd85ddb17. --- airflow/operators/python.py | 2 +- tests/operators/test_python.py | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/airflow/operators/python.py b/airflow/operators/python.py index fd36a1d476fad..c03f6b5750337 100644 --- a/airflow/operators/python.py +++ b/airflow/operators/python.py @@ -431,7 +431,7 @@ class PythonVirtualenvOperator(PythonOperator): 'prev_execution_date_success', 'prev_start_date_success', } - AIRFLOW_SERIALIZABLE_CONTEXT_KEYS = {'conf', 'dag', 'dag_run', 'task'} + AIRFLOW_SERIALIZABLE_CONTEXT_KEYS = {'macros', 'conf', 'dag', 'dag_run', 'task'} @apply_defaults def __init__( # pylint: disable=too-many-arguments diff --git a/tests/operators/test_python.py b/tests/operators/test_python.py index 83bf050ef7a35..a6300c030754a 100644 --- a/tests/operators/test_python.py +++ b/tests/operators/test_python.py @@ -1266,6 +1266,7 @@ def f( prev_execution_date_success, prev_start_date_success, # airflow-specific + macros, conf, dag, dag_run, From b7713705682759d0e0ba4d4b8da0b12348143256 Mon Sep 17 00:00:00 2001 From: Rik Heijdens Date: Mon, 7 Dec 2020 10:04:43 +0100 Subject: [PATCH 7/9] Integrate plugin-provided macros in subprocesses When Airflow is available in a virtual environment, and when this environment runs at least Python 3, then plugin-provided macros should be made available to the Python callable that is being executed in this environment. --- airflow/utils/python_virtualenv_script.jinja2 | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/airflow/utils/python_virtualenv_script.jinja2 b/airflow/utils/python_virtualenv_script.jinja2 index f2dd87525d807..f32527af51b49 100644 --- a/airflow/utils/python_virtualenv_script.jinja2 +++ b/airflow/utils/python_virtualenv_script.jinja2 @@ -20,6 +20,18 @@ import {{ pickling_library }} import sys +# Check whether Airflow is available in the environment. +# If it is, we'll want to ensure that we integrate any macros that are being provided +# by plugins prior to unpickling the task context. +if int(sys.version[0]) >= 3: + try: + from airflow.plugins_manager import integrate_macros_plugins + integrate_macros_plugins() + except ImportError: + # Airflow is not available in this environment, therefore we won't + # be able to integrate any plugin macros. + pass + # Read args {% if op_args or op_kwargs %} with open(sys.argv[1], "rb") as file: From cee7686ab15b5acdc4967ed775b00af48cd5990c Mon Sep 17 00:00:00 2001 From: Rik Heijdens Date: Mon, 7 Dec 2020 10:31:47 +0100 Subject: [PATCH 8/9] Implement a more strict version-check --- airflow/utils/python_virtualenv_script.jinja2 | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airflow/utils/python_virtualenv_script.jinja2 b/airflow/utils/python_virtualenv_script.jinja2 index f32527af51b49..95534e2695197 100644 --- a/airflow/utils/python_virtualenv_script.jinja2 +++ b/airflow/utils/python_virtualenv_script.jinja2 @@ -23,7 +23,7 @@ import sys # Check whether Airflow is available in the environment. # If it is, we'll want to ensure that we integrate any macros that are being provided # by plugins prior to unpickling the task context. -if int(sys.version[0]) >= 3: +if sys.version_info.major >= 3 and sys.version_info.minor >= 6: try: from airflow.plugins_manager import integrate_macros_plugins integrate_macros_plugins() From cb64e9683e834293df3f003e30d88b415689581b Mon Sep 17 00:00:00 2001 From: Rik Heijdens Date: Mon, 7 Dec 2020 10:38:12 +0100 Subject: [PATCH 9/9] Document macros limitation Plugin-provided macros can not be used on Python 2 when using PythonVirtualenvOperator any longer. --- airflow/operators/python.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/airflow/operators/python.py b/airflow/operators/python.py index c03f6b5750337..bb37c98dafa88 100644 --- a/airflow/operators/python.py +++ b/airflow/operators/python.py @@ -364,7 +364,8 @@ class PythonVirtualenvOperator(PythonOperator): string_args). In addition, one can pass stuff through op_args and op_kwargs, and one can use a return value. Note that if your virtualenv runs in a different Python major version than Airflow, - you cannot use return values, op_args, or op_kwargs. You can use string_args though. + you cannot use return values, op_args, op_kwargs, or use any macros that are being provided to + Airflow through plugins. You can use string_args though. .. seealso:: For more information on how to use this operator, take a look at the guide: