diff --git a/airflow/api_fastapi/db.py b/airflow/api_fastapi/db.py index 51faee25ed5a0..c3ed01a0aefec 100644 --- a/airflow/api_fastapi/db.py +++ b/airflow/api_fastapi/db.py @@ -19,6 +19,9 @@ from typing import TYPE_CHECKING +from sqlalchemy import func, select + +from airflow.models.dagrun import DagRun from airflow.utils.session import create_session if TYPE_CHECKING: @@ -52,3 +55,11 @@ def apply_filters_to_select(base_select: Select, filters: list[BaseParam]) -> Se select = filter.to_orm(select) return select + + +latest_dag_run_per_dag_id_cte = ( + select(DagRun.dag_id, func.max(DagRun.start_date).label("start_date")) + .where() + .group_by(DagRun.dag_id) + .cte() +) diff --git a/airflow/api_fastapi/openapi/v1-generated.yaml b/airflow/api_fastapi/openapi/v1-generated.yaml index 6d77056d0574d..f488825449c3a 100644 --- a/airflow/api_fastapi/openapi/v1-generated.yaml +++ b/airflow/api_fastapi/openapi/v1-generated.yaml @@ -103,6 +103,14 @@ paths: - type: boolean - type: 'null' title: Paused + - name: last_dag_run_state + in: query + required: false + schema: + anyOf: + - $ref: '#/components/schemas/DagRunState' + - type: 'null' + title: Last Dag Run State - name: order_by in: query required: false @@ -347,6 +355,22 @@ components: - file_token title: DAGResponse description: DAG serializer for responses. + DagRunState: + type: string + enum: + - queued + - running + - success + - failed + title: DagRunState + description: 'All possible states that a DagRun can be in. + + + These are "shared" with TaskInstanceState in some parts of the code, + + so please ensure that their values always match the ones with the + + same name in TaskInstanceState.' DagTagPydantic: properties: name: diff --git a/airflow/api_fastapi/parameters.py b/airflow/api_fastapi/parameters.py index 589403cc4e960..09eea5f6e055b 100644 --- a/airflow/api_fastapi/parameters.py +++ b/airflow/api_fastapi/parameters.py @@ -18,13 +18,15 @@ from __future__ import annotations from abc import ABC, abstractmethod -from typing import TYPE_CHECKING, Any, Generic, List, TypeVar, Union +from typing import TYPE_CHECKING, Any, Generic, List, TypeVar from fastapi import Depends, HTTPException, Query from sqlalchemy import case, or_ from typing_extensions import Annotated, Self from airflow.models.dag import DagModel, DagTag +from airflow.models.dagrun import DagRun +from airflow.utils.state import DagRunState if TYPE_CHECKING: from sqlalchemy.sql import ColumnElement, Select @@ -43,14 +45,14 @@ def __init__(self) -> None: def to_orm(self, select: Select) -> Select: pass - @abstractmethod - def __call__(self, *args: Any, **kwarg: Any) -> BaseParam: - pass - - def set_value(self, value: T) -> Self: + def set_value(self, value: T | None) -> Self: self.value = value return self + @abstractmethod + def depends(self, *args: Any, **kwargs: Any) -> Self: + pass + class _LimitFilter(BaseParam[int]): """Filter on the limit.""" @@ -61,7 +63,7 @@ def to_orm(self, select: Select) -> Select: return select.limit(self.value) - def __call__(self, limit: int = 100) -> _LimitFilter: + def depends(self, limit: int = 100) -> _LimitFilter: return self.set_value(limit) @@ -73,11 +75,11 @@ def to_orm(self, select: Select) -> Select: return select return select.offset(self.value) - def __call__(self, offset: int = 0) -> _OffsetFilter: + def depends(self, offset: int = 0) -> _OffsetFilter: return self.set_value(offset) -class _PausedFilter(BaseParam[Union[bool, None]]): +class _PausedFilter(BaseParam[bool]): """Filter on is_paused.""" def to_orm(self, select: Select) -> Select: @@ -85,7 +87,7 @@ def to_orm(self, select: Select) -> Select: return select return select.where(DagModel.is_paused == self.value) - def __call__(self, paused: bool | None = Query(default=None)) -> _PausedFilter: + def depends(self, paused: bool | None = None) -> _PausedFilter: return self.set_value(paused) @@ -97,11 +99,11 @@ def to_orm(self, select: Select) -> Select: return select.where(DagModel.is_active == self.value) return select - def __call__(self, only_active: bool = Query(default=True)) -> _OnlyActiveFilter: + def depends(self, only_active: bool = True) -> _OnlyActiveFilter: return self.set_value(only_active) -class _SearchParam(BaseParam[Union[str, None]]): +class _SearchParam(BaseParam[str]): """Search on attribute.""" def __init__(self, attribute: ColumnElement) -> None: @@ -120,7 +122,7 @@ class _DagIdPatternSearch(_SearchParam): def __init__(self) -> None: super().__init__(DagModel.dag_id) - def __call__(self, dag_id_pattern: str | None = Query(default=None)) -> _DagIdPatternSearch: + def depends(self, dag_id_pattern: str | None = None) -> _DagIdPatternSearch: return self.set_value(dag_id_pattern) @@ -130,15 +132,18 @@ class _DagDisplayNamePatternSearch(_SearchParam): def __init__(self) -> None: super().__init__(DagModel.dag_display_name) - def __call__( - self, dag_display_name_pattern: str | None = Query(default=None) - ) -> _DagDisplayNamePatternSearch: + def depends(self, dag_display_name_pattern: str | None = None) -> _DagDisplayNamePatternSearch: return self.set_value(dag_display_name_pattern) -class SortParam(BaseParam[Union[str]]): +class SortParam(BaseParam[str]): """Order result by the attribute.""" + attr_mapping = { + "last_run_state": DagRun.state, + "last_run_start_date": DagRun.start_date, + } + def __init__(self, allowed_attrs: list[str]) -> None: super().__init__() self.allowed_attrs = allowed_attrs @@ -155,17 +160,17 @@ def to_orm(self, select: Select) -> Select: f"the attribute does not exist on the model", ) - column = getattr(DagModel, lstriped_orderby) + column = self.attr_mapping.get(lstriped_orderby, None) or getattr(DagModel, lstriped_orderby) # MySQL does not support `nullslast`, and True/False ordering depends on the - # database implementation + # database implementation. nullscheck = case((column.isnot(None), 0), else_=1) if self.value[0] == "-": - return select.order_by(nullscheck, column.desc(), DagModel.dag_id) + return select.order_by(nullscheck, column.desc(), DagModel.dag_id.desc()) else: - return select.order_by(nullscheck, column.asc(), DagModel.dag_id) + return select.order_by(nullscheck, column.asc(), DagModel.dag_id.asc()) - def __call__(self, order_by: str = Query(default="dag_id")) -> SortParam: + def depends(self, order_by: str = "dag_id") -> SortParam: return self.set_value(order_by) @@ -179,7 +184,7 @@ def to_orm(self, select: Select) -> Select: conditions = [DagModel.tags.any(DagTag.name == tag) for tag in self.value] return select.where(or_(*conditions)) - def __call__(self, tags: list[str] = Query(default_factory=list)) -> _TagsFilter: + def depends(self, tags: list[str] = Query(default_factory=list)) -> _TagsFilter: return self.set_value(tags) @@ -193,17 +198,32 @@ def to_orm(self, select: Select) -> Select: conditions = [DagModel.owners.ilike(f"%{owner}%") for owner in self.value] return select.where(or_(*conditions)) - def __call__(self, owners: list[str] = Query(default_factory=list)) -> _OwnersFilter: + def depends(self, owners: list[str] = Query(default_factory=list)) -> _OwnersFilter: return self.set_value(owners) -QueryLimit = Annotated[_LimitFilter, Depends(_LimitFilter())] -QueryOffset = Annotated[_OffsetFilter, Depends(_OffsetFilter())] -QueryPausedFilter = Annotated[_PausedFilter, Depends(_PausedFilter())] -QueryOnlyActiveFilter = Annotated[_OnlyActiveFilter, Depends(_OnlyActiveFilter())] -QueryDagIdPatternSearch = Annotated[_DagIdPatternSearch, Depends(_DagIdPatternSearch())] +class _LastDagRunStateFilter(BaseParam[DagRunState]): + """Filter on the state of the latest DagRun.""" + + def to_orm(self, select: Select) -> Select: + if self.value is None: + return select + return select.where(DagRun.state == self.value) + + def depends(self, last_dag_run_state: DagRunState | None = None) -> _LastDagRunStateFilter: + return self.set_value(last_dag_run_state) + + +# DAG +QueryLimit = Annotated[_LimitFilter, Depends(_LimitFilter().depends)] +QueryOffset = Annotated[_OffsetFilter, Depends(_OffsetFilter().depends)] +QueryPausedFilter = Annotated[_PausedFilter, Depends(_PausedFilter().depends)] +QueryOnlyActiveFilter = Annotated[_OnlyActiveFilter, Depends(_OnlyActiveFilter().depends)] +QueryDagIdPatternSearch = Annotated[_DagIdPatternSearch, Depends(_DagIdPatternSearch().depends)] QueryDagDisplayNamePatternSearch = Annotated[ - _DagDisplayNamePatternSearch, Depends(_DagDisplayNamePatternSearch()) + _DagDisplayNamePatternSearch, Depends(_DagDisplayNamePatternSearch().depends) ] -QueryTagsFilter = Annotated[_TagsFilter, Depends(_TagsFilter())] -QueryOwnersFilter = Annotated[_OwnersFilter, Depends(_OwnersFilter())] +QueryTagsFilter = Annotated[_TagsFilter, Depends(_TagsFilter().depends)] +QueryOwnersFilter = Annotated[_OwnersFilter, Depends(_OwnersFilter().depends)] +# DagRun +QueryLastDagRunStateFilter = Annotated[_LastDagRunStateFilter, Depends(_LastDagRunStateFilter().depends)] diff --git a/airflow/api_fastapi/views/public/dags.py b/airflow/api_fastapi/views/public/dags.py index 433e5ef862447..07ab968adc975 100644 --- a/airflow/api_fastapi/views/public/dags.py +++ b/airflow/api_fastapi/views/public/dags.py @@ -22,10 +22,11 @@ from sqlalchemy.orm import Session from typing_extensions import Annotated -from airflow.api_fastapi.db import apply_filters_to_select, get_session +from airflow.api_fastapi.db import apply_filters_to_select, get_session, latest_dag_run_per_dag_id_cte from airflow.api_fastapi.parameters import ( QueryDagDisplayNamePatternSearch, QueryDagIdPatternSearch, + QueryLastDagRunStateFilter, QueryLimit, QueryOffset, QueryOnlyActiveFilter, @@ -36,6 +37,7 @@ ) from airflow.api_fastapi.serializers.dags import DAGCollectionResponse, DAGPatchBody, DAGResponse from airflow.models import DagModel +from airflow.models.dagrun import DagRun from airflow.utils.db import get_query_count dags_router = APIRouter(tags=["DAG"]) @@ -51,14 +53,36 @@ async def get_dags( dag_display_name_pattern: QueryDagDisplayNamePatternSearch, only_active: QueryOnlyActiveFilter, paused: QueryPausedFilter, - order_by: Annotated[SortParam, Depends(SortParam(["dag_id", "dag_display_name", "next_dagrun"]))], + last_dag_run_state: QueryLastDagRunStateFilter, + order_by: Annotated[ + SortParam, + Depends( + SortParam( + ["dag_id", "dag_display_name", "next_dagrun", "last_run_state", "last_run_start_date"] + ).depends + ), + ], session: Annotated[Session, Depends(get_session)], ) -> DAGCollectionResponse: """Get all DAGs.""" - dags_query = select(DagModel) + dags_query = ( + select(DagModel) + .join( + latest_dag_run_per_dag_id_cte, + DagModel.dag_id == latest_dag_run_per_dag_id_cte.c.dag_id, + isouter=True, + ) + .join( + DagRun, + DagRun.start_date == latest_dag_run_per_dag_id_cte.c.start_date + and DagRun.dag_id == latest_dag_run_per_dag_id_cte.c.dag_id, + isouter=True, + ) + ) dags_query = apply_filters_to_select( - dags_query, [only_active, paused, dag_id_pattern, dag_display_name_pattern, tags, owners] + dags_query, + [only_active, paused, dag_id_pattern, dag_display_name_pattern, tags, owners, last_dag_run_state], ) # TODO: Re-enable when permissions are handled. diff --git a/airflow/ui/openapi-gen/queries/common.ts b/airflow/ui/openapi-gen/queries/common.ts index 2818b48a33e1a..b8021fed9be3c 100644 --- a/airflow/ui/openapi-gen/queries/common.ts +++ b/airflow/ui/openapi-gen/queries/common.ts @@ -2,6 +2,7 @@ import { UseQueryResult } from "@tanstack/react-query"; import { DagService, DatasetService } from "../requests/services.gen"; +import { DagRunState } from "../requests/types.gen"; export type DatasetServiceNextRunDatasetsUiNextRunDatasetsDagIdGetDefaultResponse = Awaited< @@ -37,6 +38,7 @@ export const UseDagServiceGetDagsPublicDagsGetKeyFn = ( { dagDisplayNamePattern, dagIdPattern, + lastDagRunState, limit, offset, onlyActive, @@ -47,6 +49,7 @@ export const UseDagServiceGetDagsPublicDagsGetKeyFn = ( }: { dagDisplayNamePattern?: string; dagIdPattern?: string; + lastDagRunState?: DagRunState; limit?: number; offset?: number; onlyActive?: boolean; @@ -62,6 +65,7 @@ export const UseDagServiceGetDagsPublicDagsGetKeyFn = ( { dagDisplayNamePattern, dagIdPattern, + lastDagRunState, limit, offset, onlyActive, diff --git a/airflow/ui/openapi-gen/queries/prefetch.ts b/airflow/ui/openapi-gen/queries/prefetch.ts index f8e1bf616d143..6dd99f96b8425 100644 --- a/airflow/ui/openapi-gen/queries/prefetch.ts +++ b/airflow/ui/openapi-gen/queries/prefetch.ts @@ -2,6 +2,7 @@ import { type QueryClient } from "@tanstack/react-query"; import { DagService, DatasetService } from "../requests/services.gen"; +import { DagRunState } from "../requests/types.gen"; import * as Common from "./common"; /** @@ -40,6 +41,7 @@ export const prefetchUseDatasetServiceNextRunDatasetsUiNextRunDatasetsDagIdGet = * @param data.dagDisplayNamePattern * @param data.onlyActive * @param data.paused + * @param data.lastDagRunState * @param data.orderBy * @returns DAGCollectionResponse Successful Response * @throws ApiError @@ -49,6 +51,7 @@ export const prefetchUseDagServiceGetDagsPublicDagsGet = ( { dagDisplayNamePattern, dagIdPattern, + lastDagRunState, limit, offset, onlyActive, @@ -59,6 +62,7 @@ export const prefetchUseDagServiceGetDagsPublicDagsGet = ( }: { dagDisplayNamePattern?: string; dagIdPattern?: string; + lastDagRunState?: DagRunState; limit?: number; offset?: number; onlyActive?: boolean; @@ -72,6 +76,7 @@ export const prefetchUseDagServiceGetDagsPublicDagsGet = ( queryKey: Common.UseDagServiceGetDagsPublicDagsGetKeyFn({ dagDisplayNamePattern, dagIdPattern, + lastDagRunState, limit, offset, onlyActive, @@ -84,6 +89,7 @@ export const prefetchUseDagServiceGetDagsPublicDagsGet = ( DagService.getDagsPublicDagsGet({ dagDisplayNamePattern, dagIdPattern, + lastDagRunState, limit, offset, onlyActive, diff --git a/airflow/ui/openapi-gen/queries/queries.ts b/airflow/ui/openapi-gen/queries/queries.ts index 2a0c6b6821978..b771fccfeb947 100644 --- a/airflow/ui/openapi-gen/queries/queries.ts +++ b/airflow/ui/openapi-gen/queries/queries.ts @@ -7,7 +7,7 @@ import { } from "@tanstack/react-query"; import { DagService, DatasetService } from "../requests/services.gen"; -import { DAGPatchBody } from "../requests/types.gen"; +import { DAGPatchBody, DagRunState } from "../requests/types.gen"; import * as Common from "./common"; /** @@ -54,6 +54,7 @@ export const useDatasetServiceNextRunDatasetsUiNextRunDatasetsDagIdGet = < * @param data.dagDisplayNamePattern * @param data.onlyActive * @param data.paused + * @param data.lastDagRunState * @param data.orderBy * @returns DAGCollectionResponse Successful Response * @throws ApiError @@ -66,6 +67,7 @@ export const useDagServiceGetDagsPublicDagsGet = < { dagDisplayNamePattern, dagIdPattern, + lastDagRunState, limit, offset, onlyActive, @@ -76,6 +78,7 @@ export const useDagServiceGetDagsPublicDagsGet = < }: { dagDisplayNamePattern?: string; dagIdPattern?: string; + lastDagRunState?: DagRunState; limit?: number; offset?: number; onlyActive?: boolean; @@ -92,6 +95,7 @@ export const useDagServiceGetDagsPublicDagsGet = < { dagDisplayNamePattern, dagIdPattern, + lastDagRunState, limit, offset, onlyActive, @@ -106,6 +110,7 @@ export const useDagServiceGetDagsPublicDagsGet = < DagService.getDagsPublicDagsGet({ dagDisplayNamePattern, dagIdPattern, + lastDagRunState, limit, offset, onlyActive, diff --git a/airflow/ui/openapi-gen/queries/suspense.ts b/airflow/ui/openapi-gen/queries/suspense.ts index bcc95a53e18ff..7743ce92d2855 100644 --- a/airflow/ui/openapi-gen/queries/suspense.ts +++ b/airflow/ui/openapi-gen/queries/suspense.ts @@ -2,6 +2,7 @@ import { UseQueryOptions, useSuspenseQuery } from "@tanstack/react-query"; import { DagService, DatasetService } from "../requests/services.gen"; +import { DagRunState } from "../requests/types.gen"; import * as Common from "./common"; /** @@ -49,6 +50,7 @@ export const useDatasetServiceNextRunDatasetsUiNextRunDatasetsDagIdGetSuspense = * @param data.dagDisplayNamePattern * @param data.onlyActive * @param data.paused + * @param data.lastDagRunState * @param data.orderBy * @returns DAGCollectionResponse Successful Response * @throws ApiError @@ -61,6 +63,7 @@ export const useDagServiceGetDagsPublicDagsGetSuspense = < { dagDisplayNamePattern, dagIdPattern, + lastDagRunState, limit, offset, onlyActive, @@ -71,6 +74,7 @@ export const useDagServiceGetDagsPublicDagsGetSuspense = < }: { dagDisplayNamePattern?: string; dagIdPattern?: string; + lastDagRunState?: DagRunState; limit?: number; offset?: number; onlyActive?: boolean; @@ -87,6 +91,7 @@ export const useDagServiceGetDagsPublicDagsGetSuspense = < { dagDisplayNamePattern, dagIdPattern, + lastDagRunState, limit, offset, onlyActive, @@ -101,6 +106,7 @@ export const useDagServiceGetDagsPublicDagsGetSuspense = < DagService.getDagsPublicDagsGet({ dagDisplayNamePattern, dagIdPattern, + lastDagRunState, limit, offset, onlyActive, diff --git a/airflow/ui/openapi-gen/requests/schemas.gen.ts b/airflow/ui/openapi-gen/requests/schemas.gen.ts index 83d3670507f78..d9ce0528c396c 100644 --- a/airflow/ui/openapi-gen/requests/schemas.gen.ts +++ b/airflow/ui/openapi-gen/requests/schemas.gen.ts @@ -288,6 +288,17 @@ export const $DAGResponse = { description: "DAG serializer for responses.", } as const; +export const $DagRunState = { + type: "string", + enum: ["queued", "running", "success", "failed"], + title: "DagRunState", + description: `All possible states that a DagRun can be in. + +These are "shared" with TaskInstanceState in some parts of the code, +so please ensure that their values always match the ones with the +same name in TaskInstanceState.`, +} as const; + export const $DagTagPydantic = { properties: { name: { diff --git a/airflow/ui/openapi-gen/requests/services.gen.ts b/airflow/ui/openapi-gen/requests/services.gen.ts index a4c36d5990c78..9c261b3039000 100644 --- a/airflow/ui/openapi-gen/requests/services.gen.ts +++ b/airflow/ui/openapi-gen/requests/services.gen.ts @@ -48,6 +48,7 @@ export class DagService { * @param data.dagDisplayNamePattern * @param data.onlyActive * @param data.paused + * @param data.lastDagRunState * @param data.orderBy * @returns DAGCollectionResponse Successful Response * @throws ApiError @@ -67,6 +68,7 @@ export class DagService { dag_display_name_pattern: data.dagDisplayNamePattern, only_active: data.onlyActive, paused: data.paused, + last_dag_run_state: data.lastDagRunState, order_by: data.orderBy, }, errors: { diff --git a/airflow/ui/openapi-gen/requests/types.gen.ts b/airflow/ui/openapi-gen/requests/types.gen.ts index 2f6bc263d4289..803bcd84270c7 100644 --- a/airflow/ui/openapi-gen/requests/types.gen.ts +++ b/airflow/ui/openapi-gen/requests/types.gen.ts @@ -50,6 +50,15 @@ export type DAGResponse = { readonly file_token: string; }; +/** + * All possible states that a DagRun can be in. + * + * These are "shared" with TaskInstanceState in some parts of the code, + * so please ensure that their values always match the ones with the + * same name in TaskInstanceState. + */ +export type DagRunState = "queued" | "running" | "success" | "failed"; + /** * Serializable representation of the DagTag ORM SqlAlchemyModel used by internal API. */ @@ -79,6 +88,7 @@ export type NextRunDatasetsUiNextRunDatasetsDagIdGetResponse = { export type GetDagsPublicDagsGetData = { dagDisplayNamePattern?: string | null; dagIdPattern?: string | null; + lastDagRunState?: DagRunState | null; limit?: number; offset?: number; onlyActive?: boolean; diff --git a/tests/api_fastapi/views/public/test_dags.py b/tests/api_fastapi/views/public/test_dags.py index b508a1448352d..6e400f11cc0d2 100644 --- a/tests/api_fastapi/views/public/test_dags.py +++ b/tests/api_fastapi/views/public/test_dags.py @@ -20,9 +20,12 @@ import pytest -from airflow.models.dag import DAG, DagModel +from airflow.models.dag import DagModel +from airflow.models.dagrun import DagRun from airflow.operators.empty import EmptyOperator from airflow.utils.session import provide_session +from airflow.utils.state import DagRunState +from airflow.utils.types import DagRunType from tests.test_utils.db import clear_db_dags, clear_db_runs, clear_db_serialized_dags pytestmark = pytest.mark.db_test @@ -46,41 +49,62 @@ def _create_deactivated_paused_dag(session=None): owners="test_owner,another_test_owner", next_dagrun=datetime(2021, 1, 1, 12, 0, 0, tzinfo=timezone.utc), ) + + dagrun_failed = DagRun( + dag_id=DAG3_ID, + run_id="run1", + execution_date=datetime(2018, 1, 1, 12, 0, 0, tzinfo=timezone.utc), + start_date=datetime(2018, 1, 1, 12, 0, 0, tzinfo=timezone.utc), + run_type=DagRunType.SCHEDULED, + state=DagRunState.FAILED, + ) + + dagrun_success = DagRun( + dag_id=DAG3_ID, + run_id="run2", + execution_date=datetime(2019, 1, 1, 12, 0, 0, tzinfo=timezone.utc), + start_date=datetime(2019, 1, 1, 12, 0, 0, tzinfo=timezone.utc), + run_type=DagRunType.MANUAL, + state=DagRunState.SUCCESS, + ) + session.add(dag_model) + session.add(dagrun_failed) + session.add(dagrun_success) @pytest.fixture(autouse=True) -def setup() -> None: +def setup(dag_maker) -> None: clear_db_runs() clear_db_dags() clear_db_serialized_dags() - with DAG( + with dag_maker( DAG1_ID, dag_display_name=DAG1_DISPLAY_NAME, schedule=None, - start_date=datetime(2020, 6, 15), + start_date=datetime(2018, 6, 15, 0, 0, tzinfo=timezone.utc), doc_md="details", params={"foo": 1}, tags=["example"], - ) as dag1: + ): EmptyOperator(task_id=TASK_ID) - with DAG( + dag_maker.create_dagrun(state=DagRunState.FAILED) + + with dag_maker( DAG2_ID, dag_display_name=DAG2_DISPLAY_NAME, schedule=None, start_date=datetime( - 2020, + 2021, 6, 15, ), - ) as dag2: + ): EmptyOperator(task_id=TASK_ID) - dag1.sync_to_db() - dag2.sync_to_db() - + dag_maker.dagbag.sync_to_db() _create_deactivated_paused_dag() @@ -97,11 +121,25 @@ def setup() -> None: ({"paused": False}, 2, ["test_dag1", "test_dag2"]), ({"owners": ["airflow"]}, 2, ["test_dag1", "test_dag2"]), ({"owners": ["test_owner"], "only_active": False}, 1, ["test_dag3"]), + ({"last_dag_run_state": "success", "only_active": False}, 1, ["test_dag3"]), + ({"last_dag_run_state": "failed", "only_active": False}, 1, ["test_dag1"]), # # Sort ({"order_by": "-dag_id"}, 2, ["test_dag2", "test_dag1"]), ({"order_by": "-dag_display_name"}, 2, ["test_dag2", "test_dag1"]), ({"order_by": "dag_display_name"}, 2, ["test_dag1", "test_dag2"]), ({"order_by": "next_dagrun", "only_active": False}, 3, ["test_dag3", "test_dag1", "test_dag2"]), + ({"order_by": "last_run_state", "only_active": False}, 3, ["test_dag1", "test_dag3", "test_dag2"]), + ({"order_by": "-last_run_state", "only_active": False}, 3, ["test_dag3", "test_dag1", "test_dag2"]), + ( + {"order_by": "last_run_start_date", "only_active": False}, + 3, + ["test_dag1", "test_dag3", "test_dag2"], + ), + ( + {"order_by": "-last_run_start_date", "only_active": False}, + 3, + ["test_dag3", "test_dag1", "test_dag2"], + ), # Search ({"dag_id_pattern": "1"}, 1, ["test_dag1"]), ({"dag_display_name_pattern": "display2"}, 1, ["test_dag2"]),