Skip to content

Commit

Permalink
Merge pull request #12 from iterative/refactor
Browse files Browse the repository at this point in the history
refactor: use sync methods in thread from LocalFileSystem in async methods
  • Loading branch information
skshetry authored Oct 3, 2022
2 parents fe9e3ff + 7847fd3 commit 33da6c4
Show file tree
Hide file tree
Showing 3 changed files with 72 additions and 183 deletions.
2 changes: 1 addition & 1 deletion setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ install_requires=
memfs =
pygtrie>=2.3.2
asynclocalfs =
aiofiles==22.1.0
typing_extensions>=3.10.0; python_version < '3.10'
aiofile==3.8.1
all =
%(memfs)s
Expand Down
170 changes: 57 additions & 113 deletions src/morefs/asyn_local.py
Original file line number Diff line number Diff line change
@@ -1,149 +1,93 @@
import asyncio
import errno
import os
import posixpath
import shutil
import sys
from asyncio import get_running_loop, iscoroutinefunction
from contextlib import asynccontextmanager
from functools import partial, wraps
from typing import Awaitable, Callable, TypeVar

import aiofile
import aiofiles.os
from aiofiles.os import wrap # type: ignore[attr-defined]
from fsspec.asyn import AbstractAsyncStreamedFile, AsyncFileSystem
from fsspec.asyn import AsyncFileSystem
from fsspec.implementations.local import LocalFileSystem

async_utime = wrap(os.utime)
async_islink = wrap(os.path.islink)
async_rmtree = wrap(shutil.rmtree)
async_copyfile = wrap(shutil.copyfile)
async_get_file = wrap(LocalFileSystem.get_file)
if sys.version_info < (3, 10): # pragma: no cover
from typing_extensions import ParamSpec
else: # pragma: no cover
from typing import ParamSpec

P = ParamSpec("P")
R = TypeVar("R")

async def copy_asyncfileobj(fsrc, fdst, length=shutil.COPY_BUFSIZE):
fsrc_read = fsrc.read
fdst_write = fdst.write
while buf := await fsrc_read(length):
await fdst_write(buf)

def wrap(func: Callable[P, R]) -> Callable[P, Awaitable[R]]:
@wraps(func)
async def run(*args: P.args, **kwargs: P.kwargs) -> R:
loop = get_running_loop()
pfunc = partial(func, *args, **kwargs)
return await loop.run_in_executor(None, pfunc)

# pylint: disable=abstract-method
return run


class AsyncLocalFileSystem(AsyncFileSystem, LocalFileSystem):
# temporary hack, upstream should support `mirror_sync_methods` instead.
async_impl = False
mirror_sync_methods = False

_cat_file = wrap(LocalFileSystem.cat_file)
_chmod = wrap(LocalFileSystem.chmod)
_cp_file = wrap(LocalFileSystem.cp_file)
_created = wrap(LocalFileSystem.created)
_find_async = wrap(LocalFileSystem.find)
_get_file_async = wrap(LocalFileSystem.get_file)
_info = wrap(LocalFileSystem.info)
_lexists = wrap(LocalFileSystem.lexists)
_ls = wrap(LocalFileSystem.ls)
_makedirs = wrap(LocalFileSystem.makedirs)
_mkdir = wrap(LocalFileSystem.mkdir)
_modified = wrap(LocalFileSystem.modified)
_mv_file = wrap(LocalFileSystem.mv_file)
_pipe_file = wrap(LocalFileSystem.pipe_file)
_put_file = wrap(LocalFileSystem.put_file)
_rm_file = wrap(LocalFileSystem.rm_file)
_rmdir = wrap(LocalFileSystem.rmdir)

async def _ls(self, path, detail=True, **kwargs):
path = self._strip_protocol(path)
if detail:
with await aiofiles.os.scandir(path) as entries:
return [await self._info(f) for f in entries]
return [
posixpath.join(path, f) for f in await aiofiles.os.listdir(path)
]

async def _mkdir(self, path, create_parents=True, **kwargs):
if create_parents:
if await self._exists(path):
raise FileExistsError(
errno.EEXIST, os.strerror(errno.EEXIST), path
)
return await self._makedirs(path, exist_ok=True)
path = self._strip_protocol(path)
await aiofiles.os.mkdir(path)

