Skip to content

Commit

Permalink
Merge pull request #1737 from PrefectHQ/agent_options
Browse files Browse the repository at this point in the history
`agent start` CLI command now allows for kwargs
  • Loading branch information
joshmeek authored Nov 13, 2019
2 parents 7298003 + 62ea50a commit 9a6ac52
Show file tree
Hide file tree
Showing 5 changed files with 145 additions and 6 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ These changes are available in the [master branch](https://github.com/PrefectHQ/
- Loosen `containerDefinitions` requirements for `FargateTaskEnvironment` - [#1713](https://github.com/PrefectHQ/prefect/pull/1713)
- Local Docker agent proactively fails flow runs if image cannot be pulled - [#1395](https://github.com/PrefectHQ/prefect/issues/1395)
- Add graceful keyboard interrupt shutdown for all agents - [#1731](https://github.com/PrefectHQ/prefect/pull/1731)
- `agent start` CLI command now allows for Agent kwargs - [#1737](https://github.com/PrefectHQ/prefect/pull/1737)

### Task Library

Expand Down
25 changes: 25 additions & 0 deletions docs/cloud/agent/fargate.md
Original file line number Diff line number Diff line change
Expand Up @@ -201,3 +201,28 @@ $ prefect agent start fargate
:::warning Outbound Traffic
If you encounter issues with Fargate raising errors in cases of client timeouts or inability to pull containers then you may need to adjust your `networkConfiguration`. Visit [this discussion thread](https://github.com/aws/amazon-ecs-agent/issues/1128#issuecomment-351545461) for more information on configuring AWS security groups.
:::

#### Prefect CLI Using Kwargs

All configuration options for the Fargate Agent can also be provided to the `prefect agent start fargate` CLI command. They must match the camel casing used by boto3 but both the single kwarg as well as with the standard prefix of `--` are accepted. This means that `taskRoleArn=""` is the same as `--taskRoleArn=""`.

```bash
$ export AWS_ACCESS_KEY_ID=...
$ export AWS_SECRET_ACCESS_KEY=...
$ export REGION_NAME=us-east-1

$ prefect agent start fargate cpu=256 memory=512 networkConfiguration="{'awsvpcConfiguration': {'assignPublicIp': 'ENABLED', 'subnets': ['my_subnet_id'], 'securityGroups': []}}"
```

Kwarg values can also be provided through environment variables. This is useful in situations where case sensitive environment variables are desired or when using templating tools like Terraform to deploy your Agent.

```bash
$ export AWS_ACCESS_KEY_ID=...
$ export AWS_SECRET_ACCESS_KEY=...
$ export REGION_NAME=us-east-1
$ export CPU=256
$ export MEMORY=512
$ export NETWORK_CONFIGURATION="{'awsvpcConfiguration': {'assignPublicIp': 'ENABLED', 'subnets': ['my_subnet_id'], 'securityGroups': []}}"

$ prefect agent start fargate cpu=$CPU memory=$MEMORY networkConfiguration=$NETWORK_CONFIGURATION
```
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
"dropbox": ["dropbox ~= 9.0"],
"google": [
"google-cloud-bigquery >= 1.6.0, < 2.0",
"google-cloud-storage >= 1.13, < 2.0",
"google-cloud-storage >= 1.13, < 1.23.0",
],
"kubernetes": ["kubernetes >= 9.0.0a1, < 10.0", "dask-kubernetes >= 0.8.0"],
"rss": ["feedparser >= 5.0.1, < 6.0"],
Expand Down
34 changes: 30 additions & 4 deletions src/prefect/cli/agent.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import click

from prefect import config, context
from prefect import config
from prefect.utilities.configuration import set_temporary_config
from prefect.utilities.serialization import from_qualified_name

Expand Down Expand Up @@ -42,7 +42,10 @@ def agent():
pass


@agent.command(hidden=True)
@agent.command(
hidden=True,
context_settings=dict(ignore_unknown_options=True, allow_extra_args=True,),
)
@click.argument("agent-option", default="local")
@click.option(
"--token", "-t", required=False, help="A Prefect Cloud API token.", hidden=True
Expand All @@ -67,7 +70,8 @@ def agent():
)
@click.option("--no-pull", is_flag=True, help="Pull images flag.", hidden=True)
@click.option("--base-url", "-b", help="Docker daemon base URL.", hidden=True)
def start(agent_option, token, name, verbose, label, no_pull, base_url):
@click.pass_context
def start(ctx, agent_option, token, name, verbose, label, no_pull, base_url):
"""
Start an agent.
Expand All @@ -90,7 +94,19 @@ def start(agent_option, token, name, verbose, label, no_pull, base_url):
--base-url, -b TEXT A Docker daemon host URL for a LocalAgent
--no-pull Pull images for a LocalAgent
Defaults to pulling if not provided
\b
Fargate Agent Options:
Any of the configuration options outlined in the docs can be provided here
https://docs.prefect.io/cloud/agent/fargate.html#configuration
"""

# Split context
kwargs = dict()
for item in ctx.args:
item = item.replace("--", "")
kwargs.update([item.split("=")])

tmp_config = {"cloud.agent.auth_token": token or config.cloud.agent.auth_token}
if verbose:
tmp_config["cloud.agent.level"] = "DEBUG"
Expand All @@ -102,7 +118,17 @@ def start(agent_option, token, name, verbose, label, no_pull, base_url):
click.secho("{} is not a valid agent".format(agent_option), fg="red")
return

with context(no_pull=no_pull, base_url=base_url):
_agent = from_qualified_name(retrieved_agent)

if agent_option == "local":
from_qualified_name(retrieved_agent)(
name=name, labels=list(label), base_url=base_url, no_pull=no_pull,
).start()
elif agent_option == "fargate":
from_qualified_name(retrieved_agent)(
name=name, labels=list(label), **kwargs
).start()
else:
from_qualified_name(retrieved_agent)(name=name, labels=list(label)).start()


Expand Down
89 changes: 88 additions & 1 deletion tests/cli/test_agent.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,14 @@
from unittest.mock import MagicMock

from click.testing import CliRunner
import pytest

from prefect.cli.agent import agent

pytest.importorskip("boto3")
pytest.importorskip("botocore")
pytest.importorskip("kubernetes")


def test_agent_init():
runner = CliRunner()
Expand Down Expand Up @@ -55,6 +60,88 @@ def test_agent_start_verbose(monkeypatch, runner_token):
assert result.exit_code == 0


def test_agent_start_local(monkeypatch, runner_token):
start = MagicMock()
monkeypatch.setattr("prefect.agent.local.LocalAgent.start", start)

docker_client = MagicMock()
monkeypatch.setattr("prefect.agent.local.agent.docker.APIClient", docker_client)

runner = CliRunner()
result = runner.invoke(agent, ["start", "local"])
assert result.exit_code == 0


def test_agent_start_kubernetes(monkeypatch, runner_token):
start = MagicMock()
monkeypatch.setattr("prefect.agent.kubernetes.KubernetesAgent.start", start)

k8s_config = MagicMock()
monkeypatch.setattr("kubernetes.config", k8s_config)

runner = CliRunner()
result = runner.invoke(agent, ["start", "kubernetes"])
assert result.exit_code == 0


def test_agent_start_kubernetes_kwargs_ignored(monkeypatch, runner_token):
start = MagicMock()
monkeypatch.setattr("prefect.agent.kubernetes.KubernetesAgent.start", start)

k8s_config = MagicMock()
monkeypatch.setattr("kubernetes.config", k8s_config)

runner = CliRunner()
result = runner.invoke(agent, ["start", "kubernetes", "test_kwarg=ignored"])
assert result.exit_code == 0


def test_agent_start_fargate(monkeypatch, runner_token):
start = MagicMock()
monkeypatch.setattr("prefect.agent.fargate.FargateAgent.start", start)

boto3_client = MagicMock()
monkeypatch.setattr("boto3.client", boto3_client)

runner = CliRunner()
result = runner.invoke(agent, ["start", "fargate"])
assert result.exit_code == 0


def test_agent_start_fargate_kwargs(monkeypatch, runner_token):
start = MagicMock()
monkeypatch.setattr("prefect.agent.fargate.FargateAgent.start", start)

boto3_client = MagicMock()
monkeypatch.setattr("boto3.client", boto3_client)

runner = CliRunner()
result = runner.invoke(agent, ["start", "fargate", "taskRoleArn=test"])
assert result.exit_code == 0


def test_agent_start_fargate_kwargs_received(monkeypatch, runner_token):
start = MagicMock()
monkeypatch.setattr("prefect.agent.fargate.FargateAgent.start", start)

fargate_agent = MagicMock()
monkeypatch.setattr("prefect.agent.fargate.FargateAgent", fargate_agent)

boto3_client = MagicMock()
monkeypatch.setattr("boto3.client", boto3_client)

runner = CliRunner()
result = runner.invoke(
agent, ["start", "fargate", "taskRoleArn=arn", "--volumes=vol"]
)
assert result.exit_code == 0

assert fargate_agent.called
fargate_agent.assert_called_with(
labels=[], name=None, taskRoleArn="arn", volumes="vol"
)


def test_agent_start_name(monkeypatch, runner_token):
start = MagicMock()
monkeypatch.setattr("prefect.agent.local.LocalAgent.start", start)
Expand All @@ -67,7 +154,7 @@ def test_agent_start_name(monkeypatch, runner_token):
assert result.exit_code == 0


def test_agent_start_local_context_vars(monkeypatch, runner_token):
def test_agent_start_local_vars(monkeypatch, runner_token):
start = MagicMock()
monkeypatch.setattr("prefect.agent.local.LocalAgent.start", start)

Expand Down

0 comments on commit 9a6ac52

Please sign in to comment.