Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

External triggers / cron support / #540

Merged
merged 8 commits into from
Nov 10, 2015
Merged

External triggers / cron support / #540

merged 8 commits into from
Nov 10, 2015

Conversation

mistercrunch
Copy link
Member

@ctrebing, thanks for getting this started, I'm adding on to the work you've done as this will be a large changeset:

  • externally triggered DAGs
  • cron expression support in schedule_interval as in 0 0 0 * *
  • allow for multiple concurrent scheduler (might be another PR, we'll see)

@mistercrunch mistercrunch force-pushed the external_triggers branch 7 times, most recently from 0e1e541 to 9a33fb3 Compare October 23, 2015 06:30
dag_id=dag_id,
run_id=run_id,
execution_date=execution_date,
state=State.RUNNING,
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder whether it would be helpful to create a DagRun with state QUEUED (or a new state SCHEDULED) and only set it to running once the first scheduler created/queued the first TaskInstance. For example, if no scheduler is running, the DagRun will be displayed as running in the UI and one has to see if the corresponding task instances are actually created to find the problem why nothing happens.

@mistercrunch
Copy link
Member Author

closes #570

@mistercrunch
Copy link
Member Author

closes #586

@mistercrunch mistercrunch force-pushed the external_triggers branch 5 times, most recently from dc86268 to 4000421 Compare November 9, 2015 23:47
ctrebing and others added 8 commits November 10, 2015 09:11
add CLI to insert new DagRuns

fix typos in comments

add missing import

add state to DagRun

Adding cron support for schedule_interval

Allow null on charts

Refactoring like a mad man

add dag_run table
Fixing jobs

Lining up db revisions

Adding CRUD in the admin

Success, backend running, next is UI changes

Updating the docs to reflect the new behavior

Got the UI to show externaly triggered runs, root object for DAG Run

UI improvments, mostly functional

DAG concurrency limit

Commit resets dag runs

More unit tests

Adapting the UI

Fixing brutal amount of merge conflicts

Polish around UI and events

Adding DB migration script

Fixed the charts

Adding schedule info in the dag page

Adding cron presets

Fixing up the tests

Adding @once as a schedule_interval option

A layer of polish and bug fixes
mistercrunch added a commit that referenced this pull request Nov 10, 2015
External triggers / cron support /
@mistercrunch mistercrunch merged commit ae39fdc into master Nov 10, 2015
@mistercrunch mistercrunch deleted the external_triggers branch November 10, 2015 17:21
@pedrorjbr
Copy link

I registered a new dag. This dag will be external triggered, so I dont need to schedule it. How can I make the scheduler ignore it (nto schedule it and just run when i trigger it).

@mistercrunch
Copy link
Member Author

@pedrorjbr
Copy link

What is the correct way (if exists one) to trigger a dag when a file is uploaded on AWS S3, for example?
Is it better to create a scheduled job that makes pooling on S3?

I could start a dag with a S3Sensor, and makes the task wait_for_downstream=true.
So the task will be scheduled again just when the downstream terminates. So I will process the file just once.

return start_date
else:
return prev

Copy link

@brycedrennan brycedrennan Feb 8, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mistercrunch
I think this code doesn't work. Shouldn't it be:

    if isinstance(delta, six.string_types):
        cron = croniter(cron_frequency, dt + timedelta(minutes=1))
        return cron.get_prev(datetime)

I used this doctest to test for this case:

    >>> round_time(datetime(2015, 9, 12, 6, 0), '0 0 * * *', datetime(2015, 9, 2, 0, 0))
    datetime.datetime(2015, 9, 12, 0, 0)

I'm unclear what the intent was behind that if statement at the end.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants