Skip to content

Commit

Permalink
Input through file and pipe (#2552)
Browse files Browse the repository at this point in the history
Signed-off-by: mao3267 <chenvincent610@gmail.com>

---------

Signed-off-by: mao3267 <chenvincent610@gmail.com>
Signed-off-by: Kevin Su <pingsutw@apache.org>
Signed-off-by: pryce-turner <pryce.turner@gmail.com>
Signed-off-by: ggydush <greggydush@gmail.com>
Signed-off-by: Eduardo Apolinario <eapolinario@users.noreply.github.com>
Signed-off-by: ddl-rliu <richard.liu@dominodatalab.com>
Signed-off-by: Thomas J. Fan <thomasjpfan@gmail.com>
Signed-off-by: Future-Outlier <eric901201@gmail.com>
Signed-off-by: novahow <b08902047@ntu.edu.tw>
Signed-off-by: Mecoli1219 <michaellai901026@gmail.com>
Signed-off-by: Fabio Grätz <fabiogratz@googlemail.com>
Signed-off-by: bugra.gedik <bugra.gedik@predera.ai>
Signed-off-by: Thomas Newton <thomas.w.newton@gmail.com>
Signed-off-by: Dennis Keck <26092524+fellhorn@users.noreply.github.com>
Signed-off-by: dependabot[bot] <support@github.com>
Signed-off-by: Samhita Alla <aallasamhita@gmail.com>
Signed-off-by: Peeter Piegaze <1153481+ppiegaze@users.noreply.github.com>
Signed-off-by: Felix Ruess <felix.ruess@roboception.de>
Signed-off-by: Ketan Umare <kumare3@users.noreply.github.com>
Signed-off-by: Yee Hing Tong <wild-endeavor@users.noreply.github.com>
Signed-off-by: aditya7302 <aditya7302@gmail.com>
Signed-off-by: Jan Fiedler <jan@union.ai>
Signed-off-by: JackUrb <jack@datologyai.com>
Signed-off-by: Paul Dittamo <pvdittamo@gmail.com>
Signed-off-by: Robert Deaton <robert.deaton@freenome.com>
Co-authored-by: Kevin Su <pingsutw@apache.org>
Co-authored-by: pryce-turner <31577879+pryce-turner@users.noreply.github.com>
Co-authored-by: Greg Gydush <35151789+ggydush@users.noreply.github.com>
Co-authored-by: Eduardo Apolinario <eapolinario@users.noreply.github.com>
Co-authored-by: ddl-rliu <140021987+ddl-rliu@users.noreply.github.com>
Co-authored-by: Chi-Sheng Liu <chishengliu@chishengliu.com>
Co-authored-by: Thomas J. Fan <thomasjpfan@gmail.com>
Co-authored-by: Future-Outlier <eric901201@gmail.com>
Co-authored-by: novahow <58504997+novahow@users.noreply.github.com>
Co-authored-by: Chun-Mao Lai <72752478+Mecoli1219@users.noreply.github.com>
Co-authored-by: Fabio M. Graetz, Ph.D <fabiograetz@googlemail.com>
Co-authored-by: Fabio Grätz <fabiogratz@googlemail.com>
Co-authored-by: Buğra Gedik <bgedik@gmail.com>
Co-authored-by: bugra.gedik <bugra.gedik@predera.ai>
Co-authored-by: Thomas Newton <thomas.w.newton@gmail.com>
Co-authored-by: Dennis Keck <26092524+fellhorn@users.noreply.github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Co-authored-by: Samhita Alla <aallasamhita@gmail.com>
Co-authored-by: Peeter Piegaze <1153481+ppiegaze@users.noreply.github.com>
Co-authored-by: Felix Ruess <felix.ruess@roboception.de>
Co-authored-by: Ketan Umare <16888709+kumare3@users.noreply.github.com>
Co-authored-by: Ketan Umare <kumare3@users.noreply.github.com>
Co-authored-by: Paul Dittamo <37558497+pvditt@users.noreply.github.com>
Co-authored-by: Eduardo Apolinario <653394+eapolinario@users.noreply.github.com>
Co-authored-by: Yee Hing Tong <wild-endeavor@users.noreply.github.com>
Co-authored-by: Aditya Garg <110886184+aditya7302@users.noreply.github.com>
Co-authored-by: Jan Fiedler <89976021+fiedlerNr9@users.noreply.github.com>
Co-authored-by: Jack Urbanek <Jackurbs@gmail.com>
Co-authored-by: rdeaton-freenome <134093844+rdeaton-freenome@users.noreply.github.com>
  • Loading branch information
1 parent 620a449 commit a8f68d7
Show file tree
Hide file tree
Showing 10 changed files with 349 additions and 52 deletions.
107 changes: 99 additions & 8 deletions flytekit/clis/sdk_in_container/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,13 @@
import sys
import tempfile
import typing
import typing as t
from dataclasses import dataclass, field, fields
from typing import Iterator, get_args

import rich_click as click
import yaml
from click import Context
from mashumaro.codecs.json import JSONEncoder
from rich.progress import Progress
from typing_extensions import get_origin
Expand All @@ -25,7 +28,12 @@
pretty_print_exception,
project_option,
)
from flytekit.configuration import DefaultImages, FastSerializationSettings, ImageConfig, SerializationSettings
from flytekit.configuration import (
DefaultImages,
FastSerializationSettings,
ImageConfig,
SerializationSettings,
)
from flytekit.configuration.plugin import get_plugin
from flytekit.core import context_manager
from flytekit.core.artifact import ArtifactQuery
Expand All @@ -34,14 +42,24 @@
from flytekit.core.type_engine import TypeEngine
from flytekit.core.workflow import PythonFunctionWorkflow, WorkflowBase
from flytekit.exceptions.system import FlyteSystemException
from flytekit.interaction.click_types import FlyteLiteralConverter, key_value_callback, labels_callback
from flytekit.interaction.click_types import (
FlyteLiteralConverter,
key_value_callback,
labels_callback,
)
from flytekit.interaction.string_literals import literal_string_repr
from flytekit.loggers import logger
from flytekit.models import security
from flytekit.models.common import RawOutputDataConfig
from flytekit.models.interface import Parameter, Variable
from flytekit.models.types import SimpleType
from flytekit.remote import FlyteLaunchPlan, FlyteRemote, FlyteTask, FlyteWorkflow, remote_fs
from flytekit.remote import (
FlyteLaunchPlan,
FlyteRemote,
FlyteTask,
FlyteWorkflow,
remote_fs,
)
from flytekit.remote.executions import FlyteWorkflowExecution
from flytekit.tools import module_loader
from flytekit.tools.script_mode import _find_project_root, compress_scripts, get_all_modules
Expand Down Expand Up @@ -489,7 +507,8 @@ def _update_flyte_context(params: RunLevelParams) -> FlyteContext.Builder:
return ctx.current_context().new_builder()

