Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MySQL Not Using Correct Index for Scheduler Critical Section Query #25627

Closed
2 tasks done
michaelmicheal opened this issue Aug 9, 2022 · 4 comments · Fixed by #25673
Closed
2 tasks done

MySQL Not Using Correct Index for Scheduler Critical Section Query #25627

michaelmicheal opened this issue Aug 9, 2022 · 4 comments · Fixed by #25673
Labels
area:core kind:bug This is a clearly a bug

Comments

@michaelmicheal
Copy link
Contributor

Apache Airflow version

Other Airflow 2 version

What happened

Airflow Version: 2.2.5
MySQL Version: 8.0.18

In the Scheduler, we are coming across instances where MySQL is inefficiently optimizing the critical section task queuing query. When a large number of task instances are scheduled, MySQL failing to use the ti_state index to filter the task_instance table, resulting in a full table scan (about 7.3 million rows).

Normally, when running the critical section query the index on task_instance.state is used to filter scheduled task_instances.

| -> Limit: 512 row(s)  (actual time=5.290..5.413 rows=205 loops=1)
    -> Sort row IDs: <temporary>.tmp_field_0, <temporary>.execution_date, limit input to 512 row(s) per chunk  (actual time=5.289..5.391 rows=205 loops=1)
        -> Table scan on <temporary>  (actual time=0.003..0.113 rows=205 loops=1)
            -> Temporary table  (actual time=5.107..5.236 rows=205 loops=1)
                -> Nested loop inner join  (cost=20251.99 rows=1741) (actual time=0.100..4.242 rows=205 loops=1)
                    -> Nested loop inner join  (cost=161.70 rows=12) (actual time=0.071..2.436 rows=205 loops=1)
                        -> Index lookup on task_instance using ti_state (state='scheduled')  (cost=80.85 rows=231) (actual time=0.051..1.992 rows=222 loops=1)
                        -> Filter: ((dag_run.run_type <> 'backfill') and (dag_run.state = 'running'))  (cost=0.25 rows=0) (actual time=0.002..0.002 rows=1 loops=222)
                            -> Single-row index lookup on dag_run using dag_run_dag_id_run_id_key (dag_id=task_instance.dag_id, run_id=task_instance.run_id)  (cost=0.25 rows=1) (actual time=0.001..0.001 rows=1 loops=222)
                    -> Filter: ((dag.is_paused = 0) and (task_instance.dag_id = dag.dag_id))  (cost=233.52 rows=151) (actual time=0.008..0.008 rows=1 loops=205)
                        -> Index range scan on dag (re-planned for each iteration)  (cost=233.52 rows=15072) (actual time=0.008..0.008 rows=1 loops=205)
1 row in set, 1 warning (0.03 sec)

When a large number of task_instances are in scheduled state at the same time, the index on task_instance.state is not being used to filter scheduled task_instances.

| -> Limit: 512 row(s)  (actual time=12110.251..12110.573 rows=512 loops=1)
    -> Sort row IDs: <temporary>.tmp_field_0, <temporary>.execution_date, limit input to 512 row(s) per chunk  (actual time=12110.250..12110.526 rows=512 loops=1)
        -> Table scan on <temporary>  (actual time=0.005..0.800 rows=1176 loops=1)
            -> Temporary table  (actual time=12109.022..12109.940 rows=1176 loops=1)
                -> Nested loop inner join  (cost=10807.83 rows=3) (actual time=1.328..12097.528 rows=1176 loops=1)
                    -> Nested loop inner join  (cost=10785.34 rows=64) (actual time=1.293..12084.371 rows=1193 loops=1)
                        -> Filter: (dag.is_paused = 0)  (cost=1371.40 rows=1285) (actual time=0.087..22.409 rows=13264 loops=1)
                            -> Table scan on dag  (cost=1371.40 rows=12854) (actual time=0.085..15.796 rows=13508 loops=1)
                        -> Filter: ((task_instance.state = 'scheduled') and (task_instance.dag_id = dag.dag_id))  (cost=0.32 rows=0) (actual time=0.907..0.909 rows=0 loops=13264)
                            -> Index lookup on task_instance using PRIMARY (dag_id=dag.dag_id)  (cost=0.32 rows=70) (actual time=0.009..0.845 rows=553 loops=13264)
                    -> Filter: ((dag_run.run_type <> 'backfill') and (dag_run.state = 'running'))  (cost=0.25 rows=0) (actual time=0.010..0.011 rows=1 loops=1193)
                        -> Single-row index lookup on dag_run using dag_run_dag_id_run_id_key (dag_id=task_instance.dag_id, run_id=task_instance.run_id)  (cost=0.25 rows=1) (actual time=0.009..0.010 rows=1 loops=1193)

1 row in set, 1 warning (12.14 sec)

What you think should happen instead

To resolve this, I added a patch on the scheduler_job.py file, adding a MySQL index hint to use the ti_state index.

--- /usr/local/lib/python3.9/site-packages/airflow/jobs/scheduler_job.py
+++ /usr/local/lib/python3.9/site-packages/airflow/jobs/scheduler_job.py
@@ -293,6 +293,7 @@ class SchedulerJob(BaseJob):
             # and the dag is not paused
             query = (
                 session.query(TI)
+                .with_hint(TI, 'USE INDEX (ti_state)', dialect_name='mysql')
                 .join(TI.dag_run)
                 .filter(DR.run_type != DagRunType.BACKFILL_JOB, DR.state == DagRunState.RUNNING)
                 .join(TI.dag_model)

I think it makes sense to add this index hint upstream.

How to reproduce

Schedule a large number of dag runs and tasks in a short period of time.

Operating System

Debian GNU/Linux 10 (buster)

Versions of Apache Airflow Providers

No response

Deployment

Other 3rd-party Helm chart

Deployment details

Airflow 2.2.5 on Kubernetes
MySQL Version: 8.0.18

Anything else

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

@michaelmicheal michaelmicheal added area:core kind:bug This is a clearly a bug labels Aug 9, 2022
@uranusjr
Copy link
Member

Is this specific to MySQL?

@j-martin
Copy link
Contributor

j-martin commented Aug 10, 2022

Yes it is. @michaelmicheal can you update the title to make it clearer we are talking about MySQL.

The questions for the airflow maintainers, would you welcome a PR to add the index hint when using mysql?

# Pseudo code
if mysql:
   query = query.with_hint(TI, 'USE INDEX (ti_state)', dialect_name='mysql')

@michaelmicheal michaelmicheal changed the title Scheduler Critical Section Query Not Using Correct Index MySQL Not Using Correct Index for Scheduler Critical Section Query Aug 10, 2022
@uranusjr
Copy link
Member

I don’t think you need the if mysql part (just adding the hint globally and the dialect_name part would make it only apply for MySQL), but yes a PR would be welcomed.

@j-martin
Copy link
Contributor

Adding the hint globally and the dialect_name part would make it only apply for MySQL

Yes, you are right!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area:core kind:bug This is a clearly a bug
Projects
None yet
3 participants