Skip to content

Commit

Permalink
Merge branch 'AIP84-get-all-queued-events-for-dag' into AIP84-delete-…
Browse files Browse the repository at this point in the history
…all-queued-events-for-dag
  • Loading branch information
amoghrajesh committed Nov 14, 2024
2 parents 1379d46 + 14e187a commit dd65928
Show file tree
Hide file tree
Showing 27 changed files with 1,576 additions and 38 deletions.
1 change: 1 addition & 0 deletions airflow/api_connexion/endpoints/asset_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ def get_assets(
return asset_collection_schema.dump(AssetCollection(assets=assets, total_entries=total_entries))


@mark_fastapi_migration_done
@security.requires_access_asset("GET")
@provide_session
@format_parameters({"limit": check_limit})
Expand Down
101 changes: 96 additions & 5 deletions airflow/api_fastapi/common/parameters.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,7 @@

from airflow.api_connexion.endpoints.task_instance_endpoint import _convert_ti_states
from airflow.models import Base, Connection
from airflow.models.asset import (
AssetModel,
DagScheduleAssetReference,
TaskOutletAssetReference,
)
from airflow.models.asset import AssetEvent, AssetModel, DagScheduleAssetReference, TaskOutletAssetReference
from airflow.models.dag import DagModel, DagTag
from airflow.models.dagrun import DagRun
from airflow.models.dagwarning import DagWarning, DagWarningType
Expand Down Expand Up @@ -444,6 +440,86 @@ def to_orm(self, select: Select) -> Select:
)


class _AssetIdFilter(BaseParam[int]):
"""Filter on asset_id."""

def __init__(self, attribute: ColumnElement, skip_none: bool = True) -> None:
super().__init__(skip_none=skip_none)
self.attribute = attribute

def to_orm(self, select: Select) -> Select:
if self.value is None and self.skip_none:
return select
return select.where(self.attribute == self.value)

def depends(self, asset_id: int | None = None) -> _AssetIdFilter:
return self.set_value(asset_id)


class _SourceDagIdFilter(BaseParam[str]):
"""Filter on source_dag_id."""

def __init__(self, attribute: ColumnElement, skip_none: bool = True) -> None:
super().__init__(skip_none=skip_none)
self.attribute = attribute

def to_orm(self, select: Select) -> Select:
if self.value is None and self.skip_none:
return select
return select.where(self.attribute == self.value)

def depends(self, source_dag_id: str | None = None) -> _SourceDagIdFilter:
return self.set_value(source_dag_id)


class _SourceTaskIdFilter(BaseParam[str]):
"""Filter on source_task_id."""

def __init__(self, attribute: ColumnElement, skip_none: bool = True) -> None:
super().__init__(skip_none=skip_none)
self.attribute = attribute

def to_orm(self, select: Select) -> Select:
if self.value is None and self.skip_none:
return select
return select.where(self.attribute == self.value)

def depends(self, source_task_id: str | None = None) -> _SourceTaskIdFilter:
return self.set_value(source_task_id)


class _SourceRunIdFilter(BaseParam[str]):
"""filter on source_run_id."""

def __init__(self, attribute: ColumnElement, skip_none: bool = True) -> None:
super().__init__(skip_none=skip_none)
self.attribute = attribute

def to_orm(self, select: Select) -> Select:
if self.value is None and self.skip_none:
return select
return select.where(self.attribute == self.value)

def depends(self, source_run_id: str | None = None) -> _SourceRunIdFilter:
return self.set_value(source_run_id)


class _SourceMapIndexFilter(BaseParam[int]):
"""Filter on source_map_index."""

def __init__(self, attribute: ColumnElement, skip_none: bool = True) -> None:
super().__init__(skip_none=skip_none)
self.attribute = attribute

def to_orm(self, select: Select) -> Select:
if self.value is None and self.skip_none:
return select
return select.where(self.attribute == self.value)

def depends(self, source_map_index: int | None = None) -> _SourceMapIndexFilter:
return self.set_value(source_map_index)


class Range(BaseModel, Generic[T]):
"""Range with a lower and upper bound."""

Expand Down Expand Up @@ -541,3 +617,18 @@ def depends_float(
QueryAssetDagIdPatternSearch = Annotated[
_DagIdAssetReferenceFilter, Depends(_DagIdAssetReferenceFilter().depends)
]
QueryAssetIdFilter = Annotated[_AssetIdFilter, Depends(_AssetIdFilter(AssetEvent.asset_id).depends)]


QuerySourceDagIdFilter = Annotated[
_SourceDagIdFilter, Depends(_SourceDagIdFilter(AssetEvent.source_dag_id).depends)
]
QuerySourceTaskIdFilter = Annotated[
_SourceTaskIdFilter, Depends(_SourceTaskIdFilter(AssetEvent.source_task_id).depends)
]
QuerySourceRunIdFilter = Annotated[
_SourceRunIdFilter, Depends(_SourceRunIdFilter(AssetEvent.source_run_id).depends)
]
QuerySourceMapIndexFilter = Annotated[
_SourceMapIndexFilter, Depends(_SourceMapIndexFilter(AssetEvent.source_map_index).depends)
]
37 changes: 36 additions & 1 deletion airflow/api_fastapi/core_api/datamodels/assets.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

from datetime import datetime

from pydantic import BaseModel
from pydantic import BaseModel, Field


class DagScheduleAssetReference(BaseModel):
Expand Down Expand Up @@ -64,3 +64,38 @@ class AssetCollectionResponse(BaseModel):

assets: list[AssetResponse]
total_entries: int


class DagRunAssetReference(BaseModel):
"""DAGRun serializer for asset responses."""

run_id: str
dag_id: str
execution_date: datetime = Field(alias="logical_date")
start_date: datetime
end_date: datetime
state: str
data_interval_start: datetime
data_interval_end: datetime


class AssetEventResponse(BaseModel):
"""Asset event serializer for responses."""

id: int
asset_id: int
uri: str
extra: dict | None = None
source_task_id: str | None = None
source_dag_id: str | None = None
source_run_id: str | None = None
source_map_index: int
created_dagruns: list[DagRunAssetReference]
timestamp: datetime


class AssetEventCollectionResponse(BaseModel):
"""Asset event collection response."""

asset_events: list[AssetEventResponse]
total_entries: int
Loading

0 comments on commit dd65928

Please sign in to comment.