file_access = FileAccessProvider(
local_sandbox_dir=tempfile.mkdtemp(prefix="flyte"), raw_output_prefix=output_prefix
local_sandbox_dir=tempfile.mkdtemp(prefix="flyte"),
raw_output_prefix=output_prefix,
)

# The task might run on a remote machine if raw_output_prefix is a remote path,
Expand Down Expand Up @@ -539,7 +558,10 @@ def _run(*args, **kwargs):
entity_type = "workflow" if isinstance(entity, PythonFunctionWorkflow) else "task"
logger.debug(f"Running {entity_type} {entity.name} with input {kwargs}")

click.secho(f"Running Execution on {'Remote' if run_level_params.is_remote else 'local'}.", fg="cyan")
click.secho(
f"Running Execution on {'Remote' if run_level_params.is_remote else 'local'}.",
fg="cyan",
)
try:
inputs = {}
for input_name, v in entity.python_interface.inputs_with_defaults.items():
Expand Down Expand Up @@ -576,6 +598,8 @@ def _run(*args, **kwargs):
)
if processed_click_value is not None or optional_v:
inputs[input_name] = processed_click_value
if processed_click_value is None and v[0] == bool:
inputs[input_name] = False

if not run_level_params.is_remote:
with FlyteContextManager.with_context(_update_flyte_context(run_level_params)):
Expand Down Expand Up @@ -755,7 +779,10 @@ def list_commands(self, ctx):
run_level_params: RunLevelParams = ctx.obj
r = run_level_params.remote_instance()
progress = Progress(transient=True)
task = progress.add_task(f"[cyan]Gathering [{run_level_params.limit}] remote LaunchPlans...", total=None)
task = progress.add_task(
f"[cyan]Gathering [{run_level_params.limit}] remote LaunchPlans...",
total=None,
)
with progress:
progress.start_task(task)
try:
Expand Down Expand Up @@ -783,6 +810,70 @@ def get_command(self, ctx, name):
)


