Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[AIRFLOW-5644] Simplify TriggerDagRunOperator usage #6317

Merged
merged 2 commits into from
Oct 23, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions UPDATING.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,11 @@ assists users migrating to a new version.

## Airflow Master

### Simplification of the TriggerDagRunOperator

The TriggerDagRunOperator now takes a `conf` argument to which a dict can be provided as conf for the DagRun.
As a result, the `python_callable` argument was removed. PR: https://github.com/apache/airflow/pull/6317.

### Changes in Google Cloud Platform related hooks

The change in GCP operators implies that GCP Hooks for those operators require now keyword parameters rather
Expand Down
54 changes: 11 additions & 43 deletions airflow/example_dags/example_trigger_controller_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,58 +16,26 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""This example illustrates the use of the TriggerDagRunOperator. There are 2
entities at work in this scenario:
1. The Controller DAG - the DAG that conditionally executes the trigger
2. The Target DAG - DAG being triggered (in example_trigger_target_dag.py)

This example illustrates the following features :
1. A TriggerDagRunOperator that takes:
a. A python callable that decides whether or not to trigger the Target DAG
b. An optional params dict passed to the python callable to help in
evaluating whether or not to trigger the Target DAG
c. The id (name) of the Target DAG
d. The python callable can add contextual info to the DagRun created by
way of adding a Pickleable payload (e.g. dictionary of primitives). This
state is then made available to the TargetDag
2. A Target DAG : c.f. example_trigger_target_dag.py
"""
Example usage of the TriggerDagRunOperator. This example holds 2 DAGs:
1. 1st DAG (example_trigger_controller_dag) holds a TriggerDagRunOperator, which will trigger the 2nd DAG
2. 2nd DAG (example_trigger_target_dag) which will be triggered by the TriggerDagRunOperator in the 1st DAG
"""

import pprint

import airflow
import airflow.utils.dates
from airflow import DAG
from airflow.operators.dagrun_operator import TriggerDagRunOperator

pp = pprint.PrettyPrinter(indent=4)


def conditionally_trigger(context, dag_run_obj):
"""This function decides whether or not to Trigger the remote DAG"""
c_p = context['params']['condition_param']
print("Controller DAG : conditionally_trigger = {}".format(c_p))
if context['params']['condition_param']:
dag_run_obj.payload = {'message': context['params']['message']}
pp.pprint(dag_run_obj.payload)
return dag_run_obj
return None


# Define the DAG
dag = DAG(
dag_id='example_trigger_controller_dag',
default_args={
"owner": "airflow",
"start_date": airflow.utils.dates.days_ago(2),
},
schedule_interval='@once',
dag_id="example_trigger_controller_dag",
default_args={"owner": "airflow", "start_date": airflow.utils.dates.days_ago(2)},
schedule_interval="@once",
)

# Define the single task in this controller example DAG
trigger = TriggerDagRunOperator(
task_id='test_trigger_dagrun',
trigger_dag_id="example_trigger_target_dag",
python_callable=conditionally_trigger,
params={'condition_param': True, 'message': 'Hello World'},
task_id="test_trigger_dagrun",
trigger_dag_id="example_trigger_target_dag", # Ensure this equals the dag_id of the DAG to trigger
conf={"message": "Hello World"},
dag=dag,
)
52 changes: 12 additions & 40 deletions airflow/example_dags/example_trigger_target_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,65 +18,37 @@
# under the License.

"""
This example illustrates the use of the TriggerDagRunOperator. There are 2
entities at work in this scenario:
1. The Controller DAG - the DAG that conditionally executes the trigger
(in example_trigger_controller.py)
2. The Target DAG - DAG being triggered

This example illustrates the following features :
1. A TriggerDagRunOperator that takes:
a. A python callable that decides whether or not to trigger the Target DAG
b. An optional params dict passed to the python callable to help in
evaluating whether or not to trigger the Target DAG
c. The id (name) of the Target DAG
d. The python callable can add contextual info to the DagRun created by
way of adding a Pickleable payload (e.g. dictionary of primitives). This
state is then made available to the TargetDag
2. A Target DAG : c.f. example_trigger_target_dag.py
Example usage of the TriggerDagRunOperator. This example holds 2 DAGs:
1. 1st DAG (example_trigger_controller_dag) holds a TriggerDagRunOperator, which will trigger the 2nd DAG
2. 2nd DAG (example_trigger_target_dag) which will be triggered by the TriggerDagRunOperator in the 1st DAG
"""

import pprint

import airflow
import airflow.utils.dates
from airflow.models import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator

pp = pprint.PrettyPrinter(indent=4)

args = {
'start_date': airflow.utils.dates.days_ago(2),
'owner': 'Airflow',
}