async def _cat_file(self, path, start=None, end=None, **kwargs):
async with self.open_async(path, "rb") as f:
if start is not None:
if start >= 0:
f.seek(start)
else:
f.seek(max(0, f.size + start))
if end is not None:
if end < 0:
end = f.size + end
return await f.read(end - f.tell())
return await f.read()

async def _pipe_file(self, path, value, **kwargs):
async with self.open_async(path, "wb") as f:
await f.write(value)

async def _get_file( # pylint: disable=arguments-renamed
self, path1, path2, **kwargs
):
write_method = getattr(path2, "write", None)
if not write_method:
return await self._cp_file(path1, path2, **kwargs)
if isinstance(
path2, AbstractAsyncStreamedFile
) or asyncio.iscoroutinefunction(write_method):
async with self.open_async(path1, "rb") as fsrc:
return await copy_asyncfileobj(fsrc, path2)

path1 = self._strip_protocol(path1)
return await async_get_file(self, path1, path2)

async def _cp_file(self, path1, path2, **kwargs):
path1 = self._strip_protocol(path1)
path2 = self._strip_protocol(path2)
if self.auto_mkdir:
await self._makedirs(self._parent(path2), exist_ok=True)
if await self._isfile(path1):
return await async_copyfile(path1, path2)
if await self._isdir(path1):
return await self._makedirs(path2, exist_ok=True)
raise FileNotFoundError(errno.ENOENT, os.strerror(errno.ENOENT), path1)

_put_file = _cp_file

async def _rm(
self, path, recursive=False, batch_size=None, maxdepth=None, **kwargs
):
if isinstance(path, (str, os.PathLike)):
_touch = wrap(LocalFileSystem.touch)
sign = LocalFileSystem.sign

async def _get_file(
self, src, dst, **kwargs
): # pylint: disable=arguments-renamed
if not iscoroutinefunction(getattr(dst, "write", None)):
src = self._strip_protocol(src)
return await self._get_file_async(src, dst)

async with self.open_async(src, "rb") as fsrc:
while True:
buf = await fsrc.read(length=shutil.COPY_BUFSIZE)
if not buf:
break
await dst.write(buf)

def rm(self, path, recursive=False, maxdepth=None):
if isinstance(path, os.PathLike):
path = [path]
super().rm(path, recursive=recursive, maxdepth=maxdepth)

assert not maxdepth and not batch_size
for p in path:
p = self._strip_protocol(p)
if recursive and await self._isdir(p):
if os.path.abspath(p) == os.getcwd():
raise ValueError("Cannot delete current working directory")
await async_rmtree(p)
else:
await aiofiles.os.remove(p)

async def _link(self, src, dst):
def link(self, src, dst, **kwargs):
src = self._strip_protocol(src)
dst = self._strip_protocol(dst)
await aiofiles.os.link(src, dst)
os.link(src, dst, **kwargs)

async def _symlink(self, src, dst):
def symlink(self, src, dst, **kwargs):
src = self._strip_protocol(src)
dst = self._strip_protocol(dst)
await aiofiles.os.symlink(src, dst)
os.symlink(src, dst, **kwargs)

async def _islink(self, path):
path = self._strip_protocol(path)
return await async_islink(path)
def islink(self, path) -> bool:
return os.path.islink(self._strip_protocol(path))

async def _touch(self, path, **kwargs):
if self.auto_mkdir:
await self._makedirs(self._parent(path), exist_ok=True)
if await self._exists(path):
path = self._strip_protocol(path)
return await async_utime(path, None)
async with self.open_async(path, "a"):
pass
_rm = wrap(rm)
_link = wrap(link)
_symlink = wrap(symlink)
_islink = wrap(islink)

@asynccontextmanager
async def open_async(self, path, mode="rb", **kwargs):
Expand Down
83 changes: 14 additions & 69 deletions tests/test_asyn_local.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from os import fspath
from pathlib import Path

import pytest
from fsspec.implementations.local import LocalFileSystem
Expand Down Expand Up @@ -86,23 +87,6 @@ def test_sync_methods(tmp_path, localfs, fs):
assert fs.lexists(tmp_path / "foo")


@pytest.mark.asyncio
async def test_mkdir(tmp_path, fs):
await fs._mkdir(tmp_path / "dir", create_parents=False)
assert await fs._isdir(tmp_path / "dir")

