From a07ac4459ec0bf9443d1cd3ad06086cbcd896c48 Mon Sep 17 00:00:00 2001 From: Daniel Gafni Date: Sat, 10 Aug 2024 16:24:43 +0200 Subject: [PATCH] [dagster-aws] [docs] add documentation for ECS Pipes --- .../dagster/dagster_pipes/ecs/Dockerfile | 10 ++++ .../dagster/dagster_pipes/ecs/dagster_code.py | 48 +++++++++++++++++++ .../dagster/dagster_pipes/ecs/dev.Dockerfile | 14 ++++++ .../guides/dagster/dagster_pipes/ecs/task.py | 21 ++++++++ 4 files changed, 93 insertions(+) create mode 100644 examples/docs_snippets/docs_snippets/guides/dagster/dagster_pipes/ecs/Dockerfile create mode 100644 examples/docs_snippets/docs_snippets/guides/dagster/dagster_pipes/ecs/dagster_code.py create mode 100644 examples/docs_snippets/docs_snippets/guides/dagster/dagster_pipes/ecs/dev.Dockerfile create mode 100644 examples/docs_snippets/docs_snippets/guides/dagster/dagster_pipes/ecs/task.py diff --git a/examples/docs_snippets/docs_snippets/guides/dagster/dagster_pipes/ecs/Dockerfile b/examples/docs_snippets/docs_snippets/guides/dagster/dagster_pipes/ecs/Dockerfile new file mode 100644 index 0000000000000..a9f865b0ef038 --- /dev/null +++ b/examples/docs_snippets/docs_snippets/guides/dagster/dagster_pipes/ecs/Dockerfile @@ -0,0 +1,10 @@ +FROM python:3.11-slim + +RUN python -m pip install --no-cache dagster-pipes boto3 + +# TODO: replace this with `pip install dagster-pipes` once the docs are finished +RUN pip install --no-cache-dir dagster-pipes boto3 + +WORKDIR /app + +COPY task.py . diff --git a/examples/docs_snippets/docs_snippets/guides/dagster/dagster_pipes/ecs/dagster_code.py b/examples/docs_snippets/docs_snippets/guides/dagster/dagster_pipes/ecs/dagster_code.py new file mode 100644 index 0000000000000..1bc5d68dff633 --- /dev/null +++ b/examples/docs_snippets/docs_snippets/guides/dagster/dagster_pipes/ecs/dagster_code.py @@ -0,0 +1,48 @@ +# start_asset_marker +import os + +# dagster_glue_pipes.py +import boto3 +from dagster_aws.pipes import PipesECSClient +from docutils.nodes import entry + +from dagster import AssetExecutionContext, asset + + + +@asset +def ecs_pipes_asset(context: AssetExecutionContext, pipes_ecs_client: PipesECSClient): + return pipes_ecs_client.run( + context=context, + task_definition="dagster-pipes", + launch_type="FARGATE", + network_configuration={ + "awsvpcConfiguration": { + "subnets": [ + "subnet-a96acdc0" + ], + "securityGroups": ["sg-028d32553728591b2"], + "assignPublicIp": "ENABLED", + } + }, + ).get_materialize_result() + + +# end_asset_marker + +# start_definitions_marker + +from dagster import Definitions # noqa +from dagster_aws.pipes import PipesS3MessageReader + + +defs = Definitions( + assets=[ecs_pipes_asset], + resources={ + "pipes_ecs_client": PipesECSClient( + client=boto3.client("ecs"), + ) + }, +) + +# end_definitions_marker diff --git a/examples/docs_snippets/docs_snippets/guides/dagster/dagster_pipes/ecs/dev.Dockerfile b/examples/docs_snippets/docs_snippets/guides/dagster/dagster_pipes/ecs/dev.Dockerfile new file mode 100644 index 0000000000000..d2c45cff7ebba --- /dev/null +++ b/examples/docs_snippets/docs_snippets/guides/dagster/dagster_pipes/ecs/dev.Dockerfile @@ -0,0 +1,14 @@ +# this Dockerfile can be used for ECS Pipes development + +FROM python:3.11-slim + +RUN python -m pip install --no-cache dagster-pipes boto3 + +RUN --mount=type=cache,target=/root/.cache/pip pip install boto3 + +COPY python_modules/dagster-pipes /src/dagster-pipes + +RUN pip install -e /src/dagster-pipes + +WORKDIR /app +COPY examples/docs_snippets/docs_snippets/guides/dagster/dagster_pipes/ecs/task.py . diff --git a/examples/docs_snippets/docs_snippets/guides/dagster/dagster_pipes/ecs/task.py b/examples/docs_snippets/docs_snippets/guides/dagster/dagster_pipes/ecs/task.py new file mode 100644 index 0000000000000..9c0c029fbb43e --- /dev/null +++ b/examples/docs_snippets/docs_snippets/guides/dagster/dagster_pipes/ecs/task.py @@ -0,0 +1,21 @@ +import boto3 +from dagster_pipes import ( + PipesEnvVarParamsLoader, + PipesS3ContextLoader, + open_dagster_pipes, +) + +client = boto3.client("s3") + + +def main(): + with open_dagster_pipes() as pipes: + pipes.log.info("Hello from AWS ECS task!") + pipes.report_asset_materialization( + metadata={"some_metric": {"raw_value": 0, "type": "int"}}, + data_version="alpha", + ) + + +if __name__ == "__main__": + main()