Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Airflow Hybrid Executor have issue where tasks are rescheduled but actually running #42151

Closed
1 of 2 tasks
iw-pavan opened this issue Sep 11, 2024 · 1 comment · Fixed by #43003
Closed
1 of 2 tasks
Labels
area:hybrid-executors AIP-61 area:providers good first issue kind:bug This is a clearly a bug provider:cncf-kubernetes Kubernetes provider related issues

Comments

@iw-pavan
Copy link
Contributor

Apache Airflow version

2.10.0

If "Other Airflow 2 version" selected, which one?

No response

What happened?

There is intermittent issue in hybrid executor where task is queued multiple times killing original execution and workflow runs goes into failed state.

From logs below it queues task to celery executor which is correct behaviour, but after few seconds there is log

default-orchestrator-scheduler-56cb74fc7f-ks76d-scheduler.log:[2024-09-11T07:14:18.791+0000] {kubernetes_executor.py:273} INFO - TaskInstance: <TaskInstance: 66cdfab5b3453e031689fbae.PB_VK39 66e1407de57be85bc875c6b0 [queued]> found in queued state but was not launched, rescheduling

Which seems wrong as executor for above task is CeleryExecutor but "clear_not_launched_queued_tasks" func was executed on kubernetes executor

default-orchestrator-scheduler-56cb74fc7f-ks76d-scheduler.log:[2024-09-11T07:14:07.760+0000] {scheduler_job_runner.py:736} INFO - Trying to enqueue tasks: [<TaskInstance: 66cdf9e9b3453e031689fa09.PB_VK39 66e13e1a9b94be0771f7ff20 [scheduled]>, <TaskInstance: 66cee622d8d3b11ea97d23ea.PB_VK39 66e13eb79b94be0771f7ff92 [scheduled]>, <TaskInstance: 66cef62d21a3cc6d397f816d.SI_Y9VG 66e1407de57be85bc875c6ac [scheduled]>, <TaskInstance: 66cdfab5b3453e031689fbae.SI_MDXZ 66e1407de57be85bc875c6b0 [scheduled]>] for executor: CeleryExecutor(parallelism=200) default-orchestrator-scheduler-56cb74fc7f-ks76d-scheduler.log: <TaskInstance: 66cdfab5b3453e031689fbae.PB_VK39 66e1407de57be85bc875c6b0 [scheduled]> default-orchestrator-scheduler-56cb74fc7f-ks76d-scheduler.log: <TaskInstance: 66cdfab5b3453e031689fbae.PB_VK39 66e1407de57be85bc875c6b0 [scheduled]> default-orchestrator-scheduler-56cb74fc7f-ks76d-scheduler.log:[2024-09-11T07:14:15.300+0000] {scheduler_job_runner.py:736} INFO - Trying to enqueue tasks: [<TaskInstance: 66cdfa3c407cee140c3ce50a.SI_XAFG 66e13f1de57be85bc875c52f [scheduled]>, <TaskInstance: 66cdfab5b3453e031689fbae.PB_VK39 66e1407de57be85bc875c6b0 [scheduled]>, <TaskInstance: 66cdfab84750a813ec928598.SI_KXFL 66e14089e57be85bc875c6bd [scheduled]>] for executor: CeleryExecutor(parallelism=200) default-orchestrator-scheduler-56cb74fc7f-ks76d-scheduler.log:[2024-09-11T07:14:15.301+0000] {scheduler_job_runner.py:680} INFO - Sending TaskInstanceKey(dag_id='66cdfab5b3453e031689fbae', task_id='PB_VK39', run_id='66e1407de57be85bc875c6b0', try_number=1, map_index=-1) to CeleryExecutor with priority 1 and queue default default-orchestrator-scheduler-56cb74fc7f-ks76d-scheduler.log:[2024-09-11T07:14:15.301+0000] {base_executor.py:168} INFO - Adding to queue: ['airflow', 'tasks', 'run', '66cdfab5b3453e031689fbae', 'PB_VK39', '66e1407de57be85bc875c6b0', '--local', '--subdir', 'DAGS_FOLDER/66cdfab5b3453e031689fbae.py'] default-orchestrator-scheduler-56cb74fc7f-ks76d-scheduler.log:[2024-09-11T07:14:16.921+0000] {scheduler_job_runner.py:764} INFO - Received executor event with state queued for task instance TaskInstanceKey(dag_id='66cdfab5b3453e031689fbae', task_id='PB_VK39', run_id='66e1407de57be85bc875c6b0', try_number=1, map_index=-1) default-orchestrator-scheduler-56cb74fc7f-ks76d-scheduler.log:[2024-09-11T07:14:16.950+0000] {scheduler_job_runner.py:791} INFO - Setting external_id for <TaskInstance: 66cdfab5b3453e031689fbae.PB_VK39 66e1407de57be85bc875c6b0 [queued]> to 2f92c5d8-923d-43b7-811c-17b5986411d8 default-orchestrator-scheduler-56cb74fc7f-ks76d-scheduler.log:[2024-09-11T07:14:18.791+0000] {kubernetes_executor.py:273} INFO - TaskInstance: <TaskInstance: 66cdfab5b3453e031689fbae.PB_VK39 66e1407de57be85bc875c6b0 [queued]> found in queued state but was not launched, rescheduling default-orchestrator-scheduler-56cb74fc7f-ks76d-scheduler.log: <TaskInstance: 66cdfab5b3453e031689fbae.PB_VK39 66e1407de57be85bc875c6b0 [scheduled]> default-orchestrator-scheduler-56cb74fc7f-ks76d-scheduler.log: <TaskInstance: 66cdfab5b3453e031689fbae.PB_VK39 66e1407de57be85bc875c6b0 [scheduled]> default-orchestrator-scheduler-56cb74fc7f-ks76d-scheduler.log:[2024-09-11T07:14:19.827+0000] {scheduler_job_runner.py:736} INFO - Trying to enqueue tasks: [<TaskInstance: 66cef56ed8d3b11ea97d269b.PB_VK39 66e13dede57be85bc875c3e1 [scheduled]>, <TaskInstance: 66cdfa3c407cee140c3ce50a.SI_XAFG 66e13f1de57be85bc875c52f [scheduled]>, <TaskInstance: 66cef631fe5e403e994538d8.PB_VK39 66e140649b94be0771f801f5 [scheduled]>, <TaskInstance: 66cefe18fe5e403e99453aee.SI_CUY5 66e1406fbbe1e2215bc5ef41 [scheduled]>, <TaskInstance: 66cdfab5b3453e031689fbae.PB_VK39 66e1407de57be85bc875c6b0 [scheduled]>] for executor: CeleryExecutor(parallelism=200)

What you think should happen instead?

No response

How to reproduce

I am using HybridExecutor with Celery,Kubernetes executor.

Operating System

Linux

Versions of Apache Airflow Providers

Celery, Kubernetes

Deployment

Other Docker-based deployment

Deployment details

Kubernetes setup.

Anything else?

This task is not provided any executor so None is passed in executor value so First CeleryExecutor is used.

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

@iw-pavan iw-pavan added area:core kind:bug This is a clearly a bug needs-triage label for new issues that we didn't triage yet labels Sep 11, 2024
@iw-pavan
Copy link
Contributor Author

Seems query in kubernetes executor to check not launched task need to be updated with executor

query = select(TaskInstance).where(
TaskInstance.state == TaskInstanceState.QUEUED, TaskInstance.queued_by_job_id == self.job_id
)
if self.kubernetes_queue:
query = query.where(TaskInstance.queue == self.kubernetes_queue)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area:hybrid-executors AIP-61 area:providers good first issue kind:bug This is a clearly a bug provider:cncf-kubernetes Kubernetes provider related issues
Projects
None yet
3 participants