Skip to content

Commit

Permalink
Merge branch 'main' into AIP84-get-dataset-events-dagrun
Browse files Browse the repository at this point in the history
  • Loading branch information
amoghrajesh committed Nov 14, 2024
2 parents 0da44d0 + 9a364ac commit 75a387e
Show file tree
Hide file tree
Showing 32 changed files with 1,385 additions and 80 deletions.
1 change: 1 addition & 0 deletions airflow/api_connexion/endpoints/asset_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ def get_assets(
return asset_collection_schema.dump(AssetCollection(assets=assets, total_entries=total_entries))


@mark_fastapi_migration_done
@security.requires_access_asset("GET")
@provide_session
@format_parameters({"limit": check_limit})
Expand Down
97 changes: 96 additions & 1 deletion airflow/api_fastapi/common/parameters.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@

from airflow.api_connexion.endpoints.task_instance_endpoint import _convert_ti_states
from airflow.models import Base, Connection
from airflow.models.asset import AssetModel, DagScheduleAssetReference, TaskOutletAssetReference
from airflow.models.asset import AssetEvent, AssetModel, DagScheduleAssetReference, TaskOutletAssetReference
from airflow.models.dag import DagModel, DagTag
from airflow.models.dagrun import DagRun
from airflow.models.dagwarning import DagWarning, DagWarningType
Expand Down Expand Up @@ -440,6 +440,86 @@ def to_orm(self, select: Select) -> Select:
)


class _AssetIdFilter(BaseParam[int]):
"""Filter on asset_id."""

def __init__(self, attribute: ColumnElement, skip_none: bool = True) -> None:
super().__init__(skip_none=skip_none)
self.attribute = attribute

def to_orm(self, select: Select) -> Select:
if self.value is None and self.skip_none:
return select
return select.where(self.attribute == self.value)

def depends(self, asset_id: int | None = None) -> _AssetIdFilter:
return self.set_value(asset_id)


class _SourceDagIdFilter(BaseParam[str]):
"""Filter on source_dag_id."""

def __init__(self, attribute: ColumnElement, skip_none: bool = True) -> None:
super().__init__(skip_none=skip_none)
self.attribute = attribute

def to_orm(self, select: Select) -> Select:
if self.value is None and self.skip_none:
return select
return select.where(self.attribute == self.value)

def depends(self, source_dag_id: str | None = None) -> _SourceDagIdFilter:
return self.set_value(source_dag_id)


class _SourceTaskIdFilter(BaseParam[str]):
"""Filter on source_task_id."""

def __init__(self, attribute: ColumnElement, skip_none: bool = True) -> None:
super().__init__(skip_none=skip_none)
self.attribute = attribute

def to_orm(self, select: Select) -> Select:
if self.value is None and self.skip_none:
return select
return select.where(self.attribute == self.value)

def depends(self, source_task_id: str | None = None) -> _SourceTaskIdFilter:
return self.set_value(source_task_id)


class _SourceRunIdFilter(BaseParam[str]):
"""filter on source_run_id."""

def __init__(self, attribute: ColumnElement, skip_none: bool = True) -> None:
super().__init__(skip_none=skip_none)
self.attribute = attribute

def to_orm(self, select: Select) -> Select:
if self.value is None and self.skip_none:
return select
return select.where(self.attribute == self.value)

def depends(self, source_run_id: str | None = None) -> _SourceRunIdFilter:
return self.set_value(source_run_id)


class _SourceMapIndexFilter(BaseParam[int]):
"""Filter on source_map_index."""

def __init__(self, attribute: ColumnElement, skip_none: bool = True) -> None:
super().__init__(skip_none=skip_none)
self.attribute = attribute

def to_orm(self, select: Select) -> Select:
if self.value is None and self.skip_none:
return select
return select.where(self.attribute == self.value)

def depends(self, source_map_index: int | None = None) -> _SourceMapIndexFilter:
return self.set_value(source_map_index)


class Range(BaseModel, Generic[T]):
"""Range with a lower and upper bound."""

Expand Down Expand Up @@ -537,3 +617,18 @@ def depends_float(
QueryAssetDagIdPatternSearch = Annotated[
_DagIdAssetReferenceFilter, Depends(_DagIdAssetReferenceFilter().depends)
]
QueryAssetIdFilter = Annotated[_AssetIdFilter, Depends(_AssetIdFilter(AssetEvent.asset_id).depends)]


QuerySourceDagIdFilter = Annotated[
_SourceDagIdFilter, Depends(_SourceDagIdFilter(AssetEvent.source_dag_id).depends)
]
QuerySourceTaskIdFilter = Annotated[
_SourceTaskIdFilter, Depends(_SourceTaskIdFilter(AssetEvent.source_task_id).depends)
]
QuerySourceRunIdFilter = Annotated[
_SourceRunIdFilter, Depends(_SourceRunIdFilter(AssetEvent.source_run_id).depends)
]
QuerySourceMapIndexFilter = Annotated[
_SourceMapIndexFilter, Depends(_SourceMapIndexFilter(AssetEvent.source_map_index).depends)
]
6 changes: 3 additions & 3 deletions airflow/api_fastapi/core_api/datamodels/assets.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,13 +67,13 @@ class AssetCollectionResponse(BaseModel):


class DagRunAssetReference(BaseModel):
"""Dag Run Asset Reference serializer for responses."""
"""DAGRun serializer for asset responses."""

run_id: str
dag_id: str
execution_date: datetime = Field(alias="logical_date")
start_date: datetime
end_date: datetime | None = None
end_date: datetime
state: str
data_interval_start: datetime
data_interval_end: datetime
Expand All @@ -95,7 +95,7 @@ class AssetEventResponse(BaseModel):


class AssetEventCollectionResponse(BaseModel):
"""Asset Event collection response."""
"""Asset event collection response."""

asset_events: list[AssetEventResponse]
total_entries: int
111 changes: 105 additions & 6 deletions airflow/api_fastapi/core_api/openapi/v1-generated.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,106 @@ paths:
application/json:
schema:
$ref: '#/components/schemas/HTTPValidationError'
/public/assets/events:
get:
tags:
- Asset
summary: Get Asset Events
description: Get asset events.
operationId: get_asset_events
parameters:
- name: limit
in: query
required: false
schema:
type: integer
default: 100
title: Limit
- name: offset
in: query
required: false
schema:
type: integer
default: 0
title: Offset
- name: order_by
in: query
required: false
schema:
type: string
default: timestamp
title: Order By
- name: asset_id
in: query
required: false
schema:
anyOf:
- type: integer
- type: 'null'
title: Asset Id
- name: source_dag_id
in: query
required: false
schema:
anyOf:
- type: string
- type: 'null'
title: Source Dag Id
- name: source_task_id
in: query
required: false
schema:
anyOf:
- type: string
- type: 'null'
title: Source Task Id
- name: source_run_id
in: query
required: false
schema:
anyOf:
- type: string
- type: 'null'
title: Source Run Id
- name: source_map_index
in: query
required: false
schema:
anyOf:
- type: integer
- type: 'null'
title: Source Map Index
responses:
'200':
description: Successful Response
content:
application/json:
schema:
$ref: '#/components/schemas/AssetEventCollectionResponse'
'401':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Unauthorized
'403':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Forbidden
'404':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Not Found
'422':
description: Validation Error
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPValidationError'
/public/assets/{uri}:
get:
tags:
Expand Down Expand Up @@ -3634,7 +3734,7 @@ components:
- asset_events
- total_entries
title: AssetEventCollectionResponse
description: Asset Event collection response.
description: Asset event collection response.
AssetEventResponse:
properties:
id:
Expand Down Expand Up @@ -4769,10 +4869,8 @@ components:
format: date-time
title: Start Date
end_date:
anyOf:
- type: string
format: date-time
- type: 'null'
type: string
format: date-time
title: End Date
state:
type: string
Expand All @@ -4791,11 +4889,12 @@ components:
- dag_id
- logical_date
- start_date
- end_date
- state
- data_interval_start
- data_interval_end
title: DagRunAssetReference
description: Dag Run Asset Reference serializer for responses.
description: DAGRun serializer for asset responses.
DagRunState:
type: string
enum:
Expand Down
64 changes: 62 additions & 2 deletions airflow/api_fastapi/core_api/routes/public/assets.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,25 @@
from airflow.api_fastapi.common.db.common import get_session, paginated_select
from airflow.api_fastapi.common.parameters import (
QueryAssetDagIdPatternSearch,
QueryAssetIdFilter,
QueryLimit,
QueryOffset,
QuerySourceDagIdFilter,
QuerySourceMapIndexFilter,
QuerySourceRunIdFilter,
QuerySourceTaskIdFilter,
QueryUriPatternSearch,
SortParam,
)
from airflow.api_fastapi.common.router import AirflowRouter
from airflow.api_fastapi.core_api.datamodels.assets import AssetCollectionResponse, AssetResponse
from airflow.api_fastapi.core_api.datamodels.assets import (
AssetCollectionResponse,
AssetEventCollectionResponse,
AssetEventResponse,
AssetResponse,
)
from airflow.api_fastapi.core_api.openapi.exceptions import create_openapi_http_exception_doc
from airflow.models.asset import AssetModel
from airflow.models.asset import AssetEvent, AssetModel

assets_router = AirflowRouter(tags=["Asset"], prefix="/assets")

Expand Down Expand Up @@ -74,6 +84,56 @@ def get_assets(
)


@assets_router.get(
"/events",
responses=create_openapi_http_exception_doc([404]),
)
def get_asset_events(
limit: QueryLimit,
offset: QueryOffset,
order_by: Annotated[
SortParam,
Depends(
SortParam(
[
"source_task_id",
"source_dag_id",
"source_run_id",
"source_map_index",
"timestamp",
],
AssetEvent,
).dynamic_depends("timestamp")
),
],
asset_id: QueryAssetIdFilter,
source_dag_id: QuerySourceDagIdFilter,
source_task_id: QuerySourceTaskIdFilter,
source_run_id: QuerySourceRunIdFilter,
source_map_index: QuerySourceMapIndexFilter,
session: Annotated[Session, Depends(get_session)],
) -> AssetEventCollectionResponse:
"""Get asset events."""
assets_event_select, total_entries = paginated_select(
select(AssetEvent),
filters=[asset_id, source_dag_id, source_task_id, source_run_id, source_map_index],
order_by=order_by,
offset=offset,
limit=limit,
session=session,
)

assets_event_select = assets_event_select.options(subqueryload(AssetEvent.created_dagruns))
assets_events = session.scalars(assets_event_select).all()

return AssetEventCollectionResponse(
asset_events=[
AssetEventResponse.model_validate(asset, from_attributes=True) for asset in assets_events
],
total_entries=total_entries,
)


@assets_router.get(
"/{uri:path}",
responses=create_openapi_http_exception_doc([401, 403, 404]),
Expand Down
Loading

0 comments on commit 75a387e

Please sign in to comment.