You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
This makes it impossible to use custom hash functions with FlyteFile, e.g., to implement content-aware caching.
Note, that the same approach with Annotated[FlyteDirectory, HashMethod(...)] works as expected out of the box.
Example workflow code
importhashlibfromtypingimportAnnotatedimportpandasaspdfromflytekitimporttask, workflowfromflytekit.core.hashimportHashMethodfromflytekit.types.fileimportFlyteFiledefcalc_hash(f: FlyteFile) ->str:
"""Calculate SHA1 hash of a file"""h=hashlib.sha1(usedforsecurity=False)
withopen(f.path, "rb") asf:
whilechunk:=f.read(4096):
h.update(chunk)
returnstr(h.hexdigest())
CachedFlyteFile=Annotated[FlyteFile, HashMethod(calc_hash)]
@taskdefwrite_file() ->CachedFlyteFile:
print("write_file")
local_path="data.parquet"df=pd.DataFrame(data={"a": [1, 2, 3], "b": [3, 4, 5]})
df.to_parquet(local_path)
returnFlyteFile(local_path, remote_path=f"s3://test-repo/main/{local_path}")
@task(cache=True, cache_version="1")defprint_file(file: FlyteFile) ->None:
file.download()
print(pd.read_parquet(file))
@workflowdefwf() ->None:
f=write_file()
print_file(file=f)
if__name__=="__main__":
wf()
wf() # don't expect output from `print_file`, since it should be cached
Expected behavior
The first execution of wf should run both the write_file and print_file tasks, the second execution should only run write_file and hit the cache for print_file.
In reality, an exception is raised in flytekit/types/file/file.py, in to_literal():
TypeError: issubclass() arg 1 must be a class
Additional context to reproduce
See example workflow above.
I was able to solve the issue by inserting the following before the issubclass check in flytekit/types/file/file.py (I haven't run the flytekit test suite, so this might not be suitable for a mergeable PR just yet - should I submit a draft PR regardless?):
@@ -284,6 +288,10 @@
"None value cannot be converted to a file."
)
+ # Handle Annotated[FlyteFile, ...] correctly by extracting the wrapped type+ if issubclass(typing.get_origin(python_type), typing.Annotated):+ python_type = typing.get_args(python_type)[0]+
if not (python_type is os.PathLike or issubclass(python_type, FlyteFile)):
raise ValueError(
f"Incorrect type {python_type}, must be either a FlyteFile or os.PathLike"
Screenshots
No response
Are you sure this issue hasn't been raised already?
Yes
Have you read the Code of Conduct?
Yes
The text was updated successfully, but these errors were encountered:
Describe the bug
Original discussion on Slack: https://flyte-org.slack.com/archives/CP2HDHKE1/p1678183814829309
Using
Annotated[FlyteFile, HashMethod(...)]
for calculating a custom hash for aFlyteFile
doesn't work out-of-the-box, failing anissubclass
check in flytekit/types/file/file.py.This makes it impossible to use custom hash functions with
FlyteFile
, e.g., to implement content-aware caching.Note, that the same approach with
Annotated[FlyteDirectory, HashMethod(...)]
works as expected out of the box.Example workflow code
Expected behavior
The first execution of
wf
should run both thewrite_file
andprint_file
tasks, the second execution should only runwrite_file
and hit the cache forprint_file
.In reality, an exception is raised in
flytekit/types/file/file.py, in to_literal()
:Additional context to reproduce
See example workflow above.
I was able to solve the issue by inserting the following before the
issubclass
check inflytekit/types/file/file.py
(I haven't run the flytekit test suite, so this might not be suitable for a mergeable PR just yet - should I submit a draft PR regardless?):Screenshots
No response
Are you sure this issue hasn't been raised already?
Have you read the Code of Conduct?
The text was updated successfully, but these errors were encountered: