Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

agent start CLI command now allows for kwargs #1737

Merged
merged 8 commits into from
Nov 13, 2019
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ These changes are available in the [master branch](https://github.com/PrefectHQ/
- Loosen `containerDefinitions` requirements for `FargateTaskEnvironment` - [#1713](https://github.com/PrefectHQ/prefect/pull/1713)
- Local Docker agent proactively fails flow runs if image cannot be pulled - [#1395](https://github.com/PrefectHQ/prefect/issues/1395)
- Add graceful keyboard interrupt shutdown for all agents - [#1731](https://github.com/PrefectHQ/prefect/pull/1731)
- `agent start` CLI command now allows for Agent kwargs - [#1737](https://github.com/PrefectHQ/prefect/pull/1737)

### Task Library

Expand Down
25 changes: 25 additions & 0 deletions docs/cloud/agent/fargate.md
Original file line number Diff line number Diff line change
Expand Up @@ -201,3 +201,28 @@ $ prefect agent start fargate
:::warning Outbound Traffic
If you encounter issues with Fargate raising errors in cases of client timeouts or inability to pull containers then you may need to adjust your `networkConfiguration`. Visit [this discussion thread](https://github.com/aws/amazon-ecs-agent/issues/1128#issuecomment-351545461) for more information on configuring AWS security groups.
:::

#### Prefect CLI Using Kwargs

All configuration options for the Fargate Agent can also be provided to the `prefect agent start fargate` CLI command. They must match the camel casing used by boto3.

```bash
$ export AWS_ACCESS_KEY_ID=...
$ export AWS_SECRET_ACCESS_KEY=...
$ export REGION_NAME=us-east-1

$ prefect agent start fargate cpu=256 memory=512 networkConfiguration="{'awsvpcConfiguration': {'assignPublicIp': 'ENABLED', 'subnets': ['my_subnet_id'], 'securityGroups': []}}"
joshmeek marked this conversation as resolved.
Show resolved Hide resolved
```

Kwarg values can also be provided through environment variables. This is useful in situations where case sensitive environment variables are desired or when using templating tools like Terraform to deploy your Agent.

```bash
$ export AWS_ACCESS_KEY_ID=...
$ export AWS_SECRET_ACCESS_KEY=...
$ export REGION_NAME=us-east-1
$ export CPU=256
$ export MEMORY=512
$ export NETWORK_CONFIGURATION="{'awsvpcConfiguration': {'assignPublicIp': 'ENABLED', 'subnets': ['my_subnet_id'], 'securityGroups': []}}"

$ prefect agent start fargate cpu=$CPU memory=$MEMORY networkConfiguration=$NETWORK_CONFIGURATION
```
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
"dropbox": ["dropbox ~= 9.0"],
"google": [
"google-cloud-bigquery >= 1.6.0, < 2.0",
"google-cloud-storage >= 1.13, < 2.0",
"google-cloud-storage >= 1.13, < 1.23.0",
],
"kubernetes": ["kubernetes >= 9.0.0a1, < 10.0", "dask-kubernetes >= 0.8.0"],
"rss": ["feedparser >= 5.0.1, < 6.0"],
Expand Down
34 changes: 30 additions & 4 deletions src/prefect/cli/agent.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import click

from prefect import config, context
from prefect import config
from prefect.utilities.configuration import set_temporary_config
from prefect.utilities.serialization import from_qualified_name

Expand Down Expand Up @@ -42,7 +42,10 @@ def agent():
pass


@agent.command(hidden=True)
@agent.command(
hidden=True,
context_settings=dict(ignore_unknown_options=True, allow_extra_args=True,),
)
@click.argument("agent-option", default="local")
@click.option(
"--token", "-t", required=False, help="A Prefect Cloud API token.", hidden=True
Expand All @@ -67,7 +70,8 @@ def agent():
)
@click.option("--no-pull", is_flag=True, help="Pull images flag.", hidden=True)
@click.option("--base-url", "-b", help="Docker daemon base URL.", hidden=True)
def start(agent_option, token, name, verbose, label, no_pull, base_url):
@click.pass_context
def start(ctx, agent_option, token, name, verbose, label, no_pull, base_url):
"""
Start an agent.

Expand All @@ -90,7 +94,19 @@ def start(agent_option, token, name, verbose, label, no_pull, base_url):
--base-url, -b TEXT A Docker daemon host URL for a LocalAgent
--no-pull Pull images for a LocalAgent
Defaults to pulling if not provided

\b
Fargate Agent Options:
Any of the configuration options outlined in the docs can be provided here
https://docs.prefect.io/cloud/agent/fargate.html#configuration
"""

# Split context
kwargs = dict()
for item in ctx.args:
item = item.replace("--", "")
kwargs.update([item.split("=")])
cicdw marked this conversation as resolved.
Show resolved Hide resolved

tmp_config = {"cloud.agent.auth_token": token or config.cloud.agent.auth_token}
if verbose:
tmp_config["cloud.agent.level"] = "DEBUG"
Expand All @@ -102,7 +118,17 @@ def start(agent_option, token, name, verbose, label, no_pull, base_url):
click.secho("{} is not a valid agent".format(agent_option), fg="red")
return

with context(no_pull=no_pull, base_url=base_url):
_agent = from_qualified_name(retrieved_agent)

if agent_option == "local":
from_qualified_name(retrieved_agent)(
name=name, labels=list(label), base_url=base_url, no_pull=no_pull,
).start()
elif agent_option == "fargate":
from_qualified_name(retrieved_agent)(
name=name, labels=list(label), **kwargs
).start()
else:
from_qualified_name(retrieved_agent)(name=name, labels=list(label)).start()


Expand Down
89 changes: 88 additions & 1 deletion tests/cli/test_agent.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,14 @@
from unittest.mock import MagicMock

from click.testing import CliRunner
import pytest

from prefect.cli.agent import agent

pytest.importorskip("boto3")
pytest.importorskip("botocore")
pytest.importorskip("kubernetes")


def test_agent_init():
runner = CliRunner()
Expand Down Expand Up @@ -55,6 +60,88 @@ def test_agent_start_verbose(monkeypatch, runner_token):
assert result.exit_code == 0


def test_agent_start_local(monkeypatch, runner_token):
start = MagicMock()
monkeypatch.setattr("prefect.agent.local.LocalAgent.start", start)

docker_client = MagicMock()
monkeypatch.setattr("prefect.agent.local.agent.docker.APIClient", docker_client)

runner = CliRunner()
result = runner.invoke(agent, ["start", "local"])
assert result.exit_code == 0


def test_agent_start_kubernetes(monkeypatch, runner_token):
start = MagicMock()
monkeypatch.setattr("prefect.agent.kubernetes.KubernetesAgent.start", start)

k8s_config = MagicMock()
monkeypatch.setattr("kubernetes.config", k8s_config)

runner = CliRunner()
result = runner.invoke(agent, ["start", "kubernetes"])
assert result.exit_code == 0


def test_agent_start_kubernetes_kwargs_ignored(monkeypatch, runner_token):
start = MagicMock()
monkeypatch.setattr("prefect.agent.kubernetes.KubernetesAgent.start", start)

k8s_config = MagicMock()
monkeypatch.setattr("kubernetes.config", k8s_config)

runner = CliRunner()
result = runner.invoke(agent, ["start", "kubernetes", "test_kwarg=ignored"])
assert result.exit_code == 0


def test_agent_start_fargate(monkeypatch, runner_token):
start = MagicMock()
monkeypatch.setattr("prefect.agent.fargate.FargateAgent.start", start)

boto3_client = MagicMock()
monkeypatch.setattr("boto3.client", boto3_client)

runner = CliRunner()
result = runner.invoke(agent, ["start", "fargate"])
assert result.exit_code == 0


def test_agent_start_fargate_kwargs(monkeypatch, runner_token):
start = MagicMock()
monkeypatch.setattr("prefect.agent.fargate.FargateAgent.start", start)

boto3_client = MagicMock()
monkeypatch.setattr("boto3.client", boto3_client)

runner = CliRunner()
result = runner.invoke(agent, ["start", "fargate", "taskRoleArn=test"])
assert result.exit_code == 0


def test_agent_start_fargate_kwargs_received(monkeypatch, runner_token):
start = MagicMock()
monkeypatch.setattr("prefect.agent.fargate.FargateAgent.start", start)

fargate_agent = MagicMock()
monkeypatch.setattr("prefect.agent.fargate.FargateAgent", fargate_agent)

boto3_client = MagicMock()
monkeypatch.setattr("boto3.client", boto3_client)

runner = CliRunner()
result = runner.invoke(
agent, ["start", "fargate", "taskRoleArn=arn", "--volumes=vol"]
)
assert result.exit_code == 0

assert fargate_agent.called
fargate_agent.assert_called_with(
labels=[], name=None, taskRoleArn="arn", volumes="vol"
)


def test_agent_start_name(monkeypatch, runner_token):
start = MagicMock()
monkeypatch.setattr("prefect.agent.local.LocalAgent.start", start)
Expand All @@ -67,7 +154,7 @@ def test_agent_start_name(monkeypatch, runner_token):
assert result.exit_code == 0


def test_agent_start_local_context_vars(monkeypatch, runner_token):
def test_agent_start_local_vars(monkeypatch, runner_token):
start = MagicMock()
monkeypatch.setattr("prefect.agent.local.LocalAgent.start", start)

Expand Down