Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add overwrite_cache option the to calls of remote and local executions #1375

Merged
merged 2 commits into from
Dec 20, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions flytekit/models/execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ def __init__(
raw_output_data_config=None,
max_parallelism=None,
security_context: typing.Optional[security.SecurityContext] = None,
overwrite_cache: bool = None,
):
"""
:param flytekit.models.core.identifier.Identifier launch_plan: Launch plan unique identifier to execute
Expand All @@ -200,6 +201,7 @@ def __init__(
self._raw_output_data_config = raw_output_data_config
self._max_parallelism = max_parallelism
self._security_context = security_context
self.overwrite_cache = overwrite_cache

@property
def launch_plan(self):
Expand Down Expand Up @@ -283,6 +285,7 @@ def to_flyte_idl(self):
else None,
max_parallelism=self.max_parallelism,
security_context=self.security_context.to_flyte_idl() if self.security_context else None,
overwrite_cache=self.overwrite_cache,
)

@classmethod
Expand All @@ -306,6 +309,7 @@ def from_flyte_idl(cls, p):
security_context=security.SecurityContext.from_flyte_idl(p.security_context)
if p.security_context
else None,
overwrite_cache=p.overwrite_cache,
)


Expand Down
27 changes: 27 additions & 0 deletions flytekit/remote/remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -744,6 +744,7 @@ def _execute(
options: typing.Optional[Options] = None,
wait: bool = False,
type_hints: typing.Optional[typing.Dict[str, typing.Type]] = None,
overwrite_cache: bool = None,
) -> FlyteWorkflowExecution:
"""Common method for execution across all entities.

