Skip to content

Commit

Permalink
AIP-84 Migrate the public endpoint Get DAG to FastAPI (apache#42848)
Browse files Browse the repository at this point in the history
* Migrate the public endpoint Get DAG to FastAPI

* Use proper name for test function
  • Loading branch information
omkar-foss authored and PaulKobow7536 committed Oct 24, 2024
1 parent 7384b90 commit 54417f9
Show file tree
Hide file tree
Showing 10 changed files with 313 additions and 39 deletions.
62 changes: 56 additions & 6 deletions airflow/api_fastapi/openapi/v1-generated.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -291,13 +291,13 @@ paths:
application/json:
schema:
$ref: '#/components/schemas/HTTPValidationError'
/public/dags/{dag_id}/details:
/public/dags/{dag_id}:
get:
tags:
- DAG
summary: Get Dag Details
description: Get details of DAG.
operationId: get_dag_details
summary: Get Dag
description: Get basic information about a DAG.
operationId: get_dag
parameters:
- name: dag_id
in: path
Expand All @@ -311,7 +311,7 @@ paths:
content:
application/json:
schema:
$ref: '#/components/schemas/DAGDetailsResponse'
$ref: '#/components/schemas/DAGResponse'
'400':
content:
application/json:
Expand Down Expand Up @@ -342,7 +342,6 @@ paths:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Unprocessable Entity
/public/dags/{dag_id}:
patch:
tags:
- DAG
Expand Down Expand Up @@ -409,6 +408,57 @@ paths:
application/json:
schema:
$ref: '#/components/schemas/HTTPValidationError'
/public/dags/{dag_id}/details:
get:
tags:
- DAG
summary: Get Dag Details
description: Get details of DAG.
operationId: get_dag_details
parameters:
- name: dag_id
in: path
required: true
schema:
type: string
title: Dag Id
responses:
'200':
description: Successful Response
content:
application/json:
schema:
$ref: '#/components/schemas/DAGDetailsResponse'
'400':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Bad Request
'401':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Unauthorized
'403':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Forbidden
'404':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Not Found
'422':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Unprocessable Entity
/public/connections/{connection_id}:
delete:
tags:
Expand Down
40 changes: 23 additions & 17 deletions airflow/api_fastapi/serializers/dags.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@
from itsdangerous import URLSafeSerializer
from pendulum.tz.timezone import FixedTimezone, Timezone
from pydantic import (
AliasChoices,
AliasGenerator,
BaseModel,
Field,
ConfigDict,
computed_field,
field_validator,
)
Expand Down Expand Up @@ -77,6 +77,14 @@ def get_owners(cls, v: Any) -> list[str] | None:
return v.split(",")
return v

@field_validator("timetable_summary", mode="before")
@classmethod
def get_timetable_summary(cls, tts: str | None) -> str | None:
"""Validate the string representation of timetable_summary."""
if tts is None or tts == "None":
return None
return str(tts)

# Mypy issue https://github.com/python/mypy/issues/1362
@computed_field # type: ignore[misc]
@property
Expand All @@ -103,9 +111,7 @@ class DAGDetailsResponse(DAGResponse):
"""Specific serializer for DAG Details responses."""

catchup: bool
dag_run_timeout: timedelta | None = Field(
validation_alias=AliasChoices("dag_run_timeout", "dagrun_timeout")
)
dag_run_timeout: timedelta | None
dataset_expression: dict | None
doc_md: str | None
start_date: datetime | None
Expand All @@ -114,11 +120,19 @@ class DAGDetailsResponse(DAGResponse):
orientation: str
params: abc.MutableMapping | None
render_template_as_native_obj: bool
template_search_path: Iterable[str] | None = Field(
validation_alias=AliasChoices("template_search_path", "template_searchpath")
)
template_search_path: Iterable[str] | None
timezone: str | None
last_parsed: datetime | None = Field(validation_alias=AliasChoices("last_parsed", "last_loaded"))
last_parsed: datetime | None

model_config = ConfigDict(
alias_generator=AliasGenerator(
validation_alias=lambda field_name: {
"dag_run_timeout": "dagrun_timeout",
"last_parsed": "last_loaded",
"template_search_path": "template_searchpath",
}.get(field_name, field_name),
)
)

@field_validator("timezone", mode="before")
@classmethod
Expand All @@ -128,14 +142,6 @@ def get_timezone(cls, tz: Timezone | FixedTimezone) -> str | None:
return None
return str(tz)

@field_validator("timetable_summary", mode="before")
@classmethod
def get_timetable_summary(cls, tts: str | None) -> str | None:
"""Validate the string representation of timetable_summary."""
if tts is None or tts == "None":
return None
return str(tts)

@field_validator("params", mode="before")
@classmethod
def get_params(cls, params: abc.MutableMapping | None) -> dict | None:
Expand Down
20 changes: 20 additions & 0 deletions airflow/api_fastapi/views/public/dags.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,26 @@ async def get_dags(
)


@dags_router.get("/{dag_id}", responses=create_openapi_http_exception_doc([400, 401, 403, 404, 422]))
async def get_dag(
dag_id: str, session: Annotated[Session, Depends(get_session)], request: Request
) -> DAGResponse:
"""Get basic information about a DAG."""
dag: DAG = request.app.state.dag_bag.get_dag(dag_id)
if not dag:
raise HTTPException(404, f"Dag with id {dag_id} was not found")

dag_model: DagModel = session.get(DagModel, dag_id)
if not dag_model:
raise HTTPException(404, f"Unable to obtain dag with id {dag_id} from session")

for key, value in dag.__dict__.items():
if not key.startswith("_") and not hasattr(dag_model, key):
setattr(dag_model, key, value)

return DAGResponse.model_validate(dag_model, from_attributes=True)


@dags_router.get("/{dag_id}/details", responses=create_openapi_http_exception_doc([400, 401, 403, 404, 422]))
async def get_dag_details(
dag_id: str, session: Annotated[Session, Depends(get_session)], request: Request
Expand Down
16 changes: 16 additions & 0 deletions airflow/ui/openapi-gen/queries/common.ts
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,22 @@ export const UseDagServiceGetDagsKeyFn = (
},
]),
];
export type DagServiceGetDagDefaultResponse = Awaited<
ReturnType<typeof DagService.getDag>
>;
export type DagServiceGetDagQueryResult<
TData = DagServiceGetDagDefaultResponse,
TError = unknown,
> = UseQueryResult<TData, TError>;
export const useDagServiceGetDagKey = "DagServiceGetDag";
export const UseDagServiceGetDagKeyFn = (
{
dagId,
}: {
dagId: string;
},
queryKey?: Array<unknown>,
) => [useDagServiceGetDagKey, ...(queryKey ?? [{ dagId }])];
export type DagServiceGetDagDetailsDefaultResponse = Awaited<
ReturnType<typeof DagService.getDagDetails>
>;
Expand Down
20 changes: 20 additions & 0 deletions airflow/ui/openapi-gen/queries/prefetch.ts
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,26 @@ export const prefetchUseDagServiceGetDags = (
tags,
}),
});
/**
* Get Dag
* Get basic information about a DAG.
* @param data The data for the request.
* @param data.dagId
* @returns DAGResponse Successful Response
* @throws ApiError
*/
export const prefetchUseDagServiceGetDag = (
queryClient: QueryClient,
{
dagId,
}: {
dagId: string;
},
) =>
queryClient.prefetchQuery({
queryKey: Common.UseDagServiceGetDagKeyFn({ dagId }),
queryFn: () => DagService.getDag({ dagId }),
});
/**
* Get Dag Details
* Get details of DAG.
Expand Down
26 changes: 26 additions & 0 deletions airflow/ui/openapi-gen/queries/queries.ts
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,32 @@ export const useDagServiceGetDags = <
}) as TData,
...options,
});
/**
* Get Dag
* Get basic information about a DAG.
* @param data The data for the request.
* @param data.dagId
* @returns DAGResponse Successful Response
* @throws ApiError
*/
export const useDagServiceGetDag = <
TData = Common.DagServiceGetDagDefaultResponse,
TError = unknown,
TQueryKey extends Array<unknown> = unknown[],
>(
{
dagId,
}: {
dagId: string;
},
queryKey?: TQueryKey,
options?: Omit<UseQueryOptions<TData, TError>, "queryKey" | "queryFn">,
) =>
useQuery<TData, TError>({
queryKey: Common.UseDagServiceGetDagKeyFn({ dagId }, queryKey),
queryFn: () => DagService.getDag({ dagId }) as TData,
...options,
});
/**
* Get Dag Details
* Get details of DAG.
Expand Down
26 changes: 26 additions & 0 deletions airflow/ui/openapi-gen/queries/suspense.ts
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,32 @@ export const useDagServiceGetDagsSuspense = <
}) as TData,
...options,
});
/**
* Get Dag
* Get basic information about a DAG.
* @param data The data for the request.
* @param data.dagId
* @returns DAGResponse Successful Response
* @throws ApiError
*/
export const useDagServiceGetDagSuspense = <
TData = Common.DagServiceGetDagDefaultResponse,
TError = unknown,
TQueryKey extends Array<unknown> = unknown[],
>(
{
dagId,
}: {
dagId: string;
},
queryKey?: TQueryKey,
options?: Omit<UseQueryOptions<TData, TError>, "queryKey" | "queryFn">,
) =>
useSuspenseQuery<TData, TError>({
queryKey: Common.UseDagServiceGetDagKeyFn({ dagId }, queryKey),
queryFn: () => DagService.getDag({ dagId }) as TData,
...options,
});
/**
* Get Dag Details
* Get details of DAG.
Expand Down
45 changes: 36 additions & 9 deletions airflow/ui/openapi-gen/requests/services.gen.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,12 @@ import type {
GetDagsResponse,
PatchDagsData,
PatchDagsResponse,
GetDagDetailsData,
GetDagDetailsResponse,
GetDagData,
GetDagResponse,
PatchDagData,
PatchDagResponse,
GetDagDetailsData,
GetDagDetailsResponse,
DeleteConnectionData,
DeleteConnectionResponse,
GetConnectionData,
Expand Down Expand Up @@ -166,19 +168,17 @@ export class DagService {
}

/**
* Get Dag Details
* Get details of DAG.
* Get Dag
* Get basic information about a DAG.
* @param data The data for the request.
* @param data.dagId
* @returns DAGDetailsResponse Successful Response
* @returns DAGResponse Successful Response
* @throws ApiError
*/
public static getDagDetails(
data: GetDagDetailsData,
): CancelablePromise<GetDagDetailsResponse> {
public static getDag(data: GetDagData): CancelablePromise<GetDagResponse> {
return __request(OpenAPI, {
method: "GET",
url: "/public/dags/{dag_id}/details",
url: "/public/dags/{dag_id}",
path: {
dag_id: data.dagId,
},
Expand Down Expand Up @@ -225,6 +225,33 @@ export class DagService {
},
});
}

/**
* Get Dag Details
* Get details of DAG.
* @param data The data for the request.
* @param data.dagId
* @returns DAGDetailsResponse Successful Response
* @throws ApiError
*/
public static getDagDetails(
data: GetDagDetailsData,
): CancelablePromise<GetDagDetailsResponse> {
return __request(OpenAPI, {
method: "GET",
url: "/public/dags/{dag_id}/details",
path: {
dag_id: data.dagId,
},
errors: {
400: "Bad Request",
401: "Unauthorized",
403: "Forbidden",
404: "Not Found",
422: "Unprocessable Entity",
},
});
}
}

export class ConnectionService {
Expand Down
Loading

0 comments on commit 54417f9

Please sign in to comment.