Skip to content

Commit

Permalink
work in progress
Browse files Browse the repository at this point in the history
Signed-off-by: Ketan Umare <ketan.umare@gmail.com>
  • Loading branch information
kumare3 committed Feb 19, 2023
1 parent 7e77bcc commit 39d61be
Show file tree
Hide file tree
Showing 4 changed files with 16 additions and 2 deletions.
14 changes: 14 additions & 0 deletions flytekit/types/directory/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import os
import pathlib
import typing
from contextlib import contextmanager
from dataclasses import dataclass, field
from pathlib import Path

Expand Down Expand Up @@ -182,6 +183,19 @@ def remote_source(self) -> str:
def download(self) -> str:
return self.__fspath__()

@contextmanager
def open(self, mode: str, **kwargs):
try:
import fsspec
final_path = self.remote_path if self.remote_path else self.path
open_files: fsspec.core.OpenFiles = fsspec.open_files(final_path, mode, **kwargs)
# TODO should we wrap this up into FlyteFile?
return open_files
except ImportError as e:
print("To use streaming files, please install fsspec."
" Note: This will be bundled with flytekit in the future.")
raise

def __repr__(self):
return self.path

Expand Down
2 changes: 1 addition & 1 deletion flytekit/types/file/file.py
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ def open(self, mode: str, **kwargs):
try:
import fsspec
final_path = self.remote_path if self.remote_path else self.path
open_file: fsspec.core.OpenFile = fsspec.open(final_path, mode)
open_file: fsspec.core.OpenFile = fsspec.open(final_path, mode, **kwargs)
try:
yield open_file.open()
finally:
Expand Down
Empty file removed test.py
Empty file.
2 changes: 1 addition & 1 deletion tests/flytekit/unit/clients/test_raw.py
Original file line number Diff line number Diff line change
Expand Up @@ -241,4 +241,4 @@ def test__handle_invalid_create_request_decorator_raises(mock_to_JSON, mock_logg
client._stub.CreateWorkflow = mock.Mock(side_effect=err)
with pytest.raises(grpc.RpcError):
_handle_invalid_create_request(client.create_workflow("/flyteidl.service.AdminService/CreateWorkflow"))
mock_logger.error.assert_called_with("There is already a workflow with different structure.")
mock_logger.error.assert_called_with("There is already a workflow with different structure.")

0 comments on commit 39d61be

Please sign in to comment.