Skip to content

Commit

Permalink
Merge branch 'main' into fix-lazyload-bool
Browse files Browse the repository at this point in the history
  • Loading branch information
johncmerfeld authored Nov 13, 2024
2 parents 12fdcca + 26e8063 commit 75ac853
Show file tree
Hide file tree
Showing 16 changed files with 120 additions and 80 deletions.
19 changes: 13 additions & 6 deletions airflow/config_templates/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -1064,17 +1064,24 @@ metrics:
example: "\"scheduler,executor,dagrun,pool,triggerer,celery\"
or \"^scheduler,^executor,heartbeat|timeout\""
default: ""
metrics_consistency_on:
description: |
Enables metrics consistency across all metrics loggers (ex: timer and timing metrics).
# TODO: Remove 'timer_unit_consistency' in Airflow 3.0
timer_unit_consistency:
description: |
Controls the consistency of timer units across all metrics loggers
(e.g., Statsd, Datadog, OpenTelemetry)
for timing and duration-based metrics. When enabled, all timers will publish
metrics in milliseconds for consistency and alignment with Airflow's default
metrics behavior in version 3.0+.
.. warning::
It is enabled by default from Airflow 3.
version_added: 2.10.0
It will be the default behavior from Airflow 3.0. If disabled, timers may publish
in seconds for backwards compatibility, though it is recommended to enable this
setting to ensure metric uniformity and forward-compat with Airflow 3.
version_added: 2.11.0
type: string
example: ~
default: "True"
default: "False"
statsd_on:
description: |
Enables sending metrics to StatsD.
Expand Down
12 changes: 6 additions & 6 deletions airflow/metrics/datadog_logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
from typing import TYPE_CHECKING