dag = DAG(
dag_id='example_trigger_target_dag',
default_args=args,
dag_id="example_trigger_target_dag",
default_args={"start_date": airflow.utils.dates.days_ago(2), "owner": "Airflow"},
schedule_interval=None,
)


def run_this_func(**kwargs):
def run_this_func(**context):
"""
Print the payload "message" passed to the DagRun conf attribute.

:param dict kwargs: Context
:param context: The execution context
:type context: dict
"""
print("Remotely received value of {} for key=message".
format(kwargs['dag_run'].conf['message']))
print("Remotely received value of {} for key=message".format(context["dag_run"].conf["message"]))


run_this = PythonOperator(
task_id='run_this',
python_callable=run_this_func,
dag=dag,
)
run_this = PythonOperator(task_id="run_this", python_callable=run_this_func, dag=dag)

# You can also access the DagRun object in templates
bash_task = BashOperator(
task_id="bash_task",
bash_command='echo "Here is the message: '
'{{ dag_run.conf["message"] if dag_run else "" }}" ',
bash_command='echo "Here is the message: \'{{ dag_run.conf["message"] if dag_run else "" }}\'"',
dag=dag,
)
85 changes: 34 additions & 51 deletions airflow/operators/dagrun_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,81 +18,64 @@
# under the License.

import datetime
import json
from typing import Callable, Dict, Optional, Union
from typing import Dict, Optional, Union

from airflow.api.common.experimental.trigger_dag import trigger_dag
from airflow.models import BaseOperator
from airflow.utils import timezone
from airflow.utils.decorators import apply_defaults


class DagRunOrder:
def __init__(self, run_id=None, payload=None):
self.run_id = run_id
self.payload = payload


class TriggerDagRunOperator(BaseOperator):
"""
Triggers a DAG run for a specified ``dag_id``

:param trigger_dag_id: the dag_id to trigger (templated)
:type trigger_dag_id: str
:param python_callable: a reference to a python function that will be
called while passing it the ``context`` object and a placeholder
object ``obj`` for your callable to fill and return if you want
a DagRun created. This ``obj`` object contains a ``run_id`` and
``payload`` attribute that you can modify in your function.
The ``run_id`` should be a unique identifier for that DAG run, and
the payload has to be a picklable object that will be made available
to your tasks while executing that DAG run. Your function header
should look like ``def foo(context, dag_run_obj):``
:type python_callable: python callable
:param conf: Configuration for the DAG run
:type conf: dict
:param execution_date: Execution date for the dag (templated)
:type execution_date: str or datetime.datetime
"""
template_fields = ('trigger_dag_id', 'execution_date')
ui_color = '#ffefeb'

template_fields = ("trigger_dag_id", "execution_date", "conf")
ui_color = "#ffefeb"

@apply_defaults
def __init__(
self,
trigger_dag_id: str,
python_callable: Optional[Callable[[Dict, DagRunOrder], DagRunOrder]] = None,
execution_date: Optional[Union[str, datetime.datetime]] = None,
*args, **kwargs) -> None:
self,
trigger_dag_id: str,
conf: Optional[Dict] = None,
execution_date: Optional[Union[str, datetime.datetime]] = None,
*args,
**kwargs
) -> None:
super().__init__(*args, **kwargs)
self.python_callable = python_callable
self.trigger_dag_id = trigger_dag_id
self.conf = conf

self.execution_date = None # type: Optional[Union[str, datetime.datetime]]
if isinstance(execution_date, datetime.datetime):
self.execution_date = execution_date.isoformat()
elif isinstance(execution_date, str):
if execution_date is None or isinstance(execution_date, (str, datetime.datetime)):
self.execution_date = execution_date
elif execution_date is None:
self.execution_date = None
else:
raise TypeError(
'Expected str or datetime.datetime type '
'for execution_date. Got {}'.format(
type(execution_date)))
"Expected str or datetime.datetime type for execution_date. "
"Got {}".format(type(execution_date))
)

def execute(self, context):
if self.execution_date is not None:
run_id = 'trig__{}'.format(self.execution_date)
self.execution_date = timezone.parse(self.execution_date)
def execute(self, context: Dict):
if isinstance(self.execution_date, datetime.datetime):
run_id = "trig__{}".format(self.execution_date.isoformat())
elif isinstance(self.execution_date, str):
run_id = "trig__{}".format(self.execution_date)
self.execution_date = timezone.parse(self.execution_date) # trigger_dag() expects datetime
Copy link
Member

@feluelle feluelle Oct 19, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't it be better to set the execution date on line 58 where you are setting it the first time?

if isinstance(execution_date, str):
    self.execution_date = timezone.parse(execution_date)
else:
    self.execution_date = execution_date

IMO self.execution_date = timezone.parse(self.execution_date) is a kind of validation so that should be made in the constructor even if it will be called more often than in the execute.

then in the execute you can just do

if self.execution_date is None:
    self.execution_date = timezone.utcnow()
