Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add Deadline Client Lib to telemetry and add a success/fail decorator… #352

Merged
merged 1 commit into from
Jul 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 39 additions & 1 deletion src/deadline_worker_agent/aws/deadline/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import logging
from time import sleep, monotonic
from typing import Any, Dict, Optional
from typing import Any, Callable, Dict, List, Optional, TypeVar, cast
from threading import Event
from dataclasses import asdict, dataclass
import random
Expand All @@ -11,6 +11,7 @@
from botocore.exceptions import ClientError

from deadline.client.api import get_telemetry_client, TelemetryClient
from deadline.client import version as deadline_client_lib_version
from deadline.job_attachments.progress_tracker import SummaryStatistics
from openjd.model import version as openjd_model_version
from openjd.sessions import version as openjd_sessions_version
Expand Down Expand Up @@ -41,6 +42,9 @@

_logger = logging.getLogger(__name__)

# Generic function return type.
F = TypeVar("F", bound=Callable[..., Any])


class DeadlineRequestError(Exception):
"""Base class for all exceptions raised by functions in this module."""
Expand Down Expand Up @@ -799,6 +803,9 @@ def _get_deadline_telemetry_client() -> TelemetryClient:
__cached_telemetry_client.update_common_details(
{"openjd-model-version": ".".join(openjd_model_version.split(".")[:3])}
)
__cached_telemetry_client.update_common_details(
{"deadline-cloud": ".".join(deadline_client_lib_version.split(".")[:3])}
)
return __cached_telemetry_client


Expand Down Expand Up @@ -849,3 +856,34 @@ def record_sync_inputs_fail_telemetry_event(
event_type="com.amazon.rum.deadline.worker_agent.sync_inputs_failure",
event_details=details,
)


def record_success_fail_telemetry_event(**decorator_kwargs: Dict[str, Any]) -> Callable[..., F]:
"""
Decorator to try catch a function. Sends a success / fail telemetry event.
:param ** Python variable arguments. See https://docs.python.org/3/glossary.html#term-parameter
"""

def inner(function: F) -> F:
def wrapper(*args: List[Any], **kwargs: Dict[str, Any]) -> Any:
"""
Wrapper to actually try-catch
:param * Python variable argument. See https://docs.python.org/3/glossary.html#term-parameter
:param ** Python variable argument. See https://docs.python.org/3/glossary.html#term-parameter
"""
success: bool = True
try:
return function(*args, **kwargs)
except Exception as e:
success = False
raise e
finally:
event_name = decorator_kwargs.get("metric_name", function.__name__)
_get_deadline_telemetry_client().record_event(
event_type=f"com.amazon.rum.deadline.worker_agent.{event_name}",
event_details={"is_success": success},
)

return cast(F, wrapper)

return inner
5 changes: 4 additions & 1 deletion src/deadline_worker_agent/sessions/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@
from deadline.job_attachments.progress_tracker import ProgressReportMetadata, SummaryStatistics

from ..aws.deadline import (
record_success_fail_telemetry_event,
record_sync_inputs_fail_telemetry_event,
record_sync_inputs_telemetry_event,
record_sync_outputs_telemetry_event,
Expand Down Expand Up @@ -816,6 +817,7 @@ def _notifier_callback(
progress and status message are passed in by Job Attachments."""
return True

@record_success_fail_telemetry_event() # type: ignore
def sync_asset_inputs(
self,
*,
Expand Down Expand Up @@ -1084,7 +1086,7 @@ def _action_updated_impl(
self.logger.info("----------------------------------------------")
self.logger.info("Uploading output files to Job Attachments")
self.logger.info("----------------------------------------------")
future = self._executor.submit(
future: Future = self._executor.submit(
self._sync_asset_outputs,
current_action=current_action,
)
Expand Down Expand Up @@ -1203,6 +1205,7 @@ def _handle_action_update(
)
)

@record_success_fail_telemetry_event(metric_name="sync_asset_outputs") # type: ignore
leongdl marked this conversation as resolved.
Show resolved Hide resolved
def _sync_asset_outputs(
self,
*,
Expand Down
51 changes: 51 additions & 0 deletions test/unit/aws/deadline/test_client_telemetry.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@


from unittest.mock import patch, MagicMock
import pytest

import deadline_worker_agent.aws.deadline as deadline_mod
from deadline_worker_agent.aws.deadline import (
record_success_fail_telemetry_event,
record_worker_start_telemetry_event,
record_sync_inputs_telemetry_event,
record_sync_outputs_telemetry_event,
Expand Down Expand Up @@ -148,3 +150,52 @@ def test_record_uncaught_exception_telemetry_event():
exception_type="<class 'ValueError'>",
event_details={"exception_scope": "uncaught"},
)


def test_record_decorator_success():
"""Tests that recording a decorator successful metric"""
mock_telemetry_client = MagicMock()

with patch.object(deadline_mod, "_get_deadline_telemetry_client") as mock_get_telemetry_client:
mock_get_telemetry_client.return_value = mock_telemetry_client

# GIVEN
@record_success_fail_telemetry_event()
def successful():
return

# WHEN
successful() # type: ignore

# THEN
mock_telemetry_client.record_event.assert_called_with(
event_type="com.amazon.rum.deadline.worker_agent.successful",
event_details={
"is_success": True,
},
)


def test_record_decorator_fails():
"""Tests that recording a decorator failed metric"""
mock_telemetry_client = MagicMock()

with patch.object(deadline_mod, "_get_deadline_telemetry_client") as mock_get_telemetry_client:
mock_get_telemetry_client.return_value = mock_telemetry_client

# GIVEN
@record_success_fail_telemetry_event()
def fails():
raise RuntimeError("foobar")

# WHEN
with pytest.raises(RuntimeError):
fails() # type: ignore

# THEN
mock_telemetry_client.record_event.assert_called_with(
event_type="com.amazon.rum.deadline.worker_agent.fails",
event_details={
"is_success": False,
},
)
12 changes: 6 additions & 6 deletions test/unit/sessions/test_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -649,7 +649,7 @@ def test_asset_loading_method(
cancel = Event()

# WHEN
session.sync_asset_inputs(
session.sync_asset_inputs( # type: ignore
cancel=cancel,
job_attachment_details=job_attachment_details,
)
Expand Down Expand Up @@ -712,7 +712,7 @@ def test_sync_asset_inputs_with_fs_permission_settings(
)

# WHEN
session.sync_asset_inputs(
session.sync_asset_inputs( # type: ignore
cancel=cancel,
job_attachment_details=job_attachment_details,
)
Expand Down Expand Up @@ -792,7 +792,7 @@ def test_sync_asset_inputs(
# WHEN
with pytest.raises(RuntimeError) as raise_ctx:
for args in sync_asset_inputs_args_sequence:
session.sync_asset_inputs(cancel=cancel, **args) # type: ignore[arg-type]
session.sync_asset_inputs(cancel=cancel, **args) # type: ignore
# THEN
assert (
raise_ctx.value.args[0]
Expand All @@ -801,7 +801,7 @@ def test_sync_asset_inputs(
else:
# WHEN
for args in sync_asset_inputs_args_sequence:
session.sync_asset_inputs(cancel=cancel, **args) # type: ignore[arg-type]
session.sync_asset_inputs(cancel=cancel, **args) # type: ignore
# THEN
for call in mock_telemetry_event_for_sync_inputs.call_args_list:
assert call[0] == (
Expand Down Expand Up @@ -843,7 +843,7 @@ def mock_sync_inputs(on_downloading_files, *args, **kwargs):
session_mod, "record_sync_inputs_fail_telemetry_event"
) as mock_record_sync_inputs_fail_telemetry_event,
):
session.sync_asset_inputs(
session.sync_asset_inputs( # type: ignore
cancel=mock_cancel,
job_attachment_details=JobAttachmentDetails(
manifests=[],
Expand Down Expand Up @@ -937,7 +937,7 @@ def test_sync_asset_outputs(
session._job_attachment_details = job_attachment_details

# WHEN
session._sync_asset_outputs(current_action=current_action)
session._sync_asset_outputs(current_action=current_action) # type: ignore

# THEN
mock_ja_sync_outputs.assert_called_once_with(
Expand Down