Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

I need a flow to start with a sensor #57

Closed
r39132 opened this issue Jun 22, 2015 · 11 comments
Closed

I need a flow to start with a sensor #57

r39132 opened this issue Jun 22, 2015 · 11 comments

Comments

@r39132
Copy link
Contributor

r39132 commented Jun 22, 2015

I have a flow that is linear and sequential as shown below :

  1. new data arrives in SQS
  2. wait for that data to start being written to postgres
  3. wait for the SQS queue to drain
  4. send an email that the flow completed

Eventually, we will have a pre-cursor step (e.g. step 0.) that kicks off a Spark job before waiting for new data in the SQS queue. For now, I do not have that, so I would like my flow to simply reset to the first step. In some ways, I am asking for a circular flow that never ends. This breaks the entire workflow abstraction. On another note, where is the "cron" functionality? Do I schedule airflow commands in cron outside of airflow?

@mistercrunch
Copy link
Member

Airflow runs its own scheduler, it's hard to miss it in the docs:
http://pythonhosted.org/airflow/scheduler.html

Is your process a batch job that runs on a fixed scheduled interval? If not Airflow is probably not the right platform. DAG stands for Directed Acyclic Graph. Acyclic means there's no loop. Though its assumed we run the same set of jobs (the DAG) at a scheduled interval, which kind of makes for a large loop around the DAG execution.

@r39132
Copy link
Contributor Author

r39132 commented Jun 22, 2015

Hi!
Yes, it is batch oriented. I'm waiting for some changes in how we submit spark jobs so that I can trigger my flow starting at the Spark job submission. I've covered 4/5 stages of my flow, unfortunately, they are stages 2-5, inclusive.

Yes, the large loop is triggered by a (daily?) schedule. What would be nice would be to have the large loop triggered by different signals. Instead of by a clock (interrupt), by a message on an SQS queue.

This is very whimsical and nothing urgent.

@r39132 r39132 closed this as completed Jun 22, 2015
@r39132
Copy link
Contributor Author

r39132 commented Jun 22, 2015

BTW, having worked with Azkaban for 3 years at LinkedIn, I really like the Python DSL approach. I think it lends itself to more possibilities than just declaratively building a DAG of execution that is triggered by a cron.

@r39132 r39132 reopened this Jun 22, 2015
@r39132
Copy link
Contributor Author

r39132 commented Jun 22, 2015

About the scheduler and cron, I'll clarify. I was expecting to find a calendar widget - call it UI sugar.

In reading #7, it seems that this is instead defined in the python DAG definition code. And essentially, in order to alter the schedule of an existing DAG, we need to alter and re-run the DAG code. So, in a production environment, people are updating their DAG code on some production machine just to change the frequency of their running jobs, which can be problematic if that engineer introduced more changes than just an interval change. Hence, a calendar widget provides the option for limited scope and ease of change.

@mistercrunch
Copy link
Member

I haven't thought of frequency of execution as a variable, I always thought of it as a constant. I thought it could evolve to become a more complex constant by eventually adding a feature supporting the cron syntax (5 * * * *) to allow for more complex rules of execution.

Though it seems you are interested in a more on-demand or on-event approach.

In Airflow, a task instance is essentially a specific task run (the unit at which we track state) and this is done by associating a timestamp to the task definition. The scheduler is aware of the DAG's schedule and triggers based on that.

I guess the notion of task or DAG can be de-coupled from its schedule and can be triggered on demand using the API or the CLI, externally.

The on-demand approach seems somewhat easy to integrate to the platform, where you'd need a bit of metadata saying I want this DAG to run for this date or timestamp and the scheduler would pick that up and make sure that tasks get triggered.

The on-event would require some sort of timeless event listener that could leverage the on-demand infrastructure to add an entry to the on-demand list.

It's flipping the current approach around: a fixed schedule with sensors in it VS a sensor that defines the schedule

@r39132
Copy link
Contributor Author

r39132 commented Jun 24, 2015