run_id = "trig__{}".format(self.execution_date.isoformat())

WDYT?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would like to use it as it is much shorter and simpler.

However, the execution_date can be templated. For example {{ execution_date }} will fail in timezone.parse(). So, we have to save it first, wait for execute() to be called and all variables to be templated, and only then can we call timezone.parse() on the execution_date :(

else:
run_id = 'trig__' + timezone.utcnow().isoformat()
dro = DagRunOrder(run_id=run_id)
if self.python_callable is not None:
dro = self.python_callable(context, dro)
if dro:
trigger_dag(dag_id=self.trigger_dag_id,
run_id=dro.run_id,
conf=json.dumps(dro.payload),
execution_date=self.execution_date,
replace_microseconds=False)
else:
self.log.info("Criteria not met, moving on")
run_id = "trig__{}".format(timezone.utcnow().isoformat())

# Ignore MyPy type for self.execution_date because it doesn't pick up the timezone.parse() for strings
trigger_dag( # type: ignore
dag_id=self.trigger_dag_id,
run_id=run_id,
conf=self.conf,
execution_date=self.execution_date,
replace_microseconds=False,
)
2 changes: 1 addition & 1 deletion airflow/utils/dates.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ def date_range(start_date, end_date=None, num=None, delta=None):
delta = abs(delta)
dates = []
if end_date:
if timezone.is_naive(start_date):
if timezone.is_naive(start_date) and not timezone.is_naive(end_date):
end_date = timezone.make_naive(end_date, tz)
while start_date <= end_date:
if timezone.is_naive(start_date):
Expand Down
67 changes: 0 additions & 67 deletions tests/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@
)
from airflow.operators.bash_operator import BashOperator
from airflow.operators.check_operator import CheckOperator, ValueCheckOperator
from airflow.operators.dagrun_operator import TriggerDagRunOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
from airflow.settings import Session
Expand Down Expand Up @@ -511,18 +510,6 @@ def check_failure(context, test_case=self):
start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
self.assertTrue(data['called'])

def test_trigger_dagrun(self):
def trigga(_, obj):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I liked this name 😅

if True:
return obj

t = TriggerDagRunOperator(
task_id='test_trigger_dagrun',
trigger_dag_id='example_bash_operator',
python_callable=trigga,
dag=self.dag)
t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)

def test_dryrun(self):
t = BashOperator(
task_id='test_dryrun',
Expand Down Expand Up @@ -968,60 +955,6 @@ def test_run_command(self):
self.assertEqual(run_command('echo "foo bar"'), 'foo bar\n')
self.assertRaises(AirflowConfigException, run_command, 'bash -c "exit 1"')

def test_trigger_dagrun_with_execution_date(self):
utc_now = timezone.utcnow()
run_id = 'trig__' + utc_now.isoformat()

def payload_generator(context, object): # pylint: disable=unused-argument
object.run_id = run_id
return object

task = TriggerDagRunOperator(task_id='test_trigger_dagrun_with_execution_date',
trigger_dag_id='example_bash_operator',
python_callable=payload_generator,
execution_date=utc_now,
dag=self.dag)
task.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
dag_runs = DagRun.find(dag_id='example_bash_operator', run_id=run_id)
self.assertEqual(len(dag_runs), 1)
dag_run = dag_runs[0]
self.assertEqual(dag_run.execution_date, utc_now)

def test_trigger_dagrun_with_str_execution_date(self):
utc_now_str = timezone.utcnow().isoformat()
self.assertIsInstance(utc_now_str, (str,))
run_id = 'trig__' + utc_now_str

def payload_generator(context, object): # pylint: disable=unused-argument
object.run_id = run_id
return object

task = TriggerDagRunOperator(
task_id='test_trigger_dagrun_with_str_execution_date',
trigger_dag_id='example_bash_operator',
python_callable=payload_generator,
execution_date=utc_now_str,
dag=self.dag)
task.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
dag_runs = DagRun.find(dag_id='example_bash_operator', run_id=run_id)
self.assertEqual(len(dag_runs), 1)
dag_run = dag_runs[0]
self.assertEqual(dag_run.execution_date.isoformat(), utc_now_str)

def test_trigger_dagrun_with_templated_execution_date(self):
task = TriggerDagRunOperator(
task_id='test_trigger_dagrun_with_str_execution_date',
trigger_dag_id='example_bash_operator',
execution_date='{{ execution_date }}',
dag=self.dag)

self.assertTrue(isinstance(task.execution_date, str))
self.assertEqual(task.execution_date, '{{ execution_date }}')

ti = TaskInstance(task=task, execution_date=DEFAULT_DATE)
ti.render_templates()
self.assertEqual(timezone.parse(task.execution_date), DEFAULT_DATE)

def test_externally_triggered_dagrun(self):
TI = TaskInstance

Expand Down
Loading