Dag processor manager queue split (fixes SLAs) #27317
Closed
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
The "I'm determined to fix SLAs" PR
(Look familiar? This was originally opened as #25489, but at some point during a rebase, the PR got closed, and I couldn't persuade github that I really did have some commits in the branch...)
OK, so #25147 made a start in this direction. Summing up the, er, summary from that MR, the problem was that SLA callbacks could keep occuring, and prevent the dag processor manager from ever processing more than 2 or 3 dags in the queue before the SLA callbacks re-upped and went to the front of the queue.
Under the new behaviour, the SLA callbacks went to the back of the queue. This guaranteed that the queue would be processed at least once. However, it turns out that dags on disk would only be re-added to the queue once the queue was empty. But with SLA callbacks arriving All. The. Time. the queue would never drain, and we'd never re-read dags from disk. So if you updated the dag file, you'd have to bounce the scheduler to pick up the change, and then it would process all non-SLA-generating DAGs exactly once. And then you'd need to bounce again.
Related Issues
Closes #15596 (I hope!)
I've almost certainly missed some steps.
Before I go into a bit more detail about the change, I'd like to acknowledge that as a (very) small time contributor to the project, I'm not familiar with all the done things when making more radical changes. In particular, I assume there's more doc changes needed than just a newsfragment. I've added a config flag, some metrics, and the behaviour of the queue processing has subtly changed (for the better I hope!)
I'd very much appreciate someone(s) leaving a comment 👇 telling me what else I need to do in terms of docs etc.
TL;DR
I mean, I see your point. Skip to the bottom for the summary.
Pay attention, here comes the science bit
Ok, so to briefly recap the queue behaviour prior to this change:
dag_dir_list_interval
). It does not require that the queue be empty. Any changes to the set of files on disk e.g. dags being deleted will cause the manager to remove dags from the queue.min_file_process_interval
), so if your dag files only take 10 seconds to process, there will be 20 seconds of idle time, but if your dag files take a minute to process, then the manager will be permanently busy, because as soon as the queue drains, it'll be well past time to reload the queue from disk.Locally, I tested a hacky fix whereby on receipt of a SLA callback I still add the callback, but I simply didn't add the dag to the queue (it's a one-line change - extremely simple!). This works, but means that SLA callbacks are only processed when the queue drains, and is reprocessed (because then all dags are added to the queue, guaranteeing that we will process any outstanding SLA callbacks). However, If someone has specified a large wait time between loading dags from disk, this will affect how timely the SLA alerts are, which is fine for me, because I don't do that, but I wasn't getting a "this will be fine for everyone" vibe from the change (I did say it was hacky!).
Also, there's another catch. While the problem is much more prevalent with SLAs, these are not the only callbacks. I could envisage a situation where someone configures a dag with a very small interval (e.g. a dag run every 10 seconds). While I think this is a much more theoretical problem that might not ever exist in the wild - this isn't really the use case Airflow is intended for - the upshot would be that a dag generating lots of dag callbacks would be spamming the queue. And those callbacks still go to the front of the queue, i.e. you're back to the situation I tried to solve in the previous MR!
I don't think that word means what you think it means
I decided that fundamentally, part of the the problem was that the queue should be FIFO. And it wasn't. But if I made it FIFO, then the higher priority DAG callbacks would have to wait their turn behind the dag files loaded from disk, and I'm pretty sure stopping that would eliminate some of the speed-ups Airflow 2 was trumpeted as solving. Airflow 1 used to have on average a 15 second gap (= 30/2) between one task completing and the downstream tasks being scheduled, because once the task completed, you had to wait for the manager to drain the queue, add the files to the queue from disk, and then process the dag. (And that's assuming you could even process all your dags in <30 seconds...). In Airflow 2, because of the dag callbacks, the gap between downstream tasks being scheduled is usually sub-second.
I didn't want to be the guy who accidentally breaks that particular speed up. 😱
So I did two things:
Thing 1: Tackling the FIFO issue aka gazumping callbacks.
max_file_process_interval
and it's the dual to the existingmin_file_process_interval
. It guarantees that if you do happen to have a permanently busy priority queue, eventually we'll take a breather, and process the files on disk anyway.Thing 2: Handling the fact that SLAs stop the standard queue from ever being empty.
set
which tracked which dag files in the queue were still outstanding from the last refresh from disk. Once the queue was refreshed from disk, we'd work through every file eventually (because FIFO), and at that point the set would be empty, even if the queue wasn't (because SLAs).Notes
This doesn't materially change how SLAs work; they are generated and consumed the same as before. It just means that we reliably consume the alerts once generated without breaking the rest of the system. In my experience at least, adding SLAs would simply cause the system to stop processing dag updates (as per #15596).
In particular, I don't address issues with SLA timestamps (as raised by #22532), nor do I deal with other problems (e.g. now SLAs fire reliably, I have noticed that they fire during catch-up, and that the same alert can fire multiple times).
This is not because I think the current approach is perfect (like everyone else on the internet, I Have Thoughts on how it could be improved, given infinite time) but rather it is a sad-but-true fact that I don't have the time to take on a big project. So I am going to continue with my current approach of tinkering round the edges; and if the remaining issues are minor enough to live with, I'm just going to live with them (at least for now).
Summary: I'm from the UK; politely waiting in line is what we do best.
_file_path_queue
to_std_file_path_queue
(for SLAs and dags loaded from disk) andpriority_file_path_queue
(for DAG call backs)max_file_process_interval
config to ensure files are read from disk every so often, even if the priority queue is always busy