Skip to content

Commit

Permalink
[refactor] (Unresolved,ResolvedFromDynamic)StepHandle.solid_handle ->…
Browse files Browse the repository at this point in the history
… node_handle (#12387)

### Summary & Motivation

- Rename `{Unresolved,ResolvedFromDynamic}StepHandle.solid_handle` ->
`node_handle`

### How I Tested These Changes

BK
  • Loading branch information
smackesey authored Mar 20, 2023
1 parent ba17f91 commit 3661308
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 25 deletions.
37 changes: 20 additions & 17 deletions python_modules/dagster/dagster/_core/execution/plan/handle.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,17 @@
from dagster._serdes import whitelist_for_serdes


@whitelist_for_serdes
class StepHandle(NamedTuple("_StepHandle", [("solid_handle", NodeHandle), ("key", str)])):
# Serialize node_handle -> solid_handle for backcompat
@whitelist_for_serdes(storage_field_names={"node_handle": "solid_handle"})
class StepHandle(NamedTuple("_StepHandle", [("node_handle", NodeHandle), ("key", str)])):
"""A reference to an ExecutionStep that was determined statically."""

def __new__(cls, solid_handle: NodeHandle, key: Optional[str] = None):
def __new__(cls, node_handle: NodeHandle, key: Optional[str] = None):
return super(StepHandle, cls).__new__(
cls,
solid_handle=check.inst_param(solid_handle, "solid_handle", NodeHandle),
node_handle=check.inst_param(node_handle, "node_handle", NodeHandle),
# mypy can't tell that if default is set, this is guaranteed to be a str
key=cast(str, check.opt_str_param(key, "key", default=solid_handle.to_string())),
key=cast(str, check.opt_str_param(key, "key", default=node_handle.to_string())),
)

def to_key(self) -> str:
Expand All @@ -38,45 +39,47 @@ def parse_from_key(
return StepHandle(NodeHandle.from_string(string))


@whitelist_for_serdes
class UnresolvedStepHandle(NamedTuple("_UnresolvedStepHandle", [("solid_handle", NodeHandle)])):
# Serialize node_handle -> solid_handle for backcompat
@whitelist_for_serdes(storage_field_names={"node_handle": "solid_handle"})
class UnresolvedStepHandle(NamedTuple("_UnresolvedStepHandle", [("node_handle", NodeHandle)])):
"""A reference to an UnresolvedMappedExecutionStep in an execution."""

def __new__(cls, solid_handle: NodeHandle):
def __new__(cls, node_handle: NodeHandle):
return super(UnresolvedStepHandle, cls).__new__(
cls,
solid_handle=check.inst_param(solid_handle, "solid_handle", NodeHandle),
node_handle=check.inst_param(node_handle, "node_handle", NodeHandle),
)

def to_key(self):
return f"{self.solid_handle.to_string()}[?]"
return f"{self.node_handle.to_string()}[?]"

def resolve(self, map_key) -> "ResolvedFromDynamicStepHandle":
return ResolvedFromDynamicStepHandle(self.solid_handle, map_key)
return ResolvedFromDynamicStepHandle(self.node_handle, map_key)


@whitelist_for_serdes
# Serialize node_handle -> solid_handle for backcompat
@whitelist_for_serdes(storage_field_names={"node_handle": "solid_handle"})
class ResolvedFromDynamicStepHandle(
NamedTuple(
"_ResolvedFromDynamicStepHandle",
[("solid_handle", NodeHandle), ("mapping_key", str), ("key", str)],
[("node_handle", NodeHandle), ("mapping_key", str), ("key", str)],
)
):
"""A reference to an ExecutionStep that came from resolving an UnresolvedMappedExecutionStep
(and associated UnresolvedStepHandle) downstream of a dynamic output after it has
completed successfully.
"""

def __new__(cls, solid_handle: NodeHandle, mapping_key: str, key: Optional[str] = None):
def __new__(cls, node_handle: NodeHandle, mapping_key: str, key: Optional[str] = None):
return super(ResolvedFromDynamicStepHandle, cls).__new__(
cls,
solid_handle=check.inst_param(solid_handle, "solid_handle", NodeHandle),
node_handle=check.inst_param(node_handle, "node_handle", NodeHandle),
mapping_key=check.str_param(mapping_key, "mapping_key"),
# mypy can't tell that if default is set, this is guaranteed to be a str
key=cast(
str,
check.opt_str_param(
key, "key", default=f"{solid_handle.to_string()}[{mapping_key}]"
key, "key", default=f"{node_handle.to_string()}[{mapping_key}]"
),
),
)
Expand All @@ -86,4 +89,4 @@ def to_key(self) -> str:

@property
def unresolved_form(self) -> UnresolvedStepHandle:
return UnresolvedStepHandle(solid_handle=self.solid_handle)
return UnresolvedStepHandle(node_handle=self.node_handle)
6 changes: 3 additions & 3 deletions python_modules/dagster/dagster/_core/execution/plan/plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -338,7 +338,7 @@ def _build_from_sorted_nodes(

elif has_unresolved_input:
new_step = UnresolvedMappedExecutionStep(
handle=UnresolvedStepHandle(solid_handle=handle),
handle=UnresolvedStepHandle(node_handle=handle),
pipeline_name=self.pipeline_name,
step_inputs=cast(
List[Union[StepInput, UnresolvedMappedStepInput]], step_inputs
Expand All @@ -348,7 +348,7 @@ def _build_from_sorted_nodes(
)
elif has_pending_input:
new_step = UnresolvedCollectExecutionStep(
handle=StepHandle(solid_handle=handle),
handle=StepHandle(node_handle=handle),
pipeline_name=self.pipeline_name,
step_inputs=cast(
List[Union[StepInput, UnresolvedCollectStepInput]], step_inputs
Expand All @@ -358,7 +358,7 @@ def _build_from_sorted_nodes(
)
else:
new_step = ExecutionStep(
handle=StepHandle(solid_handle=handle),
handle=StepHandle(node_handle=handle),
pipeline_name=self.pipeline_name,
step_inputs=cast(List[StepInput], step_inputs),
step_outputs=step_outputs,
Expand Down
10 changes: 5 additions & 5 deletions python_modules/dagster/dagster/_core/execution/plan/step.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ def __new__(
{
"step_key": handle.to_key(),
"pipeline_name": pipeline_name,
"solid_name": handle.solid_handle.name,
"solid_name": handle.node_handle.name,
},
check.opt_mapping_param(logging_tags, "logging_tags"),
),
Expand All @@ -169,7 +169,7 @@ def __new__(

@property
def node_handle(self) -> "NodeHandle":
return self.handle.solid_handle
return self.handle.node_handle

@property
def solid_name(self) -> str:
Expand Down Expand Up @@ -259,7 +259,7 @@ def __new__(

@property
def node_handle(self) -> "NodeHandle":
return self.handle.solid_handle
return self.handle.node_handle

@property
def key(self) -> str:
Expand Down Expand Up @@ -354,7 +354,7 @@ def resolve(

execution_steps.append(
ExecutionStep(
handle=ResolvedFromDynamicStepHandle(self.handle.solid_handle, mapped_key),
handle=ResolvedFromDynamicStepHandle(self.handle.node_handle, mapped_key),
pipeline_name=self.pipeline_name,
step_inputs=resolved_inputs,
step_outputs=self.step_outputs,
Expand Down Expand Up @@ -418,7 +418,7 @@ def __new__(

@property
def node_handle(self) -> "NodeHandle":
return self.handle.solid_handle
return self.handle.node_handle

@property
def key(self) -> str:
Expand Down

0 comments on commit 3661308

Please sign in to comment.