Skip to content

Commit

Permalink
Stream Directories and Files using Flyte (#1512)
Browse files Browse the repository at this point in the history
Signed-off-by: Ketan Umare <ketan.umare@gmail.com>
Signed-off-by: Niels Bantilan <niels.bantilan@gmail.com>
Signed-off-by: Yee Hing Tong <wild-endeavor@users.noreply.github.com>
  • Loading branch information
kumare3 committed Mar 24, 2023
1 parent b2e1fff commit 447d2aa
Show file tree
Hide file tree
Showing 16 changed files with 380 additions and 76 deletions.
16 changes: 5 additions & 11 deletions Dockerfile.dev
Original file line number Diff line number Diff line change
Expand Up @@ -12,27 +12,21 @@ MAINTAINER Flyte Team <users@flyte.org>
LABEL org.opencontainers.image.source https://github.com/flyteorg/flytekit

WORKDIR /root
ENV PYTHONPATH /root

ARG VERSION
ARG DOCKER_IMAGE

RUN apt-get update && apt-get install build-essential vim -y

COPY . /code/flytekit
WORKDIR /code/flytekit
COPY . /flytekit

# Pod tasks should be exposed in the default image
RUN pip install -e .
RUN pip install -e plugins/flytekit-k8s-pod
RUN pip install -e plugins/flytekit-deck-standard
RUN pip install -e /flytekit
RUN pip install -e /flytekit/plugins/flytekit-k8s-pod
RUN pip install -e /flytekit/plugins/flytekit-deck-standard
RUN pip install scikit-learn

ENV PYTHONPATH "/code/flytekit:/code/flytekit/plugins/flytekit-k8s-pod:/code/flytekit/plugins/flytekit-deck-standard:"
ENV PYTHONPATH "/flytekit:/flytekit/plugins/flytekit-k8s-pod:/flytekit/plugins/flytekit-deck-standard:"

WORKDIR /root
RUN useradd -u 1000 flytekit
RUN chown flytekit: /root
USER flytekit

ENV FLYTE_INTERNAL_IMAGE "$DOCKER_IMAGE"
2 changes: 1 addition & 1 deletion flytekit/clis/sdk_in_container/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ def convert(
raise ValueError(
f"Currently only directories containing one file are supported, found [{len(files)}] files found in {p.resolve()}"
)
return Directory(dir_path=value, local_file=files[0].resolve())
return Directory(dir_path=str(p), local_file=files[0].resolve())
raise click.BadParameter(f"parameter should be a valid directory path, {value}")


Expand Down
14 changes: 7 additions & 7 deletions flytekit/core/data_persistence.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,16 +107,16 @@ def data_config(self) -> DataConfig:
return self._data_config

def get_filesystem(
self, protocol: typing.Optional[str] = None, anonymous: bool = False
self, protocol: typing.Optional[str] = None, anonymous: bool = False, **kwargs
) -> typing.Optional[fsspec.AbstractFileSystem]:
if not protocol:
return self._default_remote
kwargs = {} # type: typing.Dict[str, typing.Any]
if protocol == "file":
kwargs = {"auto_mkdir": True}
kwargs["auto_mkdir"] = True
elif protocol == "s3":
kwargs = s3_setup_args(self._data_config.s3, anonymous=anonymous)
return fsspec.filesystem(protocol, **kwargs) # type: ignore
s3kwargs = s3_setup_args(self._data_config.s3, anonymous=anonymous)
s3kwargs.update(kwargs)
return fsspec.filesystem(protocol, **s3kwargs) # type: ignore
elif protocol == "gs":
if anonymous:
kwargs["token"] = _ANON
Expand All @@ -128,9 +128,9 @@ def get_filesystem(

return fsspec.filesystem(protocol, **kwargs) # type: ignore

def get_filesystem_for_path(self, path: str = "", anonymous: bool = False) -> fsspec.AbstractFileSystem:
def get_filesystem_for_path(self, path: str = "", anonymous: bool = False, **kwargs) -> fsspec.AbstractFileSystem:
protocol = get_protocol(path)
return self.get_filesystem(protocol, anonymous=anonymous)
return self.get_filesystem(protocol, anonymous=anonymous, **kwargs)

@staticmethod
def is_remote(path: Union[str, os.PathLike]) -> bool:
Expand Down
1 change: 0 additions & 1 deletion flytekit/core/type_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,6 @@ def to_python_value(self, ctx: FlyteContext, lv: Literal, expected_python_type:
f"Conversion to python value expected type {expected_python_type} from literal not implemented"
)

@abstractmethod
def to_html(self, ctx: FlyteContext, python_val: T, expected_python_type: Type[T]) -> str:
"""
Converts any python val (dataframe, int, float) to a html string, and it will be wrapped in the HTML div
Expand Down
85 changes: 83 additions & 2 deletions flytekit/types/directory/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,25 @@

import os
import pathlib
import random
import typing
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any, Generator, Tuple
from uuid import UUID

import fsspec
from dataclasses_json import config, dataclass_json
from fsspec.utils import get_protocol
from marshmallow import fields

from flytekit.core.context_manager import FlyteContext
from flytekit.core.context_manager import FlyteContext, FlyteContextManager
from flytekit.core.type_engine import TypeEngine, TypeTransformer
from flytekit.models import types as _type_models
from flytekit.models.core import types as _core_types
from flytekit.models.literals import Blob, BlobMetadata, Literal, Scalar
from flytekit.models.types import LiteralType
from flytekit.types.file import FileExt
from flytekit.types.file import FileExt, FlyteFile

T = typing.TypeVar("T")
PathType = typing.Union[str, os.PathLike]
Expand Down Expand Up @@ -148,6 +153,18 @@ def __fspath__(self):
def extension(cls) -> str:
return ""

@classmethod
def new_remote(cls) -> FlyteDirectory:
"""
Create a new FlyteDirectory object using the currently configured default remote in the context (i.e.
the raw_output_prefix configured in the current FileAccessProvider object in the context).
This is used if you explicitly have a folder somewhere that you want to create files under.
If you want to write a whole folder, you can let your task return a FlyteDirectory object,
and let flytekit handle the uploading.
"""
d = FlyteContext.current_context().file_access.get_random_remote_directory()
return FlyteDirectory(path=d)

def __class_getitem__(cls, item: typing.Union[typing.Type, str]) -> typing.Type[FlyteDirectory]:
if item is None:
return cls
Expand Down Expand Up @@ -176,6 +193,12 @@ def downloaded(self) -> bool:
def remote_directory(self) -> typing.Optional[str]:
return self._remote_directory

@property
def sep(self) -> str:
if os.name == "nt" and get_protocol(self.path or self.remote_source or self.remote_directory) == "file":
return "\\"
return "/"

@property
def remote_source(self) -> str:
"""
Expand All @@ -184,9 +207,67 @@ def remote_source(self) -> str:
"""
return typing.cast(str, self._remote_source)

def new_file(self, name: typing.Optional[str] = None) -> FlyteFile:
"""
This will create a new file under the current folder.
If given a name, it will use the name given, otherwise it'll pick a random string.
Collisions are not checked.
"""
# TODO we may want to use - https://github.com/fsspec/universal_pathlib
if not name:
name = UUID(int=random.getrandbits(128)).hex
new_path = self.sep.join([str(self.path).rstrip(self.sep), name]) # trim trailing sep if any and join
return FlyteFile(path=new_path)

def new_dir(self, name: typing.Optional[str] = None) -> FlyteDirectory:
"""
This will create a new folder under the current folder.
If given a name, it will use the name given, otherwise it'll pick a random string.
Collisions are not checked.
"""
if not name:
name = UUID(int=random.getrandbits(128)).hex

new_path = self.sep.join([str(self.path).rstrip(self.sep), name]) # trim trailing sep if any and join
return FlyteDirectory(path=new_path)

def download(self) -> str:
return self.__fspath__()

def crawl(
self, maxdepth: typing.Optional[int] = None, topdown: bool = True, **kwargs
) -> Generator[Tuple[typing.Union[str, os.PathLike[Any]], typing.Dict[Any, Any]], None, None]:
"""
Crawl returns a generator of all files prefixed by any sub-folders under the given "FlyteDirectory".
if details=True is passed, then it will return a dictionary as specified by fsspec.
Example:
>>> list(fd.crawl())
[("/base", "file1"), ("/base", "dir1/file1"), ("/base", "dir2/file1"), ("/base", "dir1/dir/file1")]
>>> list(x.crawl(detail=True))
[('/tmp/test', {'my-dir/ab.py': {'name': '/tmp/test/my-dir/ab.py', 'size': 0, 'type': 'file',
'created': 1677720780.2318847, 'islink': False, 'mode': 33188, 'uid': 501, 'gid': 0,
'mtime': 1677720780.2317934, 'ino': 1694329, 'nlink': 1}})]
"""
final_path = self.path
if self.remote_source:
final_path = self.remote_source
elif self.remote_directory:
final_path = self.remote_directory
ctx = FlyteContextManager.current_context()
fs = ctx.file_access.get_filesystem_for_path(final_path)
base_path_len = len(fsspec.core.strip_protocol(final_path)) + 1 # Add additional `/` at the end
for base, _, files in fs.walk(final_path, maxdepth, topdown, **kwargs):
current_base = base[base_path_len:]
if isinstance(files, dict):
for f, v in files.items():
yield final_path, {os.path.join(current_base, f): v}
else:
for f in files:
yield final_path, os.path.join(current_base, f)

def __repr__(self):
return self.path

Expand Down
67 changes: 65 additions & 2 deletions flytekit/types/file/file.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,13 @@
import os
import pathlib
import typing
from contextlib import contextmanager
from dataclasses import dataclass, field

from dataclasses_json import config, dataclass_json
from marshmallow import fields

from flytekit.core.context_manager import FlyteContext
from flytekit.core.context_manager import FlyteContext, FlyteContextManager
from flytekit.core.type_engine import TypeEngine, TypeTransformer, TypeTransformerFailedError
from flytekit.loggers import logger
from flytekit.models.core.types import BlobType
Expand All @@ -27,7 +28,9 @@ def noop():
@dataclass_json
@dataclass
class FlyteFile(os.PathLike, typing.Generic[T]):
path: typing.Union[str, os.PathLike] = field(default=None, metadata=config(mm_field=fields.String())) # type: ignore
path: typing.Union[str, os.PathLike] = field(
default=None, metadata=config(mm_field=fields.String())
) # type: ignore
"""
Since there is no native Python implementation of files and directories for the Flyte Blob type, (like how int
exists for Flyte's Integer type) we need to create one so that users can express that their tasks take
Expand Down Expand Up @@ -148,6 +151,15 @@ def t2() -> flytekit_typing.FlyteFile["csv"]:
def extension(cls) -> str:
return ""

@classmethod
def new_remote_file(cls, name: typing.Optional[str] = None) -> FlyteFile:
"""
Create a new FlyteFile object with a remote path.
"""
ctx = FlyteContextManager.current_context()
remote_path = ctx.file_access.get_random_remote_path(name)
return cls(path=remote_path)

def __class_getitem__(cls, item: typing.Union[str, typing.Type]) -> typing.Type[FlyteFile]:
from . import FileExt

Expand Down Expand Up @@ -226,6 +238,57 @@ def remote_source(self) -> str:
def download(self) -> str:
return self.__fspath__()

@contextmanager
def open(
self,
mode: str,
cache_type: typing.Optional[str] = None,
cache_options: typing.Optional[typing.Dict[str, typing.Any]] = None,
):
"""
Returns a streaming File handle
.. code-block:: python
@task
def copy_file(ff: FlyteFile) -> FlyteFile:
new_file = FlyteFile.new_remote_file(ff.name)
with ff.open("rb", cache_type="readahead", cache={}) as r:
with new_file.open("wb") as w:
w.write(r.read())
return new_file
Alternatively
.. code-block:: python
@task
def copy_file(ff: FlyteFile) -> FlyteFile:
new_file = FlyteFile.new_remote_file(ff.name)
with fsspec.open(f"readahead::{ff.remote_path}", "rb", readahead={}) as r:
with new_file.open("wb") as w:
w.write(r.read())
return new_file
:param mode: str Open mode like 'rb', 'rt', 'wb', ...
:param cache_type: optional str Specify if caching is to be used. Cache protocol can be ones supported by
fsspec https://filesystem-spec.readthedocs.io/en/latest/api.html#readbuffering,
especially useful for large file reads
:param cache_options: optional Dict[str, Any] Refer to fsspec caching options. This is strongly coupled to the
cache_protocol
"""
ctx = FlyteContextManager.current_context()
final_path = self.path
if self.remote_source:
final_path = self.remote_source
elif self.remote_path:
final_path = self.remote_path
fs = ctx.file_access.get_filesystem_for_path(final_path)
f = fs.open(final_path, mode, cache_type=cache_type, cache_options=cache_options)
yield f
f.close()

def __repr__(self):
return self.path

Expand Down
30 changes: 0 additions & 30 deletions flytekit/types/structured/basic_dfs.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,22 +62,6 @@ def encode(
structured_dataset_type.format = PARQUET
return literals.StructuredDataset(uri=uri, metadata=StructuredDatasetMetadata(structured_dataset_type))

def ddencode(
self,
ctx: FlyteContext,
structured_dataset: StructuredDataset,
structured_dataset_type: StructuredDatasetType,
) -> literals.StructuredDataset:

path = typing.cast(str, structured_dataset.uri) or ctx.file_access.get_random_remote_directory()
df = typing.cast(pd.DataFrame, structured_dataset.dataframe)
local_dir = ctx.file_access.get_random_local_directory()
local_path = os.path.join(local_dir, f"{0:05}")
df.to_parquet(local_path, coerce_timestamps="us", allow_truncated_timestamps=False)
ctx.file_access.upload_directory(local_dir, path)
structured_dataset_type.format = PARQUET
return literals.StructuredDataset(uri=path, metadata=StructuredDatasetMetadata(structured_dataset_type))


class ParquetToPandasDecodingHandler(StructuredDatasetDecoder):
def __init__(self):
Expand All @@ -101,20 +85,6 @@ def decode(
kwargs = get_storage_options(ctx.file_access.data_config, uri, anon=True)
return pd.read_parquet(uri, columns=columns, storage_options=kwargs)

def dcccecode(
self,
ctx: FlyteContext,
flyte_value: literals.StructuredDataset,
current_task_metadata: StructuredDatasetMetadata,
) -> pd.DataFrame:
path = flyte_value.uri
local_dir = ctx.file_access.get_random_local_directory()
ctx.file_access.get_data(path, local_dir, is_multipart=True)
if current_task_metadata.structured_dataset_type and current_task_metadata.structured_dataset_type.columns:
columns = [c.name for c in current_task_metadata.structured_dataset_type.columns]
return pd.read_parquet(local_dir, columns=columns)
return pd.read_parquet(local_dir)


class ArrowToParquetEncodingHandler(StructuredDatasetEncoder):
def __init__(self):
Expand Down
Loading

0 comments on commit 447d2aa

Please sign in to comment.