Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[AIRFLOW-5644] Simplify TriggerDagRunOperator usage #6317

Merged
merged 2 commits into from
Oct 23, 2019
Merged

[AIRFLOW-5644] Simplify TriggerDagRunOperator usage #6317

merged 2 commits into from
Oct 23, 2019

Conversation

BasPH
Copy link
Contributor

@BasPH BasPH commented Oct 12, 2019

Make sure you have checked all steps below.

Jira

  • My PR addresses the following Airflow Jira issues and references them in the PR title. For example, "[AIRFLOW-XXX] My Airflow PR"
    • https://issues.apache.org/jira/browse/AIRFLOW-5644
    • In case you are fixing a typo in the documentation you can prepend your commit with [AIRFLOW-XXX], code changes always need a Jira issue.
    • In case you are proposing a fundamental code change, you need to create an Airflow Improvement Proposal (AIP).
    • In case you are adding a dependency, check if the license complies with the ASF 3rd Party License Policy.

Description

  • Here are some details about my PR, including screenshots of any UI changes:

This PR refactors the TriggerDagRunOperator to provide a much more intuitive behaviour, i.e. it now has a conf argument to which a dict can be provided, for configuration for the triggered Dag(Run).

Before:

def _trigger_task(context, object):
    object.payload = {"message": "Hello world"}
    return object

trigger_task = TriggerDagRunOperator(
    task_id='test_trigger_dagrun',
    trigger_dag_id="example_trigger_target_dag",
    python_callable=_trigger_task,
    dag=dag,
)

After:

trigger_task = TriggerDagRunOperator(
    task_id='test_trigger_dagrun',
    trigger_dag_id="example_trigger_target_dag",
    conf={"message": "Hello world"},
    dag=dag,
)

It removes the python_callable argument and is thus not backwards compatible so should be merged in Airflow 2.0. Also (I think), people might have "abused" this weird DagRunOrder class to set their dagrun id. This PR removes that possibility.

Tests

  • My PR adds the following unit tests OR does not need testing for this extremely good reason:

TriggerDagRunOperator tests were extracted from core.py and placed in a dedicated test_dagrun_operator.py file. I added additional tests for validating correct behaviour.

These tests were a bit tricky because they rely on passing state via the database, but the triggered DAG file is also read from disk somewhere in the code. To make these idempotent and not rely on external files (i.e. example DAGs), the setUp() writes a small DAG to a temporary file, which is used throughout the tests, and in the tearDown() all state is removed from the DB.

Commits

  • My commits all reference Jira issues in their subject lines, and I have squashed multiple commits if they address the same issue. In addition, my commits follow the guidelines from "How to write a good git commit message":
    1. Subject is separated from body by a blank line
    2. Subject is limited to 50 characters (not including Jira issue reference)
    3. Subject does not end with a period
    4. Subject uses the imperative mood ("add", not "adding")
    5. Body wraps at 72 characters
    6. Body explains "what" and "why", not "how"

Documentation

  • In case of new functionality, my PR adds documentation that describes how to use it.
    • All the public functions and the classes in the PR contain docstrings that explain what it does
    • If you implement backwards incompatible changes, please leave a note in the Updating.md so we can assign it to a appropriate release

@BasPH BasPH marked this pull request as ready for review October 12, 2019 16:44
@codecov-io
Copy link

codecov-io commented Oct 12, 2019

Codecov Report

Merging #6317 into master will decrease coverage by 0.26%.
The diff coverage is 95%.

Impacted file tree graph

@@            Coverage Diff            @@
##           master   #6317      +/-   ##
=========================================
- Coverage   80.57%   80.3%   -0.27%     
=========================================
  Files         626     626              
  Lines       36237   36223      -14     
=========================================
- Hits        29198   29090     -108     
- Misses       7039    7133      +94
Impacted Files Coverage Δ
...low/example_dags/example_trigger_controller_dag.py 100% <100%> (+43.75%) ⬆️
airflow/operators/dagrun_operator.py 96% <100%> (+1.26%) ⬆️
airflow/utils/dates.py 82.6% <100%> (ø) ⬆️
airflow/example_dags/example_trigger_target_dag.py 90% <75%> (-2.31%) ⬇️
airflow/kubernetes/volume_mount.py 44.44% <0%> (-55.56%) ⬇️
airflow/kubernetes/volume.py 52.94% <0%> (-47.06%) ⬇️
airflow/kubernetes/pod_launcher.py 45.25% <0%> (-46.72%) ⬇️
airflow/kubernetes/kube_client.py 33.33% <0%> (-41.67%) ⬇️
...rflow/contrib/operators/kubernetes_pod_operator.py 70.14% <0%> (-28.36%) ⬇️
airflow/models/taskinstance.py 93.28% <0%> (-0.51%) ⬇️
... and 4 more

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 3cfe4a1...467dfcc. Read the comment docs.

