Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

External triggers #503

Closed
wants to merge 6 commits into from
Closed

Conversation

ctrebing
Copy link
Contributor

I am aware that this pull request is not finished (tests, error handling, documentation). I would like to start a more concrete discussion on the externally triggered DAGs as mentioned in issue #417.

Within my company (http://blue-yonder.com) we are evaluating whether we could use airflow and I would really love to do so. Especially I liked the model you have chosen in the APIs and the possibilies to define the DAGs in Python.

What we really need is the possibility to trigger DAGs externally. I read the discussion in the roadmap issue #417 and liked the ideas expressed there. I did a first prototype for the DagRun object and using this in the scheduler. Before investing further work in stabilizing this, I would like to get your feedback on whether this approach fits with the existing concepts. Does it make sense from your point of view to further work on that, or do you already have different plans/implementations?

@mistercrunch
Copy link
Member

Nice. This is a very good start. Much inline with what I was thinking.

dag_id = Column(String(ID_LEN), primary_key=True)
execution_date = Column(DateTime, primary_key=True)
run_id = Column(String(ID_LEN))
external_trigger = Column(Boolean, default=False)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we may want to add a state here, so that the scheduler could completely disregard DAGs that are fully processed. I'm not sure whether it should just be boolean or if having more states would help.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would use a string to be able to mark failed dags as well.

@mistercrunch
Copy link
Member

We may need a property DAG.active_runs that would return the list of active runs according to DagRun.state, and maintain the state by checking if len(tasks) is the same as successful tasks instances for that date.

@mistercrunch
Copy link
Member

Somewhat unrelated: I've been planning on allowing for the scheduler to be distributable (many scheduler instances running concurrently). It would be a matter of taking locks in DagModel, adding a DAG.schedule_frequency param and looking at DagModel.last_run to sort on which DAG should be processed first. Maybe a way to identify old dead lock and autounlock a DAG that's been locked for more than say 10 minutes.

TI.dag_id == run.dag_id,
TI.execution_date == run.execution_date
).all()
if len(task_instances) == len(dag.tasks):
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can a dag have branches with tasks that are never executed within one run? If yes, then this check would not be sufficient.

@mistercrunch
Copy link
Member

#540

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants