Skip to content

Commit

Permalink
fix: handle case where BatchGetJobEntity returns no jobRunAsUser (#293)
Browse files Browse the repository at this point in the history
Problem:

The jobRunAsUser response field of the JobDetailsEntity structure within
a BatchGetJobEntity request is an optional field. This Worker Agent
currently treats it as a required field; resulting in failed Sessions
when the field is absent.

Solution:

Handle the case of missing the jobRunAsUser field in the response. It
is a service invariant that this field is always be present when the
Worker is within a CMF. When the Worker is running within an SMF, then
the field may be optional; but, further, in this case the Agent will
always be running with a local jobRunAsUser override. We fail Sessions
when this invariant is violated.

Signed-off-by: Daniel Neilson <53624638+ddneilson@users.noreply.github.com>
  • Loading branch information
ddneilson committed Apr 10, 2024
1 parent a6d55e3 commit 616e16c
Show file tree
Hide file tree
Showing 7 changed files with 320 additions and 233 deletions.
4 changes: 2 additions & 2 deletions src/deadline_worker_agent/api_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -253,14 +253,14 @@ class WindowsUser(TypedDict):
class JobRunAsUser(TypedDict):
posix: NotRequired[PosixUser]
windows: NotRequired[WindowsUser]
runAs: NotRequired[Literal["QUEUE_CONFIGURED_USER", "WORKER_AGENT_USER"]]
runAs: Literal["QUEUE_CONFIGURED_USER", "WORKER_AGENT_USER"]


class JobDetailsData(JobDetailsIdentifierFields):
jobAttachmentSettings: NotRequired[JobAttachmentQueueSettings]
"""The queue's job attachment settings"""

jobRunAsUser: JobRunAsUser
jobRunAsUser: NotRequired[JobRunAsUser]
"""The queue's info on how to run the job processes (ie. posix or windows user/group)"""

logGroupName: str
Expand Down
1 change: 1 addition & 0 deletions src/deadline_worker_agent/boto/shim.py
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,7 @@ def batch_get_job_entity(
},
"logGroupName": "/aws/deadline/queue-abc",
"jobRunAsUser": {
"runAs": "QUEUE_CONFIGURED_USER",
"posix": {
"user": "job-user",
"group": "job-group",
Expand Down
147 changes: 108 additions & 39 deletions src/deadline_worker_agent/scheduler/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
LogProvisioningError,
SessionLogConfigurationParameters,
)
from ..sessions.job_entities.job_details import JobRunAsUser
from ..api_models import (
AssignedSession,
UpdateWorkerScheduleResponse,
Expand Down Expand Up @@ -668,6 +669,98 @@ def _fail_all_actions(
)
self._wakeup.set()

@staticmethod
def _determine_user_for_session(
*,
host_is_posix: bool,
job_run_as_user: Optional[JobRunAsUser],
job_run_as_user_override: JobsRunAsUserOverride,
queue_id: str,
job_id: str,
session_id: str,
) -> Optional[SessionUser]:
# Called only in self._create_new_sessions() to determine what os_user the Session should
# run as.
# Raises a ValueError if an impossible situation arises and we need to fail the Session.
os_user: Optional[SessionUser] = None
if not job_run_as_user_override.run_as_agent:
if job_run_as_user_override.job_user is not None:
os_user = job_run_as_user_override.job_user
logger.info(
SessionLogEvent(
subtype=SessionLogEventSubtype.USER,
queue_id=queue_id,
job_id=job_id,
session_id=session_id,
user=os_user.user,
message="Running as host-configured override user.",
)
)
elif job_run_as_user is None:
# Terminal error. We need to fail the Session.
# This should *never* happen; it occuring would mean that a service invariant has
# been violated.
message = (
"FATAL: Queue does not have a jobRunAsUser. This should not be possible. "
"Please report this to the service team."
)
raise ValueError(message)
elif not job_run_as_user.is_worker_agent_user:
# If we do not have a job-user override & we're not explicitly running
# as the agent's user, then we *MUST* have a jobRunAsUser from the JobDetails.
# Reasons:
# 1) The service always allows service-managed Fleets to associate with
# a Queue. The SMF Worker Agent is *always* run with a local user override.
# 2) The service only allows a customer-managed Fleet to associate with a
# Queue if either:
# a) The jobRunAsUser is explicitly set to WORKER_AGENT_USER; or
# b) The jobRunAsUser is explicitly set to QUEUE_CONFIGURED_USER and
# a user has been defined for the CMF's OS Platform (Linux/MacOS Fleets must
# have a "posix" user; and Windows Fleets must have a "windows" user)
# 3) The service does not allow a Queue's jobRunAsUser to be updated if the constraint
# imposed by (2) above would be violated for one or more of that Queue's current QFAs.
if host_is_posix:
os_user = job_run_as_user.posix
else:
os_user = job_run_as_user.windows
if os_user is None:
# Terminal error. We need to fail the Session.
# This should *never* happen; it occuring would mean that a service invariant has
# been violated.
message = (
"FATAL: Queue's jobRunAsUser does not define a QUEUE_CONFIGURED_USER for this platform. "
"Please report this to the service team."
)
raise ValueError(message)
else:
logger.info(
SessionLogEvent(
subtype=SessionLogEventSubtype.USER,
queue_id=queue_id,
job_id=job_id,
session_id=session_id,
user=os_user.user,
message="Running as Queue's jobRunAsUser.",
)
)
if os_user is None:
try:
user_to_log = getpass.getuser()
except Exception:
# This is best-effort. If we cannot determine the user we will not log
user_to_log = "UNKNOWN"
logger.warning(
SessionLogEvent(
subtype=SessionLogEventSubtype.USER,
queue_id=queue_id,
job_id=job_id,
session_id=session_id,
user=user_to_log,
message="Running as the Worker Agent's user. This configuration is not recommended; please see the Security chapter of the User Guide.",
)
)
return os_user

def _create_new_sessions(
self,
*,
Expand Down Expand Up @@ -842,52 +935,28 @@ def _create_new_sessions(
queue.replace(actions=session_spec["sessionActions"])

os_user: Optional[SessionUser] = None
if not self._job_run_as_user_override.run_as_agent:
if self._job_run_as_user_override.job_user is not None:
os_user = self._job_run_as_user_override.job_user
logger.info(
SessionLogEvent(
subtype=SessionLogEventSubtype.USER,
queue_id=queue_id,
job_id=job_id,
session_id=new_session_id,
user=os_user.user,
message="Running as host-configured override user.",
)
)
elif job_details.job_run_as_user:
if os.name == "posix":
os_user = job_details.job_run_as_user.posix
else:
os_user = job_details.job_run_as_user.windows
if os_user is not None:
logger.info(
SessionLogEvent(
subtype=SessionLogEventSubtype.USER,
queue_id=queue_id,
job_id=job_id,
session_id=new_session_id,
user=os_user.user,
message="Running as Queue's jobRunAsUser.",
)
)

if os_user is None:
try:
user_to_log = getpass.getuser()
except Exception:
# This is best-effort. If we cannot determine the user we will not log
user_to_log = "UNKNOWN"
logger.warning(
try:
os_user = self._determine_user_for_session(
host_is_posix=os.name == "posix",
job_run_as_user=job_details.job_run_as_user,
job_run_as_user_override=self._job_run_as_user_override,
queue_id=queue_id,
job_id=job_id,
session_id=new_session_id,
)
except ValueError as e:
message = str(e)
self._fail_all_actions(session_spec, message)
logger.error(
SessionLogEvent(
subtype=SessionLogEventSubtype.USER,
queue_id=queue_id,
job_id=job_id,
session_id=new_session_id,
user=user_to_log,
message="Running as the Worker Agent's user.",
message=message,
)
)
continue

queue_credentials: QueueAwsCredentials | None = None
asset_sync: AssetSync | None = None
Expand Down
51 changes: 29 additions & 22 deletions src/deadline_worker_agent/sessions/job_entities/job_details.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,33 +91,39 @@ def job_run_as_user_api_model_to_worker_agent(
"""Converts the 'JobRunAsUser' api model to the 'JobRunAsUser' dataclass
expected by the Worker Agent.
"""
if "runAs" in job_run_as_user_data and job_run_as_user_data["runAs"] == "WORKER_AGENT_USER":
# Only two options for "runAs": WORKER_AGENT_USER & QUEUE_CONFIGURED_USER
if job_run_as_user_data["runAs"] == "WORKER_AGENT_USER":
job_run_as_user = JobRunAsUser(is_worker_agent_user=True)
return job_run_as_user

# We have a QUEUE_CONFIGURED_USER, so extract the data for this platform.
if os.name == "posix":
user = ""
group = ""
if job_run_as_user_posix := job_run_as_user_data.get("posix", None):
user = job_run_as_user_posix["user"]
group = job_run_as_user_posix["group"]
else:
job_run_as_user_posix = job_run_as_user_data.get("posix", None)
if job_run_as_user_posix is None:
# Note: This may happen in an SMF case as follows:
# Customer has a Windows-based CMF, and configures the QUEUE_CONFIGURED_USER
# for Windows, but also connects the Queue to a posix-based SMF.
# So, this would mean that we're in a posix-based SMF. Return None; the agent
# must have been started with a jobRunAsUser override.
return None

if "runAs" not in job_run_as_user_data and not group and not user:
return None
job_run_as_user = JobRunAsUser(
posix=PosixSessionUser(
user=user,
group=group,
user=job_run_as_user_posix["user"],
group=job_run_as_user_posix["group"],
),
)
else:
job_run_as_user_windows = job_run_as_user_data.get("windows", {})
user = job_run_as_user_windows.get("user", "")
passwordArn = job_run_as_user_windows.get("passwordArn", "")
if not (user and passwordArn):
job_run_as_user_windows = job_run_as_user_data.get("windows", None)
if job_run_as_user_windows is None:
# Note: This may happen in an SMF case as follows:
# Customer has a posix-based CMF, and configures the QUEUE_CONFIGURED_USER
# for posix, but also connects the Queue to a Windows-based SMF.
# So, this would mean that we're in a Windows-based SMF. Return None; the agent
# must have been started with a jobRunAsUser override.
return None
user = job_run_as_user_windows["user"]
passwordArn = job_run_as_user_windows["passwordArn"]
job_run_as_user = JobRunAsUser(
windows_settings=JobRunAsWindowsUser(user=user, passwordArn=passwordArn),
)
Expand Down Expand Up @@ -238,9 +244,11 @@ def from_boto(cls, job_details_data: JobDetailsData) -> JobDetails:
if job_attachment_settings_boto := job_details_data.get("jobAttachmentSettings", None):
job_attachment_settings = JobAttachmentSettings.from_boto(job_attachment_settings_boto)

job_run_as_user_data = job_details_data["jobRunAsUser"]
job_run_as_user: JobRunAsUser | None = job_run_as_user_api_model_to_worker_agent(
job_run_as_user_data
job_run_as_user_data = job_details_data.get("jobRunAsUser", None)
job_run_as_user: JobRunAsUser | None = (
job_run_as_user_api_model_to_worker_agent(job_run_as_user_data)
if job_run_as_user_data is not None
else None
)

# Note: Record the empty string as a None as well.
Expand Down Expand Up @@ -290,7 +298,6 @@ def validate_entity_data(cls, entity_data: dict[str, Any]) -> JobDetailsData:
Field(key="jobId", expected_type=str, required=True),
Field(key="logGroupName", expected_type=str, required=True),
Field(key="schemaVersion", expected_type=str, required=True),
Field(key="osUser", expected_type=str, required=False),
Field(
key="parameters",
expected_type=dict,
Expand All @@ -304,7 +311,7 @@ def validate_entity_data(cls, entity_data: dict[str, Any]) -> JobDetailsData:
Field(
key="jobRunAsUser",
expected_type=dict,
required=True,
required=False,
fields=(
Field(
key="posix",
Expand All @@ -318,7 +325,7 @@ def validate_entity_data(cls, entity_data: dict[str, Any]) -> JobDetailsData:
Field(
key="runAs",
expected_type=str,
required=False,
required=True,
),
Field(
key="windows",
Expand Down Expand Up @@ -367,7 +374,7 @@ def validate_entity_data(cls, entity_data: dict[str, Any]) -> JobDetailsData:
)

# Validate jobRunAsUser -> runAs is one of ("QUEUE_CONFIGURED_USER" / "WORKER_AGENT_USER")
if run_as_value := entity_data["jobRunAsUser"].get("runAs", None):
if run_as_value := entity_data.get("jobRunAsUser", dict()).get("runAs", None):
if run_as_value not in ("QUEUE_CONFIGURED_USER", "WORKER_AGENT_USER"):
raise ValueError(
f'Expected "jobRunAs" -> "runAs" to be one of "QUEUE_CONFIGURED_USER", "WORKER_AGENT_USER" but got "{run_as_value}"'
Expand Down
Loading

0 comments on commit 616e16c

Please sign in to comment.