Expand All @@ -755,6 +756,9 @@ def _execute(
:param wait: if True, waits for execution to complete
:param type_hints: map of python types to inputs so that the TypeEngine knows how to convert the input values
into Flyte Literals.
:param overwrite_cache: Allows for all cached values of a workflow and its tasks to be overwritten
for a single execution. If enabled, all calculations are performed even if cached results would
be available, overwriting the stored data once execution finishes successfully.
:returns: :class:`~flytekit.remote.workflow_execution.FlyteWorkflowExecution`
"""
execution_name = execution_name or "f" + uuid.uuid4().hex[:19]
Expand Down Expand Up @@ -810,6 +814,7 @@ def _execute(
"placeholder", # Admin replaces this from oidc token if auth is enabled.
0,
),
overwrite_cache=overwrite_cache,
notifications=notifications,
disable_all=options.disable_notifications,
labels=options.labels,
Expand Down Expand Up @@ -873,6 +878,7 @@ def execute(
options: typing.Optional[Options] = None,
wait: bool = False,
type_hints: typing.Optional[typing.Dict[str, typing.Type]] = None,
overwrite_cache: bool = None,
) -> FlyteWorkflowExecution:
"""
Execute a task, workflow, or launchplan, either something that's been declared locally, or a fetched entity.
Expand Down Expand Up @@ -906,6 +912,9 @@ def execute(
using the type engine, and then to ``type(v)``. Providing the correct Python types is particularly important
if the inputs are containers like lists or maps, or if the Python type is one of the more complex Flyte
provided classes (like a StructuredDataset that's annotated with columns).
:param overwrite_cache: Allows for all cached values of a workflow and its tasks to be overwritten
for a single execution. If enabled, all calculations are performed even if cached results would
be available, overwriting the stored data once execution finishes successfully.

.. note:

Expand All @@ -924,6 +933,7 @@ def execute(
options=options,
wait=wait,
type_hints=type_hints,
overwrite_cache=overwrite_cache,
)
if isinstance(entity, FlyteWorkflow):
return self.execute_remote_wf(
Expand All @@ -935,6 +945,7 @@ def execute(
options=options,
wait=wait,
type_hints=type_hints,
overwrite_cache=overwrite_cache,
)
if isinstance(entity, PythonTask):
return self.execute_local_task(
Expand All @@ -947,6 +958,7 @@ def execute(
execution_name=execution_name,
image_config=image_config,
wait=wait,
overwrite_cache=overwrite_cache,
)
if isinstance(entity, WorkflowBase):
return self.execute_local_workflow(
Expand All @@ -960,6 +972,7 @@ def execute(
image_config=image_config,
options=options,
wait=wait,
overwrite_cache=overwrite_cache,
)
if isinstance(entity, LaunchPlan):
return self.execute_local_launch_plan(
Expand All @@ -971,6 +984,7 @@ def execute(
execution_name=execution_name,
options=options,
wait=wait,
overwrite_cache=overwrite_cache,
)
raise NotImplementedError(f"entity type {type(entity)} not recognized for execution")

Expand All @@ -987,6 +1001,7 @@ def execute_remote_task_lp(
options: typing.Optional[Options] = None,
wait: bool = False,
type_hints: typing.Optional[typing.Dict[str, typing.Type]] = None,
overwrite_cache: bool = None,
) -> FlyteWorkflowExecution:
"""Execute a FlyteTask, or FlyteLaunchplan.

Expand All @@ -1001,6 +1016,7 @@ def execute_remote_task_lp(
wait=wait,
options=options,
type_hints=type_hints,
overwrite_cache=overwrite_cache,
)

def execute_remote_wf(
Expand All @@ -1013,6 +1029,7 @@ def execute_remote_wf(
options: typing.Optional[Options] = None,
wait: bool = False,
type_hints: typing.Optional[typing.Dict[str, typing.Type]] = None,
overwrite_cache: bool = None,
) -> FlyteWorkflowExecution:
"""Execute a FlyteWorkflow.

Expand All @@ -1028,6 +1045,7 @@ def execute_remote_wf(
options=options,
wait=wait,
type_hints=type_hints,
overwrite_cache=overwrite_cache,
)

# Flytekit Entities
Expand All @@ -1044,6 +1062,7 @@ def execute_local_task(
execution_name: str = None,
image_config: typing.Optional[ImageConfig] = None,
wait: bool = False,
overwrite_cache: bool = None,
) -> FlyteWorkflowExecution:
"""
Execute an @task-decorated function or TaskTemplate task.
Expand All @@ -1058,6 +1077,7 @@ def execute_local_task(
:param execution_name:
:param image_config:
:param wait:
:param overwrite_cache:
:return:
"""
resolved_identifiers = self._resolve_identifier_kwargs(entity, project, domain, name, version)
Expand All @@ -1084,6 +1104,7 @@ def execute_local_task(
execution_name=execution_name,
wait=wait,
type_hints=entity.python_interface.inputs,
overwrite_cache=overwrite_cache,
)

def execute_local_workflow(
Expand All @@ -1098,6 +1119,7 @@ def execute_local_workflow(
image_config: typing.Optional[ImageConfig] = None,
options: typing.Optional[Options] = None,
wait: bool = False,
overwrite_cache: bool = None,
) -> FlyteWorkflowExecution:
"""
Execute an @workflow decorated function.
Expand All @@ -1111,6 +1133,7 @@ def execute_local_workflow(
:param image_config:
:param options:
:param wait:
:param overwrite_cache:
:return:
"""
resolved_identifiers = self._resolve_identifier_kwargs(entity, project, domain, name, version)
Expand Down Expand Up @@ -1155,6 +1178,7 @@ def execute_local_workflow(
wait=wait,
options=options,
type_hints=entity.python_interface.inputs,
overwrite_cache=overwrite_cache,
)

def execute_local_launch_plan(
Expand All @@ -1167,6 +1191,7 @@ def execute_local_launch_plan(
execution_name: typing.Optional[str] = None,
options: typing.Optional[Options] = None,
wait: bool = False,
overwrite_cache: bool = None,
) -> FlyteWorkflowExecution:
"""

Expand All @@ -1178,6 +1203,7 @@ def execute_local_launch_plan(
:param execution_name: If specified, will be used as the execution name instead of randomly generating.
:param options:
:param wait:
:param overwrite_cache:
:return:
"""
try:
Expand All @@ -1203,6 +1229,7 @@ def execute_local_launch_plan(
options=options,
wait=wait,
type_hints=entity.python_interface.inputs,
overwrite_cache=overwrite_cache,
)

###################################
Expand Down