Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Tasks intermittently gets terminated with SIGTERM on kubernetes executor #18041

Closed
2 tasks done
Nimesh-K-Makwana opened this issue Sep 6, 2021 · 71 comments
Closed
2 tasks done
Labels
area:core kind:bug This is a clearly a bug

Comments

@Nimesh-K-Makwana
Copy link

Nimesh-K-Makwana commented Sep 6, 2021

Apache Airflow version

2.1.3 (latest released)

Operating System

Linux

Versions of Apache Airflow Providers

No response

Deployment

Other

Deployment details

Have tried env variables as given in this github issue issues/14672:
AIRFLOW__CORE__KILLED_TASK_CLEANUP_TIME: "604800"
AIRFLOW__SCHEDULER__SCHEDULE_AFTER_TASK_EXECUTION: "False"

What happened

[2021-09-04 10:28:50,536] {local_task_job.py:80} ERROR - Received SIGTERM. Terminating subprocesses
[2021-09-04 10:28:50,536] {process_utils.py:100} INFO - Sending Signals.SIGTERM to GPID 33
[2021-09-04 10:28:50,537] {taskinstance.py:1235} ERROR - Received SIGTERM. Terminating subprocesses.
[2021-09-04 10:28:52,568] {taskinstance.py:1462} ERROR - Task failed with exception
Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1164, in _run_raw_task
self._prepare_and_execute_task_with_callbacks(context, task)
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1282, in _prepare_and_execute_task_with_callbacks
result = self._execute_task(context, task_copy)
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1307, in _execute_task
result = task_copy.execute(context=context)
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/operators/python.py", line 150, in execute
return_value = self.execute_callable()
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/operators/python.py", line 161, in execute_callable
return self.python_callable(*self.op_args, **self.op_kwargs)
File "/opt/airflow/dags/repo/dags/elastit_schedular/waiting_task_processor.py", line 59, in trigger_task
time.sleep(1)
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1237, in signal_handler
raise AirflowException("Task received SIGTERM signal")
airflow.exceptions.AirflowException: Task received SIGTERM signal

What you expected to happen

Dag must get executed successfully without any sigterm signal.

How to reproduce

No response

Anything else

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

@Nimesh-K-Makwana Nimesh-K-Makwana added area:core kind:bug This is a clearly a bug labels Sep 6, 2021
@boring-cyborg
Copy link

boring-cyborg bot commented Sep 6, 2021

Thanks for opening your first issue here! Be sure to follow the issue template!

@ephraimbuddy
Copy link
Contributor

Please provide the full task logs and also the scheduler logs. What you added above is only the part where the task failed but I believe something must have caused it to receive the sigterm

@Nimesh-K-Makwana
Copy link
Author

Nimesh-K-Makwana commented Sep 6, 2021

Seems to be failing while waiting or when they are time-consuming (long-running).
Providing the latest logs for a failed task due to sigterm :

Task logs :

[2021-09-06 07:20:33,448] {taskinstance.py:1107} INFO - Executing <Task(PythonOperator): Wait_for_creation> on 2021-09-06T07:10:00+00:00
[2021-09-06 07:20:33,451] {standard_task_runner.py:52} INFO - Started process 42 to run task
[2021-09-06 07:20:33,453] {standard_task_runner.py:76} INFO - Running: ['airflow', 'tasks', 'run', 'backup_lms_rolling', 'Wait_for_creation', '2021-09-06T07:10:00+00:00', '--job-id', '92518', '--pool', 'default_pool', '--raw', '--subdir', 'DAGS_FOLDER/backup/backup_lms_rolling.py', '--cfg-path', '/tmp/tmpmtkv677x', '--error-file', '/tmp/tmp3ha9ijqx']
[2021-09-06 07:20:33,454] {standard_task_runner.py:77} INFO - Job 92518: Subtask Wait_for_creation
[2021-09-06 07:20:33,598] {logging_mixin.py:104} INFO - Running <TaskInstance: backup_lms_rolling.Wait_for_creation 2021-09-06T07:10:00+00:00 [running]> on host backuplmsrollingwaitforcreation.da04397cb65044febf65e727c16fd08
[2021-09-06 07:20:33,858] {taskinstance.py:1300} INFO - Exporting the following env vars:
AIRFLOW_CTX_DAG_OWNER=Incred
AIRFLOW_CTX_DAG_ID=backup_lms_rolling
AIRFLOW_CTX_TASK_ID=Wait_for_creation
AIRFLOW_CTX_EXECUTION_DATE=2021-09-06T07:10:00+00:00
AIRFLOW_CTX_DAG_RUN_ID=scheduled__2021-09-06T07:10:00+00:00
[2021-09-06 07:20:33,872] {credentials.py:1100} INFO - Found credentials in environment variables.
[2021-09-06 07:24:00,469] {local_task_job.py:76} ERROR - Received SIGTERM. Terminating subprocesses
[2021-09-06 07:24:00,470] {process_utils.py:100} INFO - Sending Signals.SIGTERM to GPID 42
[2021-09-06 07:24:00,470] {taskinstance.py:1284} ERROR - Received SIGTERM. Terminating subprocesses.
[2021-09-06 07:24:00,555] {taskinstance.py:1501} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1157, in _run_raw_task
    self._prepare_and_execute_task_with_callbacks(context, task)
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1331, in _prepare_and_execute_task_with_callbacks
    result = self._execute_task(context, task_copy)
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1361, in _execute_task
    result = task_copy.execute(context=context)
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/operators/python.py", line 150, in execute
    return_value = self.execute_callable()
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/operators/python.py", line 161, in execute_callable
    return self.python_callable(*self.op_args, **self.op_kwargs)
  File "/opt/airflow/dags/repo/dags/backup/backup_lms_rolling.py", line 103, in wait_for_creation
    time.sleep(30)
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1286, in signal_handler
    raise AirflowException("Task received SIGTERM signal")
airflow.exceptions.AirflowException: Task received SIGTERM signal
[2021-09-06 07:24:00,556] {taskinstance.py:1544} INFO - Marking task as UP_FOR_RETRY. dag_id=backup_lms_rolling, task_id=Wait_for_creation, execution_date=20210906T071000, start_date=20210906T072033, end_date=20210906T072400
[2021-09-06 07:24:00,682] {process_utils.py:66} INFO - Process psutil.Process(pid=42, status='terminated', exitcode=1, started='07:20:32') (42) terminated with exit code 1

Scheduler logs:

