diff --git a/UPDATING.md b/UPDATING.md index 938d8d758ac19..fd7d4fc9b8df1 100644 --- a/UPDATING.md +++ b/UPDATING.md @@ -41,6 +41,11 @@ assists users migrating to a new version. ## Airflow Master +### Simplification of the TriggerDagRunOperator + +The TriggerDagRunOperator now takes a `conf` argument to which a dict can be provided as conf for the DagRun. +As a result, the `python_callable` argument was removed. PR: https://github.com/apache/airflow/pull/6317. + ### Changes in Google Cloud Platform related hooks The change in GCP operators implies that GCP Hooks for those operators require now keyword parameters rather diff --git a/airflow/example_dags/example_trigger_controller_dag.py b/airflow/example_dags/example_trigger_controller_dag.py index 1012f994d392a..f879b2e6b133f 100644 --- a/airflow/example_dags/example_trigger_controller_dag.py +++ b/airflow/example_dags/example_trigger_controller_dag.py @@ -16,58 +16,26 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. -"""This example illustrates the use of the TriggerDagRunOperator. There are 2 -entities at work in this scenario: -1. The Controller DAG - the DAG that conditionally executes the trigger -2. The Target DAG - DAG being triggered (in example_trigger_target_dag.py) -This example illustrates the following features : -1. A TriggerDagRunOperator that takes: - a. A python callable that decides whether or not to trigger the Target DAG - b. An optional params dict passed to the python callable to help in - evaluating whether or not to trigger the Target DAG - c. The id (name) of the Target DAG - d. The python callable can add contextual info to the DagRun created by - way of adding a Pickleable payload (e.g. dictionary of primitives). This - state is then made available to the TargetDag -2. A Target DAG : c.f. example_trigger_target_dag.py +""" +Example usage of the TriggerDagRunOperator. This example holds 2 DAGs: +1. 1st DAG (example_trigger_controller_dag) holds a TriggerDagRunOperator, which will trigger the 2nd DAG +2. 2nd DAG (example_trigger_target_dag) which will be triggered by the TriggerDagRunOperator in the 1st DAG """ -import pprint - -import airflow +import airflow.utils.dates from airflow import DAG from airflow.operators.dagrun_operator import TriggerDagRunOperator -pp = pprint.PrettyPrinter(indent=4) - - -def conditionally_trigger(context, dag_run_obj): - """This function decides whether or not to Trigger the remote DAG""" - c_p = context['params']['condition_param'] - print("Controller DAG : conditionally_trigger = {}".format(c_p)) - if context['params']['condition_param']: - dag_run_obj.payload = {'message': context['params']['message']} - pp.pprint(dag_run_obj.payload) - return dag_run_obj - return None - - -# Define the DAG dag = DAG( - dag_id='example_trigger_controller_dag', - default_args={ - "owner": "airflow", - "start_date": airflow.utils.dates.days_ago(2), - }, - schedule_interval='@once', + dag_id="example_trigger_controller_dag", + default_args={"owner": "airflow", "start_date": airflow.utils.dates.days_ago(2)}, + schedule_interval="@once", ) -# Define the single task in this controller example DAG trigger = TriggerDagRunOperator( - task_id='test_trigger_dagrun', - trigger_dag_id="example_trigger_target_dag", - python_callable=conditionally_trigger, - params={'condition_param': True, 'message': 'Hello World'}, + task_id="test_trigger_dagrun", + trigger_dag_id="example_trigger_target_dag", # Ensure this equals the dag_id of the DAG to trigger + conf={"message": "Hello World"}, dag=dag, ) diff --git a/airflow/example_dags/example_trigger_target_dag.py b/airflow/example_dags/example_trigger_target_dag.py index 32255103d804e..15e3e0c63766b 100644 --- a/airflow/example_dags/example_trigger_target_dag.py +++ b/airflow/example_dags/example_trigger_target_dag.py @@ -18,65 +18,37 @@ # under the License. """ -This example illustrates the use of the TriggerDagRunOperator. There are 2 -entities at work in this scenario: -1. The Controller DAG - the DAG that conditionally executes the trigger - (in example_trigger_controller.py) -2. The Target DAG - DAG being triggered - -This example illustrates the following features : -1. A TriggerDagRunOperator that takes: - a. A python callable that decides whether or not to trigger the Target DAG - b. An optional params dict passed to the python callable to help in - evaluating whether or not to trigger the Target DAG - c. The id (name) of the Target DAG - d. The python callable can add contextual info to the DagRun created by - way of adding a Pickleable payload (e.g. dictionary of primitives). This - state is then made available to the TargetDag -2. A Target DAG : c.f. example_trigger_target_dag.py +Example usage of the TriggerDagRunOperator. This example holds 2 DAGs: +1. 1st DAG (example_trigger_controller_dag) holds a TriggerDagRunOperator, which will trigger the 2nd DAG +2. 2nd DAG (example_trigger_target_dag) which will be triggered by the TriggerDagRunOperator in the 1st DAG """ -import pprint - -import airflow +import airflow.utils.dates from airflow.models import DAG from airflow.operators.bash_operator import BashOperator from airflow.operators.python_operator import PythonOperator -pp = pprint.PrettyPrinter(indent=4) - -args = { - 'start_date': airflow.utils.dates.days_ago(2), - 'owner': 'Airflow', -} - dag = DAG( - dag_id='example_trigger_target_dag', - default_args=args, + dag_id="example_trigger_target_dag", + default_args={"start_date": airflow.utils.dates.days_ago(2), "owner": "Airflow"}, schedule_interval=None, ) -def run_this_func(**kwargs): +def run_this_func(**context): """ Print the payload "message" passed to the DagRun conf attribute. - :param dict kwargs: Context + :param context: The execution context + :type context: dict """ - print("Remotely received value of {} for key=message". - format(kwargs['dag_run'].conf['message'])) + print("Remotely received value of {} for key=message".format(context["dag_run"].conf["message"])) -run_this = PythonOperator( - task_id='run_this', - python_callable=run_this_func, - dag=dag, -) +run_this = PythonOperator(task_id="run_this", python_callable=run_this_func, dag=dag) -# You can also access the DagRun object in templates bash_task = BashOperator( task_id="bash_task", - bash_command='echo "Here is the message: ' - '{{ dag_run.conf["message"] if dag_run else "" }}" ', + bash_command='echo "Here is the message: \'{{ dag_run.conf["message"] if dag_run else "" }}\'"', dag=dag, ) diff --git a/airflow/operators/dagrun_operator.py b/airflow/operators/dagrun_operator.py index 7d082fab85f3d..f1c37b270ef59 100644 --- a/airflow/operators/dagrun_operator.py +++ b/airflow/operators/dagrun_operator.py @@ -18,8 +18,7 @@ # under the License. import datetime -import json -from typing import Callable, Dict, Optional, Union +from typing import Dict, Optional, Union from airflow.api.common.experimental.trigger_dag import trigger_dag from airflow.models import BaseOperator @@ -27,72 +26,56 @@ from airflow.utils.decorators import apply_defaults -class DagRunOrder: - def __init__(self, run_id=None, payload=None): - self.run_id = run_id - self.payload = payload - - class TriggerDagRunOperator(BaseOperator): """ Triggers a DAG run for a specified ``dag_id`` :param trigger_dag_id: the dag_id to trigger (templated) :type trigger_dag_id: str - :param python_callable: a reference to a python function that will be - called while passing it the ``context`` object and a placeholder - object ``obj`` for your callable to fill and return if you want - a DagRun created. This ``obj`` object contains a ``run_id`` and - ``payload`` attribute that you can modify in your function. - The ``run_id`` should be a unique identifier for that DAG run, and - the payload has to be a picklable object that will be made available - to your tasks while executing that DAG run. Your function header - should look like ``def foo(context, dag_run_obj):`` - :type python_callable: python callable + :param conf: Configuration for the DAG run + :type conf: dict :param execution_date: Execution date for the dag (templated) :type execution_date: str or datetime.datetime """ - template_fields = ('trigger_dag_id', 'execution_date') - ui_color = '#ffefeb' + + template_fields = ("trigger_dag_id", "execution_date", "conf") + ui_color = "#ffefeb" @apply_defaults def __init__( - self, - trigger_dag_id: str, - python_callable: Optional[Callable[[Dict, DagRunOrder], DagRunOrder]] = None, - execution_date: Optional[Union[str, datetime.datetime]] = None, - *args, **kwargs) -> None: + self, + trigger_dag_id: str, + conf: Optional[Dict] = None, + execution_date: Optional[Union[str, datetime.datetime]] = None, + *args, + **kwargs + ) -> None: super().__init__(*args, **kwargs) - self.python_callable = python_callable self.trigger_dag_id = trigger_dag_id + self.conf = conf - self.execution_date = None # type: Optional[Union[str, datetime.datetime]] - if isinstance(execution_date, datetime.datetime): - self.execution_date = execution_date.isoformat() - elif isinstance(execution_date, str): + if execution_date is None or isinstance(execution_date, (str, datetime.datetime)): self.execution_date = execution_date - elif execution_date is None: - self.execution_date = None else: raise TypeError( - 'Expected str or datetime.datetime type ' - 'for execution_date. Got {}'.format( - type(execution_date))) + "Expected str or datetime.datetime type for execution_date. " + "Got {}".format(type(execution_date)) + ) - def execute(self, context): - if self.execution_date is not None: - run_id = 'trig__{}'.format(self.execution_date) - self.execution_date = timezone.parse(self.execution_date) + def execute(self, context: Dict): + if isinstance(self.execution_date, datetime.datetime): + run_id = "trig__{}".format(self.execution_date.isoformat()) + elif isinstance(self.execution_date, str): + run_id = "trig__{}".format(self.execution_date) + self.execution_date = timezone.parse(self.execution_date) # trigger_dag() expects datetime else: - run_id = 'trig__' + timezone.utcnow().isoformat() - dro = DagRunOrder(run_id=run_id) - if self.python_callable is not None: - dro = self.python_callable(context, dro) - if dro: - trigger_dag(dag_id=self.trigger_dag_id, - run_id=dro.run_id, - conf=json.dumps(dro.payload), - execution_date=self.execution_date, - replace_microseconds=False) - else: - self.log.info("Criteria not met, moving on") + run_id = "trig__{}".format(timezone.utcnow().isoformat()) + + # Ignore MyPy type for self.execution_date because it doesn't pick up the timezone.parse() for strings + trigger_dag( # type: ignore + dag_id=self.trigger_dag_id, + run_id=run_id, + conf=self.conf, + execution_date=self.execution_date, + replace_microseconds=False, + ) diff --git a/airflow/utils/dates.py b/airflow/utils/dates.py index 89e67db73adfc..6ebbeb6a1ece5 100644 --- a/airflow/utils/dates.py +++ b/airflow/utils/dates.py @@ -79,7 +79,7 @@ def date_range(start_date, end_date=None, num=None, delta=None): delta = abs(delta) dates = [] if end_date: - if timezone.is_naive(start_date): + if timezone.is_naive(start_date) and not timezone.is_naive(end_date): end_date = timezone.make_naive(end_date, tz) while start_date <= end_date: if timezone.is_naive(start_date): diff --git a/tests/core.py b/tests/core.py index 7207b01436530..b7046b0a553e2 100644 --- a/tests/core.py +++ b/tests/core.py @@ -55,7 +55,6 @@ ) from airflow.operators.bash_operator import BashOperator from airflow.operators.check_operator import CheckOperator, ValueCheckOperator -from airflow.operators.dagrun_operator import TriggerDagRunOperator from airflow.operators.dummy_operator import DummyOperator from airflow.operators.python_operator import PythonOperator from airflow.settings import Session @@ -511,18 +510,6 @@ def check_failure(context, test_case=self): start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True) self.assertTrue(data['called']) - def test_trigger_dagrun(self): - def trigga(_, obj): - if True: - return obj - - t = TriggerDagRunOperator( - task_id='test_trigger_dagrun', - trigger_dag_id='example_bash_operator', - python_callable=trigga, - dag=self.dag) - t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True) - def test_dryrun(self): t = BashOperator( task_id='test_dryrun', @@ -968,60 +955,6 @@ def test_run_command(self): self.assertEqual(run_command('echo "foo bar"'), 'foo bar\n') self.assertRaises(AirflowConfigException, run_command, 'bash -c "exit 1"') - def test_trigger_dagrun_with_execution_date(self): - utc_now = timezone.utcnow() - run_id = 'trig__' + utc_now.isoformat() - - def payload_generator(context, object): # pylint: disable=unused-argument - object.run_id = run_id - return object - - task = TriggerDagRunOperator(task_id='test_trigger_dagrun_with_execution_date', - trigger_dag_id='example_bash_operator', - python_callable=payload_generator, - execution_date=utc_now, - dag=self.dag) - task.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True) - dag_runs = DagRun.find(dag_id='example_bash_operator', run_id=run_id) - self.assertEqual(len(dag_runs), 1) - dag_run = dag_runs[0] - self.assertEqual(dag_run.execution_date, utc_now) - - def test_trigger_dagrun_with_str_execution_date(self): - utc_now_str = timezone.utcnow().isoformat() - self.assertIsInstance(utc_now_str, (str,)) - run_id = 'trig__' + utc_now_str - - def payload_generator(context, object): # pylint: disable=unused-argument - object.run_id = run_id - return object - - task = TriggerDagRunOperator( - task_id='test_trigger_dagrun_with_str_execution_date', - trigger_dag_id='example_bash_operator', - python_callable=payload_generator, - execution_date=utc_now_str, - dag=self.dag) - task.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True) - dag_runs = DagRun.find(dag_id='example_bash_operator', run_id=run_id) - self.assertEqual(len(dag_runs), 1) - dag_run = dag_runs[0] - self.assertEqual(dag_run.execution_date.isoformat(), utc_now_str) - - def test_trigger_dagrun_with_templated_execution_date(self): - task = TriggerDagRunOperator( - task_id='test_trigger_dagrun_with_str_execution_date', - trigger_dag_id='example_bash_operator', - execution_date='{{ execution_date }}', - dag=self.dag) - - self.assertTrue(isinstance(task.execution_date, str)) - self.assertEqual(task.execution_date, '{{ execution_date }}') - - ti = TaskInstance(task=task, execution_date=DEFAULT_DATE) - ti.render_templates() - self.assertEqual(timezone.parse(task.execution_date), DEFAULT_DATE) - def test_externally_triggered_dagrun(self): TI = TaskInstance diff --git a/tests/operators/test_dagrun_operator.py b/tests/operators/test_dagrun_operator.py new file mode 100644 index 0000000000000..586701dc63c75 --- /dev/null +++ b/tests/operators/test_dagrun_operator.py @@ -0,0 +1,143 @@ +# -*- coding: utf-8 -*- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import pathlib +import tempfile +from datetime import datetime +from unittest import TestCase + +from airflow.models import DAG, DagModel, DagRun, Log, TaskInstance +from airflow.operators.dagrun_operator import TriggerDagRunOperator +from airflow.utils import timezone +from airflow.utils.db import create_session + +DEFAULT_DATE = datetime(2019, 1, 1, tzinfo=timezone.utc) +TEST_DAG_ID = "testdag" +TRIGGERED_DAG_ID = "triggerdag" +DAG_SCRIPT = ( + "from datetime import datetime\n\n" + "from airflow.models import DAG\n" + "from airflow.operators.dummy_operator import DummyOperator\n\n" + "dag = DAG(\n" + 'dag_id="{dag_id}", \n' + 'default_args={{"start_date": datetime(2019, 1, 1)}}, \n' + "schedule_interval=None,\n" + ")\n" + 'task = DummyOperator(task_id="test", dag=dag)' +).format(dag_id=TRIGGERED_DAG_ID) + + +class TestDagRunOperator(TestCase): + def setUp(self): + # Airflow relies on reading the DAG from disk when triggering it. + # Therefore write a temp file holding the DAG to trigger. + with tempfile.NamedTemporaryFile(mode="w", delete=False) as f: + self._tmpfile = f.name + f.write(DAG_SCRIPT) + f.flush() + + with create_session() as session: + session.add(DagModel(dag_id=TRIGGERED_DAG_ID, fileloc=self._tmpfile)) + session.commit() + + self.dag = DAG(TEST_DAG_ID, default_args={"owner": "airflow", "start_date": DEFAULT_DATE}) + + def tearDown(self): + """Cleanup state after testing in DB.""" + with create_session() as session: + session.query(Log).filter(Log.dag_id == TEST_DAG_ID).delete(synchronize_session=False) + for dbmodel in [DagModel, DagRun, TaskInstance]: + session.query(dbmodel).filter(dbmodel.dag_id == TRIGGERED_DAG_ID).delete( + synchronize_session=False + ) + + pathlib.Path(self._tmpfile).unlink() + + def test_trigger_dagrun(self): + """Test TriggerDagRunOperator.""" + task = TriggerDagRunOperator(task_id="test_task", trigger_dag_id=TRIGGERED_DAG_ID, dag=self.dag) + task.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True) + + with create_session() as session: + dagruns = session.query(DagRun).filter(DagRun.dag_id == TRIGGERED_DAG_ID).all() + self.assertEqual(len(dagruns), 1) + self.assertTrue(dagruns[0].external_trigger) + + def test_trigger_dagrun_with_execution_date(self): + """Test TriggerDagRunOperator with custom execution_date.""" + utc_now = timezone.utcnow() + task = TriggerDagRunOperator( + task_id="test_trigger_dagrun_with_execution_date", + trigger_dag_id=TRIGGERED_DAG_ID, + execution_date=utc_now, + dag=self.dag, + ) + task.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True) + + with create_session() as session: + dagruns = session.query(DagRun).filter(DagRun.dag_id == TRIGGERED_DAG_ID).all() + self.assertEqual(len(dagruns), 1) + self.assertTrue(dagruns[0].external_trigger) + self.assertEqual(dagruns[0].execution_date, utc_now) + + def test_trigger_dagrun_with_templated_execution_date(self): + """Test TriggerDagRunOperator with templated execution_date.""" + task = TriggerDagRunOperator( + task_id="test_trigger_dagrun_with_str_execution_date", + trigger_dag_id=TRIGGERED_DAG_ID, + execution_date="{{ execution_date }}", + dag=self.dag, + ) + task.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True) + + with create_session() as session: + dagruns = session.query(DagRun).filter(DagRun.dag_id == TRIGGERED_DAG_ID).all() + self.assertEqual(len(dagruns), 1) + self.assertTrue(dagruns[0].external_trigger) + self.assertEqual(dagruns[0].execution_date, DEFAULT_DATE) + + def test_trigger_dagrun_operator_conf(self): + """Test passing conf to the triggered DagRun.""" + task = TriggerDagRunOperator( + task_id="test_trigger_dagrun_with_str_execution_date", + trigger_dag_id=TRIGGERED_DAG_ID, + conf={"foo": "bar"}, + dag=self.dag, + ) + task.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True) + + with create_session() as session: + dagruns = session.query(DagRun).filter(DagRun.dag_id == TRIGGERED_DAG_ID).all() + self.assertEqual(len(dagruns), 1) + self.assertTrue(dagruns[0].conf, {"foo": "bar"}) + + def test_trigger_dagrun_operator_templated_conf(self): + """Test passing a templated conf to the triggered DagRun.""" + task = TriggerDagRunOperator( + task_id="test_trigger_dagrun_with_str_execution_date", + trigger_dag_id=TRIGGERED_DAG_ID, + conf={"foo": "{{ dag.dag_id }}"}, + dag=self.dag, + ) + task.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True) + + with create_session() as session: + dagruns = session.query(DagRun).filter(DagRun.dag_id == TRIGGERED_DAG_ID).all() + self.assertEqual(len(dagruns), 1) + self.assertTrue(dagruns[0].conf, {"foo": TEST_DAG_ID})