Skip to content

Commit

Permalink
refactor: use sync methods in thread from LocalFileSystem in async me…
Browse files Browse the repository at this point in the history
…thods.

Previosuly, it used to use aiofiles method which where run in thread. For maximum
compatibility, now we use LocalFileSystem's implementation.
  • Loading branch information
skshetry committed Oct 3, 2022
1 parent fe9e3ff commit b45144a
Show file tree
Hide file tree
Showing 3 changed files with 71 additions and 183 deletions.
2 changes: 1 addition & 1 deletion setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ install_requires=
memfs =
pygtrie>=2.3.2
asynclocalfs =
aiofiles==22.1.0
typing_extensions>=3.10.0; python_version < '3.10'
aiofile==3.8.1
all =
%(memfs)s
Expand Down
169 changes: 56 additions & 113 deletions src/morefs/asyn_local.py
Original file line number Diff line number Diff line change
@@ -1,149 +1,92 @@
import asyncio
import errno
import os
import posixpath
import shutil
from asyncio import get_running_loop, iscoroutinefunction
from contextlib import asynccontextmanager
from functools import partial, wraps
from typing import Awaitable, Callable, TypeVar

import aiofile
import aiofiles.os
from aiofiles.os import wrap # type: ignore[attr-defined]
from fsspec.asyn import AbstractAsyncStreamedFile, AsyncFileSystem
from fsspec.asyn import AsyncFileSystem
from fsspec.implementations.local import LocalFileSystem

async_utime = wrap(os.utime)
async_islink = wrap(os.path.islink)
async_rmtree = wrap(shutil.rmtree)
async_copyfile = wrap(shutil.copyfile)
async_get_file = wrap(LocalFileSystem.get_file)
try:
from typing import ParamSpec
except ImportError: # pragma: no cover
from typing_extensions import ParamSpec

P = ParamSpec("P")
R = TypeVar("R")

async def copy_asyncfileobj(fsrc, fdst, length=shutil.COPY_BUFSIZE):
fsrc_read = fsrc.read
fdst_write = fdst.write
while buf := await fsrc_read(length):
await fdst_write(buf)

def wrap(func: Callable[P, R]) -> Callable[P, Awaitable[R]]:
@wraps(func)
async def run(*args: P.args, **kwargs: P.kwargs) -> R:
loop = get_running_loop()
pfunc = partial(func, *args, **kwargs)
return await loop.run_in_executor(None, pfunc)

# pylint: disable=abstract-method
return run


class AsyncLocalFileSystem(AsyncFileSystem, LocalFileSystem):
# temporary hack, upstream should support `mirror_sync_methods` instead.
async_impl = False
mirror_sync_methods = False

_cat_file = wrap(LocalFileSystem.cat_file)
_chmod = wrap(LocalFileSystem.chmod)
_cp_file = wrap(LocalFileSystem.cp_file)
_created = wrap(LocalFileSystem.created)
_find_async = wrap(LocalFileSystem.find)
_get_file_async = wrap(LocalFileSystem.get_file)
_info = wrap(LocalFileSystem.info)
_lexists = wrap(LocalFileSystem.lexists)
_ls = wrap(LocalFileSystem.ls)
_makedirs = wrap(LocalFileSystem.makedirs)
_mkdir = wrap(LocalFileSystem.mkdir)
_modified = wrap(LocalFileSystem.modified)
_mv_file = wrap(LocalFileSystem.mv_file)
_pipe_file = wrap(LocalFileSystem.pipe_file)
_put_file = wrap(LocalFileSystem.put_file)
_rm_file = wrap(LocalFileSystem.rm_file)
_rmdir = wrap(LocalFileSystem.rmdir)

async def _ls(self, path, detail=True, **kwargs):
path = self._strip_protocol(path)
if detail:
with await aiofiles.os.scandir(path) as entries:
return [await self._info(f) for f in entries]
return [
posixpath.join(path, f) for f in await aiofiles.os.listdir(path)
]

async def _mkdir(self, path, create_parents=True, **kwargs):
if create_parents:
if await self._exists(path):
raise FileExistsError(
errno.EEXIST, os.strerror(errno.EEXIST), path
)
return await self._makedirs(path, exist_ok=True)
path = self._strip_protocol(path)
await aiofiles.os.mkdir(path)

async def _cat_file(self, path, start=None, end=None, **kwargs):
async with self.open_async(path, "rb") as f:
if start is not None:
if start >= 0:
f.seek(start)
else:
f.seek(max(0, f.size + start))
if end is not None:
if end < 0:
end = f.size + end
return await f.read(end - f.tell())
return await f.read()

async def _pipe_file(self, path, value, **kwargs):
async with self.open_async(path, "wb") as f:
await f.write(value)

async def _get_file( # pylint: disable=arguments-renamed
self, path1, path2, **kwargs
):
write_method = getattr(path2, "write", None)
if not write_method:
return await self._cp_file(path1, path2, **kwargs)
if isinstance(
path2, AbstractAsyncStreamedFile
) or asyncio.iscoroutinefunction(write_method):
async with self.open_async(path1, "rb") as fsrc:
return await copy_asyncfileobj(fsrc, path2)

path1 = self._strip_protocol(path1)
return await async_get_file(self, path1, path2)

async def _cp_file(self, path1, path2, **kwargs):
path1 = self._strip_protocol(path1)
path2 = self._strip_protocol(path2)
if self.auto_mkdir:
await self._makedirs(self._parent(path2), exist_ok=True)
if await self._isfile(path1):
return await async_copyfile(path1, path2)
if await self._isdir(path1):
return await self._makedirs(path2, exist_ok=True)
raise FileNotFoundError(errno.ENOENT, os.strerror(errno.ENOENT), path1)

_put_file = _cp_file

async def _rm(
self, path, recursive=False, batch_size=None, maxdepth=None, **kwargs
):
if isinstance(path, (str, os.PathLike)):
_touch = wrap(LocalFileSystem.touch)
sign = LocalFileSystem.sign

async def _get_file(
self, src, dst, **kwargs
): # pylint: disable=arguments-renamed
if not iscoroutinefunction(getattr(dst, "write", None)):
src = self._strip_protocol(src)
return await self._get_file_async(src, dst)

async with self.open_async(src, "rb") as fsrc:
while True:
buf = await fsrc.read(length=shutil.COPY_BUFSIZE)
if not buf:
break
await dst.write(buf)

def rm(self, path, recursive=False, maxdepth=None):
if isinstance(path, os.PathLike):
path = [path]
super().rm(path, recursive=recursive, maxdepth=maxdepth)

assert not maxdepth and not batch_size
for p in path:
p = self._strip_protocol(p)
if recursive and await self._isdir(p):
if os.path.abspath(p) == os.getcwd():
raise ValueError("Cannot delete current working directory")
await async_rmtree(p)
else:
await aiofiles.os.remove(p)

async def _link(self, src, dst):
def link(self, src, dst, **kwargs):
src = self._strip_protocol(src)
dst = self._strip_protocol(dst)
await aiofiles.os.link(src, dst)
os.link(src, dst, **kwargs)

async def _symlink(self, src, dst):
def symlink(self, src, dst, **kwargs):
src = self._strip_protocol(src)
dst = self._strip_protocol(dst)
await aiofiles.os.symlink(src, dst)
os.symlink(src, dst, **kwargs)

async def _islink(self, path):
path = self._strip_protocol(path)
return await async_islink(path)
def islink(self, path) -> bool:
return os.path.islink(self._strip_protocol(path))

async def _touch(self, path, **kwargs):
if self.auto_mkdir:
await self._makedirs(self._parent(path), exist_ok=True)
if await self._exists(path):
path = self._strip_protocol(path)
return await async_utime(path, None)
async with self.open_async(path, "a"):
pass
_rm = wrap(rm)
_link = wrap(link)
_symlink = wrap(symlink)
_islink = wrap(islink)

@asynccontextmanager
async def open_async(self, path, mode="rb", **kwargs):
Expand Down
83 changes: 14 additions & 69 deletions tests/test_asyn_local.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from os import fspath
from pathlib import Path

import pytest
from fsspec.implementations.local import LocalFileSystem
Expand Down Expand Up @@ -86,23 +87,6 @@ def test_sync_methods(tmp_path, localfs, fs):
assert fs.lexists(tmp_path / "foo")


@pytest.mark.asyncio
async def test_mkdir(tmp_path, fs):
await fs._mkdir(tmp_path / "dir", create_parents=False)
assert await fs._isdir(tmp_path / "dir")