Sep 6, 2021 @ 12:50:20.831 | [[34m2021-09-06 07:20:20,831[0m] {[34mkubernetes_executor.py:[0m147} INFO[0m - Event: backuplmsrollingwaitforcreation.da04397cb65044febf65e727c16fd08e had an event of type ADDED[0m
Sep 6, 2021 @ 12:50:20.831 | [[34m2021-09-06 07:20:20,831[0m] {[34mkubernetes_executor.py:[0m200} INFO[0m - Event: backuplmsrollingwaitforcreation.da04397cb65044febf65e727c16fd08e Pending[0m
Sep 6, 2021 @ 12:50:20.844 | [[34m2021-09-06 07:20:20,844[0m] {[34mkubernetes_executor.py:[0m147} INFO[0m - Event: backuplmsrollingwaitforcreation.da04397cb65044febf65e727c16fd08e had an event of type MODIFIED[0m
Sep 6, 2021 @ 12:50:20.844 | [[34m2021-09-06 07:20:20,844[0m] {[34mkubernetes_executor.py:[0m200} INFO[0m - Event: backuplmsrollingwaitforcreation.da04397cb65044febf65e727c16fd08e Pending[0m
Sep 6, 2021 @ 12:50:20.983 | [[34m2021-09-06 07:20:20,983[0m] {[34mkubernetes_executor.py:[0m147} INFO[0m - Event: backuplmsrollingwaitforcreation.da04397cb65044febf65e727c16fd08e had an event of type MODIFIED[0m
Sep 6, 2021 @ 12:50:20.983 | [[34m2021-09-06 07:20:20,983[0m] {[34mkubernetes_executor.py:[0m200} INFO[0m - Event: backuplmsrollingwaitforcreation.da04397cb65044febf65e727c16fd08e Pending[0m
Sep 6, 2021 @ 12:50:24.181 | [[34m2021-09-06 07:20:24,181[0m] {[34mkubernetes_executor.py:[0m147} INFO[0m - Event: backuplmsrollingwaitforcreation.da04397cb65044febf65e727c16fd08e had an event of type MODIFIED[0m
Sep 6, 2021 @ 12:50:24.181 | [[34m2021-09-06 07:20:24,181[0m] {[34mkubernetes_executor.py:[0m200} INFO[0m - Event: backuplmsrollingwaitforcreation.da04397cb65044febf65e727c16fd08e Pending[0m
Sep 6, 2021 @ 12:50:28.163 | [[34m2021-09-06 07:20:28,163[0m] {[34mkubernetes_executor.py:[0m147} INFO[0m - Event: backuplmsrollingwaitforcreation.da04397cb65044febf65e727c16fd08e had an event of type MODIFIED[0m
Sep 6, 2021 @ 12:50:28.163 | [[34m2021-09-06 07:20:28,163[0m] {[34mkubernetes_executor.py:[0m200} INFO[0m - Event: backuplmsrollingwaitforcreation.da04397cb65044febf65e727c16fd08e Pending[0m
Sep 6, 2021 @ 12:50:29.242 | [[34m2021-09-06 07:20:29,242[0m] {[34mkubernetes_executor.py:[0m147} INFO[0m - Event: backuplmsrollingwaitforcreation.da04397cb65044febf65e727c16fd08e had an event of type MODIFIED[0m
Sep 6, 2021 @ 12:50:29.242 | [[34m2021-09-06 07:20:29,242[0m] {[34mkubernetes_executor.py:[0m208} INFO[0m - Event: backuplmsrollingwaitforcreation.da04397cb65044febf65e727c16fd08e is Running[0m
Sep 6, 2021 @ 12:54:00.461 | [[34m2021-09-06 07:24:00,461[0m] {[34mkubernetes_executor.py:[0m147} INFO[0m - Event: backuplmsrollingwaitforcreation.da04397cb65044febf65e727c16fd08e had an event of type MODIFIED[0m
Sep 6, 2021 @ 12:54:00.461 | [[34m2021-09-06 07:24:00,461[0m] {[34mkubernetes_executor.py:[0m208} INFO[0m - Event: backuplmsrollingwaitforcreation.da04397cb65044febf65e727c16fd08e is Running[0m
Sep 6, 2021 @ 12:54:03.089 | [[34m2021-09-06 07:24:03,089[0m] {[34mkubernetes_executor.py:[0m147} INFO[0m - Event: backuplmsrollingwaitforcreation.da04397cb65044febf65e727c16fd08e had an event of type MODIFIED[0m
Sep 6, 2021 @ 12:54:03.089 | [[34m2021-09-06 07:24:03,089[0m] {[34mkubernetes_executor.py:[0m202} ERROR[0m - Event: backuplmsrollingwaitforcreation.da04397cb65044febf65e727c16fd08e Failed[0m
Sep 6, 2021 @ 12:54:04.217 | [[34m2021-09-06 07:24:04,217[0m] {[34mkubernetes_executor.py:[0m368} INFO[0m - Attempting to finish pod; pod_id: backuplmsrollingwaitforcreation.da04397cb65044febf65e727c16fd08e; state: failed; annotations: {'dag_id': 'backup_lms_rolling', 'task_id': 'Wait_for_creation', 'execution_date': '2021-09-06T07:10:00+00:00', 'try_number': '1'}[0m
Sep 6, 2021 @ 12:54:04.218 | [[34m2021-09-06 07:24:04,218[0m] {[34mkubernetes_executor.py:[0m546} INFO[0m - Changing state of (TaskInstanceKey(dag_id='backup_lms_rolling', task_id='Wait_for_creation', execution_date=datetime.datetime(2021, 9, 6, 7, 10, tzinfo=tzlocal()), try_number=1), 'failed', 'backuplmsrollingwaitforcreation.da04397cb65044febf65e727c16fd08e', 'airflow-2x', '107854273') to failed[0m
Sep 6, 2021 @ 12:54:10.689 | [[34m2021-09-06 07:24:10,688[0m] {[34mkubernetes_executor.py:[0m147} INFO[0m - Event: backuplmsrollingwaitforcreation.da04397cb65044febf65e727c16fd08e had an event of type MODIFIED[0m
Sep 6, 2021 @ 12:54:10.689 | [[34m2021-09-06 07:24:10,689[0m] {[34mkubernetes_executor.py:[0m202} ERROR[0m - Event: backuplmsrollingwaitforcreation.da04397cb65044febf65e727c16fd08e Failed[0m
Sep 6, 2021 @ 12:54:10.744 | [[34m2021-09-06 07:24:10,743[0m] {[34mkubernetes_executor.py:[0m147} INFO[0m - Event: backuplmsrollingwaitforcreation.da04397cb65044febf65e727c16fd08e had an event of type DELETED[0m
Sep 6, 2021 @ 12:54:10.744 | [[34m2021-09-06 07:24:10,744[0m] {[34mkubernetes_executor.py:[0m202} ERROR[0m - Event: backuplmsrollingwaitforcreation.da04397cb65044febf65e727c16fd08e Failed[0m
Sep 6, 2021 @ 12:54:11.091 | [[34m2021-09-06 07:24:11,091[0m] {[34mkubernetes_executor.py:[0m368} INFO[0m - Attempting to finish pod; pod_id: backuplmsrollingwaitforcreation.da04397cb65044febf65e727c16fd08e; state: failed; annotations: {'dag_id': 'backup_lms_rolling', 'task_id': 'Wait_for_creation', 'execution_date': '2021-09-06T07:10:00+00:00', 'try_number': '1'}[0m
Sep 6, 2021 @ 12:54:11.091 | [[34m2021-09-06 07:24:11,091[0m] {[34mkubernetes_executor.py:[0m368} INFO[0m - Attempting to finish pod; pod_id: backuplmsrollingwaitforcreation.da04397cb65044febf65e727c16fd08e; state: failed; annotations: {'dag_id': 'backup_lms_rolling', 'task_id': 'Wait_for_creation', 'execution_date': '2021-09-06T07:10:00+00:00', 'try_number': '1'}[0m
Sep 6, 2021 @ 12:54:11.092 | [[34m2021-09-06 07:24:11,092[0m] {[34mkubernetes_executor.py:[0m546} INFO[0m - Changing state of (TaskInstanceKey(dag_id='backup_lms_rolling', task_id='Wait_for_creation', execution_date=datetime.datetime(2021, 9, 6, 7, 10, tzinfo=tzlocal()), try_number=1), 'failed', 'backuplmsrollingwaitforcreation.da04397cb65044febf65e727c16fd08e', 'airflow-2x', '107854343') to failed[0m
Sep 6, 2021 @ 12:54:11.093 | [[34m2021-09-06 07:24:11,092[0m] {[34mkubernetes_executor.py:[0m546} INFO[0m - Changing state of (TaskInstanceKey(dag_id='backup_lms_rolling', task_id='Wait_for_creation', execution_date=datetime.datetime(2021, 9, 6, 7, 10, tzinfo=tzlocal()), try_number=1), 'failed', 'backuplmsrollingwaitforcreation.da04397cb65044febf65e727c16fd08e', 'airflow-2x', '107854344') to failed[0m

@ephraimbuddy
Copy link
Contributor

For the task to receive sigterm means something is killing your pods. Task runner receives SIGTERM when Pod is deleted. Can you check if something else is deleting your pods?

@Nimesh-K-Makwana
Copy link
Author

True, when a pod gets deleted they are getting SIGTERM. I have tried to get the cause of pods getting deleted, but could not find any reason for it yet. It happens just randomly.

@easontm
Copy link
Contributor

easontm commented Sep 7, 2021

Is your DAG paused?

@Nimesh-K-Makwana
Copy link
Author

DAG is in ON state.

@ephraimbuddy
Copy link
Contributor

@Nimesh-K-Makwana can you set delete_worker_pods to false and observe this issue, also describe the pod to see if everything is correct

@laserpedro
Copy link

laserpedro commented Sep 7, 2021

Hello,

I am facing the same issue:
airflow 2.1.3 (tested also with 2.1.2, 2.1.1)
backend: postgresql
executor: LocalExecutor
unixname: airflow
task default user (run_as_user) = airflow

I have modified the variables killed_task_cleanup_time and schedule_after_task_execution to resp. 100000 and False.
I have also installed airflow as non root user and set the default run_as_user to be airflow.
I have tried to remove the task instances to start from scratch for tasks and also dag runs (no rows in the db).

My tasks are getting constantly killed in backfill mode with the traceback:

[2021-09-07 10:21:20,185] {process_utils.py:100} INFO - Sending Signals.SIGTERM to GPID 73011

Honestly, I am a bit discouraged at this point, could you help me please ? tks

tasks logs:

Reading local file: /home/airflow/airflow/logs/import_forex_unwind_trades/detect_book_referential/2021-08-05T00:00:00+00:00/28.log
[2021-09-07 11:18:19,826] {taskinstance.py:896} INFO - Dependencies all met for <TaskInstance: import_forex_unwind_trades.detect_book_referential 2021-08-05T00:00:00+00:00 [queued]>
[2021-09-07 11:18:21,644] {taskinstance.py:896} INFO - Dependencies all met for <TaskInstance: import_forex_unwind_trades.detect_book_referential 2021-08-05T00:00:00+00:00 [queued]>
[2021-09-07 11:18:21,644] {taskinstance.py:1087} INFO - 
--------------------------------------------------------------------------------
[2021-09-07 11:18:21,644] {taskinstance.py:1088} INFO - Starting attempt 28 of 28
[2021-09-07 11:18:21,644] {taskinstance.py:1089} INFO - 
--------------------------------------------------------------------------------
[2021-09-07 11:18:21,661] {taskinstance.py:1107} INFO - Executing <Task(HttpSensor): detect_book_referential> on 2021-08-05T00:00:00+00:00
[2021-09-07 11:18:21,662] {base_task_runner.py:133} INFO - Running on host: gvasrv-airflow
[2021-09-07 11:18:21,662] {base_task_runner.py:134} INFO - Running: ['airflow', 'tasks', 'run', 'import_forex_unwind_trades', 'detect_book_referential', '2021-08-05T00:00:00+00:00', '--job-id', '723490', '--pool', 'default_pool', '--raw', '--subdir', 'DAGS_FOLDER/import_fx_unwind_trades.py', '--cfg-path', '/tmp/tmpe43x01zl', '--error-file', '/tmp/tmpohmu9qb7']
[2021-09-07 11:18:24,493] {base_task_runner.py:118} INFO - Job 723490: Subtask detect_book_referential [2021-09-07 11:18:24,492] {dagbag.py:496} INFO - Filling up the DagBag from /home/airflow/airflow/dags/import_fx_unwind_trades.py
[2021-09-07 11:18:26,045] {base_task_runner.py:118} INFO - Job 723490: Subtask detect_book_referential /home/airflow/anaconda3/envs/airflow_dev_py37/lib/python3.7/site-packages/airflow/providers/http/sensors/http.py:26 DeprecationWarning: This decorator is deprecated.
[2021-09-07 11:18:26,045] {base_task_runner.py:118} INFO - Job 723490: Subtask detect_book_referential 
[2021-09-07 11:18:26,045] {base_task_runner.py:118} INFO - Job 723490: Subtask detect_book_referential In previous versions, all subclasses of BaseOperator must use apply_default decorator for the`default_args` feature to work properly.
[2021-09-07 11:18:26,045] {base_task_runner.py:118} INFO - Job 723490: Subtask detect_book_referential 
[2021-09-07 11:18:26,045] {base_task_runner.py:118} INFO - Job 723490: Subtask detect_book_referential In current version, it is optional. The decorator is applied automatically using the metaclass.
[2021-09-07 11:18:26,045] {base_task_runner.py:118} INFO - Job 723490: Subtask detect_book_referential 
[2021-09-07 11:18:32,281] {base_task_runner.py:118} INFO - Job 723490: Subtask detect_book_referential [2021-09-07 11:18:32,280] {base.py:78} INFO - Using connection to: id: alpflow_symph_conn. Host: https://as.symphony.com/integration/v1/whi/simpleWebHookIntegration/, Port: None, Schema: None, Login: None, Password: None, extra: {'webhook_token': }
[2021-09-07 11:18:33,857] {local_task_job.py:194} WARNING - Recorded pid 60749 does not match the current pid 98677
[2021-09-07 11:18:33,863] {process_utils.py:100} INFO - Sending Signals.SIGTERM to GPID 98677
[2021-09-07 11:18:33,870] {process_utils.py:66} INFO - Process psutil.Process(pid=98677, status='terminated', exitcode=<Negsignal.SIGTERM: -15>, started='11:18:21') (98677) terminated with exit code Negsignal.SIGTERM

scheduler log:

[2021-09-07 11:33:13,489] {dagrun.py:429} ERROR - Marking run <DagRun import_forex_unwind_trades @ 2021-08-11 00:00:00+00:00: backfill__2021-08-11T00:00:00+00:00, externally triggered: False> failed [2021-09-07 11:33:13,527] {backfill_job.py:388} INFO - [backfill progress] | finished run 7 of 16 | tasks waiting: 0 | succeeded: 0 | running: 9 | failed: 39 | skipped: 0 | deadlocked: 0 | not ready: 0 [2021-09-07 11:33:14,819] {local_executor.py:124} ERROR - Failed to execute task PID of job runner does not match.

I alos face the issue of 17507

Tks,

Pierre

@laserpedro
Copy link

laserpedro commented Sep 7, 2021

Can be related to : ed99eaa

@laserpedro
Copy link

laserpedro commented Sep 7, 2021

So I tested the code that generated the issue described above with the change in the commit and it did not solve the issue: the first backfill worked but all the following did not: still getting the same error.

@ephraimbuddy
Copy link
Contributor

@laserpedro , I'm not able to reproduce your case with backfill. How long is your task running? I will also appreciate if you would make a dag to reproduce this.
Again, It looks to me that you didn't set up airflow properly to use run_as_user. Can you take a look at documentation: ,http://airflow.apache.org/docs/apache-airflow/stable/security/workload.html#impersonation

@laserpedro
Copy link

laserpedro commented Sep 8, 2021

Hello @ephraimbuddy,

Thank you for you answer !

So I thought the same as you so I did a full re install today (new machine, new database, new environment, mew user).
I have a airflow user that has sudo privileges than runs the scheduler and the webserver.

run_as_user is left empty in the airflow.cfg file now.

First question: in terms of user does that seem correct to you ?

I ported one dags that generated those errors on backfill mode and I am still getting them in an erratic way: not the same to fail, whatever the number of dags launched in backfill ....(30, 16, 10).

I would say that for the classic backfill (16 dag runs in parallel for me) the time to finish them all is 7-10 minutes: not any big stuff is done but the insertion of 40k lines in a postgresql db at the end ... (so kind of small actually). However it is true that it seems that the time variable seems to be a factor here since when backfill is performed with very few tasks it is working.

For the code sample I will provide you sth asap to reproduce the pb (sent by mail :))

Tks,

Pierre

@ephraimbuddy
Copy link
Contributor

@laserpedro Yes. that makes sense for a user setup.

@ephraimbuddy
Copy link
Contributor

@laserpedro It will be very helpful if you can provide a simple dag to reproduce this behaviour.

@laserpedro
Copy link

Hello @ephraimbuddy,

Since I had to focus on solving the issue of my airflow session I made the below modifications and it seems to be properly working now:

  • Set up a dedicated postgresql server on the same machine as the one hosting airflow
  • Downgrade airflow to 2.0.2 (since I did not find any official guide for downgrading meta database I am using a new one for the moment)
  • Incorporate the fix located here: https://github.com/apache/airflow/pull/16289/files

With this new set up my airflow session has been working correctly working for 2 days now.

@ephraimbuddy
Copy link
Contributor

Thanks @laserpedro.
We want to fix this and would appreciate it if you can make a simple dag to reproduce it

@felipeangelimvieira
Copy link

felipeangelimvieira commented Sep 15, 2021

Hello @ephraimbuddy,

We are facing the same issue...
I'm using the official airflow helm chart in a Azure Kubernetes Service, with LocalExecutor. Different values for AIRFLOW__SCHEDULER__SCHEDULE_AFTER_TASK_EXECUTION give different error messages...

Here it is an example dag to reproduce it:

from airflow import DAG
from airflow.operators.python import PythonOperator

import time
from datetime import datetime

dag = DAG('dag_test',
          description='test',
          schedule_interval=None,
          start_date=datetime(2021, 4, 1),
          max_active_runs=1,
          concurrency=40,
          catchup=False)


def my_sleeping_function(t):
    time.sleep(t)


tasks = []
for i in range(400):
    task = PythonOperator(task_id='sleep_for_' + str(i),
                          python_callable=my_sleeping_function,
                          op_kwargs={'t': 60},
                          dag=dag)

With AIRFLOW__SCHEDULER__SCHEDULE_AFTER_TASK_EXECUTION=true

*** Reading local file: /opt/airflow/logs/airflow2/dag_test/sleep_for_0/2021-09-15T11:52:24.627715+00:00/1.log
[2021-09-15 11:54:59,531] {taskinstance.py:896} INFO - Dependencies all met for <TaskInstance: dag_test.sleep_for_0 2021-09-15T11:52:24.627715+00:00 [queued]>
[2021-09-15 11:54:59,583] {taskinstance.py:896} INFO - Dependencies all met for <TaskInstance: dag_test.sleep_for_0 2021-09-15T11:52:24.627715+00:00 [queued]>
[2021-09-15 11:54:59,583] {taskinstance.py:1087} INFO - 
--------------------------------------------------------------------------------
[2021-09-15 11:54:59,583] {taskinstance.py:1088} INFO - Starting attempt 1 of 1
[2021-09-15 11:54:59,583] {taskinstance.py:1089} INFO - 
--------------------------------------------------------------------------------
[2021-09-15 11:55:04,450] {taskinstance.py:1107} INFO - Executing <Task(PythonOperator): sleep_for_0> on 2021-09-15T11:52:24.627715+00:00
[2021-09-15 11:55:04,454] {standard_task_runner.py:52} INFO - Started process 1657 to run task
[2021-09-15 11:55:04,458] {standard_task_runner.py:76} INFO - Running: ['airflow', 'tasks', 'run', 'dag_test', 'sleep_for_0', '2021-09-15T11:52:24.627715+00:00', '--job-id', '33893', '--pool', 'default_pool', '--raw', '--subdir', 'DAGS_FOLDER/other/dag_test_bug.py', '--cfg-path', '/tmp/tmpcwh5nv0m', '--error-file', '/tmp/tmp8t0t_nvu']
[2021-09-15 11:55:04,459] {standard_task_runner.py:77} INFO - Job 33893: Subtask sleep_for_0
[2021-09-15 11:55:29,043] {logging_mixin.py:104} INFO - Running <TaskInstance: dag_test.sleep_for_0 2021-09-15T11:52:24.627715+00:00 [running]> on host airflow-scheduler-0.airflow-scheduler.airflow.svc.cluster.local
[2021-09-15 11:56:02,766] {taskinstance.py:1302} INFO - Exporting the following env vars:
AIRFLOW_CTX_DAG_OWNER=airflow
AIRFLOW_CTX_DAG_ID=dag_test
AIRFLOW_CTX_TASK_ID=sleep_for_0
AIRFLOW_CTX_EXECUTION_DATE=2021-09-15T11:52:24.627715+00:00
AIRFLOW_CTX_DAG_RUN_ID=manual__2021-09-15T11:52:24.627715+00:00
[2021-09-15 11:57:00,870] {local_task_job.py:77} ERROR - Received SIGTERM. Terminating subprocesses
[2021-09-15 11:57:00,916] {process_utils.py:100} INFO - Sending Signals.SIGTERM to GPID 1657
[2021-09-15 11:57:00,916] {taskinstance.py:1284} ERROR - Received SIGTERM. Terminating subprocesses.

With AIRFLOW__SCHEDULER__SCHEDULE_AFTER_TASK_EXECUTION=false

EDIT: it seems that the error message below appears when I relaunch the task after its failure. It may not be related to SCHEDULE_AFTER_TASK_EXECUTION config

*** Reading local file: /opt/airflow/logs/airflow2/dag_test/sleep_for_0/2021-09-15T11:52:24.627715+00:00/2.log
[2021-09-15 11:59:52,836] {taskinstance.py:896} INFO - Dependencies all met for <TaskInstance: dag_test.sleep_for_0 2021-09-15T11:52:24.627715+00:00 [queued]>
[2021-09-15 11:59:57,228] {taskinstance.py:896} INFO - Dependencies all met for <TaskInstance: dag_test.sleep_for_0 2021-09-15T11:52:24.627715+00:00 [queued]>
[2021-09-15 11:59:57,228] {taskinstance.py:1087} INFO - 
--------------------------------------------------------------------------------
[2021-09-15 11:59:57,228] {taskinstance.py:1088} INFO - Starting attempt 2 of 2
[2021-09-15 11:59:57,228] {taskinstance.py:1089} INFO - 
--------------------------------------------------------------------------------
[2021-09-15 11:59:57,253] {taskinstance.py:1107} INFO - Executing <Task(PythonOperator): sleep_for_0> on 2021-09-15T11:52:24.627715+00:00
[2021-09-15 11:59:57,256] {standard_task_runner.py:52} INFO - Started process 814 to run task
[2021-09-15 11:59:57,258] {standard_task_runner.py:76} INFO - Running: ['airflow', 'tasks', 'run', 'dag_test', 'sleep_for_0', '2021-09-15T11:52:24.627715+00:00', '--job-id', '33947', '--pool', 'default_pool', '--raw', '--subdir', 'DAGS_FOLDER/other/dag_test_bug.py', '--cfg-path', '/tmp/tmpe7l0bnkn', '--error-file', '/tmp/tmppppc4ghm']
[2021-09-15 11:59:57,260] {standard_task_runner.py:77} INFO - Job 33947: Subtask sleep_for_0
[2021-09-15 12:00:18,910] {logging_mixin.py:104} INFO - Running <TaskInstance: dag_test.sleep_for_0 2021-09-15T11:52:24.627715+00:00 [running]> on host airflow-scheduler-0.airflow-scheduler.airflow.svc.cluster.local
[2021-09-15 12:00:27,276] {local_task_job.py:194} WARNING - Recorded pid 1657 does not match the current pid 814
[2021-09-15 12:00:27,283] {process_utils.py:100} INFO - Sending Signals.SIGTERM to GPID 814
[2021-09-15 12:00:27,303] {process_utils.py:66} INFO - Process psutil.Process(pid=814, status='terminated', exitcode=<Negsignal.SIGTERM: -15>, started='11:59:57') (814) terminated with exit code Negsignal.SIGTERM

Some tasks succeed and others are randomly killed.

@ephraimbuddy
Copy link
Contributor

Thanks, @felipeangelimvieira for coming through with the dag!

@felipeangelimvieira
Copy link

felipeangelimvieira commented Sep 17, 2021

So I found out that my metadata database was using 100% cpu while running dags with multiple tasks in parallel, such as the example above. I'm using Azure PostgresSQL and the official airflow helm chart with pgbouncer enabled.

Increasing the database may solve the issue, but instead I increased the AIRFLOW__SCHEDULER__JOB_HEARTBEAT_SEC and the problem was solved (the default is 5 sec).

While I'm not sure, it's possible that the heatbeat method of BaseJob is the one overloading the database. When the database is running out of CPU, the default heartbeat takes longer than heartrate * 2.1 (2.1 is the default grace_multiplier in is_alive method of BaseJob) and the scheduler kills the tasks.

@laserpedro
Copy link

@felipeangelimvieira have you noticed some pattern? Like the error happens if your task exec time > heart beat rate ?
What time in sec have you put in your config ? Sth >> task average exec time ?
Tks!

@felipeangelimvieira
Copy link

@laserpedro unfortunately no patterns... it worked with the example dag once. I have no idea how to solve it, altough the database running out of CPU seems to play a role. Could you verify if it is also the case for your database?

I've tried different configurations (LocalExecutor, CeleryExecutor), and the problem keeps appearing randomly with those dags with many tasks in parallel.

@morhook
Copy link

morhook commented Sep 20, 2021 via email

@denysivanov
Copy link

I see this issue and it is totally correlates which CPU spikes on my PostgreSQL DB.

@felipeangelimvieira
Copy link

Just updated to 2.1.4 and the error message has changed. I'm seeing less dags receiving SIGTERM signal, although this error still appears. Now some tasks are being randomly killed and marked as failed before execution.
The logs of the tasks that have failed before execution follow this pattern:

*** Log file does not exist: /opt/airflow/logs/airflow2/dag_test/sleep_for_15/2021-09-20T17:02:17.954197+00:00/1.log
*** Fetching from: http://:8793/log/dag_test/sleep_for_15/2021-09-20T17:02:17.954197+00:00/1.log
*** Failed to fetch log file from worker. Request URL missing either an 'http://' or 'https://' protocol.

I've also verified some succeeded tasks marked as failed.

In addition, I was able to reproduce the SIGTERM issue localy with docker-compose by limiting the CPU usage of the PostgresSQL container. Indeed, the meta database may cause the SIGTERM error. However, I wasn't expecting high CPU usage in PostgreSQL database while running with the official helm chart since it has a Pgbouncer.

I can share the code of the docker-compose if you find it helpful.

@laserpedro
Copy link

laserpedro commented Sep 24, 2021

The SIGTERM issue came back for me in 2.0.2 so yes I really think that it is backend related.
I will have to log the cpu usage to see what's going on when all my dags are triggered at 00:00:00.

From that, I don't know what could be the solution... a fine tuning on the postgresql server maybe ... I am currently using the standard postgre cfg.

@felipeangelimvieira : interesting stuff found on stackoverflow: https://stackoverflow.com/questions/42419834/airbnb-airflow-using-all-system-resources

@eduardchai
Copy link

eduardchai commented Jan 17, 2022

We started having this issue after we upgraded to v2.2.3. We did not experience this issue when we were at v2.0.2.

Here is the sample dag that we used:

from datetime import datetime, timedelta

from airflow import DAG
from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import (
    KubernetesPodOperator,
)

with DAG(
    dag_id="stress-test-kubepodoperator",
    schedule_interval=None,
    catchup=False,
    start_date=datetime(2021, 1, 1),
) as dag:
    for i in range(1000):
        KubernetesPodOperator(
            name="airflow-test-pod",
            namespace="airflow-official",
            image="ubuntu:latest",
            cmds=["bash", "-cx"],
            arguments=["r=$(( ( RANDOM % 120 )  + 1 )); sleep ${r}s"],
            labels={"foo": "bar"},
            task_id="task_" + str(i),
            is_delete_operator_pod=True,
            startup_timeout_seconds=300,
            service_account_name="airflow-worker",
            get_logs=True,
            resources={"request_memory": "128Mi", "request_cpu": "100m"},
            queue="kubernetes",
        )

Error message:

[2022-01-17, 18:41:48 +08] {local_task_job.py:212} WARNING - State of this instance has been externally set to scheduled. Terminating instance.
[2022-01-17, 18:41:48 +08] {process_utils.py:124} INFO - Sending Signals.SIGTERM to group 16. PIDs of all processes in the group: [16]
[2022-01-17, 18:41:48 +08] {process_utils.py:75} INFO - Sending the signal Signals.SIGTERM to group 16
[2022-01-17, 18:41:48 +08] {taskinstance.py:1408} ERROR - Received SIGTERM. Terminating subprocesses.
[2022-01-17, 18:41:48 +08] {taskinstance.py:1700} ERROR - Task failed with exception

Successful tasks were also intermittently flagged as failed:

Screenshot 2022-01-17 at 7 16 51 PM

[2022-01-17, 18:41:20 +08] {kubernetes_pod.py:372} INFO - creating pod with labels {'dag_id': 'stress-test-kubepodoperator', 'task_id': 'task_44', 'execution_date': '2022-01-17T103621.2231870000-7740efddd', 'try_number': '1'} and launcher <airflow.providers.cncf.kubernetes.utils.pod_launcher.PodLauncher object at 0x7f569d311f90>
[2022-01-17, 18:41:20 +08] {pod_launcher.py:216} INFO - Event: airflow-test-pod.c309a1eaf221470a882edf2cb57f9529 had an event of type Pending
[2022-01-17, 18:41:20 +08] {pod_launcher.py:133} WARNING - Pod not yet started: airflow-test-pod.c309a1eaf221470a882edf2cb57f9529
[2022-01-17, 18:41:21 +08] {pod_launcher.py:216} INFO - Event: airflow-test-pod.c309a1eaf221470a882edf2cb57f9529 had an event of type Pending
[2022-01-17, 18:41:21 +08] {pod_launcher.py:133} WARNING - Pod not yet started: airflow-test-pod.c309a1eaf221470a882edf2cb57f9529
[2022-01-17, 18:41:22 +08] {pod_launcher.py:216} INFO - Event: airflow-test-pod.c309a1eaf221470a882edf2cb57f9529 had an event of type Pending
[2022-01-17, 18:41:22 +08] {pod_launcher.py:133} WARNING - Pod not yet started: airflow-test-pod.c309a1eaf221470a882edf2cb57f9529
[2022-01-17, 18:41:23 +08] {pod_launcher.py:216} INFO - Event: airflow-test-pod.c309a1eaf221470a882edf2cb57f9529 had an event of type Running
[2022-01-17, 18:41:23 +08] {pod_launcher.py:159} INFO - + r=62
[2022-01-17, 18:41:23 +08] {pod_launcher.py:159} INFO - + sleep 62s
[2022-01-17, 18:42:25 +08] {pod_launcher.py:216} INFO - Event: airflow-test-pod.c309a1eaf221470a882edf2cb57f9529 had an event of type Succeeded
[2022-01-17, 18:42:25 +08] {pod_launcher.py:333} INFO - Event with job id airflow-test-pod.c309a1eaf221470a882edf2cb57f9529 Succeeded
[2022-01-17, 18:42:25 +08] {pod_launcher.py:216} INFO - Event: airflow-test-pod.c309a1eaf221470a882edf2cb57f9529 had an event of type Succeeded
[2022-01-17, 18:42:25 +08] {pod_launcher.py:333} INFO - Event with job id airflow-test-pod.c309a1eaf221470a882edf2cb57f9529 Succeeded
[2022-01-17, 18:42:25 +08] {taskinstance.py:1277} INFO - Marking task as SUCCESS. dag_id=stress-test-kubepodoperator, task_id=task_44, execution_date=20220117T103621, start_date=20220117T104119, end_date=20220117T104225
[2022-01-17, 18:42:25 +08] {local_task_job.py:154} INFO - Task exited with return code 0
[2022-01-17, 18:42:25 +08] {local_task_job.py:264} INFO - 0 downstream tasks scheduled from follow-on schedule check

Environment information:

  • deployed using official helm charts
  • Airflow version: v2.2.3
  • No. of scheduler replicas: 3 (each with 3 CPU and 6Gi memory request)

@ephraimbuddy
Copy link
Contributor

@eduardchai, your case seems different. It seems that the tasks are taking a long time to start. Try whether setting AIRFLOW__KUBERNETES__WORKER_PODS_QUEUED_CHECK_INTERVAL=86400 will resolve it

@eduardchai
Copy link

@ephraimbuddy it does reduce the number of errors by a lot! Thank you! There are some errors where the jobs are stuck in queued state, but I think it should be solvable by finding the right interval configuration for our use cases. Just out of curiosity, how did you know that the problem was with the interval from the logs? And btw, even though it is under AIRFLOW__KUBERNETES, this configuration somehow works with Celery Executor with KEDA too.

@ephraimbuddy
Copy link
Contributor

@ephraimbuddy it does reduce the number of errors by a lot! Thank you! There are some errors where the jobs are stuck in queued state, but I think it should be solvable by finding the right interval configuration for our use cases. Just out of curiosity, how did you know that the problem was with the interval from the logs? And btw, even though it is under AIRFLOW__KUBERNETES, this configuration somehow works with Celery Executor with KEDA too.

If you are not on kubernetes then I'm not sure how it worked for you(I think it shouldn't work). What formed my opinion was that your pods were taking time to start and queued tasks were being moved to scheduled. So I don't know how it worked for your case. Maybe @jedcunningham can explain better. Maybe you should increase AIRFLOW__KUBERNETES__WORKER_PODS_CREATION_BATCH_SIZE also in kubernetes 🙃

@bparhy
Copy link

bparhy commented Jan 26, 2022

I am seeing similar issue in our Airflow with kubernetes environments.

Airflow Version : 2.1.3
Kubernetes Version : v1.20.5

What Happens
This happens intermittently.
I am seeing tasks are not deleting at the kubernetes side. Even though they are completed successfully in the UI they have an Error status at the pod.

We have 1000s of task and this happens only for couple of tasks so far.

Tasks Log:
[2022-01-26 00:51:50,823] {local_task_job.py:209} WARNING - State of this instance has been externally set to success. Terminating instance.
[2022-01-26 00:51:50,825] {process_utils.py:100} INFO - Sending Signals.SIGTERM to GPID 55
[2022-01-26 00:51:50,827] {taskinstance.py:1235} ERROR - Received SIGTERM. Terminating subprocesses.
[2022-01-26 00:51:52,468] {process_utils.py:66} INFO - Process psutil.Process(pid=55, status='terminated', exitcode=1, started='00:51:45') (55) terminated with exit code 1

Task Pod Log
[2022-01-26 00:51:37,973] {dagbag.py:496} INFO - Filling up the DagBag from /usr/local/airflow/dags/name/name1/name2/name3_dag.py
/usr/local/lib/python3.6/site-packages/sqlalchemy/ext/declarative/clsregistry.py:129 SAWarning: This declarative base already contains a class with the same class name and module name as bi_plugin.DagRun, and will be replaced in the string-lookup table.
Running <TaskInstance: dagname.task_name 2022-01-25T00:50:00+00:00 [queued]> on host dagnametaskname.457ebbfa477643b289bceef4b0fbc1ab
Traceback (most recent call last):
File "/usr/local/bin/airflow", line 8, in
sys.exit(main())
File "/usr/local/lib/python3.6/site-packages/airflow/main.py", line 40, in main
args.func(args)
File "/usr/local/lib/python3.6/site-packages/airflow/cli/cli_parser.py", line 48, in command
return func(*args, **kwargs)
File "/usr/local/lib/python3.6/site-packages/airflow/utils/cli.py", line 91, in wrapper
return f(*args, **kwargs)
File "/usr/local/lib/python3.6/site-packages/airflow/cli/commands/task_command.py", line 238, in task_run
_run_task_by_selected_method(args, dag, ti)
File "/usr/local/lib/python3.6/site-packages/airflow/cli/commands/task_command.py", line 64, in _run_task_by_selected_method
_run_task_by_local_task_job(args, ti)
File "/usr/local/lib/python3.6/site-packages/airflow/cli/commands/task_command.py", line 121, in _run_task_by_local_task_job
run_job.run()
File "/usr/local/lib/python3.6/site-packages/airflow/jobs/base_job.py", line 245, in run
self._execute()
File "/usr/local/lib/python3.6/site-packages/airflow/jobs/local_task_job.py", line 100, in _execute
self.task_runner.start()
File "/usr/local/lib/python3.6/site-packages/airflow/task/task_runner/standard_task_runner.py", line 41, in start
self.process = self._start_by_fork()
File "/usr/local/lib/python3.6/site-packages/airflow/task/task_runner/standard_task_runner.py", line 92, in _start_by_fork
logging.shutdown()
File "/usr/local/lib/python3.6/logging/init.py", line 1946, in shutdown
h.close()
File "/usr/local/lib/python3.6/logging/init.py", line 1048, in close
stream.close()
File "/usr/local/lib/python3.6/site-packages/airflow/models/taskinstance.py", line 1237, in signal_handler
raise AirflowException("Task received SIGTERM signal")
airflow.exceptions.AirflowException: Task received SIGTERM signal
[2022-01-26 00:51:51,249] {connection.py:499} INFO - closed
[2022-01-26 00:51:51,249] {connection.py:502} INFO - No async queries seem to be running, deleting session
Running <TaskInstance: dag_name.task_name 2022-01-25T00:50:00+00:00 [queued]> on host dagnametaskname.457ebbfa477643b289bceef4b0fbc1ab
Traceback (most recent call last):
File "/usr/local/bin/airflow", line 8, in
sys.exit(main())
File "/usr/local/lib/python3.6/site-packages/airflow/main.py", line 40, in main
args.func(args)
File "/usr/local/lib/python3.6/site-packages/airflow/cli/cli_parser.py", line 48, in command
return func(*args, **kwargs)
File "/usr/local/lib/python3.6/site-packages/airflow/utils/cli.py", line 91, in wrapper
return f(*args, **kwargs)
File "/usr/local/lib/python3.6/site-packages/airflow/cli/commands/task_command.py", line 238, in task_run
_run_task_by_selected_method(args, dag, ti)
File "/usr/local/lib/python3.6/site-packages/airflow/cli/commands/task_command.py", line 64, in _run_task_by_selected_method
_run_task_by_local_task_job(args, ti)
File "/usr/local/lib/python3.6/site-packages/airflow/cli/commands/task_command.py", line 121, in _run_task_by_local_task_job
run_job.run()
File "/usr/local/lib/python3.6/site-packages/airflow/jobs/base_job.py", line 245, in run
self._execute()
File "/usr/local/lib/python3.6/site-packages/airflow/jobs/local_task_job.py", line 145, in _execute
self.on_kill()
File "/usr/local/lib/python3.6/site-packages/airflow/jobs/local_task_job.py", line 171, in on_kill
self.task_runner.on_finish()
File "/usr/local/lib/python3.6/site-packages/airflow/task/task_runner/base_task_runner.py", line 178, in on_finish
self._error_file.close()
File "/usr/local/lib/python3.6/tempfile.py", line 511, in close
self._closer.close()
File "/usr/local/lib/python3.6/tempfile.py", line 448, in close
unlink(self.name)
FileNotFoundError: [Errno 2] No such file or directory: '/tmp/tmpfabi4n96'

I have not set configuration params and all have default value. I see a CPU spike but unable to relate it.

Thanks in advance.

@morhook
Copy link

morhook commented Jan 26, 2022

@bparhy have you checked kubelet logs to detect node pressure evictions or other related evictions?

@bparhy
Copy link

bparhy commented Apr 5, 2022

@morhook I check with my K8s and they dont find anything unusual. I tried increasing the metadata DB size (Aurora) and that also did not help . Any solution in this direction please. We are currently running Airflow 2.1.3 using k8s.

Please let me know.

@sraviteja07
Copy link

We are also facing the same problem and facing the issue with belo stack trace :

**ERROR - Received SIGTERM. Terminating subprocesses.
ERROR - Task failed with exception
Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1164, in _run_raw_task
self._prepare_and_execute_task_with_callbacks(context, task)
File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1282, in _prepare_and_execute_task_with_callbacks
result = self._execute_task(context, task_copy)
File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1312, in _execute_task
result = task_copy.execute(context=context)
File "/usr/local/airflow/dags/flow_airflow/operator/dts_operator.py", line 100, in execute
self.to_email_address)
File "/usr/local/airflow/dags/flow_airflow/operator/dts_operator.py", line 111, in run_process
for line in iter(process.stdout.readline, ""):
File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1237, in signal_handler
raise AirflowException("Task received SIGTERM signal")

WARNING - process psutil.Process(pid=25806, name='python3', status='zombie', started='20:09:10') did not respond to SIGTERM. Trying SIGKILL

{process_utils.py:124} ERROR - Process psutil.Process(pid=54556, name='python3', status='zombie', started='20:09:10') (54556) could not be killed. Giving up.**

We also have thousands of tasks and happens to some intermittently

@GHGHGHKO
Copy link

We got SIGETERM errors on about 250 dags

solved this link

Prerequisites

@bparhy
Copy link

bparhy commented May 25, 2022

@GHGHGHKO thanks for the reply. We are seeing the issue on our task pods which even after success are in the failed state in k8s. So the output of this is Error pods in k8s.
We control the pod resources using request_memory and limit_memory and limit_cpu. Are you suggesting to increase each task limit to 4GB. That will be huge and our cluster cannot have that amount of resources as we have a lot of tasks that runs parallel. Please let me know.

@gmyrianthous
Copy link

gmyrianthous commented May 31, 2022

Possible fix

I was having the same problem after upgrading from Airflow v1.10.15 to v2.2.5 and was seeing the error in long-running DAGs having a fairly high number of tasks.

Apparently, the dagrun_timeout in airflow.models.DAG was not respected in earlier Airflow versions so I noticed that the DAGs I was trying to migrate to the new Airflow instance were running for much longer than the specified dagrun_timeout.

The solution for me was to increase the dagrun_timeout (e.g. dagrun_timeout=datetime.timedelta(minutes=120)).

Note that this variable is effective only for scheduled tasks (in other words with DAGs with a specified schedule_interval).

@allenhaozi
Copy link

I have the same problem
I'm using airflow 2.2.5, SparkKubernetesOperator and SparkKubernetesSensor

Driver is running But the sensor displays the following logs until the number of retries exceeds the threshold

2022-06-17, 18:05:52 CST] {spark_kubernetes.py:104} INFO - Poking: load-customer-data-init-1655486757.7793136
[2022-06-17, 18:05:52 CST] {spark_kubernetes.py:124} INFO - Spark application is still in state: RUNNING
[2022-06-17, 18:06:49 CST] {local_task_job.py:211} WARNING - State of this instance has been externally set to up_for_retry. Terminating instance.
[2022-06-17, 18:06:49 CST] {process_utils.py:120} INFO - Sending Signals.SIGTERM to group 84. PIDs of all processes in the group: [84]
[2022-06-17, 18:06:49 CST] {process_utils.py:75} INFO - Sending the signal Signals.SIGTERM to group 84
[2022-06-17, 18:06:49 CST] {taskinstance.py:1430} ERROR - Received SIGTERM. Terminating subprocesses.
[2022-06-17, 18:06:49 CST] {taskinstance.py:1774} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/sensors/base.py", line 249, in execute
    time.sleep(self._get_next_poke_interval(started_at, run_duration, try_number))
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1432, in signal_handler
    raise AirflowException("Task received SIGTERM signal")
airflow.exceptions.AirflowException: Task received SIGTERM signal
[2022-06-17, 18:06:49 CST] {taskinstance.py:1278} INFO - Marking task as FAILED. dag_id=salesforecast-load-init, task_id=load-customer-data-init-sensor, execution_date=20220617T172033, start_date=20220617T175649, end_date=20220617T180649
[2022-06-17, 18:06:49 CST] {standard_task_runner.py:93} ERROR - Failed to execute job 24 for task load-customer-data-init-sensor (Task received SIGTERM signal; 84)
[2022-06-17, 18:06:49 CST] {process_utils.py:70} INFO - Process psutil.Process(pid=84, status='terminated', exitcode=1, started='17:56:48') (84) terminated with exit code 1

@potiuk
Copy link
Member

potiuk commented Jun 19, 2022

I have the same problem I'm using airflow 2.2.5, SparkKubernetesOperator and SparkKubernetesSensor

Driver is running But the sensor displays the following logs until the number of retries exceeds the threshold

2022-06-17, 18:05:52 CST] {spark_kubernetes.py:104} INFO - Poking: load-customer-data-init-1655486757.7793136
[2022-06-17, 18:05:52 CST] {spark_kubernetes.py:124} INFO - Spark application is still in state: RUNNING
[2022-06-17, 18:06:49 CST] {local_task_job.py:211} WARNING - State of this instance has been externally set to up_for_retry. Terminating instance.
[2022-06-17, 18:06:49 CST] {process_utils.py:120} INFO - Sending Signals.SIGTERM to group 84. PIDs of all processes in the group: [84]
[2022-06-17, 18:06:49 CST] {process_utils.py:75} INFO - Sending the signal Signals.SIGTERM to group 84
[2022-06-17, 18:06:49 CST] {taskinstance.py:1430} ERROR - Received SIGTERM. Terminating subprocesses.
[2022-06-17, 18:06:49 CST] {taskinstance.py:1774} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/sensors/base.py", line 249, in execute
    time.sleep(self._get_next_poke_interval(started_at, run_duration, try_number))
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1432, in signal_handler
    raise AirflowException("Task received SIGTERM signal")
airflow.exceptions.AirflowException: Task received SIGTERM signal
[2022-06-17, 18:06:49 CST] {taskinstance.py:1278} INFO - Marking task as FAILED. dag_id=salesforecast-load-init, task_id=load-customer-data-init-sensor, execution_date=20220617T172033, start_date=20220617T175649, end_date=20220617T180649
[2022-06-17, 18:06:49 CST] {standard_task_runner.py:93} ERROR - Failed to execute job 24 for task load-customer-data-init-sensor (Task received SIGTERM signal; 84)
[2022-06-17, 18:06:49 CST] {process_utils.py:70} INFO - Process psutil.Process(pid=84, status='terminated', exitcode=1, started='17:56:48') (84) terminated with exit code 1

Did you try the earlier suggestions with dagrun_timeout? Do you know what is sending SIGTERM to this task?

@kcphila
Copy link

kcphila commented Jun 22, 2022

Hi all,

From the discussion over at issue 17507, I may have identified issue when the SIGTERM is sent with the Recorded pid<> does not match the current pid <> error, but I'm running LocalExecutor and not kubernetes.

For me, I think this is happening when RUN_AS_USER is set for a task and the heartbeat is checked when the task instance pid is not set (None). In these cases, the recorded_pid gets set to the parent of running task supervisor process, which is Executor itself, instead of the task runner.

I don't know if this will address the issue with kubernetes or celery executor, but it seems very likely to be the same issue. It will take me a little while to set up the dev environment and do the testing before submitting a PR, but if you want to try doing a local install, feel free to give it a whirl. I have a tentative branch set up here: https://github.com/krcrouse/airflow/tree/fix-pid-check

@allenhaozi
Copy link

allenhaozi commented Jun 23, 2022

I have the same problem I'm using airflow 2.2.5, SparkKubernetesOperator and SparkKubernetesSensor
Driver is running But the sensor displays the following logs until the number of retries exceeds the threshold

2022-06-17, 18:05:52 CST] {spark_kubernetes.py:104} INFO - Poking: load-customer-data-init-1655486757.7793136
[2022-06-17, 18:05:52 CST] {spark_kubernetes.py:124} INFO - Spark application is still in state: RUNNING
[2022-06-17, 18:06:49 CST] {local_task_job.py:211} WARNING - State of this instance has been externally set to up_for_retry. Terminating instance.
[2022-06-17, 18:06:49 CST] {process_utils.py:120} INFO - Sending Signals.SIGTERM to group 84. PIDs of all processes in the group: [84]
[2022-06-17, 18:06:49 CST] {process_utils.py:75} INFO - Sending the signal Signals.SIGTERM to group 84
[2022-06-17, 18:06:49 CST] {taskinstance.py:1430} ERROR - Received SIGTERM. Terminating subprocesses.
[2022-06-17, 18:06:49 CST] {taskinstance.py:1774} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/sensors/base.py", line 249, in execute
    time.sleep(self._get_next_poke_interval(started_at, run_duration, try_number))
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1432, in signal_handler
    raise AirflowException("Task received SIGTERM signal")
airflow.exceptions.AirflowException: Task received SIGTERM signal
[2022-06-17, 18:06:49 CST] {taskinstance.py:1278} INFO - Marking task as FAILED. dag_id=salesforecast-load-init, task_id=load-customer-data-init-sensor, execution_date=20220617T172033, start_date=20220617T175649, end_date=20220617T180649
[2022-06-17, 18:06:49 CST] {standard_task_runner.py:93} ERROR - Failed to execute job 24 for task load-customer-data-init-sensor (Task received SIGTERM signal; 84)
[2022-06-17, 18:06:49 CST] {process_utils.py:70} INFO - Process psutil.Process(pid=84, status='terminated', exitcode=1, started='17:56:48') (84) terminated with exit code 1

Did you try the earlier suggestions with dagrun_timeout? Do you know what is sending SIGTERM to this task?

thank you @potiuk
I tried this parameter dagrun_timeout and it didn't work,
But in my environment, I commented out these three parameters and it works fine for now

  1. AIRFLOW__SCHEDULER__JOB_HEARTBEAT_SEC: 600
  2. AIRFLOW__SCHEDULER__SCHEDULER_HEARTBEAT_SEC: 200
  3. AIRFLOW__SCHEDULER__SCHEDULER_HEALTH_CHECK_THRESHOLD: 600
airflow:
  config:
    # if other ns, u should config a new sa
    AIRFLOW__KUBERNETES__NAMESPACE: "airflow"
    AIRFLOW__KUBERNETES__DELETE_WORKER_PODS: "false"
    AIRFLOW__WEBSERVER__LOG_FETCH_TIMEOUT_SEC: "15"
    AIRFLOW__LOGGING__LOGGING_LEVEL: "DEBUG"
    AIRFLOW__LOGGING__REMOTE_LOGGING: "True"
    AIRFLOW__LOGGING__REMOTE_BASE_LOG_FOLDER: "s3://airflow-logs/"
    AIRFLOW__LOGGING__REMOTE_LOG_CONN_ID: "openaios_airflow_log"
    AIRFLOW__API__AUTH_BACKEND: "airflow.api.auth.backend.basic_auth"
    #AIRFLOW__SCHEDULER__JOB_HEARTBEAT_SEC: 600
    #AIRFLOW__SCHEDULER__SCHEDULER_HEARTBEAT_SEC: 200
    #AIRFLOW__SCHEDULER__SCHEDULER_HEALTH_CHECK_THRESHOLD: 600
    AIRFLOW__KUBERNETES__WORKER_PODS_QUEUED_CHECK_INTERVAL: "86400"
    AIRFLOW__CORE__KILLED_TASK_CLEANUP_TIME: "604800"
    AIRFLOW__CORE__HOSTNAME_CALLABLE: socket.gethostname
    AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL: "30"
    AIRFLOW__SCHEDULER__SCHEDULE_AFTER_TASK_EXECUTION: "False"

  ## a list of users to create

@potiuk
Copy link
Member

potiuk commented Jul 3, 2022

I tried this parameter dagrun_timeout and it didn't work,
But in my environment, I commented out these three parameters and it works fine for now

the change indicates problem with sheduler healthcheck - which I believe in 2.3.* (currently we are voting on 2.3.3) it was already addressed. I will close it provisionaly. And have a big request - can any of the people who had the problem migrate to 2.3.3 (or even try the 2.3.3rc1 which we are testing here #24806 and remove the configuration to default (@allenhaozi - maybe you can try it).

@potiuk potiuk closed this as completed Jul 3, 2022
@bdsoha
Copy link
Contributor

bdsoha commented Jul 24, 2022

@potiuk I am on version 2.3.3 and am having the same issue described here.

@potiuk
Copy link
Member

potiuk commented Jul 24, 2022

@potiuk I am on version 2.3.3 and am having the same issue described here.

Then provide an information: logs and analysis and description of your circumstances in a separate issue. It does not bring anyone closer by stating "I have the same issue" without providign any more details that can help with diagnosis of the problem you have. This might be different issue manifesting similarly - but if you do not create a new issue with your symptomps and description you pretty much removes the chance for anyone fixing your problem - because it might be a different one. So if you want to help with diagnosis of the problem - please do your part and report details that might help with the diagnosis.

@yannibenoit
Copy link

@potiuk
I'm on version 2.3.4, I got a issue on an existing DAGs that's was working fine before with older version (2.1.X) 🤷‍♂️

I tried to update the following variables and I still have the issue :

AIRFLOW__SCHEDULER__SCHEDULE_AFTER_TASK_EXECUTION: 'False'
 AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL: "30"
AIRFLOW__CORE__KILLED_TASK_CLEANUP_TIME: "604800"

and also tried with dagrun_timeout=timedelta(minutes=120)

I don't understand what I'm doing wrong because other dags work fine 😢

Any clue 🙏 ?

dag_id=singer_wootric_stitch_run_id=manual__2022-11-18T09_14_57.025351+00_00_task_id=bash_create_and_init_wootric_venv_attempt=16.log

@potiuk
Copy link
Member

potiuk commented Nov 25, 2022

I suggest to migrate to latest - 2.4 (or in a few days 2.5) version. There are 100s of related fixes since and it is the easiest way to see if things got better. This is most efficient way for everyone.

@yannibenoit
Copy link

@potiuk after migrating to 2.5.0 i still get the issue

@potiuk
Copy link
Member

potiuk commented Dec 6, 2022

Can you please open a new issue with description of cirscumstances and logs describing when and how it happens,

That ask from above does not change:

Then provide an information: logs and analysis and description of your circumstances in a separate issue. It does not bring anyone closer by stating "I have the same issue" without providign any more details that can help with diagnosis of the problem you have. This might be different issue manifesting similarly - but if you do not create a new issue with your symptomps and description you pretty much removes the chance for anyone fixing your problem - because it might be a different one. So if you want to help with diagnosis of the problem - please do your part and report details that might help with the diagnosis.

@potiuk
Copy link
Member

potiuk commented Dec 6, 2022

cc: @yannibenoit ^^

@yannibenoit
Copy link

yannibenoit commented Dec 6, 2022

@potiuk Thank you for your help

I created an issue but i will resolve it haha 😂 -> Tasks intermittently gets terminated with SIGTERM on Celery Executor · Issue #27885 · apache/airflow

Found a fix after looking at a stack overflow post -> Celery Executor - Airflow Impersonation "run_as_user" Recorded pid xxx does not match the current pid - Stack Overflow

I was running my bash operator with a run_as_user=airflow, i think i don't need anymore
image
image

@potiuk
Copy link
Member

potiuk commented Dec 6, 2022

Ah. I would say that should have been fixed already. Is it possible @yannibenoit - to make an issue and submit some logs from BEFORE the run_as_user was commented out? I guess this might be a problem others might also have and run_as_user is kinda useful.

@shaurya-sood
Copy link

shaurya-sood commented Dec 7, 2022

Hello, we were experiencing a similar issue on v2.2.5 so we migrated to v2.4.3 but the problem still exists.

[2022-12-07, 15:37:49 UTC] {local_task_job.py:223} WARNING - State of this instance has been externally set to up_for_retry. Terminating instance.
[2022-12-07, 15:37:49 UTC] {process_utils.py:133} INFO - Sending Signals.SIGTERM to group 89412. PIDs of all processes in the group: [89412]
[2022-12-07, 15:37:49 UTC] {process_utils.py:84} INFO - Sending the signal Signals.SIGTERM to group 89412
[2022-12-07, 15:37:49 UTC] {taskinstance.py:1562} ERROR - Received SIGTERM. Terminating subprocesses.

scheduler_heartbeat metric drops to almost 0 during the same time.

We're using Postgres DB and during the DAG execution, the CPU utilization of the DB is spiked up to 100%. (we're using db.r6g.large RDS instance btw)

@potiuk
Copy link
Member

potiuk commented Dec 7, 2022

@shaurya-sood - can you please (asking it again ) - open a new issue wifh more details - what is your deployment what you are doing, what you experience, more logs, what happens in the UI, whether you use run_as_user, is it happening alwasys or sometimes only, when it hppens etc. It really does not help to add a comment on a closed issue that might get just similar message, but might not necessarily be the same issue.

Thanks in dvance.

@shaurya-sood
Copy link

@shaurya-sood - can you please (asking it again ) - open a new issue wifh more details - what is your deployment what you are doing, what you experience, more logs, what happens in the UI, whether you use run_as_user, is it happening alwasys or sometimes only, when it hppens etc. It really does not help to add a comment on a closed issue that might get just similar message, but might not necessarily be the same issue.

Thanks in dvance.

Opened a new issue #28201
Thanks.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area:core kind:bug This is a clearly a bug
Projects
None yet
Development

No branches or pull requests