Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Stream Directories and Files using Flyte #1512

Merged
merged 27 commits into from
Mar 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 5 additions & 11 deletions Dockerfile.dev
Original file line number Diff line number Diff line change
Expand Up @@ -12,27 +12,21 @@ MAINTAINER Flyte Team <users@flyte.org>
LABEL org.opencontainers.image.source https://github.com/flyteorg/flytekit

WORKDIR /root
ENV PYTHONPATH /root

ARG VERSION
ARG DOCKER_IMAGE

RUN apt-get update && apt-get install build-essential vim -y

COPY . /code/flytekit
WORKDIR /code/flytekit
COPY . /flytekit

# Pod tasks should be exposed in the default image
RUN pip install -e .
RUN pip install -e plugins/flytekit-k8s-pod
RUN pip install -e plugins/flytekit-deck-standard
RUN pip install -e /flytekit
RUN pip install -e /flytekit/plugins/flytekit-k8s-pod
RUN pip install -e /flytekit/plugins/flytekit-deck-standard
RUN pip install scikit-learn

ENV PYTHONPATH "/code/flytekit:/code/flytekit/plugins/flytekit-k8s-pod:/code/flytekit/plugins/flytekit-deck-standard:"
ENV PYTHONPATH "/flytekit:/flytekit/plugins/flytekit-k8s-pod:/flytekit/plugins/flytekit-deck-standard:"

WORKDIR /root
RUN useradd -u 1000 flytekit
RUN chown flytekit: /root
USER flytekit

ENV FLYTE_INTERNAL_IMAGE "$DOCKER_IMAGE"
2 changes: 1 addition & 1 deletion flytekit/clis/sdk_in_container/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ def convert(
raise ValueError(
f"Currently only directories containing one file are supported, found [{len(files)}] files found in {p.resolve()}"
)
return Directory(dir_path=value, local_file=files[0].resolve())
return Directory(dir_path=str(p), local_file=files[0].resolve())
raise click.BadParameter(f"parameter should be a valid directory path, {value}")


Expand Down
14 changes: 7 additions & 7 deletions flytekit/core/data_persistence.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,16 +107,16 @@ def data_config(self) -> DataConfig:
return self._data_config

def get_filesystem(
self, protocol: typing.Optional[str] = None, anonymous: bool = False
self, protocol: typing.Optional[str] = None, anonymous: bool = False, **kwargs
) -> typing.Optional[fsspec.AbstractFileSystem]:
if not protocol:
return self._default_remote
kwargs = {} # type: typing.Dict[str, typing.Any]
if protocol == "file":
kwargs = {"auto_mkdir": True}
kwargs["auto_mkdir"] = True
elif protocol == "s3":
kwargs = s3_setup_args(self._data_config.s3, anonymous=anonymous)
return fsspec.filesystem(protocol, **kwargs) # type: ignore
s3kwargs = s3_setup_args(self._data_config.s3, anonymous=anonymous)
s3kwargs.update(kwargs)
return fsspec.filesystem(protocol, **s3kwargs) # type: ignore
elif protocol == "gs":
if anonymous:
kwargs["token"] = _ANON
Expand All @@ -128,9 +128,9 @@ def get_filesystem(

return fsspec.filesystem(protocol, **kwargs) # type: ignore

def get_filesystem_for_path(self, path: str = "", anonymous: bool = False) -> fsspec.AbstractFileSystem:
def get_filesystem_for_path(self, path: str = "", anonymous: bool = False, **kwargs) -> fsspec.AbstractFileSystem:
protocol = get_protocol(path)
return self.get_filesystem(protocol, anonymous=anonymous)
return self.get_filesystem(protocol, anonymous=anonymous, **kwargs)

@staticmethod
def is_remote(path: Union[str, os.PathLike]) -> bool:
Expand Down
1 change: 0 additions & 1 deletion flytekit/core/type_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,6 @@ def to_python_value(self, ctx: FlyteContext, lv: Literal, expected_python_type:
f"Conversion to python value expected type {expected_python_type} from literal not implemented"
)

@abstractmethod
def to_html(self, ctx: FlyteContext, python_val: T, expected_python_type: Type[T]) -> str:
"""
Converts any python val (dataframe, int, float) to a html string, and it will be wrapped in the HTML div
Expand Down
85 changes: 83 additions & 2 deletions flytekit/types/directory/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,25 @@

import os
import pathlib
import random
import typing
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any, Generator, Tuple
from uuid import UUID

import fsspec
from dataclasses_json import config, dataclass_json
from fsspec.utils import get_protocol
from marshmallow import fields

from flytekit.core.context_manager import FlyteContext
from flytekit.core.context_manager import FlyteContext, FlyteContextManager
from flytekit.core.type_engine import TypeEngine, TypeTransformer
from flytekit.models import types as _type_models
from flytekit.models.core import types as _core_types
from flytekit.models.literals import Blob, BlobMetadata, Literal, Scalar
from flytekit.models.types import LiteralType
from flytekit.types.file import FileExt
from flytekit.types.file import FileExt, FlyteFile

T = typing.TypeVar("T")
PathType = typing.Union[str, os.PathLike]
Expand Down Expand Up @@ -148,6 +153,18 @@ def __fspath__(self):
def extension(cls) -> str:
return ""

@classmethod
def new_remote(cls) -> FlyteDirectory:
eapolinario marked this conversation as resolved.
Show resolved Hide resolved
"""
Create a new FlyteDirectory object using the currently configured default remote in the context (i.e.
the raw_output_prefix configured in the current FileAccessProvider object in the context).
This is used if you explicitly have a folder somewhere that you want to create files under.
If you want to write a whole folder, you can let your task return a FlyteDirectory object,
and let flytekit handle the uploading.
"""
d = FlyteContext.current_context().file_access.get_random_remote_directory()
return FlyteDirectory(path=d)

def __class_getitem__(cls, item: typing.Union[typing.Type, str]) -> typing.Type[FlyteDirectory]:
if item is None:
return cls
Expand Down Expand Up @@ -176,6 +193,12 @@ def downloaded(self) -> bool:
def remote_directory(self) -> typing.Optional[str]:
return self._remote_directory

@property
def sep(self) -> str:
if os.name == "nt" and get_protocol(self.path or self.remote_source or self.remote_directory) == "file":
return "\\"
return "/"

@property
def remote_source(self) -> str:
"""
Expand All @@ -184,9 +207,67 @@ def remote_source(self) -> str:
"""
return typing.cast(str, self._remote_source)

def new_file(self, name: typing.Optional[str] = None) -> FlyteFile:
"""
This will create a new file under the current folder.
If given a name, it will use the name given, otherwise it'll pick a random string.
Collisions are not checked.
"""
# TODO we may want to use - https://github.com/fsspec/universal_pathlib
wild-endeavor marked this conversation as resolved.
Show resolved Hide resolved
if not name:
name = UUID(int=random.getrandbits(128)).hex
new_path = self.sep.join([str(self.path).rstrip(self.sep), name]) # trim trailing sep if any and join
return FlyteFile(path=new_path)

def new_dir(self, name: typing.Optional[str] = None) -> FlyteDirectory:
wild-endeavor marked this conversation as resolved.
Show resolved Hide resolved
"""
This will create a new folder under the current folder.
If given a name, it will use the name given, otherwise it'll pick a random string.
Collisions are not checked.
"""
if not name:
name = UUID(int=random.getrandbits(128)).hex

new_path = self.sep.join([str(self.path).rstrip(self.sep), name]) # trim trailing sep if any and join
return FlyteDirectory(path=new_path)

def download(self) -> str:
return self.__fspath__()

def crawl(
self, maxdepth: typing.Optional[int] = None, topdown: bool = True, **kwargs
) -> Generator[Tuple[typing.Union[str, os.PathLike[Any]], typing.Dict[Any, Any]], None, None]:
"""
Crawl returns a generator of all files prefixed by any sub-folders under the given "FlyteDirectory".
if details=True is passed, then it will return a dictionary as specified by fsspec.

Example:

>>> list(fd.crawl())
[("/base", "file1"), ("/base", "dir1/file1"), ("/base", "dir2/file1"), ("/base", "dir1/dir/file1")]

>>> list(x.crawl(detail=True))
[('/tmp/test', {'my-dir/ab.py': {'name': '/tmp/test/my-dir/ab.py', 'size': 0, 'type': 'file',
'created': 1677720780.2318847, 'islink': False, 'mode': 33188, 'uid': 501, 'gid': 0,
'mtime': 1677720780.2317934, 'ino': 1694329, 'nlink': 1}})]
"""
final_path = self.path
if self.remote_source:
final_path = self.remote_source
elif self.remote_directory:
final_path = self.remote_directory
ctx = FlyteContextManager.current_context()
fs = ctx.file_access.get_filesystem_for_path(final_path)
base_path_len = len(fsspec.core.strip_protocol(final_path)) + 1 # Add additional `/` at the end
for base, _, files in fs.walk(final_path, maxdepth, topdown, **kwargs):
current_base = base[base_path_len:]
if isinstance(files, dict):
for f, v in files.items():
yield final_path, {os.path.join(current_base, f): v}
else:
for f in files:
yield final_path, os.path.join(current_base, f)

def __repr__(self):
return self.path

Expand Down
67 changes: 65 additions & 2 deletions flytekit/types/file/file.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,13 @@
import os
import pathlib
import typing
from contextlib import contextmanager
from dataclasses import dataclass, field

from dataclasses_json import config, dataclass_json
from marshmallow import fields

from flytekit.core.context_manager import FlyteContext
from flytekit.core.context_manager import FlyteContext, FlyteContextManager
from flytekit.core.type_engine import TypeEngine, TypeTransformer, TypeTransformerFailedError
from flytekit.loggers import logger
from flytekit.models.core.types import BlobType
Expand All @@ -27,7 +28,9 @@ def noop():
@dataclass_json
@dataclass
class FlyteFile(os.PathLike, typing.Generic[T]):
path: typing.Union[str, os.PathLike] = field(default=None, metadata=config(mm_field=fields.String())) # type: ignore
path: typing.Union[str, os.PathLike] = field(
default=None, metadata=config(mm_field=fields.String())
) # type: ignore
"""
Since there is no native Python implementation of files and directories for the Flyte Blob type, (like how int
exists for Flyte's Integer type) we need to create one so that users can express that their tasks take
Expand Down Expand Up @@ -148,6 +151,15 @@ def t2() -> flytekit_typing.FlyteFile["csv"]:
def extension(cls) -> str:
return ""

@classmethod
def new_remote_file(cls, name: typing.Optional[str] = None) -> FlyteFile:
"""
Create a new FlyteFile object with a remote path.
"""
ctx = FlyteContextManager.current_context()
remote_path = ctx.file_access.get_random_remote_path(name)
return cls(path=remote_path)

def __class_getitem__(cls, item: typing.Union[str, typing.Type]) -> typing.Type[FlyteFile]:
from . import FileExt

Expand Down Expand Up @@ -226,6 +238,57 @@ def remote_source(self) -> str:
def download(self) -> str:
return self.__fspath__()

@contextmanager
def open(
self,
mode: str,
cache_type: typing.Optional[str] = None,
cache_options: typing.Optional[typing.Dict[str, typing.Any]] = None,
):
"""
Returns a streaming File handle

.. code-block:: python

@task
def copy_file(ff: FlyteFile) -> FlyteFile:
new_file = FlyteFile.new_remote_file(ff.name)
with ff.open("rb", cache_type="readahead", cache={}) as r:
with new_file.open("wb") as w:
w.write(r.read())
return new_file

Alternatively

.. code-block:: python

@task
def copy_file(ff: FlyteFile) -> FlyteFile:
new_file = FlyteFile.new_remote_file(ff.name)
with fsspec.open(f"readahead::{ff.remote_path}", "rb", readahead={}) as r:
with new_file.open("wb") as w:
w.write(r.read())
return new_file


:param mode: str Open mode like 'rb', 'rt', 'wb', ...
:param cache_type: optional str Specify if caching is to be used. Cache protocol can be ones supported by
fsspec https://filesystem-spec.readthedocs.io/en/latest/api.html#readbuffering,
especially useful for large file reads
:param cache_options: optional Dict[str, Any] Refer to fsspec caching options. This is strongly coupled to the
cache_protocol
"""
ctx = FlyteContextManager.current_context()
final_path = self.path
if self.remote_source:
final_path = self.remote_source
elif self.remote_path:
final_path = self.remote_path
fs = ctx.file_access.get_filesystem_for_path(final_path)
f = fs.open(final_path, mode, cache_type=cache_type, cache_options=cache_options)
yield f
f.close()

def __repr__(self):
return self.path

Expand Down
30 changes: 0 additions & 30 deletions flytekit/types/structured/basic_dfs.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,22 +62,6 @@ def encode(
structured_dataset_type.format = PARQUET
return literals.StructuredDataset(uri=uri, metadata=StructuredDatasetMetadata(structured_dataset_type))

def ddencode(
self,
ctx: FlyteContext,
structured_dataset: StructuredDataset,
structured_dataset_type: StructuredDatasetType,
) -> literals.StructuredDataset:

path = typing.cast(str, structured_dataset.uri) or ctx.file_access.get_random_remote_directory()
df = typing.cast(pd.DataFrame, structured_dataset.dataframe)
local_dir = ctx.file_access.get_random_local_directory()
local_path = os.path.join(local_dir, f"{0:05}")
df.to_parquet(local_path, coerce_timestamps="us", allow_truncated_timestamps=False)
ctx.file_access.upload_directory(local_dir, path)
structured_dataset_type.format = PARQUET
return literals.StructuredDataset(uri=path, metadata=StructuredDatasetMetadata(structured_dataset_type))


class ParquetToPandasDecodingHandler(StructuredDatasetDecoder):
def __init__(self):
Expand All @@ -101,20 +85,6 @@ def decode(
kwargs = get_storage_options(ctx.file_access.data_config, uri, anon=True)
return pd.read_parquet(uri, columns=columns, storage_options=kwargs)

def dcccecode(
self,
ctx: FlyteContext,
flyte_value: literals.StructuredDataset,
current_task_metadata: StructuredDatasetMetadata,
) -> pd.DataFrame:
path = flyte_value.uri
local_dir = ctx.file_access.get_random_local_directory()
ctx.file_access.get_data(path, local_dir, is_multipart=True)
if current_task_metadata.structured_dataset_type and current_task_metadata.structured_dataset_type.columns:
columns = [c.name for c in current_task_metadata.structured_dataset_type.columns]
return pd.read_parquet(local_dir, columns=columns)
return pd.read_parquet(local_dir)


class ArrowToParquetEncodingHandler(StructuredDatasetEncoder):
def __init__(self):
Expand Down
Loading