Skip to content

Commit

Permalink
Merge branch 'master' into fargate_docs_kwargs
Browse files Browse the repository at this point in the history
  • Loading branch information
joshmeek authored Nov 12, 2019
2 parents 8a6be50 + 675a03a commit a058885
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 4 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ These changes are available in the [master branch](https://github.com/PrefectHQ/
- Add informative logs in the event that a heartbeat thread dies - [#1721](https://github.com/PrefectHQ/prefect/pull/1721)
- Loosen Job spec requirements for `KubernetesJobEnvironment` - [#1713](https://github.com/PrefectHQ/prefect/pull/1713)
- Loosen `containerDefinitions` requirements for `FargateTaskEnvironment` - [#1713](https://github.com/PrefectHQ/prefect/pull/1713)
- Local Docker agent proactively fails flow runs if image cannot be pulled - [#1395](https://github.com/PrefectHQ/prefect/issues/1395)
- Add graceful keyboard interrupt shutdown for all agents - [#1731](https://github.com/PrefectHQ/prefect/pull/1731)

### Task Library
Expand Down
20 changes: 17 additions & 3 deletions src/prefect/agent/local/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

from prefect import config, context
from prefect.agent import Agent
from prefect.engine.state import Failed
from prefect.environments.storage import Docker
from prefect.serialization.storage import StorageSchema
from prefect.utilities.graphql import GraphQLResult
Expand Down Expand Up @@ -78,9 +79,14 @@ def deploy_flows(self, flow_runs: list) -> None:

storage = StorageSchema().load(flow_run.flow.storage)
if not isinstance(StorageSchema().load(flow_run.flow.storage), Docker):
self.logger.error(
"Storage for flow run {} is not of type Docker.".format(flow_run.id)
msg = "Storage for flow run {} is not of type Docker.".format(
flow_run.id
)
state_msg = "Agent {} failed to run flow: ".format(self.name) + msg
self.client.set_flow_run_state(
flow_run.id, version=flow_run.version, state=Failed(state_msg)
)
self.logger.error(msg)
continue

env_vars = self.populate_env_vars(flow_run=flow_run)
Expand All @@ -97,7 +103,15 @@ def deploy_flows(self, flow_runs: list) -> None:
"Successfully pulled image {}...".format(storage.name)
)
except docker.errors.APIError as exc:
self.logger.error("Issue pulling image {}".format(storage.name))
msg = "Issue pulling image {}".format(storage.name)
state_msg = (
"Agent {} failed to pull image for flow: ".format(self.name)
+ msg
)
self.client.set_flow_run_state(
flow_run.id, version=flow_run.version, state=Failed(msg)
)
self.logger.error(msg)

# Create a container
self.logger.debug("Creating Docker container {}".format(storage.name))
Expand Down
7 changes: 6 additions & 1 deletion tests/agent/test_local_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,7 @@ def test_local_agent_deploy_flows(monkeypatch, runner_token):

def test_local_agent_deploy_flows_storage_continues(monkeypatch, runner_token):

monkeypatch.setattr("prefect.agent.agent.Client", MagicMock())
api = MagicMock()
api.ping.return_value = True
api.create_container.return_value = {"Id": "container_id"}
Expand All @@ -200,7 +201,11 @@ def test_local_agent_deploy_flows_storage_continues(monkeypatch, runner_token):
agent.deploy_flows(
flow_runs=[
GraphQLResult(
{"flow": GraphQLResult({"storage": Local().serialize()}), "id": "id"}
{
"flow": GraphQLResult({"storage": Local().serialize()}),
"id": "id",
"version": "version",
}
)
]
)
Expand Down

0 comments on commit a058885

Please sign in to comment.