class YamlFileReadingCommand(click.RichCommand):
def __init__(
self,
name: str,
params: typing.List[click.Option],
help: str,
callback: typing.Callable = None,
):
params.append(
click.Option(
["--inputs-file"],
required=False,
type=click.Path(exists=True, dir_okay=False, resolve_path=True),
help="Path to a YAML | JSON file containing inputs for the workflow.",
)
)
super().__init__(name=name, params=params, callback=callback, help=help)

def parse_args(self, ctx: Context, args: t.List[str]) -> t.List[str]:
def load_inputs(f: str) -> t.Dict[str, str]:
try:
inputs = yaml.safe_load(f)
except yaml.YAMLError as e:
yaml_e = e
try:
inputs = json.loads(f)
except json.JSONDecodeError as e:
raise click.BadParameter(
message=f"Could not load the inputs file. Please make sure it is a valid JSON or YAML file."
f"\n json error: {e},"
f"\n yaml error: {yaml_e}",
param_hint="--inputs-file",
)

return inputs

inputs = {}
if "--inputs-file" in args:
idx = args.index("--inputs-file")
args.pop(idx)
f = args.pop(idx)
with open(f, "r") as f:
inputs = load_inputs(f.read())
elif not sys.stdin.isatty():
f = sys.stdin.read()
if f != "":
inputs = load_inputs(f)

new_args = []
for k, v in inputs.items():
if isinstance(v, str):
new_args.extend([f"--{k}", v])
elif isinstance(v, bool):
if v:
new_args.append(f"--{k}")
else:
v = json.dumps(v)
new_args.extend([f"--{k}", v])
new_args.extend(args)
args = new_args

return super().parse_args(ctx, args)


class WorkflowCommand(click.RichGroup):
"""
click multicommand at the python file layer, subcommands should be all the workflows in the file.
Expand Down Expand Up @@ -837,11 +928,11 @@ def _create_command(
h = f"{click.style(entity_type, bold=True)} ({run_level_params.computed_params.module}.{entity_name})"
if loaded_entity.__doc__:
h = h + click.style(f"{loaded_entity.__doc__}", dim=True)
cmd = click.RichCommand(
cmd = YamlFileReadingCommand(
name=entity_name,
params=params,
callback=run_command(ctx, loaded_entity),
help=h,
callback=run_command(ctx, loaded_entity),
)
return cmd

Expand Down
21 changes: 18 additions & 3 deletions flytekit/core/interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,18 @@
import sys
import typing
from collections import OrderedDict
from typing import Any, Dict, Generator, List, Optional, Tuple, Type, TypeVar, Union, cast
from typing import (
Any,
Dict,
Generator,
List,
Optional,
Tuple,
Type,
TypeVar,
Union,
cast,
)

from flyteidl.core import artifact_id_pb2 as art_id
from typing_extensions import get_args, get_type_hints
Expand Down Expand Up @@ -370,7 +381,9 @@ def transform_interface_to_list_interface(


def transform_function_to_interface(
fn: typing.Callable, docstring: Optional[Docstring] = None, is_reference_entity: bool = False
fn: typing.Callable,
docstring: Optional[Docstring] = None,
is_reference_entity: bool = False,
) -> Interface:
"""
From the annotations on a task function that the user should have provided, and the output names they want to use
Expand Down Expand Up @@ -463,7 +476,9 @@ def transform_type(x: type, description: Optional[str] = None) -> _interface_mod
if artifact_id:
logger.debug(f"Found artifact id spec: {artifact_id}")
return _interface_models.Variable(
type=TypeEngine.to_literal_type(x), description=description, artifact_partial_id=artifact_id
type=TypeEngine.to_literal_type(x),
description=description,
artifact_partial_id=artifact_id,
)


Expand Down
18 changes: 8 additions & 10 deletions flytekit/image_spec/default_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,25 +19,24 @@
)
from flytekit.tools.ignore import DockerIgnore, GitIgnore, IgnoreGroup, StandardIgnore

UV_PYTHON_INSTALL_COMMAND_TEMPLATE = Template("""\
UV_PYTHON_INSTALL_COMMAND_TEMPLATE = Template(
"""\
RUN --mount=type=cache,sharing=locked,mode=0777,target=/root/.cache/uv,id=uv \
--mount=from=uv,source=/uv,target=/usr/bin/uv \
--mount=type=bind,target=requirements_uv.txt,src=requirements_uv.txt \
/usr/bin/uv \
pip install --python /opt/micromamba/envs/runtime/bin/python $PIP_EXTRA \
--requirement requirements_uv.txt
""")
"""
)

APT_INSTALL_COMMAND_TEMPLATE = Template(
"""\
APT_INSTALL_COMMAND_TEMPLATE = Template("""\
RUN --mount=type=cache,sharing=locked,mode=0777,target=/var/cache/apt,id=apt \
apt-get update && apt-get install -y --no-install-recommends \
$APT_PACKAGES
"""
)
""")

DOCKER_FILE_TEMPLATE = Template(
"""\
DOCKER_FILE_TEMPLATE = Template("""\
#syntax=docker/dockerfile:1.5
FROM ghcr.io/astral-sh/uv:0.2.37 as uv
FROM mambaorg/micromamba:1.5.8-bookworm-slim as micromamba
Expand Down Expand Up @@ -84,8 +83,7 @@
USER flytekit
RUN mkdir -p $$HOME && \
echo "export PATH=$$PATH" >> $$HOME/.profile
"""
)
""")


def get_flytekit_for_pypi():
Expand Down
22 changes: 17 additions & 5 deletions plugins/flytekit-kf-pytorch/flytekitplugins/kfpytorch/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ def _convert_replica_spec(
replicas=replicas,
image=replica_config.image,
resources=resources.to_flyte_idl() if resources else None,
restart_policy=replica_config.restart_policy.value if replica_config.restart_policy else None,
restart_policy=(replica_config.restart_policy.value if replica_config.restart_policy else None),
)

def get_custom(self, settings: SerializationSettings) -> Dict[str, Any]:
Expand Down Expand Up @@ -289,9 +289,11 @@ def spawn_helper(
return ElasticWorkerResult(return_value=return_val, decks=flytekit.current_context().decks, om=om)


def _convert_run_policy_to_flyte_idl(run_policy: RunPolicy) -> kubeflow_common.RunPolicy:
def _convert_run_policy_to_flyte_idl(
run_policy: RunPolicy,
) -> kubeflow_common.RunPolicy:
return kubeflow_common.RunPolicy(
clean_pod_policy=run_policy.clean_pod_policy.value if run_policy.clean_pod_policy else None,
clean_pod_policy=(run_policy.clean_pod_policy.value if run_policy.clean_pod_policy else None),
ttl_seconds_after_finished=run_policy.ttl_seconds_after_finished,
active_deadline_seconds=run_policy.active_deadline_seconds,
backoff_limit=run_policy.backoff_limit,
Expand Down Expand Up @@ -416,7 +418,13 @@ def _execute(self, **kwargs) -> Any:
checkpoint_dest = None
checkpoint_src = None

launcher_args = (dumped_target_function, ctx.raw_output_prefix, checkpoint_dest, checkpoint_src, kwargs)
launcher_args = (
dumped_target_function,
ctx.raw_output_prefix,
checkpoint_dest,
checkpoint_src,
kwargs,
)
elif self.task_config.start_method == "fork":
"""
The torch elastic launcher doesn't support passing kwargs to the target function,
Expand All @@ -440,7 +448,11 @@ def fn_partial():
if isinstance(e, FlyteRecoverableException):
create_recoverable_error_file()
raise
return ElasticWorkerResult(return_value=return_val, decks=flytekit.current_context().decks, om=om)
return ElasticWorkerResult(
return_value=return_val,
decks=flytekit.current_context().decks,
om=om,
)

launcher_target_func = fn_partial
launcher_args = ()
Expand Down
Loading

0 comments on commit a8f68d7

Please sign in to comment.