Skip to content

Commit

Permalink
feat!: differentiate step actions from non-step actions in logs (#292)
Browse files Browse the repository at this point in the history
Problem:

A request was made to be able to differentiate SessionActions in the
logs that are synchronizing Job inputs to the host vs ones that are
synchronizing the outputs from dependendent Steps to the host.

Solution:

* Add the kind "JobAttachSyncDeps" to the "Action" log event type;
* Add a step_id to the "Action" log event when it is available; and
* Adds the task_id to "Action" log events if when the kind is "TaskRun".

**BREAKING CHANGE**
- The unstructured log format for log event type "Action" has been
  modified.
- The "JobAttachSyncDeps" kind of the "Action" log event type has
  been added to differentiate synchronizing Job inputs from
  synchronizing the outputs from dependencies of the running Step.

Signed-off-by: Daniel Neilson <53624638+ddneilson@users.noreply.github.com>
  • Loading branch information
ddneilson authored Apr 9, 2024
1 parent 99ec6a2 commit a6d55e3
Show file tree
Hide file tree
Showing 19 changed files with 149 additions and 51 deletions.
6 changes: 3 additions & 3 deletions docs/logging.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,9 @@ Log events may also contain a `type`, `subtype`, icon (`ti`), and additional fie
| type | subtype | ti | fields | purpose |
| --- | --- | --- | --- | --- |
| None | None | None | message | A simple status message or update and its log level. These messages may change at any time and must not be relied upon for automation. |
| Action | Start | 🟢 | session_id; queue_id; job_id; action_id; kind; message | A SessionAction has started running. |
| Action | Cancel/Interrupt | 🟨 | session_id; queue_id; job_id; action_id; kind; message | A cancel/interrupt of a SessionAction has been initiated. |
| Action | End | 🟣 | session_id; queue_id; job_id; action_id; kind; status; message | A SessionAction has completed running. |
| Action | Start | 🟢 | session_id; queue_id; job_id; action_id; kind; message; step_id (optional); task_id (optional) | A SessionAction has started running. |
| Action | Cancel/Interrupt | 🟨 | session_id; queue_id; job_id; action_id; kind; message; step_id (optional); task_id (optional) | A cancel/interrupt of a SessionAction has been initiated. |
| Action | End | 🟣 | session_id; queue_id; job_id; action_id; kind; status; message; step_id (optional); task_id (optional) | A SessionAction has completed running. |
| AgentInfo | None | None | platform; python[interpreter,version]; agent[version,installedAt,runningAs]; depenencies | Information about the running Agent software. |
| API | Req | 📤 | operation; request_url; params; resource (optional) | A request to an AWS API. Only requests to AWS Deadline Cloud APIs contain a resource field. |
| API | Resp | 📥 | operation; params; status_code, request_id; error (optional) | A response from an AWS API request. |
Expand Down
12 changes: 6 additions & 6 deletions scripts/run_posix_docker.sh
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,11 @@ if test "${OVERRIDE_JOB_USER}" == "True" && test "${USE_LDAP}" == "True"; then
exit 1
fi

if ! test -d ${HOME}/.aws/models/deadline
then
echo "ERROR: AWS Deadline Cloud service model must be installed to ~/.aws/models/deadline"
exit 1
fi
# if ! test -d ${HOME}/.aws/models/deadline
# then
# echo "ERROR: AWS Deadline Cloud service model must be installed to ~/.aws/models/deadline"
# exit 1
# fi

ARGS=""

Expand Down Expand Up @@ -110,7 +110,7 @@ do
printenv ${var} >> $TMP_ENV_FILE
done

if test "${PIP_INDEX_URL}" != ""; then
if test "${PIP_INDEX_URL:-}" != ""; then
echo "PIP_INDEX_URL=${PIP_INDEX_URL}" >> $TMP_ENV_FILE
fi

Expand Down
16 changes: 15 additions & 1 deletion scripts/submit_jobs/asset_example/template.json
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
"name": "runScript",
"type": "TEXT",
"runnable": true,
"data": "#!/usr/bin/env bash\n\nfind . -type f -exec md5sum {} \\; > computed_hashes.txt\n"
"data": "#!/usr/bin/env bash\n\nfind . -type f -exec md5sum {} \\; > {{Param.DataDir}}/computed_hashes.txt\n"
}
]
},
Expand Down Expand Up @@ -54,6 +54,20 @@
}
}
]
},
{
"name": "DepStep",
"dependencies": [
{ "dependsOn": "Hash" }
],
"script": {
"actions": {
"onRun": {
"command": "echo",
"args": ["DepStep for testing syncing dependencies!"]
}
}
}
}
]
}
26 changes: 22 additions & 4 deletions src/deadline_worker_agent/log_messages.py
Original file line number Diff line number Diff line change
Expand Up @@ -466,13 +466,16 @@ class SessionActionLogKind(str, Enum):
ENV_EXIT = "EnvExit"
TASK_RUN = "TaskRun"
JA_SYNC = "JobAttachSyncInput"
JA_DEP_SYNC = "JobAttachSyncDeps"


