From 1dd694ecf218c254619bfa2c754d7802b4e2c8b1 Mon Sep 17 00:00:00 2001 From: Sandeep Bhardwaj Date: Wed, 22 Mar 2023 11:36:56 +0530 Subject: [PATCH] fix: fix depends_on_past issue for sensors (#763) --- ext/scheduler/airflow2/resources/base_dag.py | 6 ++++++ .../airflow2/resources/expected_compiled_template.py | 8 ++++++++ 2 files changed, 14 insertions(+) diff --git a/ext/scheduler/airflow2/resources/base_dag.py b/ext/scheduler/airflow2/resources/base_dag.py index 1839a08acc..1939cf1aa6 100644 --- a/ext/scheduler/airflow2/resources/base_dag.py +++ b/ext/scheduler/airflow2/resources/base_dag.py @@ -73,6 +73,7 @@ task_id = JOB_START_EVENT_NAME, python_callable = log_job_start, provide_context=True, + depends_on_past=False, dag=dag ) @@ -81,6 +82,7 @@ python_callable = log_job_end, provide_context=True, trigger_rule= 'all_success', + depends_on_past=False, dag=dag ) @@ -219,6 +221,7 @@ get_logs=True, dag=dag, in_cluster=True, + depends_on_past=False, is_delete_operator_pod=True, do_xcom_push=False, env_vars=executor_env_vars, @@ -252,6 +255,7 @@ poke_interval=SENSOR_DEFAULT_POKE_INTERVAL_IN_SECS, timeout=SENSOR_DEFAULT_TIMEOUT_IN_SECS, task_id="wait_{{$dependency.Job.Name | trunc 200}}-{{$dependencySchema.Name}}", + depends_on_past=False, dag=dag ) {{- end}} @@ -268,6 +272,7 @@ window_version=int("{{ $baseWindow.GetVersion }}"), poke_interval=SENSOR_DEFAULT_POKE_INTERVAL_IN_SECS, timeout=SENSOR_DEFAULT_TIMEOUT_IN_SECS, + depends_on_past=False, task_id="wait_{{$dependency.JobName | trunc 200}}-{{$dependency.TaskName}}", dag=dag ) @@ -284,6 +289,7 @@ poke_interval=SENSOR_DEFAULT_POKE_INTERVAL_IN_SECS, timeout=SENSOR_DEFAULT_TIMEOUT_IN_SECS, task_id='wait_{{$httpDependency.Name| trunc 200}}', + depends_on_past=False, dag=dag ) {{- end}} diff --git a/ext/scheduler/airflow2/resources/expected_compiled_template.py b/ext/scheduler/airflow2/resources/expected_compiled_template.py index b8846fff3c..adfb0f58c0 100644 --- a/ext/scheduler/airflow2/resources/expected_compiled_template.py +++ b/ext/scheduler/airflow2/resources/expected_compiled_template.py @@ -65,6 +65,7 @@ task_id = JOB_START_EVENT_NAME, python_callable = log_job_start, provide_context=True, + depends_on_past=False, dag=dag ) @@ -73,6 +74,7 @@ python_callable = log_job_end, provide_context=True, trigger_rule= 'all_success', + depends_on_past=False, dag=dag ) @@ -174,6 +176,7 @@ get_logs=True, dag=dag, in_cluster=True, + depends_on_past=False, is_delete_operator_pod=True, do_xcom_push=False, env_vars=executor_env_vars, @@ -211,6 +214,7 @@ get_logs=True, dag=dag, in_cluster=True, + depends_on_past=False, is_delete_operator_pod=True, do_xcom_push=False, env_vars=executor_env_vars, @@ -248,6 +252,7 @@ get_logs=True, dag=dag, in_cluster=True, + depends_on_past=False, is_delete_operator_pod=True, do_xcom_push=False, env_vars=executor_env_vars, @@ -273,6 +278,7 @@ poke_interval=SENSOR_DEFAULT_POKE_INTERVAL_IN_SECS, timeout=SENSOR_DEFAULT_TIMEOUT_IN_SECS, task_id="wait_foo-intra-dep-job-bq", + depends_on_past=False, dag=dag ) wait_foo__dash__inter__dash__dep__dash__job = SuperExternalTaskSensor( @@ -286,6 +292,7 @@ poke_interval=SENSOR_DEFAULT_POKE_INTERVAL_IN_SECS, timeout=SENSOR_DEFAULT_TIMEOUT_IN_SECS, task_id="wait_foo-inter-dep-job-bq", + depends_on_past=False, dag=dag ) @@ -299,6 +306,7 @@ window_version=int("1"), poke_interval=SENSOR_DEFAULT_POKE_INTERVAL_IN_SECS, timeout=SENSOR_DEFAULT_TIMEOUT_IN_SECS, + depends_on_past=False, task_id="wait_foo-external-optimus-dep-job-bq", dag=dag )