airflow/example_dags/example_trigger_target_dag.py Outdated Show resolved Hide resolved
@@ -511,18 +510,6 @@ def check_failure(context, test_case=self):
start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
self.assertTrue(data['called'])

def test_trigger_dagrun(self):
def trigga(_, obj):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I liked this name 😅

airflow/operators/dagrun_operator.py Outdated Show resolved Hide resolved
@turbaszek
Copy link
Member

In overall I like the proposed simplification ✅

Copy link
Contributor

@Fokko Fokko left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

airflow/operators/dagrun_operator.py Outdated Show resolved Hide resolved
airflow/example_dags/example_trigger_target_dag.py Outdated Show resolved Hide resolved
run_id = "trig__{}".format(self.execution_date.isoformat())
elif isinstance(self.execution_date, str):
run_id = "trig__{}".format(self.execution_date)
self.execution_date = timezone.parse(self.execution_date) # trigger_dag() expects datetime
Copy link
Member

@feluelle feluelle Oct 19, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't it be better to set the execution date on line 58 where you are setting it the first time?

if isinstance(execution_date, str):
    self.execution_date = timezone.parse(execution_date)
else:
    self.execution_date = execution_date

IMO self.execution_date = timezone.parse(self.execution_date) is a kind of validation so that should be made in the constructor even if it will be called more often than in the execute.

then in the execute you can just do

if self.execution_date is None:
    self.execution_date = timezone.utcnow()
run_id = "trig__{}".format(self.execution_date.isoformat())

WDYT?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would like to use it as it is much shorter and simpler.