class SessionActionLogEvent(BaseLogEvent):
type = "Action"

queue_id: str
job_id: str
step_id: Optional[str]
task_id: Optional[str]
session_id: str
kind: SessionActionLogKind
action_id: str
Expand All @@ -485,6 +488,8 @@ def __init__(
subtype: SessionActionLogEventSubtype,
queue_id: str,
job_id: str,
step_id: Optional[str] = None,
task_id: Optional[str] = None,
session_id: str,
action_log_kind: SessionActionLogKind,
action_id: str,
Expand All @@ -505,22 +510,31 @@ def __init__(
self.kind = action_log_kind
self.queue_id = queue_id
self.job_id = job_id
self.step_id = step_id
self.task_id = task_id
self.action_id = action_id
self.msg = message
self.status = status

def getMessage(self) -> str:
dd = self.asdict()
# TODO - Rearrange. Put (%(queue_id)s/%(job_id)s) after the message
if self.step_id is None:
resource_id = "[%(queue_id)s/%(job_id)s]"
elif self.task_id is None:
resource_id = "[%(queue_id)s/%(job_id)s/%(step_id)s]"
else:
resource_id = "[%(queue_id)s/%(job_id)s/%(step_id)s/%(task_id)s]"
if self.subtype == SessionActionLogEventSubtype.END.value and self.status is not None:
fmt_str = "[%(session_id)s](%(action_id)s) %(message)s (Status: %(status)s) (Kind: %(kind)s) [%(queue_id)s/%(job_id)s]"
fmt_str = (
"[%(session_id)s](%(action_id)s) %(message)s (Status: %(status)s) (Kind: %(kind)s) "
+ resource_id
)
else:
fmt_str = "[%(session_id)s](%(action_id)s) %(message)s (Kind: %(kind)s) [%(queue_id)s/%(job_id)s]"
fmt_str = "[%(session_id)s](%(action_id)s) %(message)s (Kind: %(kind)s) " + resource_id
return self.add_exception_to_message(fmt_str % dd)

def asdict(self) -> dict[str, Any]:
dd = super().asdict()
# TODO - Rearrange. Put (%(queue_id)s/%(job_id)s) after the message
dd.update(
session_id=self.session_id,
action_id=self.action_id,
Expand All @@ -530,6 +544,10 @@ def asdict(self) -> dict[str, Any]:
if self.subtype == SessionActionLogEventSubtype.END.value and self.status is not None:
dd.update(status=self.status)
dd.update(queue_id=self.queue_id, job_id=self.job_id)
if self.step_id is not None:
dd.update(step_id=self.step_id)
if self.task_id is not None:
dd.update(task_id=self.task_id)
return self.add_exception_to_dict(dd)


Expand Down
26 changes: 21 additions & 5 deletions src/deadline_worker_agent/scheduler/session_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -411,22 +411,32 @@ def dequeue(self) -> SessionActionDefinition | None:
action_queue_entry = cast(TaskRunQueueEntry, action_queue_entry)
action_definition = action_queue_entry.definition
step_id = action_definition["stepId"]
task_id = action_definition["taskId"]
try:
step_details = self._job_entities.step_details(step_id=step_id)
except UnsupportedSchema as e:
raise JobEntityUnsupportedSchemaError(
action_id, SessionActionLogKind.TASK_RUN, e._version
action_id,
SessionActionLogKind.TASK_RUN,
e._version,
step_id=step_id,
task_id=task_id,
) from e
except (ValueError, RuntimeError) as e:
raise StepDetailsError(action_id, SessionActionLogKind.TASK_RUN, str(e)) from e
raise StepDetailsError(
action_id,
SessionActionLogKind.TASK_RUN,
str(e),
step_id=step_id,
task_id=task_id,
) from e
task_parameters_data: dict = action_definition.get("parameters", {})
task_parameters = parameters_from_api_response(task_parameters_data)

next_action = RunStepTaskAction(
id=action_id,
details=step_details,
task_parameter_values=task_parameters,
step_id=step_id,
task_id=action_definition["taskId"],
)
elif action_type == "SYNC_INPUT_JOB_ATTACHMENTS":
Expand Down Expand Up @@ -460,11 +470,17 @@ def dequeue(self) -> SessionActionDefinition | None:
)
except UnsupportedSchema as e:
raise JobEntityUnsupportedSchemaError(
action_id, SessionActionLogKind.JA_SYNC, e._version
action_id,
SessionActionLogKind.JA_DEP_SYNC,
e._version,
step_id=action_definition["stepId"],
) from e
except ValueError as e:
raise StepDetailsError(
action_id, SessionActionLogKind.JA_SYNC, str(e)
action_id,
SessionActionLogKind.JA_DEP_SYNC,
str(e),
step_id=action_definition["stepId"],
) from e
next_action = SyncInputJobAttachmentsAction(
id=action_id,
Expand Down
11 changes: 10 additions & 1 deletion src/deadline_worker_agent/sessions/actions/action_definition.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from __future__ import annotations
from concurrent.futures import Executor
from datetime import timedelta
from typing import Optional

from abc import ABC, abstractmethod

Expand All @@ -22,10 +23,14 @@ class SessionActionDefinition(ABC):

_id: str
_action_log_kind: SessionActionLogKind
_step_id: Optional[str]

def __init__(self, *, id: str, action_log_kind: SessionActionLogKind) -> None:
def __init__(
self, *, id: str, action_log_kind: SessionActionLogKind, step_id: Optional[str] = None
) -> None:
self._id = id
self._action_log_kind = action_log_kind
self._step_id = step_id

@property
def id(self) -> str:
Expand All @@ -36,6 +41,10 @@ def id(self) -> str:
def action_log_kind(self) -> SessionActionLogKind:
return self._action_log_kind

@property
def step_id(self) -> Optional[str]:
return self._step_id

@abstractmethod
def start(
self,
Expand Down
5 changes: 1 addition & 4 deletions src/deadline_worker_agent/sessions/actions/run_step_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ class RunStepTaskAction(OpenjdAction):
The task parameter values
"""

step_id: str
task_id: str
_details: StepDetails
_task_parameter_values: TaskParameterSet
Expand All @@ -40,16 +39,14 @@ def __init__(
self,
*,
id: str,
step_id: str,
details: StepDetails,
task_id: str,
task_parameter_values: TaskParameterSet,
) -> None:
super(RunStepTaskAction, self).__init__(
id=id, action_log_kind=SessionActionLogKind.TASK_RUN
id=id, action_log_kind=SessionActionLogKind.TASK_RUN, step_id=details.step_id
)
self._details = details
self.step_id = step_id
self.task_id = task_id
self._task_parameter_values = task_parameter_values

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,13 @@ def __init__(
step_details: Optional[StepDetails] = None,
) -> None:
super(SyncInputJobAttachmentsAction, self).__init__(
id=id, action_log_kind=SessionActionLogKind.JA_SYNC
id=id,
action_log_kind=(
SessionActionLogKind.JA_SYNC
if step_details is None
else SessionActionLogKind.JA_DEP_SYNC
),
step_id=step_details.step_id if step_details is not None else None,
)
self._cancel = Event()
self._job_attachment_details = job_attachment_details
Expand Down
32 changes: 29 additions & 3 deletions src/deadline_worker_agent/sessions/errors.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.

from __future__ import annotations
from typing import Optional

from .._version import version

Expand All @@ -18,11 +19,22 @@ class SessionActionError(Exception):

action_id: str
action_log_kind: SessionActionLogKind
step_id: Optional[str]
task_id: Optional[str]
message: str

def __init__(self, action_id: str, action_log_kind: SessionActionLogKind, message: str):
def __init__(
self,
action_id: str,
action_log_kind: SessionActionLogKind,
message: str,
*,
step_id: Optional[str] = None,
task_id: Optional[str] = None,
):
super().__init__()
self.action_id = action_id
self.action_log_kind = action_log_kind
self.message = message

def __str__(self) -> str:
Expand Down Expand Up @@ -53,10 +65,24 @@ class JobEntityUnsupportedSchemaError(SessionActionError):

schema_version: str

def __init__(self, action_id: str, action_log_kind: SessionActionLogKind, schema_version: str):
def __init__(
self,
action_id: str,
action_log_kind: SessionActionLogKind,
schema_version: str,
*,
step_id: Optional[str] = None,
task_id: Optional[str] = None,
):
self.schema_version = schema_version
self.message = (
f"Worker Agent: {version} does not support Open Job Description Schema Version {self.schema_version}. "
f"Consider upgrading to a newer Worker Agent."
)
super().__init__(action_id=action_id, action_log_kind=action_log_kind, message=self.message)
super().__init__(
action_id=action_id,
action_log_kind=action_log_kind,
message=self.message,
step_id=step_id,
task_id=task_id,
)
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ class StepDetails:
"""The step's Open Job Description step template.
"""

step_id: str
"""The AWS Deadline Cloud resource ID for the Step.
"""

dependencies: list[str] = field(default_factory=list)
"""The dependencies (a list of IDs) that the step depends on"""

Expand Down Expand Up @@ -73,6 +77,7 @@ def from_boto(cls, step_details_data: StepDetailsData) -> StepDetails:

return StepDetails(
step_template=step_template,
step_id=step_details_data["stepId"],
dependencies=step_details_data["dependencies"],
)

Expand Down
Loading

0 comments on commit a6d55e3

Please sign in to comment.