Skip to content

Commit

Permalink
Fix plugin macros not being exposed through airflow.macros (#12788)
Browse files Browse the repository at this point in the history
In order to allow a plugin-provided macro to be used at templating time,
it needs to be exposed through the airflow.macros module.

* Add cleanup logic to test_registering_plugin_macros

This test-case has side-effects in the sense that the symbol table of
the airflow.macros module is altered when integrate_macros_plugins() is
invoked. This commit adds a finalizer to the test case that ensures that
that module is being reloaded completely in order to prevent impact on
other tests.


* Integrate plugin-provided macros in subprocesses

When Airflow is available in a virtual environment, and when this
environment runs at least Python 3, then plugin-provided macros should
be made available to the Python callable that is being executed in this
environment.

* Document macros limitation

Plugin-provided macros can not be used on Python 2 when using
PythonVirtualenvOperator any longer.
  • Loading branch information
RikHeijdens committed Dec 7, 2020
1 parent f66a46d commit 29d7848
Show file tree
Hide file tree
Showing 4 changed files with 58 additions and 1 deletion.
3 changes: 2 additions & 1 deletion airflow/operators/python.py
Original file line number Diff line number Diff line change
Expand Up @@ -364,7 +364,8 @@ class PythonVirtualenvOperator(PythonOperator):
string_args). In addition, one can pass stuff through op_args and op_kwargs, and one
can use a return value.
Note that if your virtualenv runs in a different Python major version than Airflow,
you cannot use return values, op_args, or op_kwargs. You can use string_args though.
you cannot use return values, op_args, op_kwargs, or use any macros that are being provided to
Airflow through plugins. You can use string_args though.
.. seealso::
For more information on how to use this operator, take a look at the guide:
Expand Down
4 changes: 4 additions & 0 deletions airflow/plugins_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -401,6 +401,7 @@ def integrate_macros_plugins() -> None:
global plugins
global macros_modules
# pylint: enable=global-statement
from airflow import macros

if macros_modules is not None:
return
Expand All @@ -423,3 +424,6 @@ def integrate_macros_plugins() -> None:
if macros_module:
macros_modules.append(macros_module)
sys.modules[macros_module.__name__] = macros_module # pylint: disable=no-member
# Register the newly created module on airflow.macros such that it
# can be accessed when rendering templates.
setattr(macros, plugin.name, macros_module)
12 changes: 12 additions & 0 deletions airflow/utils/python_virtualenv_script.jinja2
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,18 @@
import {{ pickling_library }}
import sys

# Check whether Airflow is available in the environment.
# If it is, we'll want to ensure that we integrate any macros that are being provided
# by plugins prior to unpickling the task context.
if sys.version_info.major >= 3 and sys.version_info.minor >= 6:
try:
from airflow.plugins_manager import integrate_macros_plugins
integrate_macros_plugins()
except ImportError:
# Airflow is not available in this environment, therefore we won't
# be able to integrate any plugin macros.
pass

# Read args
{% if op_args or op_kwargs %}
with open(sys.argv[1], "rb") as file:
Expand Down
40 changes: 40 additions & 0 deletions tests/plugins/test_plugins_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import importlib
import logging
import sys
import unittest
Expand Down Expand Up @@ -217,6 +218,45 @@ def test_entrypoint_plugin_errors_dont_raise_exceptions(self, caplog):
assert "Failed to import plugin test-entrypoint" in received_logs
assert ("test.plugins.test_plugins_manager", "my_fake_module not found") in import_errors.items()

def test_registering_plugin_macros(self, request):
"""
Tests whether macros that originate from plugins are being registered correctly.
"""
from airflow import macros
from airflow.plugins_manager import integrate_macros_plugins

def cleanup_macros():
"""Reloads the airflow.macros module such that the symbol table is reset after the test."""
# We're explicitly deleting the module from sys.modules and importing it again
# using import_module() as opposed to using importlib.reload() because the latter
# does not undo the changes to the airflow.macros module that are being caused by
# invoking integrate_macros_plugins()
del sys.modules['airflow.macros']
importlib.import_module('airflow.macros')

request.addfinalizer(cleanup_macros)

def custom_macro():
return 'foo'

class MacroPlugin(AirflowPlugin):
name = 'macro_plugin'
macros = [custom_macro]

with mock_plugin_manager(plugins=[MacroPlugin()]):
# Ensure the macros for the plugin have been integrated.
integrate_macros_plugins()
# Test whether the modules have been created as expected.
plugin_macros = importlib.import_module(f"airflow.macros.{MacroPlugin.name}")
for macro in MacroPlugin.macros:
# Verify that the macros added by the plugin are being set correctly
# on the plugin's macro module.
assert hasattr(plugin_macros, macro.__name__)
# Verify that the symbol table in airflow.macros has been updated with an entry for
# this plugin, this is necessary in order to allow the plugin's macros to be used when
# rendering templates.
assert hasattr(macros, MacroPlugin.name)


class TestPluginsDirectorySource(unittest.TestCase):
def test_should_return_correct_path_name(self):
Expand Down

0 comments on commit 29d7848

Please sign in to comment.