However, the execution_date can be templated. For example {{ execution_date }} will fail in timezone.parse(). So, we have to save it first, wait for execute() to be called and all variables to be templated, and only then can we call timezone.parse() on the execution_date :(

@BasPH
Copy link
Contributor Author

BasPH commented Oct 23, 2019

@Fokko @feluelle WDYT?

@Fokko Fokko merged commit f3c3812 into apache:master Oct 23, 2019
@BasPH BasPH deleted the bash-fix-triggerdagrunoperator branch October 23, 2019 19:49
@Sharadh
Copy link

Sharadh commented Mar 7, 2020

Greetings! I'm curious to know if you folks knew this change reduced functionality. Specifically, we have workflows where the python_callable was useful with two things:

  1. Dynamically generate the conf required for the trigger_dag call
  2. Return a false-y value so the trigger_dag call does not take place

I am not sure how this can be done after the change.

In general, having the convenience of an arbitrary python callable to hook into and modify behavior based on incoming conf is very valuable. For a practical example, this task would trigger a dag only if a flag was set in the conf; this flag can vary between dag runs, but the same dag can model both behaviors:

step1 = SomeOperator()

step2 = AnotherOperator()

def check_and_trigger(context, dag_run_obj):
    payload = context["dag_run"].conf

    if not payload["should_trigger"]:
        return False

    dag_run_obj.payload = payload["downstream_payload"]
    return dag_run_obj

maybe_trigger_bar_dag = TriggerDagRunOperator(
    trigger_dag_id="bar",
    python_callable=check_and_trigger,
)

step1 >> step2 >> maybe_trigger_bar_dag

In our use-case, the Dag itself is static but takes in a few parameters via conf, which comes in via the experimental API or from another scheduled dag. It helps us reuse dags without getting into the gnarly world of sub-dags.

Please let me know if I can explain things further. I was unable to find the motivation for this change apart from the Jira ticket linked, so please do point me to more reading if it exists, so I can gain context.

@Sharadh
Copy link

Sharadh commented Mar 7, 2020

@BasPH @Fokko not sure what y'alls settings on notifications for closed PRs are, so pinging you to make sure :)

@BasPH
Copy link
Contributor Author

BasPH commented Mar 7, 2020

Hi @Sharadh,

The main motivation for this change was code clarity. Back then, I found the TriggerDagRunOperator very awkward to read and the context and dag_run_obj seem to come out of nowhere. You are correct in that it reduces the functionality, however I still think this change makes the code much better readable and makes the expected result clear.

To achieve the same, I suggest to split your task into two:

(1) a PythonOperator which succeeds if should_trigger==True, else raises AirflowSkipException
(2) the TriggerDagRunOperator

from airflow.exceptions import AirflowSkipException

step1 = SomeOperator()
step2 = AnotherOperator()

def _should_trigger(dag_run, **_):
    if not dag_run.conf["should_trigger"]:
        raise AirflowSkipException("should_trigger set to False")

should_trigger = PythonOperator(
    task_id="should_trigger",
    python_callable=_should_trigger,
    provide_context=True,
)

trigger_bar_dag = TriggerDagRunOperator(
    task_id="trigger_bar_dag",
    trigger_dag_id="bar",
    conf={"downstream_payload": "{{ dag_run.conf['downstream_payload'] }}"},
)

step1 >> step2 >> should_trigger >> trigger_bar_dag

Since you're passing the payload to the to-be-triggered-DAG, the conf arguments gets a little weird. It's templated, so I think the code above should do the trick. I didn't test it, so use at your own risk :-)

The should_trigger task now determines whether or not to continue. If not, it will raise an AirflowSkipException, which sets its state to skipped, and automatically skips downstream tasks (assuming they have the default TriggerRule).

Does this clarify?

@Sharadh
Copy link

Sharadh commented Mar 9, 2020

Thanks for the quick reply @BasPH, and especially for taking the time to sketch out the workaround / new style.

That does clarify - I'm assuming that _should_trigger in your example above can modify dag_run.conf to perhaps add more inputs in. That would replicate what we currently do with TriggerDagRunOperator. For example, a really trivial case would be:

def _should_trigger(dag_run, **_):
    if not dag_run.conf["should_trigger"]:
        raise AirflowSkipException("should_trigger set to False")
    dag_run.conf["downstream_payload"]["trigger_checked_at"] = datetime.now()

I do also appreciate - from a design perspective - that the code now is simpler; I daresay it's cleaner for us to separate out the logic of if / how to trigger (which is purely business logic), from the trigger itself (which is purely an airflow construct). I guess the old code was conflating a bit of PythonOperator concerns with TriggerDagRunOperator concerns.

Is there some sort of release notes for the new TriggerDagRunOperator changes? I'd like to contribute to clarify what functionality has changed, and codify the workaround you're suggesting here.

@shippy
Copy link

shippy commented Jul 18, 2020

Unless I overlooked something, it seems that the changes in TriggerDagRunOperator merged here seem to be missing form 1.10.11 - https://github.com/apache/airflow/blob/1.10.11/airflow/operators/dagrun_operator.py still contains python_callable and conf is considered an invalid argument, the last commit being from March 2019. This is odd, since this merge does exist on master.

This might be the culprit in #9870 and #9852.

@feluelle
Copy link
Member

Yes @shippy these changes are not yet released. As of https://issues.apache.org/jira/browse/AIRFLOW-5644 these changes will be included in 2.0.

@ryanrichholt
Copy link

@BasPH Could you clarify the reasoning that setting the run_id is abusing the behavior? I'm working with Airflow to build DAGs that are externally triggered once-per-patient, not on a time interval. It's very helpful to be able to search the run_ids for a patient ID. So, each externally triggered DAG run sets the run_id to a patient ID. I was hoping to be able to set the run_id with this operator, but now it sounds like I'm going down the wrong path.

@BasPH
Copy link
Contributor Author

BasPH commented Aug 11, 2020

Setting DAG Run ids via the old TriggerDagRunOperator feels like one big hack. Both from a code perspective (the Python callable to provide is odd to read), and a logical perspective (I don't think it's desirable to have users edit the run id).

To understand your use case, I'd like to know how exactly you're searching for patient-specific DAG runs?

@ryanrichholt
Copy link

In the DagRun model list view there is an option to search by run_id. Since we set the run_id when the dag is triggered externally, this field can be used when we need to check on progress for a specific case.

This seems to work alright, but there are some downsides:

  • run_id doesn't propagate to subdags, so we basically cant use subdags
  • run_id can't be set when triggering the dag from some of the quick buttons in the ui
  • with the create dag_run form, the run_id can be set, but the conf cannot

So, I'm also exploring options for moving away from using run_ids, and just adding this info to the run conf. But, I think this is going to require updating the DagRun model views, or a custom view plugin.

@waleedsamy
Copy link

waleedsamy commented Apr 29, 2021

@BasPH

Setting DAG Run ids via the old TriggerDagRunOperator feels like one big hack. Both from a code perspective (the Python callable to provide is odd to read), and a logical perspective (I don't think it's desirable to have users edit the run id).

To understand your use case, I'd like to know how exactly you're searching for patient-specific DAG runs?

I don't think so, TriggerDagRunOperator should allow a user to set the RUN_ID the same way airflow cli allow the user to set run id using i.e airflow dags trigger --run-id.

In my case, I use the TriggerDagRunOperator to trigger 30K dagrun daily and will be so annoying to see them having unidentifiable run_id.

Could you please look into this again.

@AddEleven
Copy link

Thanks for the quick reply @BasPH, and especially for taking the time to sketch out the workaround / new style.

That does clarify - I'm assuming that _should_trigger in your example above can modify dag_run.conf to perhaps add more inputs in. That would replicate what we currently do with TriggerDagRunOperator. For example, a really trivial case would be:

def _should_trigger(dag_run, **_):
    if not dag_run.conf["should_trigger"]:
        raise AirflowSkipException("should_trigger set to False")
    dag_run.conf["downstream_payload"]["trigger_checked_at"] = datetime.now()

I do also appreciate - from a design perspective - that the code now is simpler; I daresay it's cleaner for us to separate out the logic of if / how to trigger (which is purely business logic), from the trigger itself (which is purely an airflow construct). I guess the old code was conflating a bit of PythonOperator concerns with TriggerDagRunOperator concerns.

Is there some sort of release notes for the new TriggerDagRunOperator changes? I'd like to contribute to clarify what functionality has changed, and codify the workaround you're suggesting here.

Hi Sharadh,

Did you managed to find a way to edit dag_run.conf to add further inputs before passing it to the TriggerDagRunOperator conf parameter? I have been trying this and can't seem to find a solution.

Thanks,

@ksumanth
Copy link

ksumanth commented Jun 8, 2021

@BasPH

I am in same boat. We use the run_id to identify the runs and when there are so many runs, it is very useful. It is particularly useful in Tree View where the run_id shows up in the hover of the run without having to go somewhere else.

Why couldn't the run_id be added as a parameter?

@BasPH

Setting DAG Run ids via the old TriggerDagRunOperator feels like one big hack. Both from a code perspective (the Python callable to provide is odd to read), and a logical perspective (I don't think it's desirable to have users edit the run id).
To understand your use case, I'd like to know how exactly you're searching for patient-specific DAG runs?

I don't think so, TriggerDagRunOperator should allow a user to set the RUN_ID the same way airflow cli allow the user to set run id using i.e airflow dags trigger --run-id.

In my case, I use the TriggerDagRunOperator to trigger 30K dagrun daily and will be so annoying to see them having unidentifiable run_id.

Could you please look into this again.

@mathee06
Copy link

Hi @BasPH,

I realize this is an old thread now, but we just migrated to Airflow 2.0.0.

Similar to @Sharadh, is there a way to dynamically pass a dag_run_obj or conf object to the new DAG that was triggered?

There does not seem to be a way to do this with the removal of the python_callable function.

Is "downstream_payload" below a unique variable that passes the current DAG's dag_run object to the new triggered DAG?

conf={"downstream_payload": "{{ dag_run.conf['downstream_payload'] }}"}

Here is my Current Code (w/ some explanation on the flow and approach):

#Poll S3 folder for new received files
fileSensor_tsk = S3KeySensor()

#Use chooseDAGBasedOnInput function to create dag_run object (previously python_callable was used directly in TriggerDagRunOperator to create the dag_run object for the new triggered DAG)
#dag_run object will pass received file name details to new DAG for reference in order to complete its own work
chooseDAGTrigger_tsk = BranchPythonOperator(
    task_id='chooseDAGTrigger_tsk',
    python_callable=chooseDAGBasedOnInput,
    provide_context=True
)

triggerNewDAG_tsk = TriggerDagRunOperator(
    task_id='triggerNewDAG_tsk',
    trigger_dag_id='1000_NEW_LOAD'
)

triggerNewDAG2_tsk = TriggerDagRunOperator(
    task_id='triggerNew2DAG_tsk',
    trigger_dag_id='1000_NEW2_LOAD'
) ...

Any help or advise would be appreciated!

Thank you.

@billcrook
Copy link
Contributor

Late to the party here, but I agree this change is causing pain when upgrading to 2.x. @mathee06 If you recall, what was your final approach? I will likely move the logic to a new task before the trigger and possibly to a new task in the downstream triggered dag.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.