Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Purge existing SLA implementation #42285

Merged
merged 11 commits into from
Sep 25, 2024
1 change: 0 additions & 1 deletion airflow/api_internal/endpoints/rpc_api_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,6 @@ def initialize_method_map() -> dict[str, Callable]:
DagFileProcessor._execute_task_callbacks,
DagFileProcessor.execute_callbacks,
DagFileProcessor.execute_callbacks_without_dag,
DagFileProcessor.manage_slas,
DagFileProcessor.save_dag_to_db,
DagFileProcessor.update_import_errors,
DagFileProcessor._validate_task_pools_and_update_dag_warnings,
Expand Down
20 changes: 0 additions & 20 deletions airflow/callbacks/callback_requests.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,23 +137,3 @@ def __init__(
self.dag_id = dag_id
self.run_id = run_id
self.is_failure_callback = is_failure_callback


class SlaCallbackRequest(CallbackRequest):
"""
A class with information about the SLA callback to be executed.

:param full_filepath: File Path to use to run the callback
:param dag_id: DAG ID
:param processor_subdir: Directory used by Dag Processor when parsed the dag.
"""

def __init__(
self,
full_filepath: str,
dag_id: str,
processor_subdir: str | None,
msg: str | None = None,
):
super().__init__(full_filepath, processor_subdir=processor_subdir, msg=msg)
self.dag_id = dag_id
7 changes: 0 additions & 7 deletions airflow/config_templates/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -395,13 +395,6 @@ core:
type: integer
example: ~
default: "30"
check_slas:
description: |
On each dagrun check against defined SLAs
version_added: 1.10.8
type: string
example: ~
default: "True"
xcom_backend:
description: |
Path to custom XCom class that will be used to store and resolve operators results
Expand Down
47 changes: 12 additions & 35 deletions airflow/dag_processing/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@

import airflow.models
from airflow.api_internal.internal_api_call import internal_api_call
from airflow.callbacks.callback_requests import CallbackRequest, SlaCallbackRequest
from airflow.callbacks.callback_requests import CallbackRequest
from airflow.configuration import conf
from airflow.dag_processing.processor import DagFileProcessorProcess
from airflow.models.dag import DagModel
Expand Down Expand Up @@ -752,40 +752,17 @@ def _fetch_callbacks_with_retries(
return callback_queue

def _add_callback_to_queue(self, request: CallbackRequest):
# requests are sent by dag processors. SLAs exist per-dag, but can be generated once per SLA-enabled
# task in the dag. If treated like other callbacks, SLAs can cause feedback where a SLA arrives,
# goes to the front of the queue, gets processed, triggers more SLAs from the same DAG, which go to
# the front of the queue, and we never get round to picking stuff off the back of the queue
if isinstance(request, SlaCallbackRequest):
if request in self._callback_to_execute[request.full_filepath]:
self.log.debug("Skipping already queued SlaCallbackRequest")
return

# not already queued, queue the callback
# do NOT add the file of this SLA to self._file_path_queue. SLAs can arrive so rapidly that
# they keep adding to the file queue and never letting it drain. This in turn prevents us from
# ever rescanning the dags folder for changes to existing dags. We simply store the callback, and
# periodically, when self._file_path_queue is drained, we rescan and re-queue all DAG files.
# The SLAs will be picked up then. It means a delay in reacting to the SLAs (as controlled by the
# min_file_process_interval config) but stops SLAs from DoS'ing the queue.
self.log.debug("Queuing SlaCallbackRequest for %s", request.dag_id)
self._callback_to_execute[request.full_filepath].append(request)
Stats.incr("dag_processing.sla_callback_count")

# Other callbacks have a higher priority over DAG Run scheduling, so those callbacks gazump, even if
# already in the file path queue
else:
self.log.debug("Queuing %s CallbackRequest: %s", type(request).__name__, request)
self._callback_to_execute[request.full_filepath].append(request)
if request.full_filepath in self._file_path_queue:
# Remove file paths matching request.full_filepath from self._file_path_queue
# Since we are already going to use that filepath to run callback,
# there is no need to have same file path again in the queue
self._file_path_queue = deque(
file_path for file_path in self._file_path_queue if file_path != request.full_filepath
)
self._add_paths_to_queue([request.full_filepath], True)
Stats.incr("dag_processing.other_callback_count")
self.log.debug("Queuing %s CallbackRequest: %s", type(request).__name__, request)
self._callback_to_execute[request.full_filepath].append(request)
if request.full_filepath in self._file_path_queue:
# Remove file paths matching request.full_filepath from self._file_path_queue
# Since we are already going to use that filepath to run callback,
# there is no need to have same file path again in the queue
self._file_path_queue = deque(
file_path for file_path in self._file_path_queue if file_path != request.full_filepath
)
self._add_paths_to_queue([request.full_filepath], True)
Stats.incr("dag_processing.other_callback_count")

def _refresh_requested_filelocs(self) -> None:
"""Refresh filepaths from dag dir as requested by users via APIs."""
Expand Down
196 changes: 5 additions & 191 deletions airflow/dag_processing/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,33 +25,28 @@
import zipfile
from contextlib import contextmanager, redirect_stderr, redirect_stdout, suppress
from dataclasses import dataclass
from datetime import timedelta
from typing import TYPE_CHECKING, Generator, Iterable, Iterator
from typing import TYPE_CHECKING, Generator, Iterable

from setproctitle import setproctitle
from sqlalchemy import delete, event, func, or_, select
from sqlalchemy import delete, event

from airflow import settings
from airflow.api_internal.internal_api_call import InternalApiConfig, internal_api_call
from airflow.api_internal.internal_api_call import internal_api_call
from airflow.callbacks.callback_requests import (
DagCallbackRequest,
SlaCallbackRequest,
TaskCallbackRequest,
)
from airflow.configuration import conf
from airflow.exceptions import AirflowException, TaskNotFound
from airflow.exceptions import AirflowException
from airflow.listeners.listener import get_listener_manager
from airflow.models import SlaMiss
from airflow.models.dag import DAG, DagModel
from airflow.models.dagbag import DagBag
from airflow.models.dagrun import DagRun as DR
from airflow.models.dagwarning import DagWarning, DagWarningType
from airflow.models.errors import ParseImportError
from airflow.models.serialized_dag import SerializedDagModel
from airflow.models.taskinstance import TaskInstance, TaskInstance as TI, _run_finished_callback
from airflow.models.taskinstance import TaskInstance, _run_finished_callback
from airflow.stats import Stats
from airflow.utils import timezone
from airflow.utils.email import get_email_address_list, send_email
from airflow.utils.file import iter_airflow_imports, might_contain_dag
from airflow.utils.log.logging_mixin import LoggingMixin, StreamLogWriter, set_context
from airflow.utils.mixins import MultiprocessingStartMethodMixin
Expand Down Expand Up @@ -440,180 +435,6 @@ def __init__(self, dag_ids: list[str] | None, dag_directory: str, log: logging.L
self.dag_warnings: set[tuple[str, str]] = set()
self._last_num_of_db_queries = 0

@classmethod
@internal_api_call
@provide_session
def manage_slas(cls, dag_folder, dag_id: str, session: Session = NEW_SESSION) -> None:
"""
Find all tasks that have SLAs defined, and send alert emails when needed.

New SLA misses are also recorded in the database.

We are assuming that the scheduler runs often, so we only check for
tasks that should have succeeded in the past hour.
"""
dagbag = DagFileProcessor._get_dagbag(dag_folder)
dag = dagbag.get_dag(dag_id)
cls.logger().info("Running SLA Checks for %s", dag.dag_id)
if not any(isinstance(ti.sla, timedelta) for ti in dag.tasks):
cls.logger().info("Skipping SLA check for %s because no tasks in DAG have SLAs", dag)
return
qry = (
select(TI.task_id, func.max(DR.execution_date).label("max_ti"))
.join(TI.dag_run)
.where(TI.dag_id == dag.dag_id)
.where(or_(TI.state == TaskInstanceState.SUCCESS, TI.state == TaskInstanceState.SKIPPED))
.where(TI.task_id.in_(dag.task_ids))
.group_by(TI.task_id)
.subquery("sq")
)
# get recorded SlaMiss
recorded_slas_query = set(
session.execute(
select(SlaMiss.dag_id, SlaMiss.task_id, SlaMiss.execution_date).where(
SlaMiss.dag_id == dag.dag_id, SlaMiss.task_id.in_(dag.task_ids)
)
)
)
max_tis: Iterator[TI] = session.scalars(
select(TI)
.join(TI.dag_run)
.where(TI.dag_id == dag.dag_id, TI.task_id == qry.c.task_id, DR.execution_date == qry.c.max_ti)
)

ts = timezone.utcnow()

for ti in max_tis:
task = dag.get_task(ti.task_id)
if not task.sla:
continue

if not isinstance(task.sla, timedelta):
raise TypeError(
f"SLA is expected to be timedelta object, got "
f"{type(task.sla)} in {task.dag_id}:{task.task_id}"
)

sla_misses = []
next_info = dag.next_dagrun_info(dag.get_run_data_interval(ti.dag_run), restricted=False)
while next_info and next_info.logical_date < ts:
next_info = dag.next_dagrun_info(next_info.data_interval, restricted=False)

if next_info is None:
break
if (ti.dag_id, ti.task_id, next_info.logical_date) in recorded_slas_query:
continue
if next_info.logical_date + task.sla < ts:
sla_miss = SlaMiss(
task_id=ti.task_id,
dag_id=ti.dag_id,
execution_date=next_info.logical_date,
timestamp=ts,
)
sla_misses.append(sla_miss)
Stats.incr("sla_missed", tags={"dag_id": ti.dag_id, "task_id": ti.task_id})
if sla_misses:
session.add_all(sla_misses)
session.commit()
slas: list[SlaMiss] = session.scalars(
select(SlaMiss).where(~SlaMiss.notification_sent, SlaMiss.dag_id == dag.dag_id)
).all()
if slas:
sla_dates: list[datetime] = [sla.execution_date for sla in slas]
fetched_tis: list[TI] = session.scalars(
select(TI).where(
TI.dag_id == dag.dag_id,
TI.execution_date.in_(sla_dates),
TI.state != TaskInstanceState.SUCCESS,
)
).all()
blocking_tis: list[TI] = []
for ti in fetched_tis:
if ti.task_id in dag.task_ids:
ti.task = dag.get_task(ti.task_id)
blocking_tis.append(ti)
else:
session.delete(ti)
session.commit()

task_list = "\n".join(sla.task_id + " on " + sla.execution_date.isoformat() for sla in slas)
blocking_task_list = "\n".join(
ti.task_id + " on " + ti.execution_date.isoformat() for ti in blocking_tis
)
# Track whether email or any alert notification sent
# We consider email or the alert callback as notifications
email_sent = False
notification_sent = False
if dag.sla_miss_callback:
# Execute the alert callback
callbacks = (
dag.sla_miss_callback
if isinstance(dag.sla_miss_callback, list)
else [dag.sla_miss_callback]
)
for callback in callbacks:
cls.logger().info("Calling SLA miss callback %s", callback)
try:
callback(dag, task_list, blocking_task_list, slas, blocking_tis)
notification_sent = True
except Exception:
Stats.incr(
"sla_callback_notification_failure",
tags={
"dag_id": dag.dag_id,
"func_name": callback.__name__,
},
)
cls.logger().exception(
"Could not call sla_miss_callback(%s) for DAG %s",
callback.__name__,
dag.dag_id,
)
email_content = f"""\
Here's a list of tasks that missed their SLAs:
<pre><code>{task_list}\n<code></pre>
Blocking tasks:
<pre><code>{blocking_task_list}<code></pre>
Airflow Webserver URL: {conf.get(section='webserver', key='base_url')}
"""

tasks_missed_sla = []
for sla in slas:
try:
task = dag.get_task(sla.task_id)
except TaskNotFound:
# task already deleted from DAG, skip it
cls.logger().warning(
"Task %s doesn't exist in DAG anymore, skipping SLA miss notification.", sla.task_id
)
else:
tasks_missed_sla.append(task)

emails: set[str] = set()
for task in tasks_missed_sla:
if task.email:
if isinstance(task.email, str):
emails.update(get_email_address_list(task.email))
elif isinstance(task.email, (list, tuple)):
emails.update(task.email)
if emails:
try:
send_email(emails, f"[airflow] SLA miss on DAG={dag.dag_id}", email_content)
email_sent = True
notification_sent = True
except Exception:
Stats.incr("sla_email_notification_failure", tags={"dag_id": dag.dag_id})
cls.logger().exception(
"Could not send SLA Miss email notification for DAG %s", dag.dag_id
)
# If we sent any notification, update the sla_miss table
if notification_sent:
for sla in slas:
sla.email_sent = email_sent
sla.notification_sent = True
session.merge(sla)
session.commit()

@staticmethod
@internal_api_call
@provide_session
Expand Down Expand Up @@ -748,13 +569,6 @@ def execute_callbacks(
try:
if isinstance(request, TaskCallbackRequest):
cls._execute_task_callbacks(dagbag, request, unit_test_mode, session=session)
elif isinstance(request, SlaCallbackRequest):
if InternalApiConfig.get_use_internal_api():
cls.logger().warning(
"SlaCallbacks are not supported when the Internal API is enabled"
)
else:
DagFileProcessor.manage_slas(dagbag.dag_folder, request.dag_id, session=session)
elif isinstance(request, DagCallbackRequest):
cls._execute_dag_callbacks(dagbag, request, session=session)
except Exception:
Expand Down
Loading