Skip to content

Commit

Permalink
Fix mypy errors
Browse files Browse the repository at this point in the history
  • Loading branch information
sunank200 committed Jun 6, 2024
1 parent ed4c117 commit 0d45a54
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 4 deletions.
2 changes: 1 addition & 1 deletion airflow/dag_processing/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -601,7 +601,7 @@ def _run_parsing_loop(self):
self.waitables.pop(sentinel)
self._processors.pop(processor.file_path)

if True:
if self.standalone_dag_processor:
self._fetch_callbacks(max_callbacks_per_loop)
self._scan_stale_dags()
DagWarning.purge_inactive_dag_warnings()
Expand Down
18 changes: 15 additions & 3 deletions airflow/dag_processing/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@
from airflow.models.dagwarning import DagWarning, DagWarningType
from airflow.models.errors import ParseImportError
from airflow.models.serialized_dag import SerializedDagModel
from airflow.models.taskinstance import TaskInstance, TaskInstance as TI
from airflow.models.taskinstance import TaskInstance, TaskInstance as TI, _run_finished_callback
from airflow.stats import Stats
from airflow.utils import timezone
from airflow.utils.email import get_email_address_list, send_email
Expand Down Expand Up @@ -762,7 +762,16 @@ def _execute_dag_callbacks(self, dagbag: DagBag, request: DagCallbackRequest, se
if callbacks and context:
DAG.execute_callback(callbacks, context, dag.dag_id)

def _execute_task_callbacks(self, dagbag: DagBag | None, request: TaskCallbackRequest, session: Session):
def _execute_task_callbacks(
self, dagbag: DagBag | None, request: TaskCallbackRequest, session: Session
) -> None:
"""
Execute the task callbacks.
:param dagbag: the DagBag to use to get the task instance
:param request: the task callback request
:param session: the session to use
"""
try:
callback_type = TaskInstanceState(request.task_callback_type)
except Exception:
Expand All @@ -786,6 +795,7 @@ def _execute_task_callbacks(self, dagbag: DagBag | None, request: TaskCallbackRe
map_index=simple_ti.map_index,
session=session,
)

if not ti:
return

Expand All @@ -810,8 +820,10 @@ def _execute_task_callbacks(self, dagbag: DagBag | None, request: TaskCallbackRe
if callback_type is TaskInstanceState.SUCCESS:
context = ti.get_template_context(session=session)
if callback_type is TaskInstanceState.SUCCESS:
if not ti.task:
return
callbacks = ti.task.on_success_callback
ti._run_finished_callback(callbacks=callbacks, context=context, callback_type="on_success")
_run_finished_callback(callbacks=callbacks, context=context)
self.log.info("Executed callback for %s in state %s", ti, ti.state)
elif not is_remote or callback_type is TaskInstanceState.FAILED:
ti.handle_failure(error=request.msg, test_mode=self.UNIT_TEST_MODE, session=session)
Expand Down

0 comments on commit 0d45a54

Please sign in to comment.