We integrated Spark with my main flow, so I now have an end-to-end flow that works, with a few rough-edges remaining.

@pedrorjbr
Copy link

I need this feature too! I would like to kick off a DAG when a file is upload to S3 bucket. Is anyone has a solution for on-event approach?

@bolkedebruin
Copy link
Contributor

No solution yet. Just a thought. With a Notify daemon (to be implemented) that does polling and puts actions into the executor queues when an event happens. lib event (python-libevent) comes to mind

@pedrorjbr
Copy link

I used a S3 lambda function to trigger an event that starts a docker container in the AWS ECS infrastructure. So I did not used the Airflow :(

@mistercrunch
Copy link
Member

Since PR #540 we support external triggers for DAGs, meaning you can order a DAG run externally, not on a schedule.

The current way to trigger is by using the CLI airflow trigger_dag dag_id. I'm planning on adding a DagRunTrigger operator that receives a function that if it returns an non-None object creates a DagRun for a specified dag_id, the returned object is pickled to the database and made available to the context of all tasks in this DagRun.

@r39132
Copy link
Contributor Author

r39132 commented Dec 11, 2015

Very cool.

mobuchowski pushed a commit to mobuchowski/airflow that referenced this issue Jan 4, 2022
Signed-off-by: wslulciuc <willy@datakin.com>
rajatsri28 pushed a commit to rajatsri28/airflow that referenced this issue Jan 25, 2022
* EWT-569 : Initial Commit for migrations

* [EWT-569] Airflow Upgrade to 1.10.14, Cherry-Pick  76fe7ac from 1.10.4

* CP Contains fb64f2e: [TWTR][AIRFLOW-XXX] Twitter Airflow Customizations + Fixup job scheduling without explicit_defaults_for_timestamp

* [EWT-569] Airflow Upgrade to 1.10.14, Cherry-Pick 91d2b00
[CP][EWT-548][AIRFLOW-6527] Make send_task_to_executor timeout configurable (apache#63)

* [EWT-569] Airflow Upgrade to 1.10.14, Cherry-Pick 91d2b00
CP contains [EWT-16]: Airflow fix for manual trigger during version upgrade (apache#13)

* [EWT-16]: Airflow fix for manual trigger during version upgrade

* [EWT-569] Airflow Upgrade to 1.10.14, Cherry-Pick 91d2b00
[CP][EWT-548][AIRFLOW-6527] Make send_task_to_executor timeout configurable (apache#63)

CP of f757a54

* CP(55bb579) [AIRFLOW-5597] Linkify urls in task instance log (apache#16)

* [EWT-569] Airflow Upgrade to 1.10.14 [CP] from 1.10.4+twtr : 94cdcf6
[CP] Contains [AIRFLOW-5597] Linkify urls in task instance log

CP of f757a54

* [EWT-569] Airflow Upgrade to 1.10.14, Cherry-Pick  4ce8d4c from 1.10.4
CP contains [TWTTR] Fix for rendering code on UI (apache#34)

* [EWT-569] Airflow Upgrade to 1.10.14, Cherry-Pick  299b4d8 from 1.10.4
CP contains [TWTR] CP from 1.10+twtr (apache#35)

* 99ee040: CP from 1.10+twtr

* 2e01c24: CP from 1.10.4 ([TWTR][AIRFLOW-4939] Fixup use of fallback kwarg in conf.getint)

* 00cb4ae: [TWTR][AIRFLOW-XXXX] Cherry-pick d4a83bc and bump version (apache#21)

* CP 51b1aee: Relax version requiremets (apache#24)

* CP 67a4d1c: [CX-16266] Change with reference to 1a4c164 commit in open source (apache#25)

* CP 54bd095: [TWTR][CX-17516] Queue tasks already being handled by the executor (apache#26)

* CP 87fcc1c: [TWTR][CX-17516] Requeue tasks in the queued state (apache#27)

* CP 98a1ca9: [AIRFLOW-6625] Explicitly log using utf-8 encoding (apache#7247) (apache#31)

* [EWT-569] Airflow Upgrade to 1.10.14 [CP] from 1.10.4+twtr : f7050fb
CP Contains Experiment API path fix (apache#37)

* [EWT-569] Airflow Upgrade to 1.10.14, Cherry-Pick  8a689af from 1.10.4
CP Contains Export scheduler env variable into worker pods. (apache#38)

* [EWT-569] Airflow Upgrade to 1.10.14, Cherry-Pick  5875a15 from 1.10.4
Cp Contains [EWT-115][EWT-118] Initialise dag var to None and fix for DagModel.fileloc (missed in EWT-16) (apache#39)

* [EWT-569] Airflow Upgrade to 1.10.14, Cherry-Pick  a68e2b3 from 1.10.4
[CX-16591] Fix regex to work with impersonated clusters like airflow_scheduler_ddavydov (apache#42)

* [EWT-569] Airflow Upgrade to 1.10.14 [CP] from 1.10.4+twtr : e9642c2
[CP][EWT-128] Fetch task logs from worker pods (19ac45a) (apache#43)

* [EWT-569] Airflow Upgrade to 1.10.14 [CP] from 1.10.4+twtr : d5d0a07
[CP][AIRFLOW-6561][EWT-290]: Adding priority class and default resource for worker pod. (apache#47)

* [EWT-569] Airflow Upgrade to 1.10.14 [CP] from 1.10.4+twtr : 9b58c88
[CP][EWT-302]Patch Pool.DEFAULT_POOL_NAME in BaseOperator (apache#8587) (apache#49)

Open source commit id: b37ce29

* [EWT-569] Airflow Upgrade to 1.10.14 [CP] from 1.10.4+twtr : 7b52a71
[CP][AIRFLOW-3121] Define closed property on StreamLogWriter (apache#3955) (apache#52)

CP of 2d5b8a5

* [EWT-361] Fix broken regex pattern for extracting dataflow job id (apache#51)

Update the dataflow URL regex as per AIRFLOW-9323

* [EWT-569] Airflow Upgrade to 1.10.14 [CP] from 1.10.4+twtr : 4b5b977
EWT-370: Use python3 to launch the dataflow job. (apache#53)

* [EWT-569] Airflow Upgrade to 1.10.14 [CP] from 1.10.4+twtr : 596e24f
* [EWT-450] fixing sla miss triggering duplicate alerts every minute (apache#56)

* [EWT-569] Airflow Upgrade to 1.10.14 [CP] from 1.10.4+twtr : b3d7fb4
[CP] Handle IntegrityErrors for trigger dagruns & add Stacktrace when DagFileProcessorManager gets killed (apache#57)

CP of faaf179 - from master
CP of 2102122 - from 1.10.12

* [EWT-569] Airflow Upgrade to 1.10.14 [CP] from 1.10.4+twtr : bac4acd
[TWTR][EWT-472] Add lifecycle support while launching worker pods (apache#59)

* [EWT-569] Airflow Upgrade to 1.10.14 [CP] from 1.10.4+twtr : 6162402
[TWTTR] Don't enqueue tasks again if already queued for K8sExecutor(apache#60)

Basically reverting commit 87fcc1c  and making changes specifically into the Celery Executor class only.

* [EWT-569] Airflow Upgrade to 1.10.14 [CP] from 1.10.4+twtr : 1991419
[CP][TWTR][EWT-377] Fix DagBag bug when a Dag has invalid schedule_interval (apache#61)

CP of 5605d10 & apache#11462

* [EWT-569] Airflow Upgrade to 1.10.14 [CP] from 1.10.4+twtr : 48be0f9
[TWTR][EWT-350] Reverting the last commit partially (apache#62)

* [EWT-569] Airflow Upgrade to 1.10.14 [CP] from 1.10.4+twtr : d8c473e
[CP][EWT-548][AIRFLOW-6527] Make send_task_to_executor timeout configurable (apache#63)

CP of f757a54
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants