Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Any plans to add an SQS hook and SQS Sensor? #52

Closed
r39132 opened this issue Jun 19, 2015 · 3 comments
Closed

Any plans to add an SQS hook and SQS Sensor? #52

r39132 opened this issue Jun 19, 2015 · 3 comments

Comments

@r39132
Copy link
Contributor

r39132 commented Jun 19, 2015

I would add the following actions to the Sensor?

  1. Check for new message
  2. Check for empty queue
  3. Check for non-empty queue
@mistercrunch
Copy link
Member

Wouldn't SQS make most sense as a broker for Celery? Seems like it may doable from this old article here:
https://www.caktusgroup.com/blog/2011/12/19/using-django-and-celery-amazon-sqs/

@r39132
Copy link
Contributor Author

r39132 commented Jun 20, 2015

It does make sense for use with Celery, but that is a different use-case than what I have in mind.

Our data pipeline uses SQS : Agari's Data Pipeline

We use SNS+SQS actually - we publish S3 object-created notifications over SNS+SQS. SNS has push-based topics with 0 day message retention and SQS has pull-based queues with max 14 day message retention. When we publish to SNS, SNS pushes to multiple SQS queues. We have several data importers that load this data into different DBs.

So, as part of my data pipeline, I would like to detect that a queue receives a message (it signals that it is receiving data). If that sensor returns true, I advance to the next stage : checking whether my db is receiving data. If that passes, I advance to checking if the SQS queue is drained (end of data load). If any of these fail, I want email notification. If the last step succeeds, I would like to send a "Data Load Successfully Completed" email notification.

@mistercrunch
Copy link
Member

Interesting! An alternative would be to check on whether the file exists in s3 (you may have to use a trigger file to signify the file is fully loaded, or load into a tmp key and rename it).

But I know nothing about your setup so I'm sure you have a much better understanding of the components you need. BTW it's very easy to create hooks and operators, and we're excited for the community to extend the portfolio of external systems we integrate with.

mobuchowski pushed a commit to mobuchowski/airflow that referenced this issue Jan 4, 2022
* Adding documentation about host, port and namespace

* MARQUEZ_HOST, MARQUEZ_PORT, and MARQUEZ_NAMESPACE are valuable env vars to set

Signed-off-by: Bob Briski <rbriski@gmail.com>

* Additional documentation around extractors

This is pretty tough to do with any kind of precision right now.  It's clear that the underlying API is in flux as new objects to handle metadata are being added.  This is meant as a placeholder right now to let people know about the fact that extractors exist so they don't spend a bunch of time trying to figure out why it doesn't work out of the box

Signed-off-by: Bob Briski <rbriski@gmail.com>

* Removed MARQUEZ_HOST and MARQUEZ_PORT and replaced with MARQUEZ_URL

A note, in the marquez_client code, the DEFAULT_MARQUEZ_URL is http://localhost:8080.  The port at 8080 seems odd since the default port on the marquez server is actually 5000.  Not pertinent to this PR though.

Signed-off-by: Bob Briski <rbriski@gmail.com>

Co-authored-by: Willy Lulciuc <willy@datakin.com>
mobuchowski added a commit to mobuchowski/airflow that referenced this issue Jan 4, 2022
…n if exists, unpin dependencies (apache#52)

* format RC versions compliant to PEP 440, use tag to version if exists, unpin dependencies

Signed-off-by: Maciej Obuchowski <maciej.obuchowski@getindata.com>
rajatsri28 pushed a commit to rajatsri28/airflow that referenced this issue Jan 25, 2022
* EWT-569 : Initial Commit for migrations

* [EWT-569] Airflow Upgrade to 1.10.14, Cherry-Pick  76fe7ac from 1.10.4

* CP Contains fb64f2e: [TWTR][AIRFLOW-XXX] Twitter Airflow Customizations + Fixup job scheduling without explicit_defaults_for_timestamp

* [EWT-569] Airflow Upgrade to 1.10.14, Cherry-Pick 91d2b00
[CP][EWT-548][AIRFLOW-6527] Make send_task_to_executor timeout configurable (apache#63)

* [EWT-569] Airflow Upgrade to 1.10.14, Cherry-Pick 91d2b00
CP contains [EWT-16]: Airflow fix for manual trigger during version upgrade (apache#13)

* [EWT-16]: Airflow fix for manual trigger during version upgrade

* [EWT-569] Airflow Upgrade to 1.10.14, Cherry-Pick 91d2b00
[CP][EWT-548][AIRFLOW-6527] Make send_task_to_executor timeout configurable (apache#63)

CP of f757a54

* CP(55bb579) [AIRFLOW-5597] Linkify urls in task instance log (apache#16)

* [EWT-569] Airflow Upgrade to 1.10.14 [CP] from 1.10.4+twtr : 94cdcf6
[CP] Contains [AIRFLOW-5597] Linkify urls in task instance log

CP of f757a54

* [EWT-569] Airflow Upgrade to 1.10.14, Cherry-Pick  4ce8d4c from 1.10.4
CP contains [TWTTR] Fix for rendering code on UI (apache#34)

* [EWT-569] Airflow Upgrade to 1.10.14, Cherry-Pick  299b4d8 from 1.10.4
CP contains [TWTR] CP from 1.10+twtr (apache#35)

* 99ee040: CP from 1.10+twtr

* 2e01c24: CP from 1.10.4 ([TWTR][AIRFLOW-4939] Fixup use of fallback kwarg in conf.getint)

* 00cb4ae: [TWTR][AIRFLOW-XXXX] Cherry-pick d4a83bc and bump version (apache#21)

* CP 51b1aee: Relax version requiremets (apache#24)

* CP 67a4d1c: [CX-16266] Change with reference to 1a4c164 commit in open source (apache#25)

* CP 54bd095: [TWTR][CX-17516] Queue tasks already being handled by the executor (apache#26)

* CP 87fcc1c: [TWTR][CX-17516] Requeue tasks in the queued state (apache#27)

* CP 98a1ca9: [AIRFLOW-6625] Explicitly log using utf-8 encoding (apache#7247) (apache#31)

* [EWT-569] Airflow Upgrade to 1.10.14 [CP] from 1.10.4+twtr : f7050fb
CP Contains Experiment API path fix (apache#37)

* [EWT-569] Airflow Upgrade to 1.10.14, Cherry-Pick  8a689af from 1.10.4
CP Contains Export scheduler env variable into worker pods. (apache#38)

* [EWT-569] Airflow Upgrade to 1.10.14, Cherry-Pick  5875a15 from 1.10.4
Cp Contains [EWT-115][EWT-118] Initialise dag var to None and fix for DagModel.fileloc (missed in EWT-16) (apache#39)

* [EWT-569] Airflow Upgrade to 1.10.14, Cherry-Pick  a68e2b3 from 1.10.4
[CX-16591] Fix regex to work with impersonated clusters like airflow_scheduler_ddavydov (apache#42)

* [EWT-569] Airflow Upgrade to 1.10.14 [CP] from 1.10.4+twtr : e9642c2
[CP][EWT-128] Fetch task logs from worker pods (19ac45a) (apache#43)

* [EWT-569] Airflow Upgrade to 1.10.14 [CP] from 1.10.4+twtr : d5d0a07
[CP][AIRFLOW-6561][EWT-290]: Adding priority class and default resource for worker pod. (apache#47)

* [EWT-569] Airflow Upgrade to 1.10.14 [CP] from 1.10.4+twtr : 9b58c88
[CP][EWT-302]Patch Pool.DEFAULT_POOL_NAME in BaseOperator (apache#8587) (apache#49)

Open source commit id: b37ce29

* [EWT-569] Airflow Upgrade to 1.10.14 [CP] from 1.10.4+twtr : 7b52a71
[CP][AIRFLOW-3121] Define closed property on StreamLogWriter (apache#3955) (apache#52)

CP of 2d5b8a5

* [EWT-361] Fix broken regex pattern for extracting dataflow job id (apache#51)

Update the dataflow URL regex as per AIRFLOW-9323

* [EWT-569] Airflow Upgrade to 1.10.14 [CP] from 1.10.4+twtr : 4b5b977
EWT-370: Use python3 to launch the dataflow job. (apache#53)

* [EWT-569] Airflow Upgrade to 1.10.14 [CP] from 1.10.4+twtr : 596e24f
* [EWT-450] fixing sla miss triggering duplicate alerts every minute (apache#56)

* [EWT-569] Airflow Upgrade to 1.10.14 [CP] from 1.10.4+twtr : b3d7fb4
[CP] Handle IntegrityErrors for trigger dagruns & add Stacktrace when DagFileProcessorManager gets killed (apache#57)

CP of faaf179 - from master
CP of 2102122 - from 1.10.12

* [EWT-569] Airflow Upgrade to 1.10.14 [CP] from 1.10.4+twtr : bac4acd
[TWTR][EWT-472] Add lifecycle support while launching worker pods (apache#59)

* [EWT-569] Airflow Upgrade to 1.10.14 [CP] from 1.10.4+twtr : 6162402
[TWTTR] Don't enqueue tasks again if already queued for K8sExecutor(apache#60)

Basically reverting commit 87fcc1c  and making changes specifically into the Celery Executor class only.

* [EWT-569] Airflow Upgrade to 1.10.14 [CP] from 1.10.4+twtr : 1991419
[CP][TWTR][EWT-377] Fix DagBag bug when a Dag has invalid schedule_interval (apache#61)

CP of 5605d10 & apache#11462

* [EWT-569] Airflow Upgrade to 1.10.14 [CP] from 1.10.4+twtr : 48be0f9
[TWTR][EWT-350] Reverting the last commit partially (apache#62)

* [EWT-569] Airflow Upgrade to 1.10.14 [CP] from 1.10.4+twtr : d8c473e
[CP][EWT-548][AIRFLOW-6527] Make send_task_to_executor timeout configurable (apache#63)

CP of f757a54
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants