From 2b5cf366a259ef03a180ca8ae7eac011622d861f Mon Sep 17 00:00:00 2001 From: Katrina Rogan Date: Wed, 14 Sep 2022 16:56:54 -0700 Subject: [PATCH 1/5] Add additional execution pb data to model Signed-off-by: Katrina Rogan --- flytekit/models/execution.py | 188 +++++++++++++++---- tests/flytekit/unit/models/test_execution.py | 79 +++++++- 2 files changed, 231 insertions(+), 36 deletions(-) diff --git a/flytekit/models/execution.py b/flytekit/models/execution.py index 3dbdfb8564..c16848370a 100644 --- a/flytekit/models/execution.py +++ b/flytekit/models/execution.py @@ -1,10 +1,17 @@ +from __future__ import annotations + +import datetime +import time import typing +import flyteidl import flyteidl.admin.execution_pb2 as _execution_pb2 import flyteidl.admin.node_execution_pb2 as _node_execution_pb2 import flyteidl.admin.task_execution_pb2 as _task_execution_pb2 import pytz as _pytz +from google.protobuf.timestamp_pb2 import Timestamp +import flytekit from flytekit.models import common as _common_models from flytekit.models import literals as _literals_models from flytekit.models import security @@ -13,52 +20,130 @@ from flytekit.models.node_execution import DynamicWorkflowNodeMetadata +class SystemMetadata(_common_models.FlyteIdlEntity): + def __init__(self, execution_cluster: str): + self._execution_cluster = execution_cluster + + @property + def execution_cluster(self) -> str: + return self._execution_cluster + + def to_flyte_idl(self) -> flyteidl.admin.execution_pb2.SystemMetadata: + return _execution_pb2.SystemMetadata(execution_cluster=self.execution_cluster) + + @classmethod + def from_flyte_idl(cls, pb2_object) -> SystemMetadata: + """ + :param flyteidl.admin.execution_pb2.SystemMetadata pb2_object: + :return: SystemMetadata + """ + return cls( + execution_cluster=pb2_object.execution_cluster, + ) + + class ExecutionMetadata(_common_models.FlyteIdlEntity): class ExecutionMode(object): MANUAL = 0 SCHEDULED = 1 SYSTEM = 2 - def __init__(self, mode, principal, nesting): + def __init__( + self, + mode: int, + principal: str, + nesting: int, + scheduled_at: datetime.datetime = None, + parent_node_execution: _identifier.NodeExecutionIdentifier = None, + reference_execution: _identifier.WorkflowExecutionIdentifier = None, + system_metadata: SystemMetadata = None, + ): """ - :param int mode: An enum value from ExecutionMetadata.ExecutionMode which specifies how the job started. - :param Text principal: The entity that triggered the execution - :param int nesting: An integer representing how deeply nested the workflow is (i.e. was it triggered by a parent + :param mode: An enum value from ExecutionMetadata.ExecutionMode which specifies how the job started. + :param principal: The entity that triggered the execution + :param nesting: An integer representing how deeply nested the workflow is (i.e. was it triggered by a parent workflow) + :param scheduled_at: For scheduled executions, the requested time for execution for this specific schedule invocation. + :param parent_node_execution: Which subworkflow node (if any) launched this execution + :param reference_execution: Optional, reference workflow execution related to this execution + :param system_metadata: Optional, platform-specific metadata about the execution. """ self._mode = mode self._principal = principal self._nesting = nesting + self._scheduled_at = scheduled_at + self._parent_node_execution = parent_node_execution + self._reference_execution = reference_execution + self._system_metadata = system_metadata @property - def mode(self): + def mode(self) -> int: """ An enum value from ExecutionMetadata.ExecutionMode which specifies how the job started. - :rtype: int """ return self._mode @property - def principal(self): + def principal(self) -> str: """ The entity that triggered the execution - :rtype: Text """ return self._principal @property - def nesting(self): + def nesting(self) -> int: """ An integer representing how deeply nested the workflow is (i.e. was it triggered by a parent workflow) - :rtype: int """ return self._nesting + @property + def scheduled_at(self) -> datetime.datetime: + """ + For scheduled executions, the requested time for execution for this specific schedule invocation. + """ + return self._scheduled_at + + @property + def parent_node_execution(self) -> _identifier.NodeExecutionIdentifier: + """ + Which subworkflow node (if any) launched this execution + """ + return self._parent_node_execution + + @property + def reference_execution(self) -> _identifier.WorkflowExecutionIdentifier: + """ + Optional, reference workflow execution related to this execution + """ + return self._reference_execution + + @property + def system_metadata(self) -> SystemMetadata: + """ + Optional, platform-specific metadata about the execution. + """ + return self._system_metadata + def to_flyte_idl(self): """ :rtype: flyteidl.admin.execution_pb2.ExecutionMetadata """ - return _execution_pb2.ExecutionMetadata(mode=self.mode, principal=self.principal, nesting=self.nesting) + p = _execution_pb2.ExecutionMetadata( + mode=self.mode, + principal=self.principal, + nesting=self.nesting, + parent_node_execution=self.parent_node_execution.to_flyte_idl() + if self.parent_node_execution is not None + else None, + reference_execution=self.reference_execution.to_flyte_idl() + if self.reference_execution is not None + else None, + system_metadata=self.system_metadata.to_flyte_idl() if self.system_metadata is not None else None, + ) + if self.scheduled_at is not None: + p.scheduled_at.FromDatetime(self.scheduled_at) + return p @classmethod def from_flyte_idl(cls, pb2_object): @@ -70,6 +155,10 @@ def from_flyte_idl(cls, pb2_object): mode=pb2_object.mode, principal=pb2_object.principal, nesting=pb2_object.nesting, + scheduled_at=pb2_object.scheduled_at.ToDatetime(), + parent_node_execution=_identifier.NodeExecutionIdentifier.from_flyte_idl(pb2_object.parent_node_execution), + reference_execution=_identifier.WorkflowExecutionIdentifier.from_flyte_idl(pb2_object.reference_execution), + system_metadata=SystemMetadata.from_flyte_idl(pb2_object.system_metadata), ) @@ -319,57 +408,82 @@ def from_flyte_idl(cls, pb): ) +class AbortMetadata(_common_models.FlyteIdlEntity): + def __init__(self, cause: str, principal: str): + self._cause = cause + self._principal = principal + + @property + def cause(self) -> str: + return self._cause + + @property + def principal(self) -> str: + return self._principal + + def to_flyte_idl(self) -> flyteidl.admin.execution_pb2.AbortMetadata: + return _execution_pb2.AbortMetadata(cause=self.cause, principal=self.principal) + + @classmethod + def from_flyte_idl(cls, pb2_object: flyteidl.admin.execution_pb2.AbortMetadata) -> AbortMetadata: + return cls( + cause=pb2_object.cause, + principal=pb2_object.principal, + ) + + class ExecutionClosure(_common_models.FlyteIdlEntity): - def __init__(self, phase, started_at, duration, error=None, outputs=None): + def __init__( + self, + phase: int, + started_at: datetime.datetime, + duration: datetime.timedelta, + error: flytekit.models.core.execution.ExecutionError = None, + outputs: LiteralMapBlob = None, + abort_metadata: AbortMetadata = None, + ): """ - :param int phase: From the flytekit.models.core.execution.WorkflowExecutionPhase enum - :param datetime.datetime started_at: - :param datetime.timedelta duration: Duration for which the execution has been running. - :param flytekit.models.core.execution.ExecutionError error: - :param LiteralMapBlob outputs: + :param phase: From the flytekit.models.core.execution.WorkflowExecutionPhase enum + :param started_at: + :param duration: Duration for which the execution has been running. + :param error: + :param outputs: + :param abort_metadata: Specifies metadata around an aborted workflow execution. """ self._phase = phase self._started_at = started_at self._duration = duration self._error = error self._outputs = outputs + self._abort_metadata = abort_metadata @property - def error(self): - """ - :rtype: flytekit.models.core.execution.ExecutionError - """ + def error(self) -> flytekit.models.core.execution.ExecutionError: return self._error @property - def phase(self): + def phase(self) -> int: """ From the flytekit.models.core.execution.WorkflowExecutionPhase enum - :rtype: int """ return self._phase @property - def started_at(self): - """ - :rtype: datetime.datetime - """ + def started_at(self) -> datetime.datetime: return self._started_at @property - def duration(self): - """ - :rtype: datetime.timedelta - """ + def duration(self) -> datetime.timedelta: return self._duration @property - def outputs(self): - """ - :rtype: LiteralMapBlob - """ + def outputs(self) -> LiteralMapBlob: return self._outputs + @property + def abort_metadata(self) -> AbortMetadata: + return self._abort_metadata + def to_flyte_idl(self): """ :rtype: flyteidl.admin.execution_pb2.ExecutionClosure @@ -378,6 +492,7 @@ def to_flyte_idl(self): phase=self.phase, error=self.error.to_flyte_idl() if self.error is not None else None, outputs=self.outputs.to_flyte_idl() if self.outputs is not None else None, + abort_metadata=self.abort_metadata.to_flyte_idl(), ) obj.started_at.FromDatetime(self.started_at.astimezone(_pytz.UTC).replace(tzinfo=None)) obj.duration.FromTimedelta(self.duration) @@ -395,12 +510,15 @@ def from_flyte_idl(cls, pb2_object): outputs = None if pb2_object.HasField("outputs"): outputs = LiteralMapBlob.from_flyte_idl(pb2_object.outputs) + if pb2_object.HasField("abort_metadata"): + abort_metadata = AbortMetadata.from_flyte_idl(pb2_object.abort_metadata) return cls( error=error, outputs=outputs, phase=pb2_object.phase, started_at=pb2_object.started_at.ToDatetime().replace(tzinfo=_pytz.UTC), duration=pb2_object.duration.ToTimedelta(), + abort_metadata=abort_metadata, ) diff --git a/tests/flytekit/unit/models/test_execution.py b/tests/flytekit/unit/models/test_execution.py index 3db0a8eb23..4bfac5a8e0 100644 --- a/tests/flytekit/unit/models/test_execution.py +++ b/tests/flytekit/unit/models/test_execution.py @@ -22,23 +22,27 @@ def test_execution_closure_with_output(): test_datetime = datetime.datetime(year=2022, month=1, day=1, tzinfo=pytz.UTC) test_timedelta = datetime.timedelta(seconds=10) test_outputs = _execution.LiteralMapBlob(values=_OUTPUT_MAP, uri="http://foo/") + abort_metadata = _execution.AbortMetadata(cause="cause", principal="skinner") obj = _execution.ExecutionClosure( phase=_core_exec.WorkflowExecutionPhase.SUCCEEDED, started_at=test_datetime, duration=test_timedelta, outputs=test_outputs, + abort_metadata=abort_metadata, ) assert obj.phase == _core_exec.WorkflowExecutionPhase.SUCCEEDED assert obj.started_at == test_datetime assert obj.duration == test_timedelta assert obj.outputs == test_outputs + assert obj.abort_metadata == abort_metadata obj2 = _execution.ExecutionClosure.from_flyte_idl(obj.to_flyte_idl()) assert obj2 == obj assert obj2.phase == _core_exec.WorkflowExecutionPhase.SUCCEEDED assert obj2.started_at == test_datetime assert obj2.duration == test_timedelta assert obj2.outputs == test_outputs + assert obj2.abort_metadata == abort_metadata def test_execution_closure_with_error(): @@ -66,16 +70,79 @@ def test_execution_closure_with_error(): assert obj2.error == test_error +def test_execution_closure_with_abort_metadata(): + test_datetime = datetime.datetime(year=2022, month=1, day=1, tzinfo=pytz.UTC) + test_timedelta = datetime.timedelta(seconds=10) + abort_metadata = _execution.AbortMetadata(cause="cause", principal="skinner") + + obj = _execution.ExecutionClosure( + phase=_core_exec.WorkflowExecutionPhase.SUCCEEDED, + started_at=test_datetime, + duration=test_timedelta, + abort_metadata=abort_metadata, + ) + assert obj.phase == _core_exec.WorkflowExecutionPhase.SUCCEEDED + assert obj.started_at == test_datetime + assert obj.duration == test_timedelta + assert obj.abort_metadata == abort_metadata + obj2 = _execution.ExecutionClosure.from_flyte_idl(obj.to_flyte_idl()) + assert obj2 == obj + assert obj2.phase == _core_exec.WorkflowExecutionPhase.SUCCEEDED + assert obj2.started_at == test_datetime + assert obj2.duration == test_timedelta + assert obj2.abort_metadata == abort_metadata + + +def test_system_metadata(): + obj = _execution.SystemMetadata(execution_cluster="my_cluster") + assert obj.execution_cluster == "my_cluster" + obj2 = _execution.SystemMetadata.from_flyte_idl(obj.to_flyte_idl()) + assert obj == obj2 + assert obj2.execution_cluster == "my_cluster" + + def test_execution_metadata(): - obj = _execution.ExecutionMetadata(_execution.ExecutionMetadata.ExecutionMode.MANUAL, "tester", 1) + scheduled_at = datetime.datetime.now() + system_metadata = _execution.SystemMetadata(execution_cluster="my_cluster") + parent_node_execution = _identifier.NodeExecutionIdentifier( + node_id="node_id", + execution_id=_identifier.WorkflowExecutionIdentifier( + project="project", + domain="domain", + name="parent", + ), + ) + reference_execution = _identifier.WorkflowExecutionIdentifier( + project="project", + domain="domain", + name="reference", + ) + + obj = _execution.ExecutionMetadata( + _execution.ExecutionMetadata.ExecutionMode.MANUAL, + "tester", + 1, + scheduled_at=scheduled_at, + parent_node_execution=parent_node_execution, + reference_execution=reference_execution, + system_metadata=system_metadata, + ) assert obj.mode == _execution.ExecutionMetadata.ExecutionMode.MANUAL assert obj.principal == "tester" assert obj.nesting == 1 + assert obj.scheduled_at == scheduled_at + assert obj.parent_node_execution == parent_node_execution + assert obj.reference_execution == reference_execution + assert obj.system_metadata == system_metadata obj2 = _execution.ExecutionMetadata.from_flyte_idl(obj.to_flyte_idl()) assert obj == obj2 assert obj2.mode == _execution.ExecutionMetadata.ExecutionMode.MANUAL assert obj2.principal == "tester" assert obj2.nesting == 1 + assert obj2.scheduled_at == scheduled_at + assert obj2.parent_node_execution == parent_node_execution + assert obj2.reference_execution == reference_execution + assert obj2.system_metadata == system_metadata @pytest.mark.parametrize("literal_value_pair", _parameterizers.LIST_OF_SCALAR_LITERALS_AND_PYTHON_VALUE) @@ -198,3 +265,13 @@ def test_task_execution_data_response(): assert obj2.outputs == output_blob assert obj2.full_inputs == _INPUT_MAP assert obj2.full_outputs == _OUTPUT_MAP + + +def test_abort_metadata(): + obj = _execution.AbortMetadata(cause="cause", principal="skinner") + assert obj.cause == "cause" + assert obj.principal == "skinner" + obj2 = _execution.AbortMetadata.from_flyte_idl(obj.to_flyte_idl()) + assert obj == obj2 + assert obj2.cause == "cause" + assert obj2.principal == "skinner" From 02ebadc793f6ed3647f44c523b4d8e7e227abe91 Mon Sep 17 00:00:00 2001 From: Katrina Rogan Date: Wed, 14 Sep 2022 16:59:07 -0700 Subject: [PATCH 2/5] Una mas Signed-off-by: Katrina Rogan --- flytekit/models/execution.py | 1 + 1 file changed, 1 insertion(+) diff --git a/flytekit/models/execution.py b/flytekit/models/execution.py index c16848370a..8272a90f0a 100644 --- a/flytekit/models/execution.py +++ b/flytekit/models/execution.py @@ -510,6 +510,7 @@ def from_flyte_idl(cls, pb2_object): outputs = None if pb2_object.HasField("outputs"): outputs = LiteralMapBlob.from_flyte_idl(pb2_object.outputs) + abort_metadata = None if pb2_object.HasField("abort_metadata"): abort_metadata = AbortMetadata.from_flyte_idl(pb2_object.abort_metadata) return cls( From 81d5c01466fdbcbe87b27829dc7428d97ffd778f Mon Sep 17 00:00:00 2001 From: Katrina Rogan Date: Wed, 14 Sep 2022 17:14:48 -0700 Subject: [PATCH 3/5] what i love unit tests Signed-off-by: Katrina Rogan --- flytekit/models/execution.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flytekit/models/execution.py b/flytekit/models/execution.py index 8272a90f0a..025be9b02f 100644 --- a/flytekit/models/execution.py +++ b/flytekit/models/execution.py @@ -492,7 +492,7 @@ def to_flyte_idl(self): phase=self.phase, error=self.error.to_flyte_idl() if self.error is not None else None, outputs=self.outputs.to_flyte_idl() if self.outputs is not None else None, - abort_metadata=self.abort_metadata.to_flyte_idl(), + abort_metadata=self.abort_metadata.to_flyte_idl() if self.abort_metadata is not None else None, ) obj.started_at.FromDatetime(self.started_at.astimezone(_pytz.UTC).replace(tzinfo=None)) obj.duration.FromTimedelta(self.duration) From 3ee0ed1ff6e1e4af11fceb4e5527941f4eaf6010 Mon Sep 17 00:00:00 2001 From: Katrina Rogan Date: Wed, 14 Sep 2022 17:34:30 -0700 Subject: [PATCH 4/5] amazing Signed-off-by: Katrina Rogan --- flytekit/models/execution.py | 14 ++++++++++---- tests/flytekit/unit/models/test_execution.py | 4 ---- 2 files changed, 10 insertions(+), 8 deletions(-) diff --git a/flytekit/models/execution.py b/flytekit/models/execution.py index 025be9b02f..1bbdcab371 100644 --- a/flytekit/models/execution.py +++ b/flytekit/models/execution.py @@ -155,10 +155,16 @@ def from_flyte_idl(cls, pb2_object): mode=pb2_object.mode, principal=pb2_object.principal, nesting=pb2_object.nesting, - scheduled_at=pb2_object.scheduled_at.ToDatetime(), - parent_node_execution=_identifier.NodeExecutionIdentifier.from_flyte_idl(pb2_object.parent_node_execution), - reference_execution=_identifier.WorkflowExecutionIdentifier.from_flyte_idl(pb2_object.reference_execution), - system_metadata=SystemMetadata.from_flyte_idl(pb2_object.system_metadata), + scheduled_at=pb2_object.scheduled_at.ToDatetime() if pb2_object.HasField("scheduled_at") else None, + parent_node_execution=_identifier.NodeExecutionIdentifier.from_flyte_idl(pb2_object.parent_node_execution) + if pb2_object.HasField("parent_node_execution") + else None, + reference_execution=_identifier.WorkflowExecutionIdentifier.from_flyte_idl(pb2_object.reference_execution) + if pb2_object.HasField("reference_execution") + else None, + system_metadata=SystemMetadata.from_flyte_idl(pb2_object.system_metadata) + if pb2_object.HasField("system_metadata") + else None, ) diff --git a/tests/flytekit/unit/models/test_execution.py b/tests/flytekit/unit/models/test_execution.py index 4bfac5a8e0..b327d5e9d6 100644 --- a/tests/flytekit/unit/models/test_execution.py +++ b/tests/flytekit/unit/models/test_execution.py @@ -22,27 +22,23 @@ def test_execution_closure_with_output(): test_datetime = datetime.datetime(year=2022, month=1, day=1, tzinfo=pytz.UTC) test_timedelta = datetime.timedelta(seconds=10) test_outputs = _execution.LiteralMapBlob(values=_OUTPUT_MAP, uri="http://foo/") - abort_metadata = _execution.AbortMetadata(cause="cause", principal="skinner") obj = _execution.ExecutionClosure( phase=_core_exec.WorkflowExecutionPhase.SUCCEEDED, started_at=test_datetime, duration=test_timedelta, outputs=test_outputs, - abort_metadata=abort_metadata, ) assert obj.phase == _core_exec.WorkflowExecutionPhase.SUCCEEDED assert obj.started_at == test_datetime assert obj.duration == test_timedelta assert obj.outputs == test_outputs - assert obj.abort_metadata == abort_metadata obj2 = _execution.ExecutionClosure.from_flyte_idl(obj.to_flyte_idl()) assert obj2 == obj assert obj2.phase == _core_exec.WorkflowExecutionPhase.SUCCEEDED assert obj2.started_at == test_datetime assert obj2.duration == test_timedelta assert obj2.outputs == test_outputs - assert obj2.abort_metadata == abort_metadata def test_execution_closure_with_error(): From 0118df81e40e49de6101aee9f527d64f7856a095 Mon Sep 17 00:00:00 2001 From: Katrina Rogan Date: Thu, 15 Sep 2022 14:57:39 -0700 Subject: [PATCH 5/5] lint Signed-off-by: Katrina Rogan --- flytekit/models/execution.py | 22 ++++++++-------------- 1 file changed, 8 insertions(+), 14 deletions(-) diff --git a/flytekit/models/execution.py b/flytekit/models/execution.py index 1bbdcab371..75e040891b 100644 --- a/flytekit/models/execution.py +++ b/flytekit/models/execution.py @@ -1,7 +1,6 @@ from __future__ import annotations import datetime -import time import typing import flyteidl @@ -9,7 +8,6 @@ import flyteidl.admin.node_execution_pb2 as _node_execution_pb2 import flyteidl.admin.task_execution_pb2 as _task_execution_pb2 import pytz as _pytz -from google.protobuf.timestamp_pb2 import Timestamp import flytekit from flytekit.models import common as _common_models @@ -32,11 +30,7 @@ def to_flyte_idl(self) -> flyteidl.admin.execution_pb2.SystemMetadata: return _execution_pb2.SystemMetadata(execution_cluster=self.execution_cluster) @classmethod - def from_flyte_idl(cls, pb2_object) -> SystemMetadata: - """ - :param flyteidl.admin.execution_pb2.SystemMetadata pb2_object: - :return: SystemMetadata - """ + def from_flyte_idl(cls, pb2_object: flyteidl.admin.execution_pb2.SystemMetadata) -> SystemMetadata: return cls( execution_cluster=pb2_object.execution_cluster, ) @@ -53,10 +47,10 @@ def __init__( mode: int, principal: str, nesting: int, - scheduled_at: datetime.datetime = None, - parent_node_execution: _identifier.NodeExecutionIdentifier = None, - reference_execution: _identifier.WorkflowExecutionIdentifier = None, - system_metadata: SystemMetadata = None, + scheduled_at: typing.Optional[datetime.datetime] = None, + parent_node_execution: typing.Optional[_identifier.NodeExecutionIdentifier] = None, + reference_execution: typing.Optional[_identifier.WorkflowExecutionIdentifier] = None, + system_metadata: typing.Optional[SystemMetadata] = None, ): """ :param mode: An enum value from ExecutionMetadata.ExecutionMode which specifies how the job started. @@ -444,9 +438,9 @@ def __init__( phase: int, started_at: datetime.datetime, duration: datetime.timedelta, - error: flytekit.models.core.execution.ExecutionError = None, - outputs: LiteralMapBlob = None, - abort_metadata: AbortMetadata = None, + error: typing.Optional[flytekit.models.core.execution.ExecutionError] = None, + outputs: typing.Optional[LiteralMapBlob] = None, + abort_metadata: typing.Optional[AbortMetadata] = None, ): """ :param phase: From the flytekit.models.core.execution.WorkflowExecutionPhase enum