Skip to content

Commit

Permalink
feat: adds data on action kind and queue length to logs (#266)
Browse files Browse the repository at this point in the history
Request:

When looking at Agent Logs it is useful to know:

1. What kind of SessionAction is starting/ending. This helps understand
   the flow of the running Session(s) just from the agent log and can be
   a useful troubleshooting tool.
2. How many actions are queued up in a Session's internal pipeline. As
   with the above, this can be a useful troubleshooting datapoint, but
   can also be used to generate some statistics on scheduling behavior.

Solution:

This adds:

1. A 'kind' field to SessionActionLogEvents with values: TaskRun,
   EnvEnter, EnvExit, and JobAttachSyncInput.
2. A 'queue_length' field to SessionLogEvents of subtype Add and Remove.

It also adds a warning message to the start of the Agent log to inform
the customer about the expectations that they can have about log
contents and formatting. This was added because not all customers will
see the README on the GitHub repository.

Signed-off-by: Daniel Neilson <53624638+ddneilson@users.noreply.github.com>
  • Loading branch information
ddneilson committed Mar 27, 2024
1 parent f3d950c commit bb10c47
Show file tree
Hide file tree
Showing 13 changed files with 118 additions and 41 deletions.
8 changes: 4 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,9 @@ Log events may also contain a `type`, `subtype`, icon (`ti`), and additional fie
| type | subtype | ti | fields | purpose |
| --- | --- | --- | --- | --- |
| None | None | None | message | A simple status message or update and its log level. These messages may change at any time and must not be relied upon for automation. |
| Action | Start | 🟢 | session_id; queue_id; job_id; action_id; message | A SessionAction has started running. |
| Action | Cancel/Interrupt | 🟨 | session_id; queue_id; job_id; action_id; message | A cancel/interrupt of a SessionAction has been initiated. |
| Action | End | 🟣 | session_id; queue_id; job_id; action_id; status; message | A SessionAction has completed running. |
| Action | Start | 🟢 | session_id; queue_id; job_id; action_id; kind; message | A SessionAction has started running. |
| Action | Cancel/Interrupt | 🟨 | session_id; queue_id; job_id; action_id; kind; message | A cancel/interrupt of a SessionAction has been initiated. |
| Action | End | 🟣 | session_id; queue_id; job_id; action_id; kind; status; message | A SessionAction has completed running. |
| AgentInfo | None | None | platform; python[interpreter,version]; agent[version,installedAt,runningAs]; depenencies | Information about the running Agent software. |
| API | Req | 📤 | operation; request_url; params; resource (optional) | A request to an AWS API. Only requests to AWS Deadline Cloud APIs contain a resource field. |
| API | Resp | 📥 | operation; params; status_code, request_id; error (optional) | A response from an AWS API request. |
Expand All @@ -77,7 +77,7 @@ Log events may also contain a `type`, `subtype`, icon (`ti`), and additional fie
| AWSCreds | Refresh | 🔑 | resource; message; role_arn (optional); expiry (optional); scheduled_time (optional) | Related to an operation for AWS Credentials. |
| Metrics | System | 📊 | many | System metrics. |
| Session | Starting/Failed/AWSCreds/Complete/Info | 🔷 | queue_id; job_id; session_id | An update or information related to a Session. |
| Session | Add/Remove | 🔷 | queue_id; job_id; session_id; action_ids | Adding or removing SessionActions in a Session. |
| Session | Add/Remove | 🔷 | queue_id; job_id; session_id; action_ids; queued_actions | Adding or removing SessionActions in a Session. |
| Session | Logs | 🔷 | queue_id; job_id; session_id; log_dest | Information regarding where the Session logs are located. |
| Session | User | 🔷 | queue_id; job_id; session_id; user | The user that a Session is running Actions as. |
| Worker | Create/Load/ID/Status/Delete | 💻 | farm_id; fleet_id; worker_id (optional); message | A notification related to a Worker resource within AWS Deadline Cloud. |
Expand Down
31 changes: 22 additions & 9 deletions src/deadline_worker_agent/log_messages.py
Original file line number Diff line number Diff line change
Expand Up @@ -396,6 +396,7 @@ class SessionLogEvent(BaseLogEvent):
user: Optional[str]
action_ids: Optional[list[str]] # for Add/Cancel
log_dest: Optional[str]
queued_action_count: Optional[int]

def __init__(
self,
Expand All @@ -408,6 +409,7 @@ def __init__(
message: str,
action_ids: Optional[list[str]] = None,
log_dest: Optional[str] = None,
queued_action_count: Optional[int] = None,
) -> None:
self.subtype = subtype.value
self.session_id = session_id
Expand All @@ -417,19 +419,17 @@ def __init__(
self.msg = message
self.action_ids = action_ids
self.log_dest = log_dest
self.queued_action_count = queued_action_count

def getMessage(self) -> str:
dd = self.asdict()
# TODO - Rearrange. Put (%(queue_id)s/%(job_id)s) after the message
if self.subtype == SessionLogEventSubtype.USER.value and self.user is not None:
fmt_str = "[%(session_id)s] %(message)s (User: %(user)s) [%(queue_id)s/%(job_id)s]"
elif (
self.subtype in (SessionLogEventSubtype.ADD.value, SessionLogEventSubtype.REMOVE.value)
and self.action_ids is not None
elif self.subtype in (
SessionLogEventSubtype.ADD.value,
SessionLogEventSubtype.REMOVE.value,
):
fmt_str = (
"[%(session_id)s] %(message)s (ActionIds: %(action_ids)s) [%(queue_id)s/%(job_id)s]"
)
fmt_str = "[%(session_id)s] %(message)s (ActionIds: %(action_ids)s) (QueuedActionCount: %(queued_action_count)s) [%(queue_id)s/%(job_id)s]"
elif self.subtype == SessionLogEventSubtype.LOGS.value and self.log_dest is not None:
fmt_str = "[%(session_id)s] %(message)s (LogDestination: %(log_dest)s) [%(queue_id)s/%(job_id)s]"
else:
Expand All @@ -446,6 +446,8 @@ def asdict(self) -> dict[str, Any]:
dd.update(user=self.user)
if self.action_ids is not None:
dd.update(action_ids=self.action_ids)
if self.queued_action_count is not None:
dd.update(queued_action_count=self.queued_action_count)
if self.log_dest is not None:
dd.update(log_dest=self.log_dest)
dd.update(queue_id=self.queue_id, job_id=self.job_id)
Expand All @@ -459,12 +461,20 @@ class SessionActionLogEventSubtype(str, Enum):
END = "End" # will have a status key


class SessionActionLogKind(str, Enum):
ENV_ENTER = "EnvEnter"
ENV_EXIT = "EnvExit"
TASK_RUN = "TaskRun"
JA_SYNC = "JobAttachSyncInput"


class SessionActionLogEvent(BaseLogEvent):
type = "Action"

queue_id: str
job_id: str
session_id: str
kind: SessionActionLogKind
action_id: str
status: Optional[str]
msg: str
Expand All @@ -476,6 +486,7 @@ def __init__(
queue_id: str,
job_id: str,
session_id: str,
action_log_kind: SessionActionLogKind,
action_id: str,
message: str,
status: Optional[str] = None,
Expand All @@ -491,6 +502,7 @@ def __init__(
self.ti = "🟣"
self.subtype = subtype.value
self.session_id = session_id
self.kind = action_log_kind
self.queue_id = queue_id
self.job_id = job_id
self.action_id = action_id
Expand All @@ -501,9 +513,9 @@ def getMessage(self) -> str:
dd = self.asdict()
# TODO - Rearrange. Put (%(queue_id)s/%(job_id)s) after the message
if self.subtype == SessionActionLogEventSubtype.END.value and self.status is not None:
fmt_str = "[%(session_id)s](%(action_id)s) %(message)s (Status: %(status)s) [%(queue_id)s/%(job_id)s]"
fmt_str = "[%(session_id)s](%(action_id)s) %(message)s (Status: %(status)s) (Kind: %(kind)s) [%(queue_id)s/%(job_id)s]"
else:
fmt_str = "[%(session_id)s](%(action_id)s) %(message)s [%(queue_id)s/%(job_id)s]"
fmt_str = "[%(session_id)s](%(action_id)s) %(message)s (Kind: %(kind)s) [%(queue_id)s/%(job_id)s]"
return self.add_exception_to_message(fmt_str % dd)

def asdict(self) -> dict[str, Any]:
Expand All @@ -512,6 +524,7 @@ def asdict(self) -> dict[str, Any]:
dd.update(
session_id=self.session_id,
action_id=self.action_id,
kind=self.kind.value,
message=self.msg,
)
if self.subtype == SessionActionLogEventSubtype.END.value and self.status is not None:
Expand Down
44 changes: 35 additions & 9 deletions src/deadline_worker_agent/scheduler/session_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
StepDetailsError,
)
from ..sessions.job_entities.job_details import parameters_from_api_response
from ..log_messages import SessionLogEvent, SessionLogEventSubtype
from ..log_messages import SessionLogEvent, SessionLogEventSubtype, SessionActionLogKind

if TYPE_CHECKING:
from ..sessions.job_entities import JobEntities
Expand Down Expand Up @@ -261,6 +261,7 @@ def cancel_all(
job_id=self._job_id,
session_id=self._session_id,
action_ids=action_ids,
queued_action_count=len(self._actions),
message="Removed SessionActions.",
)
)
Expand Down Expand Up @@ -333,6 +334,7 @@ def replace(
job_id=self._job_id,
session_id=self._session_id,
action_ids=action_ids_added,
queued_action_count=len(self._actions),
message="Appended new SessionActions.",
)
)
Expand Down Expand Up @@ -375,9 +377,23 @@ def dequeue(self) -> SessionActionDefinition | None:
environment_id=environment_id
)
except UnsupportedSchema as e:
raise JobEntityUnsupportedSchemaError(action_id, e._version)
if action_type == "ENV_ENTER":
raise JobEntityUnsupportedSchemaError(
action_id, SessionActionLogKind.ENV_ENTER, e._version
)
else:
raise JobEntityUnsupportedSchemaError(
action_id, SessionActionLogKind.ENV_EXIT, e._version
)
except (ValueError, RuntimeError) as e:
raise EnvironmentDetailsError(action_id, str(e)) from e
if action_type == "ENV_ENTER":
raise EnvironmentDetailsError(
action_id, SessionActionLogKind.ENV_ENTER, str(e)
) from e
else:
raise EnvironmentDetailsError(
action_id, SessionActionLogKind.ENV_EXIT, str(e)
) from e
if action_type == "ENV_ENTER":
next_action = EnterEnvironmentAction(
id=action_id,
Expand All @@ -398,9 +414,11 @@ def dequeue(self) -> SessionActionDefinition | None:
try:
step_details = self._job_entities.step_details(step_id=step_id)
except UnsupportedSchema as e:
raise JobEntityUnsupportedSchemaError(action_id, e._version) from e
raise JobEntityUnsupportedSchemaError(
action_id, SessionActionLogKind.TASK_RUN, e._version
) from e
except (ValueError, RuntimeError) as e:
raise StepDetailsError(action_id, str(e)) from e
raise StepDetailsError(action_id, SessionActionLogKind.TASK_RUN, str(e)) from e
task_parameters_data: dict = action_definition.get("parameters", {})
task_parameters = parameters_from_api_response(task_parameters_data)

Expand All @@ -419,9 +437,13 @@ def dequeue(self) -> SessionActionDefinition | None:
try:
job_attachment_details = self._job_entities.job_attachment_details()
except UnsupportedSchema as e:
raise JobEntityUnsupportedSchemaError(action_id, e._version) from e
raise JobEntityUnsupportedSchemaError(
action_id, SessionActionLogKind.JA_SYNC, e._version
) from e
except ValueError as e:
raise JobAttachmentDetailsError(action_id, str(e)) from e
raise JobAttachmentDetailsError(
action_id, SessionActionLogKind.JA_SYNC, str(e)
) from e
next_action = SyncInputJobAttachmentsAction(
id=action_id,
session_id=self._session_id,
Expand All @@ -437,9 +459,13 @@ def dequeue(self) -> SessionActionDefinition | None:
step_id=action_definition["stepId"],
)
except UnsupportedSchema as e:
raise JobEntityUnsupportedSchemaError(action_id, e._version) from e
raise JobEntityUnsupportedSchemaError(
action_id, SessionActionLogKind.JA_SYNC, e._version
) from e
except ValueError as e:
raise StepDetailsError(action_id, str(e)) from e
raise StepDetailsError(
action_id, SessionActionLogKind.JA_SYNC, str(e)
) from e
next_action = SyncInputJobAttachmentsAction(
id=action_id,
session_id=self._session_id,
Expand Down
13 changes: 8 additions & 5 deletions src/deadline_worker_agent/sessions/actions/action_definition.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from abc import ABC, abstractmethod

from ..session import Session
from ...log_messages import SessionActionLogKind


class SessionActionDefinition(ABC):
Expand All @@ -20,19 +21,21 @@ class SessionActionDefinition(ABC):
"""

_id: str
_action_log_kind: SessionActionLogKind

def __init__(
self,
*,
id: str,
) -> None:
def __init__(self, *, id: str, action_log_kind: SessionActionLogKind) -> None:
self._id = id
self._action_log_kind = action_log_kind

@property
def id(self) -> str:
"""The unique identifier of the SessionAction"""
return self._id

@property
def action_log_kind(self) -> SessionActionLogKind:
return self._action_log_kind

@abstractmethod
def start(
self,
Expand Down
3 changes: 2 additions & 1 deletion src/deadline_worker_agent/sessions/actions/enter_env.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from openjd.sessions import EnvironmentIdentifier

from ..job_entities import EnvironmentDetails
from ...log_messages import SessionActionLogKind
from .openjd_action import OpenjdAction

if TYPE_CHECKING:
Expand Down Expand Up @@ -40,7 +41,7 @@ def __init__(
details: EnvironmentDetails,
) -> None:
super(EnterEnvironmentAction, self).__init__(
id=id,
id=id, action_log_kind=SessionActionLogKind.ENV_ENTER
)
self._job_env_id = job_env_id
self._details = details
Expand Down
3 changes: 2 additions & 1 deletion src/deadline_worker_agent/sessions/actions/exit_env.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from concurrent.futures import Executor
from typing import TYPE_CHECKING, Any

from ...log_messages import SessionActionLogKind
from .openjd_action import OpenjdAction

if TYPE_CHECKING:
Expand All @@ -30,7 +31,7 @@ def __init__(
environment_id: str,
) -> None:
super(ExitEnvironmentAction, self).__init__(
id=id,
id=id, action_log_kind=SessionActionLogKind.ENV_EXIT
)
self._environment_id = environment_id

Expand Down
3 changes: 2 additions & 1 deletion src/deadline_worker_agent/sessions/actions/run_step_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

from openjd.model import TaskParameterSet

from ...log_messages import SessionActionLogKind
from .openjd_action import OpenjdAction

if TYPE_CHECKING:
Expand Down Expand Up @@ -45,7 +46,7 @@ def __init__(
task_parameter_values: TaskParameterSet,
) -> None:
super(RunStepTaskAction, self).__init__(
id=id,
id=id, action_log_kind=SessionActionLogKind.TASK_RUN
)
self._details = details
self.step_id = step_id
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from openjd.sessions import ActionState, ActionStatus, LOG as OPENJD_LOG

from ..session import Session
from ...log_messages import SessionActionLogKind

from .action_definition import SessionActionDefinition

Expand Down Expand Up @@ -57,7 +58,7 @@ def __init__(
step_details: Optional[StepDetails] = None,
) -> None:
super(SyncInputJobAttachmentsAction, self).__init__(
id=id,
id=id, action_log_kind=SessionActionLogKind.JA_SYNC
)
self._cancel = Event()
self._job_attachment_details = job_attachment_details
Expand Down
9 changes: 6 additions & 3 deletions src/deadline_worker_agent/sessions/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

from .._version import version

from ..log_messages import SessionActionLogKind


class CancelationError(Exception):
"""Raised when there was an error trying to cancel a session action"""
Expand All @@ -15,9 +17,10 @@ class SessionActionError(Exception):
"""Captures the action_id of an action that failed"""

action_id: str
action_log_kind: SessionActionLogKind
message: str

def __init__(self, action_id: str, message: str):
def __init__(self, action_id: str, action_log_kind: SessionActionLogKind, message: str):
super().__init__()
self.action_id = action_id
self.message = message
Expand Down Expand Up @@ -50,10 +53,10 @@ class JobEntityUnsupportedSchemaError(SessionActionError):

schema_version: str

def __init__(self, action_id: str, schema_version: str):
def __init__(self, action_id: str, action_log_kind: SessionActionLogKind, schema_version: str):
self.schema_version = schema_version
self.message = (
f"Worker Agent: {version} does not support Open Job Description Schema Version {self.schema_version}. "
f"Consider upgrading to a newer Worker Agent."
)
super().__init__(action_id=action_id, message=self.message)
super().__init__(action_id=action_id, action_log_kind=action_log_kind, message=self.message)
Loading

0 comments on commit bb10c47

Please sign in to comment.