await fs._mkdir(tmp_path / "dir2" / "sub")
assert await fs._isdir(tmp_path / "dir2")
assert await fs._isdir(tmp_path / "dir2" / "sub")


@pytest.mark.asyncio
async def test_mkdir_twice_failed(tmp_path, fs):
await fs._mkdir(tmp_path / "dir")
with pytest.raises(FileExistsError):
await fs._mkdir(tmp_path / "dir")


@pytest.mark.asyncio
async def test_open_async(tmp_path, fs):
async with fs.open_async(tmp_path / "file", mode="wb") as f:
Expand All @@ -116,32 +100,6 @@ async def test_open_async(tmp_path, fs):
assert await f.read() == b"contents"


@pytest.mark.asyncio
async def test_pipe_cat_put(tmp_path, fs):
value = b"foo" * 1000
await fs._pipe_file(tmp_path / "foo", value)
assert await fs._cat_file(tmp_path / "foo") == value
assert await fs._cat_file(tmp_path / "foo", start=100) == value[100:]
assert (
await fs._cat_file(tmp_path / "foo", start=100, end=1000)
== value[100:1000]
)

await fs._put_file(tmp_path / "foo", tmp_path / "bar")
assert await fs._isfile(tmp_path / "bar")


@pytest.mark.asyncio
async def test_cp_file(tmp_path, fs):
await fs._mkdir(tmp_path / "dir")
await fs._cp_file(tmp_path / "dir", tmp_path / "dir2")
assert await fs._isdir(tmp_path / "dir")

await fs._pipe_file(tmp_path / "foo", b"foo")
assert await fs._cp_file(tmp_path / "foo", tmp_path / "bar")
assert await fs._cat_file(tmp_path / "bar") == b"foo"


@pytest.mark.asyncio
async def test_get_file(tmp_path, fs):
await fs._pipe_file(tmp_path / "foo", b"foo")
Expand All @@ -162,33 +120,18 @@ async def test_get_file(tmp_path, fs):
assert await fs._cat_file(tmp_path / "file3") == b"foo"


@pytest.mark.parametrize("transform", [Path, fspath])
@pytest.mark.asyncio
async def test_rm(tmp_path, fs):
async def test_rm(tmp_path, fs, transform):
await fs._pipe_file(tmp_path / "foo", b"foo")
await fs._pipe_file(tmp_path / "bar", b"bar")
await fs._mkdir(tmp_path / "dir")
await fs._pipe_file(tmp_path / "dir" / "file", b"file")

await fs._rm_file(tmp_path / "foo")
await fs._rm(transform(tmp_path / "foo"))
assert not await fs._exists(tmp_path / "foo")

await fs._rm(tmp_path / "bar")
assert not await fs._exists(tmp_path / "bar")

with pytest.raises((IsADirectoryError, PermissionError)):
await fs._rm(tmp_path / "dir")

await fs._rm(tmp_path / "dir", recursive=True)
await fs._mkdir(tmp_path / "dir")
await fs._rm(transform(tmp_path / "dir"), recursive=True)
assert not await fs._exists(tmp_path / "dir")


@pytest.mark.asyncio
async def test_try_rm_recursive_cwd(tmp_path, monkeypatch, fs):
monkeypatch.chdir(tmp_path)
with pytest.raises(ValueError):
await fs._rm(tmp_path, recursive=True)


@pytest.mark.asyncio
async def test_link(tmp_path, fs):
fs.pipe_file(tmp_path / "foo", b"foo")
Expand All @@ -204,9 +147,11 @@ async def test_symlink(tmp_path, fs):


@pytest.mark.asyncio
async def test_touch(tmp_path, fs):
await fs._touch(tmp_path / "file")
assert await fs._exists(tmp_path / "file")
created = await fs._created(tmp_path / "file")
await fs._touch(tmp_path / "file")
assert await fs._modified(tmp_path / "file") >= created
async def test_auto_mkdir_on_open_async(tmp_path):
fs = AsyncLocalFileSystem(auto_mkdir=True)
async with fs.open_async(tmp_path / "dir" / "file", mode="wb") as f:
await f.write(b"contents")

assert await fs._isdir(tmp_path / "dir")
assert await fs._isfile(tmp_path / "dir" / "file")
assert await fs._cat_file(tmp_path / "dir" / "file") == b"contents"

0 comments on commit b45144a

Please sign in to comment.