Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add async friendly upload #626

Merged
merged 28 commits into from
Jun 5, 2023
Merged
Show file tree
Hide file tree
Changes from 27 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
8708194
calling pkgstor.add_package as async method
ivergara Apr 18, 2023
0d40569
local packagestore, internally still blocking
ivergara Apr 18, 2023
6f543ad
Reverting async declaration, test for add_package
ivergara Apr 21, 2023
ba42feb
refactoring closure into helper function
ivergara Apr 21, 2023
f609cac
async definition with blocking implementation
ivergara Apr 21, 2023
36c5db4
making local async implementation work
ivergara Apr 21, 2023
364f8f1
Using a separate async add_package version.
ivergara Apr 22, 2023
b0aa143
Fix test implementation
ivergara Apr 22, 2023
1fc3e22
Adding sync test function for add_package.
ivergara Apr 22, 2023
3612703
Fixing issues with linting.
ivergara Apr 22, 2023
ae9191a
removeasync keywords in old methods
ivergara Apr 24, 2023
c673214
Merge branch 'main' into async_friendly_upload
ivergara May 5, 2023
b1248f3
Implementing async upload using aioshutil for all backends.
ivergara May 23, 2023
8121350
calling pkgstor.add_package as async method
ivergara Apr 18, 2023
4ddd0a1
local packagestore, internally still blocking
ivergara Apr 18, 2023
9578f7d
Reverting async declaration, test for add_package
ivergara Apr 21, 2023
6d984ac
refactoring closure into helper function
ivergara Apr 21, 2023
09f1ea0
async definition with blocking implementation
ivergara Apr 21, 2023
d688512
making local async implementation work
ivergara Apr 21, 2023
f1daaf2
Using a separate async add_package version.
ivergara Apr 22, 2023
942d2dd
Fix test implementation
ivergara Apr 22, 2023
8f1b6ff
Adding sync test function for add_package.
ivergara Apr 22, 2023
3c50f74
Fixing issues with linting.
ivergara Apr 22, 2023
b6d2198
removeasync keywords in old methods
ivergara Apr 24, 2023
3d96198
Implementing async upload using aioshutil for all backends.
ivergara May 23, 2023
596f76b
Merge branch 'async_friendly_upload' of https://github.com/ivergara/q…
ivergara Jun 1, 2023
7738529
Adding missing await keyword 🙈
ivergara Jun 1, 2023
cb09721
Add async argument in stores constructors.
ivergara Jun 1, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ repos:
- types-six
- types-toml
- types-ujson
- types-aiofiles
args: [--show-error-codes]
- repo: https://github.com/Quantco/pre-commit-mirrors-prettier
rev: 2.7.1
Expand Down
1 change: 1 addition & 0 deletions environment.yml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ dependencies:
- tenacity
- xattr
- aiofiles
- aioshutil
- pyyaml
- ujson
- prometheus_client
Expand Down
2 changes: 1 addition & 1 deletion quetz/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -1388,7 +1388,7 @@ async def post_upload(
dest = os.path.join(condainfo.info["subdir"], filename)

body.seek(0)
pkgstore.add_package(body, channel_name, dest)
await pkgstore.add_package_async(body, channel_name, dest)

package_name = str(condainfo.info.get("name"))
package_data = rest_models.Package(
Expand Down
43 changes: 43 additions & 0 deletions quetz/pkgstores.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
from threading import Lock
from typing import IO, List, Tuple, Union

import aiofiles
import aioshutil
import fsspec
from tenacity import retry
from tenacity.retry import retry_if_exception_type
Expand Down Expand Up @@ -80,6 +82,10 @@ def list_files(self, channel: str) -> List[str]:
def url(self, channel: str, src: str, expires: int = 0) -> str:
pass

@abc.abstractmethod
def add_package_async(self, package: File, channel: str, destination: str):
pass

@abc.abstractmethod
def add_package(self, package: File, channel: str, destination: str):
pass
Expand Down Expand Up @@ -175,6 +181,15 @@ def add_package(self, package: File, channel: str, destination: str) -> None:
with self._atomic_open(channel, destination) as f:
shutil.copyfileobj(package, f)

async def add_package_async(
self, package: File, channel: str, destination: str
) -> None:
full_path = path.join(self.channels_dir, channel, destination)
self.fs.makedirs(path.dirname(full_path), exist_ok=True)

async with aiofiles.open(full_path, 'wb') as f:
await f.write(package.read())

def add_file(
self, data: Union[str, bytes], channel: str, destination: StrPath
) -> None:
Expand Down Expand Up @@ -333,6 +348,15 @@ def add_package(self, package: File, channel: str, destination: str) -> None:
# use a chunk size of 10 Megabytes
shutil.copyfileobj(package, pkg, 10 * 1024 * 1024)

async def add_package_async(
self, package: File, channel: str, destination: str
) -> None:
with self._get_fs() as fs:
bucket = self._bucket_map(channel)
with fs.open(path.join(bucket, destination), "wb", acl="private") as pkg:
# use a chunk size of 10 Megabytes
await aioshutil.copyfileobj(package, pkg, 10 * 1024 * 1024)

def add_file(
self, data: Union[str, bytes], channel: str, destination: StrPath
) -> None:
Expand Down Expand Up @@ -472,6 +496,15 @@ def add_package(self, package: File, channel: str, destination: str) -> None:
# use a chunk size of 10 Megabytes
shutil.copyfileobj(package, pkg, 10 * 1024 * 1024)

async def add_package_async(
self, package: File, channel: str, destination: str
) -> None:
with self._get_fs() as fs:
container = self._container_map(channel)
with fs.open(path.join(container, destination), "wb") as pkg:
# use a chunk size of 10 Megabytes
await aioshutil.copyfileobj(package, pkg, 10 * 1024 * 1024)

def add_file(
self, data: Union[str, bytes], channel: str, destination: StrPath
) -> None:
Expand Down Expand Up @@ -571,6 +604,7 @@ def __init__(self, config):
token=self.token if self.token else None,
cache_timeout=self.cache_timeout,
default_location=self.region,
asynchronous=True,
ivergara marked this conversation as resolved.
Show resolved Hide resolved
)

self.bucket_prefix = config['bucket_prefix']
Expand Down Expand Up @@ -621,6 +655,15 @@ def add_package(self, package: File, channel: str, destination: str) -> None:
# use a chunk size of 10 Megabytes
shutil.copyfileobj(package, pkg, 10 * 1024 * 1024)

async def add_package_async(
self, package: File, channel: str, destination: str
) -> None:
with self._get_fs() as fs:
container = self._bucket_map(channel)
with fs.open(path.join(container, destination), "wb") as pkg:
# use a chunk size of 10 Megabytes
await aioshutil.copyfileobj(package, pkg, 10 * 1024 * 1024)

def add_file(
self, data: Union[str, bytes], channel: str, destination: StrPath
) -> None:
Expand Down
78 changes: 49 additions & 29 deletions quetz/tests/test_pkg_stores.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,17 @@
import shutil
import time
import uuid
from collections.abc import Collection
from io import BytesIO
from pathlib import Path

import pytest

from quetz.pkgstores import (
AzureBlobStore,
GoogleCloudStorageStore,
LocalStore,
PackageStore,
S3Store,
has_xattr,
)
Expand Down Expand Up @@ -180,57 +184,73 @@ def channel(any_store, channel_name):
any_store.remove_channel(channel_name)


def test_store_add_list_files(any_store, channel, channel_name):
def assert_files(expected_files, n_retries=3):
n_retries = 3

files = []
for i in range(n_retries):
files = pkg_store.list_files(channel_name)
try:
assert files == expected_files
except AssertionError:
continue
break
assert files == expected_files
def assert_files(
pkg_store: PackageStore,
channel_name: str,
expected_files: Collection[str],
n_retries: int = 3,
):
"""
Asserts that the files in the package store match the expected files with retry.
"""
for _ in range(n_retries):
files = pkg_store.list_files(channel_name)
try:
assert files == expected_files
except AssertionError:
continue
else:
assert set(files) == set(expected_files)


def test_store_add_list_files(any_store, channel, channel_name):
pkg_store = any_store

pkg_store.add_file("content", channel_name, "test.txt")
pkg_store.add_file("content", channel_name, "test_2.txt")

assert_files(["test.txt", "test_2.txt"])
assert_files(pkg_store, channel_name, ["test.txt", "test_2.txt"])

pkg_store.delete_file(channel_name, "test.txt")

assert_files(["test_2.txt"])
assert_files(pkg_store, channel_name, ["test_2.txt"])

metadata = pkg_store.get_filemetadata(channel_name, "test_2.txt")
assert metadata[0] > 0
assert type(metadata[1]) is float
assert type(metadata[2]) is str


def test_move_file(any_store, channel, channel_name):
def assert_files(expected_files, n_retries=3):
n_retries = 3

files = []
for i in range(n_retries):
files = pkg_store.list_files(channel_name)
try:
assert files == expected_files
except AssertionError:
continue
break
assert files == expected_files
@pytest.mark.asyncio
async def test_add_package_async(any_store, channel, channel_name):
pkg_store = any_store

data = (Path(__file__).parent / "data" / "test-package-0.1-0.tar.bz2").read_bytes()

await pkg_store.add_package_async(
BytesIO(data), channel_name, "test-package-0.1-0.tar.gz"
)

assert_files(pkg_store, channel_name, ["test-package-0.1-0.tar.gz"])


def test_add_package(any_store, channel, channel_name):
pkg_store = any_store

data = (Path(__file__).parent / "data" / "test-package-0.1-0.tar.bz2").read_bytes()

pkg_store.add_package(BytesIO(data), channel_name, "test-package-0.1-0.tar.gz")

assert_files(pkg_store, channel_name, ["test-package-0.1-0.tar.gz"])


def test_move_file(any_store, channel, channel_name):
pkg_store = any_store

pkg_store.add_file("content", channel_name, "test.txt")
pkg_store.move_file(channel_name, "test.txt", "test_2.txt")

assert_files(['test_2.txt'])
assert_files(pkg_store, channel_name, ['test_2.txt'])


@pytest.mark.parametrize("redirect_enabled", [False, True])
Expand Down
1 change: 1 addition & 0 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ install_requires =
ujson
uvicorn
zstandard
aioshutil

[options.entry_points]
console_scripts =
Expand Down