await fs._mkdir(tmp_path / "dir2" / "sub")
assert await fs._isdir(tmp_path / "dir2")
assert await fs._isdir(tmp_path / "dir2" / "sub")


@pytest.mark.asyncio
async def test_mkdir_twice_failed(tmp_path, fs):
await fs._mkdir(tmp_path / "dir")
with pytest.raises(FileExistsError):
await fs._mkdir(tmp_path / "dir")


@pytest.mark.asyncio
async def test_open_async(tmp_path, fs):
async with fs.open_async(tmp_path / "file", mode="wb") as f:
Expand All @@ -116,32 +100,6 @@ async def test_open_async(tmp_path, fs):
assert await f.read() == b"contents"


@pytest.mark.asyncio
async def test_pipe_cat_put(tmp_path, fs):
value = b"foo" * 1000
await fs._pipe_file(tmp_path / "foo", value)
assert await fs._cat_file(tmp_path / "foo") == value
assert await fs._cat_file(tmp_path / "foo", start=100) == value[100:]
assert (
await fs._cat_file(tmp_path / "foo", start=100, end=1000)
== value[100:1000]
)

await fs._put_file(tmp_path / "foo", tmp_path / "bar")
assert await fs._isfile(tmp_path / "bar")


@pytest.mark.asyncio
async def test_cp_file(tmp_path, fs):
await fs._mkdir(tmp_path / "dir")
await fs._cp_file(tmp_path / "dir", tmp_path / "dir2")
assert await fs._isdir(tmp_path / "dir")

await fs._pipe_file(tmp_path / "foo", b"foo")
assert await fs._cp_file(tmp_path / "foo", tmp_path / "bar")
assert await fs._cat_file(tmp_path / "bar") == b"foo"


@pytest.mark.asyncio
async def test_get_file(tmp_path, fs):
await fs._pipe_file(tmp_path / "foo", b"foo")
Expand All @@ -162,33 +120,18 @@ async def test_get_file(tmp_path, fs):
assert await fs._cat_file(tmp_path / "file3") == b"foo"


@pytest.mark.parametrize("transform", [Path, fspath])
@pytest.mark.asyncio
async def test_rm(tmp_path, fs):
async def test_rm(tmp_path, fs, transform):
await fs._pipe_file(tmp_path / "foo", b"foo")
await fs._pipe_file(tmp_path / "bar", b"bar")
await fs._mkdir(tmp_path / "dir")
await fs._pipe_file(tmp_path / "dir" / "file", b"file")

await fs._rm_file(tmp_path / "foo")
await fs._rm(transform(tmp_path / "foo"))
assert not await fs._exists(tmp_path / "foo")

await fs._rm(tmp_path / "bar")
assert not await fs._exists(tmp_path / "bar")

with pytest.raises((IsADirectoryError, PermissionError)):
await fs._rm(tmp_path / "dir")

await fs._rm(tmp_path / "dir", recursive=True)
await fs._mkdir(tmp_path / "dir")
await fs._rm(transform(tmp_path / "dir"), recursive=True)
assert not await fs._exists(tmp_path / "dir")


@pytest.mark.asyncio
async def test_try_rm_recursive_cwd(tmp_path, monkeypatch, fs):
monkeypatch.chdir(tmp_path)
with pytest.raises(ValueError):
await fs._rm(tmp_path, recursive=True)


@pytest.mark.asyncio
async def test_link(tmp_path, fs):
fs.pipe_file(tmp_path / "foo", b"foo")
Expand All @@ -204,9 +147,11 @@ async def test_symlink(tmp_path, fs):


@pytest.mark.asyncio
async def test_touch(tmp_path, fs):
await fs._touch(tmp_path / "file")
assert await fs._exists(tmp_path / "file")
created = await fs._created(tmp_path / "file")
await fs._touch(tmp_path / "file")
assert await fs._modified(tmp_path / "file") >= created
async def test_auto_mkdir_on_open_async(tmp_path):
fs = AsyncLocalFileSystem(auto_mkdir=True)
async with fs.open_async(tmp_path / "dir" / "file", mode="wb") as f:
await f.write(b"contents")

assert await fs._isdir(tmp_path / "dir")
assert await fs._isfile(tmp_path / "dir" / "file")
assert await fs._cat_file(tmp_path / "dir" / "file") == b"contents"

0 comments on commit 33da6c4

Please sign in to comment.