diff --git a/airflow/config_templates/config.yml b/airflow/config_templates/config.yml index dc703446af24..c77f9476b0d2 100644 --- a/airflow/config_templates/config.yml +++ b/airflow/config_templates/config.yml @@ -1064,17 +1064,24 @@ metrics: example: "\"scheduler,executor,dagrun,pool,triggerer,celery\" or \"^scheduler,^executor,heartbeat|timeout\"" default: "" - metrics_consistency_on: - description: | - Enables metrics consistency across all metrics loggers (ex: timer and timing metrics). + # TODO: Remove 'timer_unit_consistency' in Airflow 3.0 + timer_unit_consistency: + description: | + Controls the consistency of timer units across all metrics loggers + (e.g., Statsd, Datadog, OpenTelemetry) + for timing and duration-based metrics. When enabled, all timers will publish + metrics in milliseconds for consistency and alignment with Airflow's default + metrics behavior in version 3.0+. .. warning:: - It is enabled by default from Airflow 3. - version_added: 2.10.0 + It will be the default behavior from Airflow 3.0. If disabled, timers may publish + in seconds for backwards compatibility, though it is recommended to enable this + setting to ensure metric uniformity and forward-compat with Airflow 3. + version_added: 2.11.0 type: string example: ~ - default: "True" + default: "False" statsd_on: description: | Enables sending metrics to StatsD. diff --git a/airflow/metrics/datadog_logger.py b/airflow/metrics/datadog_logger.py index 60b5424a6f5c..81926716eb25 100644 --- a/airflow/metrics/datadog_logger.py +++ b/airflow/metrics/datadog_logger.py @@ -23,7 +23,7 @@ from typing import TYPE_CHECKING from airflow.configuration import conf -from airflow.exceptions import AirflowProviderDeprecationWarning +from airflow.exceptions import RemovedInAirflow3Warning from airflow.metrics.protocols import Timer from airflow.metrics.validators import ( PatternAllowListValidator, @@ -42,11 +42,11 @@ log = logging.getLogger(__name__) -metrics_consistency_on = conf.getboolean("metrics", "metrics_consistency_on", fallback=True) -if not metrics_consistency_on: +timer_unit_consistency = conf.getboolean("metrics", "timer_unit_consistency") +if not timer_unit_consistency: warnings.warn( - "Timer and timing metrics publish in seconds were deprecated. It is enabled by default from Airflow 3 onwards. Enable metrics consistency to publish all the timer and timing metrics in milliseconds.", - AirflowProviderDeprecationWarning, + "Timer and timing metrics publish in seconds were deprecated. It is enabled by default from Airflow 3 onwards. Enable timer_unit_consistency to publish all the timer and timing metrics in milliseconds.", + RemovedInAirflow3Warning, stacklevel=2, ) @@ -144,7 +144,7 @@ def timing( tags_list = [] if self.metrics_validator.test(stat): if isinstance(dt, datetime.timedelta): - if metrics_consistency_on: + if timer_unit_consistency: dt = dt.total_seconds() * 1000.0 else: dt = dt.total_seconds() diff --git a/airflow/metrics/otel_logger.py b/airflow/metrics/otel_logger.py index 6d7d6e8fffa1..ed123608626f 100644 --- a/airflow/metrics/otel_logger.py +++ b/airflow/metrics/otel_logger.py @@ -31,7 +31,7 @@ from opentelemetry.sdk.resources import HOST_NAME, SERVICE_NAME, Resource from airflow.configuration import conf -from airflow.exceptions import AirflowProviderDeprecationWarning +from airflow.exceptions import RemovedInAirflow3Warning from airflow.metrics.protocols import Timer from airflow.metrics.validators import ( OTEL_NAME_MAX_LENGTH, @@ -73,11 +73,11 @@ # Delimiter is placed between the universal metric prefix and the unique metric name. DEFAULT_METRIC_NAME_DELIMITER = "." -metrics_consistency_on = conf.getboolean("metrics", "metrics_consistency_on", fallback=True) -if not metrics_consistency_on: +timer_unit_consistency = conf.getboolean("metrics", "timer_unit_consistency") +if not timer_unit_consistency: warnings.warn( - "Timer and timing metrics publish in seconds were deprecated. It is enabled by default from Airflow 3 onwards. Enable metrics consistency to publish all the timer and timing metrics in milliseconds.", - AirflowProviderDeprecationWarning, + "Timer and timing metrics publish in seconds were deprecated. It is enabled by default from Airflow 3 onwards. Enable timer_unit_consistency to publish all the timer and timing metrics in milliseconds.", + RemovedInAirflow3Warning, stacklevel=2, ) @@ -284,7 +284,7 @@ def timing( """OTel does not have a native timer, stored as a Gauge whose value is number of seconds elapsed.""" if self.metrics_validator.test(stat) and name_is_otel_safe(self.prefix, stat): if isinstance(dt, datetime.timedelta): - if metrics_consistency_on: + if timer_unit_consistency: dt = dt.total_seconds() * 1000.0 else: dt = dt.total_seconds() diff --git a/airflow/metrics/protocols.py b/airflow/metrics/protocols.py index 7eef7929e02d..0d12704e87a3 100644 --- a/airflow/metrics/protocols.py +++ b/airflow/metrics/protocols.py @@ -23,16 +23,16 @@ from typing import Union from airflow.configuration import conf -from airflow.exceptions import AirflowProviderDeprecationWarning +from airflow.exceptions import RemovedInAirflow3Warning from airflow.typing_compat import Protocol DeltaType = Union[int, float, datetime.timedelta] -metrics_consistency_on = conf.getboolean("metrics", "metrics_consistency_on", fallback=True) -if not metrics_consistency_on: +timer_unit_consistency = conf.getboolean("metrics", "timer_unit_consistency") +if not timer_unit_consistency: warnings.warn( - "Timer and timing metrics publish in seconds were deprecated. It is enabled by default from Airflow 3 onwards. Enable metrics consistency to publish all the timer and timing metrics in milliseconds.", - AirflowProviderDeprecationWarning, + "Timer and timing metrics publish in seconds were deprecated. It is enabled by default from Airflow 3 onwards. Enable timer_unit_consistency to publish all the timer and timing metrics in milliseconds.", + RemovedInAirflow3Warning, stacklevel=2, ) @@ -127,7 +127,7 @@ def start(self) -> Timer: def stop(self, send: bool = True) -> None: """Stop the timer, and optionally send it to stats backend.""" if self._start_time is not None: - if metrics_consistency_on: + if timer_unit_consistency: self.duration = 1000.0 * (time.perf_counter() - self._start_time) # Convert to milliseconds. else: self.duration = time.perf_counter() - self._start_time diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py index c05b1dd62eca..0b2a71e92317 100644 --- a/airflow/models/taskinstance.py +++ b/airflow/models/taskinstance.py @@ -80,12 +80,12 @@ from airflow.exceptions import ( AirflowException, AirflowFailException, - AirflowProviderDeprecationWarning, AirflowRescheduleException, AirflowSensorTimeout, AirflowSkipException, AirflowTaskTerminated, AirflowTaskTimeout, + RemovedInAirflow3Warning, TaskDeferralError, TaskDeferred, UnmappableXComLengthPushed, @@ -176,11 +176,11 @@ PAST_DEPENDS_MET = "past_depends_met" -metrics_consistency_on = conf.getboolean("metrics", "metrics_consistency_on", fallback=True) -if not metrics_consistency_on: +timer_unit_consistency = conf.getboolean("metrics", "timer_unit_consistency") +if not timer_unit_consistency: warnings.warn( - "Timer and timing metrics publish in seconds were deprecated. It is enabled by default from Airflow 3 onwards. Enable metrics consistency to publish all the timer and timing metrics in milliseconds.", - AirflowProviderDeprecationWarning, + "Timer and timing metrics publish in seconds were deprecated. It is enabled by default from Airflow 3 onwards. Enable timer_unit_consistency to publish all the timer and timing metrics in milliseconds.", + RemovedInAirflow3Warning, stacklevel=2, ) @@ -2827,7 +2827,7 @@ def emit_state_change_metric(self, new_state: TaskInstanceState) -> None: self.task_id, ) return - if metrics_consistency_on: + if timer_unit_consistency: timing = timezone.utcnow() - self.queued_dttm else: timing = (timezone.utcnow() - self.queued_dttm).total_seconds() @@ -2843,7 +2843,7 @@ def emit_state_change_metric(self, new_state: TaskInstanceState) -> None: self.task_id, ) return - if metrics_consistency_on: + if timer_unit_consistency: timing = timezone.utcnow() - self.start_date else: timing = (timezone.utcnow() - self.start_date).total_seconds() diff --git a/airflow/ui/src/components/DagRunInfo.tsx b/airflow/ui/src/components/DagRunInfo.tsx index ecb062b8076a..7add29b8e754 100644 --- a/airflow/ui/src/components/DagRunInfo.tsx +++ b/airflow/ui/src/components/DagRunInfo.tsx @@ -26,7 +26,6 @@ type Props = { readonly dataIntervalEnd?: string | null; readonly dataIntervalStart?: string | null; readonly endDate?: string | null; - readonly logicalDate?: string | null; readonly nextDagrunCreateAfter?: string | null; readonly startDate?: string | null; }; @@ -35,7 +34,6 @@ const DagRunInfo = ({ dataIntervalEnd, dataIntervalStart, endDate, - logicalDate, nextDagrunCreateAfter, startDate, }: Props) => @@ -54,9 +52,6 @@ const DagRunInfo = ({ Run After: