Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix plugin macros not being exposed through airflow.macros #12788

Merged
merged 9 commits into from
Dec 7, 2020
3 changes: 2 additions & 1 deletion airflow/operators/python.py
Original file line number Diff line number Diff line change
Expand Up @@ -364,7 +364,8 @@ class PythonVirtualenvOperator(PythonOperator):
string_args). In addition, one can pass stuff through op_args and op_kwargs, and one
can use a return value.
Note that if your virtualenv runs in a different Python major version than Airflow,
you cannot use return values, op_args, or op_kwargs. You can use string_args though.
you cannot use return values, op_args, op_kwargs, or use any macros that are being provided to
Airflow through plugins. You can use string_args though.

.. seealso::
For more information on how to use this operator, take a look at the guide:
Expand Down
4 changes: 4 additions & 0 deletions airflow/plugins_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -398,6 +398,7 @@ def integrate_macros_plugins() -> None:
global plugins
global macros_modules
# pylint: enable=global-statement
from airflow import macros

if macros_modules is not None:
return
Expand All @@ -420,3 +421,6 @@ def integrate_macros_plugins() -> None:
if macros_module:
macros_modules.append(macros_module)
sys.modules[macros_module.__name__] = macros_module # pylint: disable=no-member
# Register the newly created module on airflow.macros such that it
# can be accessed when rendering templates.
setattr(macros, plugin.name, macros_module)
12 changes: 12 additions & 0 deletions airflow/utils/python_virtualenv_script.jinja2
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,18 @@
import {{ pickling_library }}
import sys

# Check whether Airflow is available in the environment.
# If it is, we'll want to ensure that we integrate any macros that are being provided
# by plugins prior to unpickling the task context.
if sys.version_info.major >= 3 and sys.version_info.minor >= 6:
try:
from airflow.plugins_manager import integrate_macros_plugins
integrate_macros_plugins()
except ImportError:
# Airflow is not available in this environment, therefore we won't
# be able to integrate any plugin macros.
pass

# Read args
{% if op_args or op_kwargs %}
with open(sys.argv[1], "rb") as file:
Expand Down
41 changes: 41 additions & 0 deletions tests/plugins/test_plugins_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import importlib
import logging
import sys
import unittest
from unittest import mock

Expand Down Expand Up @@ -213,6 +215,45 @@ def test_entrypoint_plugin_errors_dont_raise_exceptions(self, caplog):
assert "Failed to import plugin test-entrypoint" in received_logs
assert ("test.plugins.test_plugins_manager", "my_fake_module not found") in import_errors.items()

def test_registering_plugin_macros(self, request):
"""
Tests whether macros that originate from plugins are being registered correctly.
"""
from airflow import macros
from airflow.plugins_manager import integrate_macros_plugins

def cleanup_macros():
"""Reloads the airflow.macros module such that the symbol table is reset after the test."""
# We're explicitly deleting the module from sys.modules and importing it again
# using import_module() as opposed to using importlib.reload() because the latter
# does not undo the changes to the airflow.macros module that are being caused by
# invoking integrate_macros_plugins()
del sys.modules['airflow.macros']
importlib.import_module('airflow.macros')

request.addfinalizer(cleanup_macros)

def custom_macro():
return 'foo'

class MacroPlugin(AirflowPlugin):
name = 'macro_plugin'
macros = [custom_macro]

with mock_plugin_manager(plugins=[MacroPlugin()]):
# Ensure the macros for the plugin have been integrated.
integrate_macros_plugins()
# Test whether the modules have been created as expected.
plugin_macros = importlib.import_module(f"airflow.macros.{MacroPlugin.name}")
for macro in MacroPlugin.macros:
# Verify that the macros added by the plugin are being set correctly
# on the plugin's macro module.
assert hasattr(plugin_macros, macro.__name__)
# Verify that the symbol table in airflow.macros has been updated with an entry for
# this plugin, this is necessary in order to allow the plugin's macros to be used when
# rendering templates.
assert hasattr(macros, MacroPlugin.name)
Copy link
Contributor Author

@RikHeijdens RikHeijdens Dec 3, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

integrate_macros_plugins() has the side-effect that it modifies the airflow.macros module. Do we need to add additional cleanup logic to reset the contents of airflow.macros through a finalizer here?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will importlib.reload help?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Simply reloading the module using importlib.reload() will not be sufficient as the module's symbol table (dictionary) is retained.

When a module is reloaded, its dictionary (containing the module’s global variables) is retained. Redefinitions of names will override the old definitions, so this is generally not a problem. If the new version of a module does not define a name that was defined by the old version, the old definition remains.

I think we'll have to delete the module from sys.modules and import it again in order to properly remove the entries that are being added by this test case.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've implemented this in c2b5354 -- I'm curious to your thoughts.

Copy link
Member

@mik-laj mik-laj Dec 3, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I love this change.😻 I'm just wondering if it's worth moving this code to the context manager mock_plugins_manager. This allows us to limit the side effects in other tests as well. What do you think?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I considered doing this, but the __doc__ comment on the mock_plugin_manager seems to suggest that the scope of that mock is limited to the airflow.plugins module.

While that mock does in fact clear out the macros_modules variable in airflow.plugins, it does not actually attempt to reverse any (side) effects that are caused by invoking integrate_macros_plugins(). I did a bit of searching through the code base, and it doesn't really appear that there is any test coverage for this function beyond the test I've just added.

Because I want to avoid scope creep for the mock_plugin_manager fixture, and this is the only test case that actually appears to have to deal with side effects cause by calling integrate_macros_plugins(), I'm a bit hesitant to make changes beyond what's being proposed here.



class TestPluginsDirectorySource(unittest.TestCase):
def test_should_return_correct_path_name(self):
Expand Down