diff --git a/airflow/operators/python.py b/airflow/operators/python.py index c03f6b575033..bb37c98dafa8 100644 --- a/airflow/operators/python.py +++ b/airflow/operators/python.py @@ -364,7 +364,8 @@ class PythonVirtualenvOperator(PythonOperator): string_args). In addition, one can pass stuff through op_args and op_kwargs, and one can use a return value. Note that if your virtualenv runs in a different Python major version than Airflow, - you cannot use return values, op_args, or op_kwargs. You can use string_args though. + you cannot use return values, op_args, op_kwargs, or use any macros that are being provided to + Airflow through plugins. You can use string_args though. .. seealso:: For more information on how to use this operator, take a look at the guide: diff --git a/airflow/plugins_manager.py b/airflow/plugins_manager.py index efd2c5682c39..c53329d52e77 100644 --- a/airflow/plugins_manager.py +++ b/airflow/plugins_manager.py @@ -401,6 +401,7 @@ def integrate_macros_plugins() -> None: global plugins global macros_modules # pylint: enable=global-statement + from airflow import macros if macros_modules is not None: return @@ -423,3 +424,6 @@ def integrate_macros_plugins() -> None: if macros_module: macros_modules.append(macros_module) sys.modules[macros_module.__name__] = macros_module # pylint: disable=no-member + # Register the newly created module on airflow.macros such that it + # can be accessed when rendering templates. + setattr(macros, plugin.name, macros_module) diff --git a/airflow/utils/python_virtualenv_script.jinja2 b/airflow/utils/python_virtualenv_script.jinja2 index f2dd87525d80..95534e269519 100644 --- a/airflow/utils/python_virtualenv_script.jinja2 +++ b/airflow/utils/python_virtualenv_script.jinja2 @@ -20,6 +20,18 @@ import {{ pickling_library }} import sys +# Check whether Airflow is available in the environment. +# If it is, we'll want to ensure that we integrate any macros that are being provided +# by plugins prior to unpickling the task context. +if sys.version_info.major >= 3 and sys.version_info.minor >= 6: + try: + from airflow.plugins_manager import integrate_macros_plugins + integrate_macros_plugins() + except ImportError: + # Airflow is not available in this environment, therefore we won't + # be able to integrate any plugin macros. + pass + # Read args {% if op_args or op_kwargs %} with open(sys.argv[1], "rb") as file: diff --git a/tests/plugins/test_plugins_manager.py b/tests/plugins/test_plugins_manager.py index 2bd239f4331b..384ada0a17dd 100644 --- a/tests/plugins/test_plugins_manager.py +++ b/tests/plugins/test_plugins_manager.py @@ -15,6 +15,7 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. +import importlib import logging import sys import unittest @@ -217,6 +218,45 @@ def test_entrypoint_plugin_errors_dont_raise_exceptions(self, caplog): assert "Failed to import plugin test-entrypoint" in received_logs assert ("test.plugins.test_plugins_manager", "my_fake_module not found") in import_errors.items() + def test_registering_plugin_macros(self, request): + """ + Tests whether macros that originate from plugins are being registered correctly. + """ + from airflow import macros + from airflow.plugins_manager import integrate_macros_plugins + + def cleanup_macros(): + """Reloads the airflow.macros module such that the symbol table is reset after the test.""" + # We're explicitly deleting the module from sys.modules and importing it again + # using import_module() as opposed to using importlib.reload() because the latter + # does not undo the changes to the airflow.macros module that are being caused by + # invoking integrate_macros_plugins() + del sys.modules['airflow.macros'] + importlib.import_module('airflow.macros') + + request.addfinalizer(cleanup_macros) + + def custom_macro(): + return 'foo' + + class MacroPlugin(AirflowPlugin): + name = 'macro_plugin' + macros = [custom_macro] + + with mock_plugin_manager(plugins=[MacroPlugin()]): + # Ensure the macros for the plugin have been integrated. + integrate_macros_plugins() + # Test whether the modules have been created as expected. + plugin_macros = importlib.import_module(f"airflow.macros.{MacroPlugin.name}") + for macro in MacroPlugin.macros: + # Verify that the macros added by the plugin are being set correctly + # on the plugin's macro module. + assert hasattr(plugin_macros, macro.__name__) + # Verify that the symbol table in airflow.macros has been updated with an entry for + # this plugin, this is necessary in order to allow the plugin's macros to be used when + # rendering templates. + assert hasattr(macros, MacroPlugin.name) + class TestPluginsDirectorySource(unittest.TestCase): def test_should_return_correct_path_name(self):