from airflow.configuration import conf
from airflow.exceptions import AirflowProviderDeprecationWarning
from airflow.exceptions import RemovedInAirflow3Warning
from airflow.metrics.protocols import Timer
from airflow.metrics.validators import (
PatternAllowListValidator,
Expand All @@ -42,11 +42,11 @@

log = logging.getLogger(__name__)

metrics_consistency_on = conf.getboolean("metrics", "metrics_consistency_on", fallback=True)
if not metrics_consistency_on:
timer_unit_consistency = conf.getboolean("metrics", "timer_unit_consistency")
if not timer_unit_consistency:
warnings.warn(
"Timer and timing metrics publish in seconds were deprecated. It is enabled by default from Airflow 3 onwards. Enable metrics consistency to publish all the timer and timing metrics in milliseconds.",
AirflowProviderDeprecationWarning,
"Timer and timing metrics publish in seconds were deprecated. It is enabled by default from Airflow 3 onwards. Enable timer_unit_consistency to publish all the timer and timing metrics in milliseconds.",
RemovedInAirflow3Warning,
stacklevel=2,
)

Expand Down Expand Up @@ -144,7 +144,7 @@ def timing(
tags_list = []
if self.metrics_validator.test(stat):
if isinstance(dt, datetime.timedelta):
if metrics_consistency_on:
if timer_unit_consistency:
dt = dt.total_seconds() * 1000.0
else:
dt = dt.total_seconds()
Expand Down
12 changes: 6 additions & 6 deletions airflow/metrics/otel_logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
from opentelemetry.sdk.resources import HOST_NAME, SERVICE_NAME, Resource

from airflow.configuration import conf
from airflow.exceptions import AirflowProviderDeprecationWarning
from airflow.exceptions import RemovedInAirflow3Warning
from airflow.metrics.protocols import Timer
from airflow.metrics.validators import (
OTEL_NAME_MAX_LENGTH,
Expand Down Expand Up @@ -73,11 +73,11 @@
# Delimiter is placed between the universal metric prefix and the unique metric name.
DEFAULT_METRIC_NAME_DELIMITER = "."

metrics_consistency_on = conf.getboolean("metrics", "metrics_consistency_on", fallback=True)
if not metrics_consistency_on:
timer_unit_consistency = conf.getboolean("metrics", "timer_unit_consistency")
if not timer_unit_consistency:
warnings.warn(
"Timer and timing metrics publish in seconds were deprecated. It is enabled by default from Airflow 3 onwards. Enable metrics consistency to publish all the timer and timing metrics in milliseconds.",
AirflowProviderDeprecationWarning,
"Timer and timing metrics publish in seconds were deprecated. It is enabled by default from Airflow 3 onwards. Enable timer_unit_consistency to publish all the timer and timing metrics in milliseconds.",
RemovedInAirflow3Warning,
stacklevel=2,
)

Expand Down Expand Up @@ -284,7 +284,7 @@ def timing(
"""OTel does not have a native timer, stored as a Gauge whose value is number of seconds elapsed."""
if self.metrics_validator.test(stat) and name_is_otel_safe(self.prefix, stat):
if isinstance(dt, datetime.timedelta):
if metrics_consistency_on:
if timer_unit_consistency:
dt = dt.total_seconds() * 1000.0
else:
dt = dt.total_seconds()
Expand Down
12 changes: 6 additions & 6 deletions airflow/metrics/protocols.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,16 @@
from typing import Union

from airflow.configuration import conf
from airflow.exceptions import AirflowProviderDeprecationWarning
from airflow.exceptions import RemovedInAirflow3Warning
from airflow.typing_compat import Protocol

DeltaType = Union[int, float, datetime.timedelta]

metrics_consistency_on = conf.getboolean("metrics", "metrics_consistency_on", fallback=True)
if not metrics_consistency_on:
timer_unit_consistency = conf.getboolean("metrics", "timer_unit_consistency")
if not timer_unit_consistency:
warnings.warn(
"Timer and timing metrics publish in seconds were deprecated. It is enabled by default from Airflow 3 onwards. Enable metrics consistency to publish all the timer and timing metrics in milliseconds.",
AirflowProviderDeprecationWarning,
"Timer and timing metrics publish in seconds were deprecated. It is enabled by default from Airflow 3 onwards. Enable timer_unit_consistency to publish all the timer and timing metrics in milliseconds.",
RemovedInAirflow3Warning,
stacklevel=2,
)

Expand Down Expand Up @@ -127,7 +127,7 @@ def start(self) -> Timer:
def stop(self, send: bool = True) -> None:
"""Stop the timer, and optionally send it to stats backend."""
if self._start_time is not None:
if metrics_consistency_on:
if timer_unit_consistency:
self.duration = 1000.0 * (time.perf_counter() - self._start_time) # Convert to milliseconds.
else:
self.duration = time.perf_counter() - self._start_time
Expand Down
14 changes: 7 additions & 7 deletions airflow/models/taskinstance.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,12 +80,12 @@
from airflow.exceptions import (
AirflowException,
AirflowFailException,
AirflowProviderDeprecationWarning,
AirflowRescheduleException,
AirflowSensorTimeout,
AirflowSkipException,
AirflowTaskTerminated,
AirflowTaskTimeout,
RemovedInAirflow3Warning,
TaskDeferralError,
TaskDeferred,
UnmappableXComLengthPushed,
Expand Down Expand Up @@ -176,11 +176,11 @@

PAST_DEPENDS_MET = "past_depends_met"

metrics_consistency_on = conf.getboolean("metrics", "metrics_consistency_on", fallback=True)
if not metrics_consistency_on:
timer_unit_consistency = conf.getboolean("metrics", "timer_unit_consistency")
if not timer_unit_consistency:
warnings.warn(
"Timer and timing metrics publish in seconds were deprecated. It is enabled by default from Airflow 3 onwards. Enable metrics consistency to publish all the timer and timing metrics in milliseconds.",
AirflowProviderDeprecationWarning,
"Timer and timing metrics publish in seconds were deprecated. It is enabled by default from Airflow 3 onwards. Enable timer_unit_consistency to publish all the timer and timing metrics in milliseconds.",
RemovedInAirflow3Warning,
stacklevel=2,
)

Expand Down Expand Up @@ -2827,7 +2827,7 @@ def emit_state_change_metric(self, new_state: TaskInstanceState) -> None:
self.task_id,
)
return
if metrics_consistency_on:
if timer_unit_consistency:
timing = timezone.utcnow() - self.queued_dttm
else:
timing = (timezone.utcnow() - self.queued_dttm).total_seconds()
Expand All @@ -2843,7 +2843,7 @@ def emit_state_change_metric(self, new_state: TaskInstanceState) -> None:
self.task_id,
)
return
if metrics_consistency_on:
if timer_unit_consistency:
timing = timezone.utcnow() - self.start_date
else:
timing = (timezone.utcnow() - self.start_date).total_seconds()
Expand Down
5 changes: 0 additions & 5 deletions airflow/ui/src/components/DagRunInfo.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ type Props = {
readonly dataIntervalEnd?: string | null;
readonly dataIntervalStart?: string | null;
readonly endDate?: string | null;
readonly logicalDate?: string | null;
readonly nextDagrunCreateAfter?: string | null;
readonly startDate?: string | null;
};
Expand All @@ -35,7 +34,6 @@ const DagRunInfo = ({
dataIntervalEnd,
dataIntervalStart,
endDate,
logicalDate,
nextDagrunCreateAfter,
startDate,
}: Props) =>
Expand All @@ -54,9 +52,6 @@ const DagRunInfo = ({
Run After: <Time datetime={nextDagrunCreateAfter} />
</Text>
) : undefined}
{Boolean(logicalDate) ? (
<Text>Logical Date: {logicalDate}</Text>
) : undefined}
{Boolean(startDate) ? (
<Text>Start Date: {startDate}</Text>
) : undefined}
Expand Down
38 changes: 38 additions & 0 deletions airflow/ui/src/constants/sortParams.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*!
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import { createListCollection } from "@chakra-ui/react/collection";

export const DagSortOptions = createListCollection({
items: [
{ label: "Sort by Display Name (A-Z)", value: "dag_display_name" },
{ label: "Sort by Display Name (Z-A)", value: "-dag_display_name" },
{ label: "Sort by Next DAG Run (Earliest-Latest)", value: "next_dagrun" },
{ label: "Sort by Next DAG Run (Latest-Earliest)", value: "-next_dagrun" },
{ label: "Sort by Last Run State (A-Z)", value: "last_run_state" },
{ label: "Sort by Last Run State (Z-A)", value: "-last_run_state" },
{
label: "Sort by Last Run Start Date (Earliest-Latest)",
value: "last_run_start_date",
},
{
label: "Sort by Last Run Start Date (Latest-Earliest)",
value: "-last_run_start_date",
},
],
});
1 change: 0 additions & 1 deletion airflow/ui/src/pages/DagsList/Dag/Header.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,6 @@ export const Header = ({
dataIntervalEnd={latestRun.data_interval_end}
dataIntervalStart={latestRun.data_interval_start}
endDate={latestRun.end_date}
logicalDate={latestRun.logical_date}
startDate={latestRun.start_date}
/>
) : undefined}
Expand Down
1 change: 0 additions & 1 deletion airflow/ui/src/pages/DagsList/DagCard.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,6 @@ export const DagCard = ({ dag }: Props) => {
dataIntervalEnd={latestRun.data_interval_end}
dataIntervalStart={latestRun.data_interval_start}
endDate={latestRun.end_date}
logicalDate={latestRun.logical_date}
startDate={latestRun.start_date}
/>
) : undefined}
Expand Down
12 changes: 2 additions & 10 deletions airflow/ui/src/pages/DagsList/DagsList.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import {
Skeleton,
VStack,
Link,
createListCollection,
type SelectValueChangeDetails,
Box,
} from "@chakra-ui/react";
Expand All @@ -49,6 +48,7 @@ import {
SearchParamsKeys,
type SearchParamsKeysType,
} from "src/constants/searchParams";
import { DagSortOptions as sortOptions } from "src/constants/sortParams";
import { useDags } from "src/queries/useDags";
import { pluralize } from "src/utils";

Expand Down Expand Up @@ -107,7 +107,6 @@ const columns: Array<ColumnDef<DAGWithLatestDagRunsResponse>> = [
dataIntervalEnd={original.latest_dag_runs[0].data_interval_end}
dataIntervalStart={original.latest_dag_runs[0].data_interval_start}
endDate={original.latest_dag_runs[0].end_date}
logicalDate={original.latest_dag_runs[0].logical_date}
startDate={original.latest_dag_runs[0].start_date}
/>
) : undefined,
Expand Down Expand Up @@ -148,13 +147,6 @@ const cardDef: CardDef<DAGWithLatestDagRunsResponse> = {

const DAGS_LIST_DISPLAY = "dags_list_display";

const sortOptions = createListCollection({
items: [
{ label: "Sort by Dag ID (A-Z)", value: "dag_id" },
{ label: "Sort by Dag ID (Z-A)", value: "-dag_id" },
],
});

export const DagsList = () => {
const [searchParams, setSearchParams] = useSearchParams();
const [display, setDisplay] = useLocalStorage<"card" | "table">(
Expand Down Expand Up @@ -237,7 +229,7 @@ export const DagsList = () => {
data-testid="sort-by-select"
onValueChange={handleSortChange}
value={orderBy === undefined ? undefined : [orderBy]}
width="200px"
width="310px"
>
<Select.Trigger>
<Select.ValueText placeholder="Sort by" />
Expand Down
2 changes: 1 addition & 1 deletion airflow/ui/src/pages/DagsList/LatestRun.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ type Props = {
export const LatestRun = ({ latestRun }: Props) =>
latestRun ? (
<HStack fontSize="sm">
<Time datetime={latestRun.logical_date} />
<Time datetime={latestRun.start_date} />
<Box
bg={stateColor[latestRun.state]}
borderRadius="50%"
Expand Down
2 changes: 1 addition & 1 deletion airflow/ui/src/pages/DagsList/RecentRuns.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ export const RecentRuns = ({
<Box>
<Text>State: {run.state}</Text>
<Text>
Logical Date: <Time datetime={run.logical_date} />
Start Date: <Time datetime={run.start_date} />
</Text>
<Text>Duration: {run.duration.toFixed(2)}s</Text>
</Box>
Expand Down
12 changes: 11 additions & 1 deletion newsfragments/39908.significant.rst
Original file line number Diff line number Diff line change
@@ -1 +1,11 @@
Publishing timer and timing metrics in seconds has been deprecated. In Airflow 3, ``metrics_consistency_on`` from ``metrics`` is enabled by default. You can disable this for backward compatibility. To publish all timer and timing metrics in milliseconds, ensure metrics consistency is enabled
Publishing timer and timing metrics in seconds is now deprecated.

In Airflow 3.0, the ``timer_unit_consistency`` setting in the ``metrics`` section will be
enabled by default and setting itself will be removed. This will standardize all timer and timing metrics to
milliseconds across all metric loggers.

**Users Integrating with Datadog, OpenTelemetry, or other metric backends** should enable this setting. For users, using
``statsd``, this change will not affect you.

If you need backward compatibility, you can leave this setting disabled temporarily, but enabling
``timer_unit_consistency`` is encouraged to future-proof your metrics setup.
Loading

0 comments on commit 75ac853

Please sign in to comment.