Skip to content

Commit

Permalink
fix: complete all actions following unsuccessful actions as NEVER_ATT…
Browse files Browse the repository at this point in the history
…EMPTED (#190)

Problem:
When session init failed, all actions were being marked as FAILED.

Solution:
Only the first should be marked as FAILED and the remainder should be
marked as NEVER_ATTEMPTED

Signed-off-by: Matt Authement <maauth@amazon.com>
  • Loading branch information
matta-aws authored Mar 7, 2024
1 parent 92b6d17 commit d266c0f
Show file tree
Hide file tree
Showing 6 changed files with 20 additions and 35 deletions.
2 changes: 1 addition & 1 deletion src/deadline_worker_agent/scheduler/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -595,7 +595,7 @@ def _fail_all_actions(
{
action["sessionActionId"]: SessionActionStatus(
id=action["sessionActionId"],
completed_status="FAILED",
completed_status="FAILED" if action is actions[0] else "NEVER_ATTEMPTED",
start_time=now,
end_time=now,
status=ActionStatus(
Expand Down
9 changes: 5 additions & 4 deletions src/deadline_worker_agent/scheduler/session_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,6 @@ def cancel_all(
self,
*,
message: str | None = None,
cancel_outcome: CancelOutcome = "NEVER_ATTEMPTED",
ignore_env_exits: bool = True,
) -> None:
"""Cancels all queued actions
Expand All @@ -233,8 +232,6 @@ def cancel_all(
----------
message : str | None
An optional message to include explaining why this action was canceled
cancel_outcome : Literal["NEVER_ATTEMPTED", "FAILED"]
Whether to mark the actions as NEVER_ATTEMPTED or FAILED. Default is NEVER_ATTEMPTED
ignore_env_exits : bool
If True, ENV_EXIT actions will not be canceled. Defaults to canceling ENV_EXIT actions.
"""
Expand All @@ -247,7 +244,11 @@ def cancel_all(
]

for action_id in action_ids:
self.cancel(id=action_id, message=message, cancel_outcome=cancel_outcome)
self.cancel(
id=action_id,
message=message,
cancel_outcome="NEVER_ATTEMPTED",
)

def replace(
self,
Expand Down
7 changes: 0 additions & 7 deletions src/deadline_worker_agent/sessions/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -340,7 +340,6 @@ def _cleanup(self) -> None:
)

self._queue.cancel_all(
cancel_outcome="NEVER_ATTEMPTED",
message=self._stop_fail_message,
)

Expand Down Expand Up @@ -533,7 +532,6 @@ def _start_action(self) -> None:
)
self._queue.cancel_all(
message=f"Error starting prior action {e.action_id}",
cancel_outcome="FAILED",
ignore_env_exits=True,
)
self._current_action = None
Expand Down Expand Up @@ -561,7 +559,6 @@ def _start_action(self) -> None:
)
self._queue.cancel_all(
message=f"Error starting prior action {action_definition.id}",
cancel_outcome="FAILED",
ignore_env_exits=True,
)
self._current_action = None
Expand Down Expand Up @@ -605,9 +602,6 @@ def _start_action(self) -> None:
)
self._queue.cancel_all(
message=f"Error starting prior action {action_definition.id}",
# TODO: Change this after session actions failures before a task run count as
# overall failures and do not cause retry sessions to be scheduled indefinitely
cancel_outcome="FAILED",
ignore_env_exits=True,
)
self._current_action = None
Expand Down Expand Up @@ -1103,7 +1097,6 @@ def _handle_action_update(
# If the current action failed, we mark future actions assigned to the session as
# NEVER_ATTEMPTED except for envExit actions.
self._queue.cancel_all(
cancel_outcome="NEVER_ATTEMPTED",
message=fail_message,
ignore_env_exits=True,
)
Expand Down
18 changes: 12 additions & 6 deletions test/unit/scheduler/test_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -623,8 +623,9 @@ def test_log_provision_error(
self,
scheduler: WorkerScheduler,
) -> None:
"""Tests that when a session is assigned with a log provisioning error, that its assigned
actions are marked as FAILED and the scheduler's wakeup event is set so that it makes an
"""Tests that when a session is assigned with a log provisioning error, that the assigned
action is marked as FAILED, the rest are marked as NEVER_ATTEMPTED,
and the scheduler's wakeup event is set so that it makes an
immediate follow-up UpdateWorkerSchedule request to signal the failure.
"""

Expand Down Expand Up @@ -672,7 +673,9 @@ def test_log_provision_error(
action_update := scheduler._action_updates_map.get(action_id, None)
), f"no action update for {action_id}"
assert action_update.id == action_id
assert action_update.completed_status == "FAILED"
assert action_update.completed_status == (
"FAILED" if action_num == 1 else "NEVER_ATTEMPTED"
)
assert action_update.status is not None
assert action_update.status.state == ActionState.FAILED
assert (
Expand All @@ -695,8 +698,9 @@ def test_job_details_error(
scheduler: WorkerScheduler,
job_details_error: Exception,
) -> None:
"""Tests that when a session encounters a job details error, that its assigned
actions are marked as FAILED and the scheduler's wakeup event is set so that it makes an
"""Tests that when a session encounters a job details error, that the first assigned
action is marked as FAILED, the rest are marked as NEVER_ATTEPTED,
and the scheduler's wakeup event is set so that it makes an
immediate follow-up UpdateWorkerSchedule request to signal the failure.
"""
# GIVEN
Expand Down Expand Up @@ -748,7 +752,9 @@ def test_job_details_error(
action_update := scheduler._action_updates_map.get(action_id, None)
), f"no action update for {action_id}"
assert action_update.id == action_id
assert action_update.completed_status == "FAILED"
assert action_update.completed_status == (
"FAILED" if action_num == 1 else "NEVER_ATTEMPTED"
)
assert action_update.status is not None
assert action_update.status.state == ActionState.FAILED
assert action_update.status.fail_message == str(job_details_error)
Expand Down
9 changes: 1 addition & 8 deletions test/unit/scheduler/test_session_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import pytest

from deadline_worker_agent.scheduler.session_queue import (
CancelOutcome,
EnvironmentQueueEntry,
TaskRunQueueEntry,
SessionActionQueue,
Expand Down Expand Up @@ -355,11 +354,6 @@ class TestCancelAll:
argvalues=("msg1", "msg2", None),
ids=("msg1", "msg2", "no-msg"),
)
@pytest.mark.parametrize(
argnames="cancel_outcome",
argvalues=("msg1", "msg2", None),
ids=("msg1", "msg2", "no-msg"),
)
@pytest.mark.parametrize(
argnames="ignore_env_exits",
argvalues=(False, True),
Expand All @@ -368,7 +362,6 @@ class TestCancelAll:
def test_ignore_env_exits(
self,
message: str | None,
cancel_outcome: CancelOutcome,
ignore_env_exits: bool,
session_queue: SessionActionQueue,
) -> None:
Expand Down Expand Up @@ -404,11 +397,11 @@ def test_ignore_env_exits(
# WHEN
session_queue.cancel_all(
message=message,
cancel_outcome=cancel_outcome,
ignore_env_exits=ignore_env_exits,
)

# THEN
cancel_outcome = "NEVER_ATTEMPTED"
if ignore_env_exits:
cancel_mock.assert_called_once()
cancel_mock.assert_any_call(
Expand Down
10 changes: 1 addition & 9 deletions test/unit/sessions/test_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -1197,9 +1197,7 @@ def test_timeout_messaging(
session.update_action(status)

# THEN
mock_cancel_all.assert_called_once_with(
cancel_outcome="NEVER_ATTEMPTED", message=ANY, ignore_env_exits=True
)
mock_cancel_all.assert_called_once_with(message=ANY, ignore_env_exits=True)
assert "TIMEOUT" in mock_cancel_all.call_args.kwargs["message"]
mock_report_action_update.assert_called_once()
session_status = mock_report_action_update.call_args.args[0]
Expand Down Expand Up @@ -1277,7 +1275,6 @@ def test_failed_enter_env(
# THEN
mock_report_action_update.assert_called_once_with(expected_action_update)
queue_cancel_all.assert_called_once_with(
cancel_outcome="NEVER_ATTEMPTED",
message=expected_next_action_message,
ignore_env_exits=True,
)
Expand Down Expand Up @@ -1345,7 +1342,6 @@ def test_failed_task_run(
# THEN
mock_report_action_update.assert_called_once_with(expected_action_update)
queue_cancel_all.assert_called_once_with(
cancel_outcome="NEVER_ATTEMPTED",
message=expected_next_action_message,
ignore_env_exits=True,
)
Expand Down Expand Up @@ -1497,7 +1493,6 @@ def mock_now(*arg, **kwarg) -> datetime:
# THEN
mock_report_action_update.assert_called_once_with(expected_action_update)
queue_cancel_all.assert_called_once_with(
cancel_outcome="NEVER_ATTEMPTED",
message=expected_fail_action_status.fail_message,
ignore_env_exits=True,
)
Expand Down Expand Up @@ -1825,7 +1820,6 @@ def test_calls_queue_cancel_all(

# THEN
mock_queue_cancel_all.assert_called_once_with(
cancel_outcome="NEVER_ATTEMPTED",
message=session._stop_fail_message,
)

Expand Down Expand Up @@ -1929,7 +1923,6 @@ def test_initial_action_exception(
)
mock_queue_cancel_all.assert_called_once_with(
message=f"Error starting prior action {run_step_task_action.id}",
cancel_outcome="FAILED",
ignore_env_exits=True,
)
assert session._current_action is None
Expand Down Expand Up @@ -2001,7 +1994,6 @@ def test_run_exception(
)
mock_queue_cancel_all.assert_called_once_with(
message=f"Error starting prior action {run_step_task_action.id}",
cancel_outcome="FAILED",
ignore_env_exits=True,
)
assert session._current_action is None
Expand Down

0 comments on commit d266c0f

Please sign in to comment.