-
Notifications
You must be signed in to change notification settings - Fork 1.5k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
[dagster-aws] [docs] add documentation for ECS Pipes
- Loading branch information
1 parent
87e6397
commit a35b7d5
Showing
15 changed files
with
218 additions
and
6 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Binary file not shown.
Binary file not shown.
Binary file not shown.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,137 @@ | ||
--- | ||
title: "Integrating AWS ECS with Dagster Pipes | Dagster Docs" | ||
description: "Learn to integrate Dagster Pipes with AWS ECS to launch external code from Dagster assets." | ||
--- | ||
|
||
# AWS ECS & Dagster Pipes | ||
|
||
This tutorial gives a short overview on how to use [Dagster Pipes](/concepts/dagster-pipes) with [AWS ECS](https://aws.amazon.com/ecs/). | ||
|
||
The [dagster-aws](/\_apidocs/libraries/dagster-aws) integration library provides the <PyObject object="PipesECSClient" module="dagster_aws.pipes" /> resource which can be used to launch AWS ECS tasks from Dagster assets and ops. Dagster can receive regular events like logs, asset checks, or asset materializations from jobs launched with this client. Using it requires minimal code changes on the task side. | ||
|
||
--- | ||
|
||
## Prerequisites | ||
|
||
- **In the orchestration environment**, you'll need to: | ||
|
||
- Install the following packages: | ||
|
||
```shell | ||
pip install dagster dagster-webserver dagster-aws | ||
``` | ||
|
||
Refer to the [Dagster installation guide](/getting-started/install) for more info. | ||
|
||
- **AWS authentication credentials configured.** If you don't have this set up already, refer to the [boto3 quickstart](https://boto3.amazonaws.com/v1/documentation/api/latest/guide/quickstart.html). | ||
- **In AWS**: | ||
- An existing AWS account | ||
- An AWS ECS task. To receive logs and events from a task container, it must have `"logDriver"` set to `"awslogs"` in `"logConfiguration"`. | ||
--- | ||
## Step 1: Install the dagster-pipes module | ||
Install the `dagster-pipes` module in the image used for your ECS task. For example, you can install the dependency with `pip` in your image Dockerfile: | ||
```Dockerfile | ||
FROM python:3.11-slim | ||
RUN python -m pip install dagster-pipes | ||
# copy the task script | ||
COPY . . | ||
``` | ||
--- | ||
## Step 2: Add dagster-pipes to the ECS task script | ||
Call `open_dagster_pipes` in the ECS task script to create a context that can be used to send messages to Dagster: | ||
```python file=/guides/dagster/dagster_pipes/ecs/task.py | ||
from dagster_pipes import ( | ||
PipesEnvVarParamsLoader, | ||
PipesS3ContextLoader, | ||
open_dagster_pipes, | ||
) | ||
def main(): | ||
with open_dagster_pipes() as pipes: | ||
pipes.log.info("Hello from AWS ECS task!") | ||
pipes.report_asset_materialization( | ||
metadata={"some_metric": {"raw_value": 0, "type": "int"}}, | ||
data_version="alpha", | ||
) | ||
if __name__ == "__main__": | ||
main() | ||
``` | ||
--- | ||
## Step 3: Create an asset using the PipesECSClient to launch the task | ||
In the Dagster asset/op code, use the `PipesECSClient` resource to launch the job: | ||
```python file=/guides/dagster/dagster_pipes/ecs/dagster_code.py startafter=start_asset_marker endbefore=end_asset_marker | ||
import os | ||
# dagster_glue_pipes.py | ||
import boto3 | ||
from dagster_aws.pipes import PipesECSClient | ||
from docutils.nodes import entry | ||
from dagster import AssetExecutionContext, asset | ||
@asset | ||
def ecs_pipes_asset(context: AssetExecutionContext, pipes_ecs_client: PipesECSClient): | ||
return pipes_ecs_client.run( | ||
context=context, | ||
run_task_params={ | ||
"taskDefinition": "my-task", | ||
"count": 1, | ||
}, | ||
).get_materialize_result() | ||
``` | ||
This will launch the AWS ECS task and wait until it reaches `"STOPPED"` status. If any of the tasks's containers fail, the Dagster process will raise an exception. If the Dagster process is interrupted while the task is still running, the task will be terminated. | ||
|
||
--- | ||
|
||
## Step 4: Create Dagster definitions | ||
|
||
Next, add the `PipesECSClient` resource to your project's <PyObject object="Definitions" /> object: | ||
```python file=/guides/dagster/dagster_pipes/ecs/dagster_code.py startafter=start_definitions_marker endbefore=end_definitions_marker | ||
from dagster import Definitions # noqa | ||
from dagster_aws.pipes import PipesS3MessageReader | ||
defs = Definitions( | ||
assets=[ecs_pipes_asset], | ||
resources={"pipes_ecs_client": PipesECSClient()}, | ||
) | ||
``` | ||
Dagster will now be able to launch the AWS ECS task from the `ecs_pipes_asset` asset, and receive logs and events from the task. If using the default `message_reader` `PipesCloudwatchLogReader`, logs will be read from the Cloudwatch log group specified in the container `"logConfiguration"` field definition. Logs from all containers in the task will be read. | ||
--- | ||
## Related | ||
<ArticleList> | ||
<ArticleListItem | ||
title="Dagster Pipes" | ||
href="/concepts/dagster-pipes" | ||
></ArticleListItem> | ||
<ArticleListItem | ||
title="AWS ECS Pipes API reference" | ||
href="/_apidocs/libraries/dagster-aws#dagster_aws.pipes.PipesECSClient" | ||
></ArticleListItem> | ||
</ArticleList> |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Binary file not shown.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
36 changes: 36 additions & 0 deletions
36
examples/docs_snippets/docs_snippets/guides/dagster/dagster_pipes/ecs/dagster_code.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,36 @@ | ||
# start_asset_marker | ||
import os | ||
|
||
# dagster_glue_pipes.py | ||
import boto3 | ||
from dagster_aws.pipes import PipesECSClient | ||
from docutils.nodes import entry | ||
|
||
from dagster import AssetExecutionContext, asset | ||
|
||
|
||
@asset | ||
def ecs_pipes_asset(context: AssetExecutionContext, pipes_ecs_client: PipesECSClient): | ||
return pipes_ecs_client.run( | ||
context=context, | ||
run_task_params={ | ||
"taskDefinition": "my-task", | ||
"count": 1, | ||
}, | ||
).get_materialize_result() | ||
|
||
|
||
# end_asset_marker | ||
|
||
# start_definitions_marker | ||
|
||
from dagster import Definitions # noqa | ||
from dagster_aws.pipes import PipesS3MessageReader | ||
|
||
|
||
defs = Definitions( | ||
assets=[ecs_pipes_asset], | ||
resources={"pipes_ecs_client": PipesECSClient()}, | ||
) | ||
|
||
# end_definitions_marker |
12 changes: 12 additions & 0 deletions
12
examples/docs_snippets/docs_snippets/guides/dagster/dagster_pipes/ecs/dev.Dockerfile
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,12 @@ | ||
# this Dockerfile can be used for ECS Pipes development | ||
|
||
FROM python:3.11-slim | ||
|
||
RUN --mount=type=cache,target=/root/.cache/pip python -m pip install boto3 | ||
|
||
COPY python_modules/dagster-pipes /src/dagster-pipes | ||
|
||
RUN pip install -e /src/dagster-pipes | ||
|
||
WORKDIR /app | ||
COPY examples/docs_snippets/docs_snippets/guides/dagster/dagster_pipes/ecs/task.py . |
18 changes: 18 additions & 0 deletions
18
examples/docs_snippets/docs_snippets/guides/dagster/dagster_pipes/ecs/task.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,18 @@ | ||
from dagster_pipes import ( | ||
PipesEnvVarParamsLoader, | ||
PipesS3ContextLoader, | ||
open_dagster_pipes, | ||
) | ||
|
||
|
||
def main(): | ||
with open_dagster_pipes() as pipes: | ||
pipes.log.info("Hello from AWS ECS task!") | ||
pipes.report_asset_materialization( | ||
metadata={"some_metric": {"raw_value": 0, "type": "int"}}, | ||
data_version="alpha", | ||
) | ||
|
||
|
||
if __name__ == "__main__": | ||
main() |
1 change: 0 additions & 1 deletion
1
examples/docs_snippets/docs_snippets/guides/dagster/dagster_pipes/glue/dagster_code.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters