Skip to content

Commit

Permalink
File implementation
Browse files Browse the repository at this point in the history
Signed-off-by: Ketan Umare <ketan.umare@gmail.com>
  • Loading branch information
kumare3 committed Feb 24, 2023
1 parent 7e77bcc commit 8f069b3
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 5 deletions.
14 changes: 14 additions & 0 deletions flytekit/types/directory/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import os
import pathlib
import typing
from contextlib import contextmanager
from dataclasses import dataclass, field
from pathlib import Path

Expand Down Expand Up @@ -182,6 +183,19 @@ def remote_source(self) -> str:
def download(self) -> str:
return self.__fspath__()

@contextmanager
def open(self, mode: str, **kwargs):
try:
import fsspec
final_path = self.remote_path if self.remote_path else self.path
open_files: fsspec.core.OpenFiles = fsspec.open_files(final_path, mode, **kwargs)
# TODO should we wrap this up into FlyteFile?
return open_files
except ImportError as e:
print("To use streaming files, please install fsspec."
" Note: This will be bundled with flytekit in the future.")
raise

def __repr__(self):
return self.path

Expand Down
48 changes: 43 additions & 5 deletions flytekit/types/file/file.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

from flytekit.core.context_manager import FlyteContext, FlyteContextManager
from flytekit.core.type_engine import TypeEngine, TypeTransformer, TypeTransformerFailedError
from flytekit.exceptions.user import FlyteUserException
from flytekit.loggers import logger
from flytekit.models.core.types import BlobType
from flytekit.models.literals import Blob, BlobMetadata, Literal, Scalar
Expand Down Expand Up @@ -151,9 +152,9 @@ def extension(cls) -> str:
return ""

@classmethod
def new_remote_file(cls) -> FlyteFile:
def new_remote_file(cls, name: typing.Optional[str] = None) -> FlyteFile:
ctx = FlyteContextManager.current_context()
remote_path = ctx.file_access.get_random_remote_path()
remote_path = ctx.file_access.get_random_remote_path(name)
return cls(path=remote_path)

def __class_getitem__(cls, item: typing.Union[str, typing.Type]) -> typing.Type[FlyteFile]:
Expand Down Expand Up @@ -235,19 +236,56 @@ def download(self) -> str:
return self.__fspath__()

@contextmanager
def open(self, mode: str, **kwargs):
def open(self, mode: str, cache_type: str = None, cache_options: typing.Dict[str, typing.Any] = None):
"""
Returns a streaming File handle
.. code-block:: python
@task
def copy_file(ff: FlyteFile) -> FlyteFile:
new_file = FlyteFile.new_remote_file(ff.name)
with ff.open("rb", cache_type="readahead", cache={}) as r:
with new_file.open("wb") as w:
w.write(r.read())
return new_file
Alternatively
.. code-block:: python
@task
def copy_file(ff: FlyteFile) -> FlyteFile:
new_file = FlyteFile.new_remote_file(ff.name)
with fsspec.open(f"readahead::{ff.remote_path}", "rb", readahead={}) as r:
with new_file.open("wb") as w:
w.write(r.read())
return new_file
:param mode: str Open mode like 'rb', 'rt', 'wb', ...
:param cache_type: optional str Specify if caching is to be used. Cache protocol can be ones supported by
fsspec https://filesystem-spec.readthedocs.io/en/latest/api.html#readbuffering,
especially useful for large file reads
:param cache_options: optional Dict[str, Any] Refer to fsspec caching options. This is strongly coupled to the
cache_protocol
"""
try:
import fsspec
final_path = self.remote_path if self.remote_path else self.path
open_file: fsspec.core.OpenFile = fsspec.open(final_path, mode)
kwargs = {}
if cache_type:
final_path = f"{cache_type}::{final_path}"
kwargs[cache_type] = cache_options
open_file: fsspec.core.OpenFile = fsspec.open(final_path, mode, **kwargs)
try:
yield open_file.open()
finally:
open_file.close()
except ImportError as e:
print("To use streaming files, please install fsspec."
" Note: This will be bundled with flytekit in the future.")
raise
raise FlyteUserException("Install fsspec to use FlyteFile streaming.") from e

def __repr__(self):
return self.path
Expand Down
Empty file removed test.py
Empty file.

0 comments on commit 8f069b3

Please sign in to comment.