-
Notifications
You must be signed in to change notification settings - Fork 14.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Protect against None components of universal pathlib xcom backend #41921
Conversation
acc85c8
to
bb9a9a2
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the fix, I was hoping there is someone around with more background knowledge why the code is currently like it is... otherwise if none available.. OK to merge it as is.
Maybe we wait a few days hoping that bolke returns back.
Regarding the tldr: the default and most custom implementations throw away extra keyword arguments. The only one in this list that doesn't is See below for details on the different rename implementations: fsspec subclasses: customized method report for 'mv'Click to see the environment used to generate the report
Default implementation def mv(self, path1, path2, recursive=False, maxdepth=None, **kwargs):
"""Move file(s) from one location to another"""
if path1 == path2:
logger.debug("%s mv: The paths are the same, so no files were moved.", self)
else:
# explicitly raise exception to prevent data corruption
self.copy(
path1, path2, recursive=recursive, maxdepth=maxdepth, onerror="raise"
)
self.rm(path1, recursive=recursive) These filesystem classes do not customize the method 'mv':
Subclasses customizing 'mv'HadoopFileSystem
@wrap_exceptions
def mv(self, path1, path2, **kwargs):
path1 = self._strip_protocol(path1).rstrip("/")
path2 = self._strip_protocol(path2).rstrip("/")
self.fs.move(path1, path2) AsyncLocalFileSystem
def mv(self, path1, path2, **kwargs):
path1 = self._strip_protocol(path1)
path2 = self._strip_protocol(path2)
shutil.move(path1, path2) DaskWorkerFileSystem
def mv(self, *args, **kwargs):
if self.worker:
self.fs.mv(*args, **kwargs)
else:
self.rfs.mv(*args, **kwargs).compute() DatabricksFileSystem
def mv(
self, source_path, destination_path, recursive=False, maxdepth=None, **kwargs
):
"""
Move a source to a destination path.
A note from the original [databricks API manual]
(https://docs.databricks.com/dev-tools/api/latest/dbfs.html#move).
When moving a large number of files the API call will time out after
approximately 60s, potentially resulting in partially moved data.
Therefore, for operations that move more than 10k files, we strongly
discourage using the DBFS REST API.
Parameters
----------
source_path: str
From where to move (absolute path)
destination_path: str
To where to move (absolute path)
recursive: bool
Not implemented to far.
maxdepth:
Not implemented to far.
"""
if recursive:
raise NotImplementedError
if maxdepth:
raise NotImplementedError
try:
self._send_to_api(
method="post",
endpoint="move",
json={"source_path": source_path, "destination_path": destination_path},
)
except DatabricksException as e:
if e.error_code == "RESOURCE_DOES_NOT_EXIST":
raise FileNotFoundError(e.message)
elif e.error_code == "RESOURCE_ALREADY_EXISTS":
raise FileExistsError(e.message)
raise e
self.invalidate_cache(self._parent(source_path))
self.invalidate_cache(self._parent(destination_path)) DirFileSystem
def mv(self, path1, path2, **kwargs):
return self.fs.mv(
self._join(path1),
self._join(path2),
**kwargs,
) LocalFileSystem
def mv(self, path1, path2, **kwargs):
path1 = self._strip_protocol(path1)
path2 = self._strip_protocol(path2)
shutil.move(path1, path2) FTPFileSystem
def mv(self, path1, path2, **kwargs):
path1 = self._strip_protocol(path1)
path2 = self._strip_protocol(path2)
self.ftp.rename(path1, path2)
self.invalidate_cache(self._parent(path1))
self.invalidate_cache(self._parent(path2)) SFTPFileSystem
def mv(self, old, new):
logger.debug("Renaming %s into %s", old, new)
self.ftp.posix_rename(old, new) SMBFileSystem
def mv(self, path1, path2, recursive=None, maxdepth=None, **kwargs):
wpath1 = _as_unc_path(self.host, path1)
wpath2 = _as_unc_path(self.host, path2)
smbclient.rename(wpath1, wpath2, port=self._port, **kwargs) WebdavFileSystem
def mv(
self,
path1: str,
path2: str,
recursive: bool = False,
maxdepth: Optional[bool] = None,
**kwargs: Any,
) -> None:
"""Move a file/directory from one path to the other."""
path1 = self._strip_protocol(path1)
path2 = self._strip_protocol(path2)
if recursive and not maxdepth and self.isdir(path1):
return self.client.move(path1, path2)
if not recursive and self.isdir(path1):
return self.makedirs(path2)
super().mv(path1, path2, recursive=recursive, maxdepth=maxdepth, **kwargs)
return None WebHDFS
def mv(self, path1, path2, **kwargs):
self._call("RENAME", method="put", path=path1, destination=path2) |
Thanks @ap-- ! |
…ache#41921) fixes: apache#41723 (cherry picked from commit 7a75f0a)
fixes: #41723
^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named
{pr_number}.significant.rst
or{issue_number}.significant.rst
, in newsfragments.