Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Execution model fields #1164

Merged
merged 5 commits into from
Sep 15, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
189 changes: 154 additions & 35 deletions flytekit/models/execution.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,15 @@
from __future__ import annotations

import datetime
import typing

import flyteidl
import flyteidl.admin.execution_pb2 as _execution_pb2
import flyteidl.admin.node_execution_pb2 as _node_execution_pb2
import flyteidl.admin.task_execution_pb2 as _task_execution_pb2
import pytz as _pytz

import flytekit
from flytekit.models import common as _common_models
from flytekit.models import literals as _literals_models
from flytekit.models import security
Expand All @@ -13,52 +18,126 @@
from flytekit.models.node_execution import DynamicWorkflowNodeMetadata


class SystemMetadata(_common_models.FlyteIdlEntity):
def __init__(self, execution_cluster: str):
self._execution_cluster = execution_cluster

@property
def execution_cluster(self) -> str:
return self._execution_cluster

def to_flyte_idl(self) -> flyteidl.admin.execution_pb2.SystemMetadata:
return _execution_pb2.SystemMetadata(execution_cluster=self.execution_cluster)

@classmethod
def from_flyte_idl(cls, pb2_object: flyteidl.admin.execution_pb2.SystemMetadata) -> SystemMetadata:
return cls(
execution_cluster=pb2_object.execution_cluster,
)


class ExecutionMetadata(_common_models.FlyteIdlEntity):
class ExecutionMode(object):
MANUAL = 0
SCHEDULED = 1
SYSTEM = 2

def __init__(self, mode, principal, nesting):
def __init__(
self,
mode: int,
principal: str,
nesting: int,
scheduled_at: typing.Optional[datetime.datetime] = None,
parent_node_execution: typing.Optional[_identifier.NodeExecutionIdentifier] = None,
reference_execution: typing.Optional[_identifier.WorkflowExecutionIdentifier] = None,
system_metadata: typing.Optional[SystemMetadata] = None,
):
"""
:param int mode: An enum value from ExecutionMetadata.ExecutionMode which specifies how the job started.
:param Text principal: The entity that triggered the execution
:param int nesting: An integer representing how deeply nested the workflow is (i.e. was it triggered by a parent
:param mode: An enum value from ExecutionMetadata.ExecutionMode which specifies how the job started.
:param principal: The entity that triggered the execution
:param nesting: An integer representing how deeply nested the workflow is (i.e. was it triggered by a parent
workflow)
:param scheduled_at: For scheduled executions, the requested time for execution for this specific schedule invocation.
:param parent_node_execution: Which subworkflow node (if any) launched this execution
:param reference_execution: Optional, reference workflow execution related to this execution
:param system_metadata: Optional, platform-specific metadata about the execution.
"""
self._mode = mode
self._principal = principal
self._nesting = nesting
self._scheduled_at = scheduled_at
self._parent_node_execution = parent_node_execution
self._reference_execution = reference_execution
self._system_metadata = system_metadata

@property
def mode(self):
def mode(self) -> int:
"""
An enum value from ExecutionMetadata.ExecutionMode which specifies how the job started.
:rtype: int
"""
return self._mode

@property
def principal(self):
def principal(self) -> str:
"""
The entity that triggered the execution
:rtype: Text
"""
return self._principal

@property
def nesting(self):
def nesting(self) -> int:
"""
An integer representing how deeply nested the workflow is (i.e. was it triggered by a parent workflow)
:rtype: int
"""
return self._nesting

@property
def scheduled_at(self) -> datetime.datetime:
"""
For scheduled executions, the requested time for execution for this specific schedule invocation.
"""
return self._scheduled_at

@property
def parent_node_execution(self) -> _identifier.NodeExecutionIdentifier:
"""
Which subworkflow node (if any) launched this execution
"""
return self._parent_node_execution

@property
def reference_execution(self) -> _identifier.WorkflowExecutionIdentifier:
"""
Optional, reference workflow execution related to this execution
"""
return self._reference_execution

@property
def system_metadata(self) -> SystemMetadata:
"""
Optional, platform-specific metadata about the execution.
"""
return self._system_metadata

def to_flyte_idl(self):
"""
:rtype: flyteidl.admin.execution_pb2.ExecutionMetadata
"""
return _execution_pb2.ExecutionMetadata(mode=self.mode, principal=self.principal, nesting=self.nesting)
p = _execution_pb2.ExecutionMetadata(
mode=self.mode,
principal=self.principal,
nesting=self.nesting,
parent_node_execution=self.parent_node_execution.to_flyte_idl()
if self.parent_node_execution is not None
else None,
reference_execution=self.reference_execution.to_flyte_idl()
if self.reference_execution is not None
else None,
system_metadata=self.system_metadata.to_flyte_idl() if self.system_metadata is not None else None,
)
if self.scheduled_at is not None:
p.scheduled_at.FromDatetime(self.scheduled_at)
return p

@classmethod
def from_flyte_idl(cls, pb2_object):
Expand All @@ -70,6 +149,16 @@ def from_flyte_idl(cls, pb2_object):
mode=pb2_object.mode,
principal=pb2_object.principal,
nesting=pb2_object.nesting,
scheduled_at=pb2_object.scheduled_at.ToDatetime() if pb2_object.HasField("scheduled_at") else None,
parent_node_execution=_identifier.NodeExecutionIdentifier.from_flyte_idl(pb2_object.parent_node_execution)
if pb2_object.HasField("parent_node_execution")
else None,
reference_execution=_identifier.WorkflowExecutionIdentifier.from_flyte_idl(pb2_object.reference_execution)
if pb2_object.HasField("reference_execution")
else None,
system_metadata=SystemMetadata.from_flyte_idl(pb2_object.system_metadata)
if pb2_object.HasField("system_metadata")
else None,
)


Expand Down Expand Up @@ -319,57 +408,82 @@ def from_flyte_idl(cls, pb):
)


class AbortMetadata(_common_models.FlyteIdlEntity):
def __init__(self, cause: str, principal: str):
self._cause = cause
self._principal = principal

@property
def cause(self) -> str:
return self._cause

@property
def principal(self) -> str:
return self._principal

def to_flyte_idl(self) -> flyteidl.admin.execution_pb2.AbortMetadata:
return _execution_pb2.AbortMetadata(cause=self.cause, principal=self.principal)

@classmethod
def from_flyte_idl(cls, pb2_object: flyteidl.admin.execution_pb2.AbortMetadata) -> AbortMetadata:
return cls(
cause=pb2_object.cause,
principal=pb2_object.principal,
)


class ExecutionClosure(_common_models.FlyteIdlEntity):
def __init__(self, phase, started_at, duration, error=None, outputs=None):
def __init__(
self,
phase: int,
started_at: datetime.datetime,
duration: datetime.timedelta,
error: typing.Optional[flytekit.models.core.execution.ExecutionError] = None,
outputs: typing.Optional[LiteralMapBlob] = None,
abort_metadata: typing.Optional[AbortMetadata] = None,
):
"""
:param int phase: From the flytekit.models.core.execution.WorkflowExecutionPhase enum
:param datetime.datetime started_at:
:param datetime.timedelta duration: Duration for which the execution has been running.
:param flytekit.models.core.execution.ExecutionError error:
:param LiteralMapBlob outputs:
:param phase: From the flytekit.models.core.execution.WorkflowExecutionPhase enum
:param started_at:
:param duration: Duration for which the execution has been running.
:param error:
:param outputs:
:param abort_metadata: Specifies metadata around an aborted workflow execution.
"""
self._phase = phase
self._started_at = started_at
self._duration = duration
self._error = error
self._outputs = outputs
self._abort_metadata = abort_metadata

@property
def error(self):
"""
:rtype: flytekit.models.core.execution.ExecutionError
"""
def error(self) -> flytekit.models.core.execution.ExecutionError:
return self._error

@property
def phase(self):
def phase(self) -> int:
"""
From the flytekit.models.core.execution.WorkflowExecutionPhase enum
:rtype: int
"""
return self._phase

@property
def started_at(self):
"""
:rtype: datetime.datetime
"""
def started_at(self) -> datetime.datetime:
return self._started_at

@property
def duration(self):
"""
:rtype: datetime.timedelta
"""
def duration(self) -> datetime.timedelta:
return self._duration

@property
def outputs(self):
"""
:rtype: LiteralMapBlob
"""
def outputs(self) -> LiteralMapBlob:
return self._outputs

@property
def abort_metadata(self) -> AbortMetadata:
return self._abort_metadata

def to_flyte_idl(self):
"""
:rtype: flyteidl.admin.execution_pb2.ExecutionClosure
Expand All @@ -378,6 +492,7 @@ def to_flyte_idl(self):
phase=self.phase,
error=self.error.to_flyte_idl() if self.error is not None else None,
outputs=self.outputs.to_flyte_idl() if self.outputs is not None else None,
abort_metadata=self.abort_metadata.to_flyte_idl() if self.abort_metadata is not None else None,
)
obj.started_at.FromDatetime(self.started_at.astimezone(_pytz.UTC).replace(tzinfo=None))
obj.duration.FromTimedelta(self.duration)
Expand All @@ -395,12 +510,16 @@ def from_flyte_idl(cls, pb2_object):
outputs = None
if pb2_object.HasField("outputs"):
outputs = LiteralMapBlob.from_flyte_idl(pb2_object.outputs)
abort_metadata = None
if pb2_object.HasField("abort_metadata"):
abort_metadata = AbortMetadata.from_flyte_idl(pb2_object.abort_metadata)
return cls(
error=error,
outputs=outputs,
phase=pb2_object.phase,
started_at=pb2_object.started_at.ToDatetime().replace(tzinfo=_pytz.UTC),
duration=pb2_object.duration.ToTimedelta(),
abort_metadata=abort_metadata,
)


Expand Down
75 changes: 74 additions & 1 deletion tests/flytekit/unit/models/test_execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,16 +66,79 @@ def test_execution_closure_with_error():
assert obj2.error == test_error


def test_execution_closure_with_abort_metadata():
test_datetime = datetime.datetime(year=2022, month=1, day=1, tzinfo=pytz.UTC)
test_timedelta = datetime.timedelta(seconds=10)
abort_metadata = _execution.AbortMetadata(cause="cause", principal="skinner")

obj = _execution.ExecutionClosure(
phase=_core_exec.WorkflowExecutionPhase.SUCCEEDED,
started_at=test_datetime,
duration=test_timedelta,
abort_metadata=abort_metadata,
)
assert obj.phase == _core_exec.WorkflowExecutionPhase.SUCCEEDED
assert obj.started_at == test_datetime
assert obj.duration == test_timedelta
assert obj.abort_metadata == abort_metadata
obj2 = _execution.ExecutionClosure.from_flyte_idl(obj.to_flyte_idl())
assert obj2 == obj
assert obj2.phase == _core_exec.WorkflowExecutionPhase.SUCCEEDED
assert obj2.started_at == test_datetime
assert obj2.duration == test_timedelta
assert obj2.abort_metadata == abort_metadata


def test_system_metadata():
obj = _execution.SystemMetadata(execution_cluster="my_cluster")
assert obj.execution_cluster == "my_cluster"
obj2 = _execution.SystemMetadata.from_flyte_idl(obj.to_flyte_idl())
assert obj == obj2
assert obj2.execution_cluster == "my_cluster"


def test_execution_metadata():
obj = _execution.ExecutionMetadata(_execution.ExecutionMetadata.ExecutionMode.MANUAL, "tester", 1)
scheduled_at = datetime.datetime.now()
system_metadata = _execution.SystemMetadata(execution_cluster="my_cluster")
parent_node_execution = _identifier.NodeExecutionIdentifier(
node_id="node_id",
execution_id=_identifier.WorkflowExecutionIdentifier(
project="project",
domain="domain",
name="parent",
),
)
reference_execution = _identifier.WorkflowExecutionIdentifier(
project="project",
domain="domain",
name="reference",
)

obj = _execution.ExecutionMetadata(
_execution.ExecutionMetadata.ExecutionMode.MANUAL,
"tester",
1,
scheduled_at=scheduled_at,
parent_node_execution=parent_node_execution,
reference_execution=reference_execution,
system_metadata=system_metadata,
)
assert obj.mode == _execution.ExecutionMetadata.ExecutionMode.MANUAL
assert obj.principal == "tester"
assert obj.nesting == 1
assert obj.scheduled_at == scheduled_at
assert obj.parent_node_execution == parent_node_execution
assert obj.reference_execution == reference_execution
assert obj.system_metadata == system_metadata
obj2 = _execution.ExecutionMetadata.from_flyte_idl(obj.to_flyte_idl())
assert obj == obj2
assert obj2.mode == _execution.ExecutionMetadata.ExecutionMode.MANUAL
assert obj2.principal == "tester"
assert obj2.nesting == 1
assert obj2.scheduled_at == scheduled_at
assert obj2.parent_node_execution == parent_node_execution
assert obj2.reference_execution == reference_execution
assert obj2.system_metadata == system_metadata


@pytest.mark.parametrize("literal_value_pair", _parameterizers.LIST_OF_SCALAR_LITERALS_AND_PYTHON_VALUE)
Expand Down Expand Up @@ -198,3 +261,13 @@ def test_task_execution_data_response():
assert obj2.outputs == output_blob
assert obj2.full_inputs == _INPUT_MAP
assert obj2.full_outputs == _OUTPUT_MAP


def test_abort_metadata():
obj = _execution.AbortMetadata(cause="cause", principal="skinner")
assert obj.cause == "cause"
assert obj.principal == "skinner"
obj2 = _execution.AbortMetadata.from_flyte_idl(obj.to_flyte_idl())
assert obj == obj2
assert obj2.cause == "cause"
assert obj2.principal == "skinner"