diff --git a/kinto_http/client.py b/kinto_http/client.py index 0c93cf1..10d1cd9 100644 --- a/kinto_http/client.py +++ b/kinto_http/client.py @@ -1,4 +1,6 @@ import asyncio +import functools +import inspect import logging import uuid from collections import OrderedDict @@ -75,7 +77,7 @@ def clone(self, **kwargs): @contextmanager def batch(self, **kwargs): if self._server_settings is None: - resp, _ = self.session.request("GET", self.get_endpoint("root")) + resp, _ = self.session.request("GET", self._get_endpoint("root")) self._server_settings = resp["settings"] batch_max_requests = self._server_settings["batch_max_requests"] @@ -92,6 +94,9 @@ def batch(self, **kwargs): batch_session.reset() def get_endpoint(self, name, *, bucket=None, group=None, collection=None, id=None) -> str: + return self._get_endpoint(name, bucket=bucket, group=group, collection=collection, id=id) + + def _get_endpoint(self, name, *, bucket=None, group=None, collection=None, id=None) -> str: """Return the endpoint with named parameters.""" kwargs = { "bucket": bucket or self.bucket_name, @@ -228,7 +233,7 @@ def _delete_if_exists(self, resource, **kwargs): @retry_timeout def server_info(self) -> Dict: - endpoint = self.get_endpoint("root") + endpoint = self._get_endpoint("root") resp, _ = self.session.request("get", endpoint) return resp @@ -246,7 +251,7 @@ def create_bucket( "bucket", id=id, data=data, permissions=permissions, safe=safe ) headers = DO_NOT_OVERWRITE if safe else None - endpoint = self.get_endpoint("bucket", bucket=id) + endpoint = self._get_endpoint("bucket", bucket=id) logger.info("Create bucket %r" % id or self.bucket_name) @@ -262,7 +267,7 @@ def update_bucket( if id is None and data: id = data.get("id", None) - endpoint = self.get_endpoint("bucket", bucket=id) + endpoint = self._get_endpoint("bucket", bucket=id) headers = self._get_cache_headers(safe, data, if_match) logger.info("Update bucket %r" % id or self.bucket_name) @@ -301,7 +306,7 @@ def patch_bucket( original = original or data (id, if_match) = self._extract_original_info(original, id, if_match) - endpoint = self.get_endpoint("bucket", bucket=id) + endpoint = self._get_endpoint("bucket", bucket=id) logger.info("Patch bucket %r" % (id or self.bucket_name,)) return self._patch_method( @@ -309,12 +314,12 @@ def patch_bucket( ) def get_buckets(self, **kwargs) -> List[Dict]: - endpoint = self.get_endpoint("buckets") + endpoint = self._get_endpoint("buckets") return self._paginated(endpoint, **kwargs) @retry_timeout def get_bucket(self, *, id=None, **kwargs) -> Dict: - endpoint = self.get_endpoint("bucket", bucket=id) + endpoint = self._get_endpoint("bucket", bucket=id) logger.info("Get bucket %r" % id or self.bucket_name) @@ -337,7 +342,7 @@ def get_bucket(self, *, id=None, **kwargs) -> Dict: def delete_bucket(self, *, id=None, safe=True, if_match=None, if_exists=False) -> Dict: if if_exists: return self._delete_if_exists("bucket", id=id, safe=safe, if_match=if_match) - endpoint = self.get_endpoint("bucket", bucket=id) + endpoint = self._get_endpoint("bucket", bucket=id) headers = self._get_cache_headers(safe, if_match=if_match) logger.info("Delete bucket %r" % id or self.bucket_name) @@ -347,7 +352,7 @@ def delete_bucket(self, *, id=None, safe=True, if_match=None, if_exists=False) - @retry_timeout def delete_buckets(self, *, safe=True, if_match=None) -> Dict: - endpoint = self.get_endpoint("buckets") + endpoint = self._get_endpoint("buckets") headers = self._get_cache_headers(safe, if_match=if_match) logger.info("Delete buckets") @@ -358,7 +363,7 @@ def delete_buckets(self, *, safe=True, if_match=None) -> Dict: # Groups def get_groups(self, *, bucket=None, **kwargs) -> List[Dict]: - endpoint = self.get_endpoint("groups", bucket=bucket) + endpoint = self._get_endpoint("groups", bucket=bucket) return self._paginated(endpoint, **kwargs) @retry_timeout @@ -376,7 +381,7 @@ def create_group( "group", id=id, bucket=bucket, data=data, permissions=permissions, safe=safe ) headers = DO_NOT_OVERWRITE if safe else None - endpoint = self.get_endpoint("group", bucket=bucket, group=id) + endpoint = self._get_endpoint("group", bucket=bucket, group=id) logger.info("Create group %r in bucket %r" % (id, bucket or self.bucket_name)) @@ -406,7 +411,7 @@ def update_group( if id is None: raise KeyError("Please provide a group id") - endpoint = self.get_endpoint("group", bucket=bucket, group=id) + endpoint = self._get_endpoint("group", bucket=bucket, group=id) headers = self._get_cache_headers(safe, data, if_match) logger.info("Update group %r in bucket %r" % (id, bucket or self.bucket_name)) @@ -446,7 +451,7 @@ def patch_group( original = original or data (id, if_match) = self._extract_original_info(original, id, if_match) - endpoint = self.get_endpoint("group", bucket=bucket, group=id) + endpoint = self._get_endpoint("group", bucket=bucket, group=id) logger.info("Patch group %r in bucket %r" % (id, bucket or self.bucket_name)) return self._patch_method( @@ -455,7 +460,7 @@ def patch_group( @retry_timeout def get_group(self, *, id, bucket=None) -> Dict: - endpoint = self.get_endpoint("group", bucket=bucket, group=id) + endpoint = self._get_endpoint("group", bucket=bucket, group=id) logger.info("Get group %r in bucket %r" % (id, bucket or self.bucket_name)) @@ -468,7 +473,7 @@ def delete_group(self, *, id, bucket=None, safe=True, if_match=None, if_exists=F return self._delete_if_exists( "group", id=id, bucket=bucket, safe=safe, if_match=if_match ) - endpoint = self.get_endpoint("group", bucket=bucket, group=id) + endpoint = self._get_endpoint("group", bucket=bucket, group=id) headers = self._get_cache_headers(safe, if_match=if_match) logger.info("Delete group %r in bucket %r" % (id, bucket or self.bucket_name)) @@ -478,7 +483,7 @@ def delete_group(self, *, id, bucket=None, safe=True, if_match=None, if_exists=F @retry_timeout def delete_groups(self, *, bucket=None, safe=True, if_match=None) -> Dict: - endpoint = self.get_endpoint("groups", bucket=bucket) + endpoint = self._get_endpoint("groups", bucket=bucket) headers = self._get_cache_headers(safe, if_match=if_match) logger.info("Delete groups in bucket %r" % bucket or self.bucket_name) @@ -489,7 +494,7 @@ def delete_groups(self, *, bucket=None, safe=True, if_match=None) -> Dict: # Collections def get_collections(self, *, bucket=None, **kwargs) -> List[Dict]: - endpoint = self.get_endpoint("collections", bucket=bucket) + endpoint = self._get_endpoint("collections", bucket=bucket) return self._paginated(endpoint, **kwargs) @retry_timeout @@ -505,7 +510,7 @@ def create_collection( ) headers = DO_NOT_OVERWRITE if safe else None - endpoint = self.get_endpoint("collection", bucket=bucket, collection=id) + endpoint = self._get_endpoint("collection", bucket=bucket, collection=id) logger.info( "Create collection %r in bucket %r" @@ -535,7 +540,7 @@ def update_collection( if id is None and data: id = data.get("id", None) - endpoint = self.get_endpoint("collection", bucket=bucket, collection=id) + endpoint = self._get_endpoint("collection", bucket=bucket, collection=id) headers = self._get_cache_headers(safe, data, if_match) logger.info( @@ -578,7 +583,7 @@ def patch_collection( original = original or data (id, if_match) = self._extract_original_info(original, id, if_match) - endpoint = self.get_endpoint("collection", bucket=bucket, collection=id) + endpoint = self._get_endpoint("collection", bucket=bucket, collection=id) logger.info( "Patch collection %r in bucket %r" % (id or self.collection_name, bucket or self.bucket_name) @@ -590,7 +595,7 @@ def patch_collection( @retry_timeout def get_collection(self, *, id=None, bucket=None, **kwargs) -> Dict: - endpoint = self.get_endpoint("collection", bucket=bucket, collection=id) + endpoint = self._get_endpoint("collection", bucket=bucket, collection=id) logger.info( "Get collection %r in bucket %r" @@ -614,7 +619,7 @@ def delete_collection( return self._delete_if_exists( "collection", id=id, bucket=bucket, safe=safe, if_match=if_match ) - endpoint = self.get_endpoint("collection", bucket=bucket, collection=id) + endpoint = self._get_endpoint("collection", bucket=bucket, collection=id) headers = self._get_cache_headers(safe, if_match=if_match) logger.info( @@ -627,7 +632,7 @@ def delete_collection( @retry_timeout def delete_collections(self, *, bucket=None, safe=True, if_match=None) -> Dict: - endpoint = self.get_endpoint("collections", bucket=bucket) + endpoint = self._get_endpoint("collections", bucket=bucket) headers = self._get_cache_headers(safe, if_match=if_match) logger.info("Delete collections in bucket %r" % bucket or self.bucket_name) @@ -638,7 +643,7 @@ def delete_collections(self, *, bucket=None, safe=True, if_match=None) -> Dict: # Records def get_records_timestamp(self, *, collection=None, bucket=None) -> str: - endpoint = self.get_endpoint("records", bucket=bucket, collection=collection) + endpoint = self._get_endpoint("records", bucket=bucket, collection=collection) if endpoint not in self._records_timestamp: record_resp, headers = self.session.request("head", endpoint) @@ -651,11 +656,11 @@ def get_records_timestamp(self, *, collection=None, bucket=None) -> str: @retry_timeout def get_records(self, *, collection=None, bucket=None, **kwargs) -> List[Dict]: """Returns all the records""" - endpoint = self.get_endpoint("records", bucket=bucket, collection=collection) + endpoint = self._get_endpoint("records", bucket=bucket, collection=collection) return self._paginated(endpoint, **kwargs) def get_paginated_records(self, *, collection=None, bucket=None, **kwargs) -> List[Dict]: - endpoint = self.get_endpoint("records", bucket=bucket, collection=collection) + endpoint = self._get_endpoint("records", bucket=bucket, collection=collection) return self._paginated_generator(endpoint, **kwargs) @@ -677,7 +682,7 @@ def _paginated_generator(self, endpoint, *, if_none_match=None, **kwargs): @retry_timeout def get_record(self, *, id, collection=None, bucket=None, **kwargs) -> Dict: - endpoint = self.get_endpoint("record", id=id, bucket=bucket, collection=collection) + endpoint = self._get_endpoint("record", id=id, bucket=bucket, collection=collection) logger.info( "Get record with id %r from collection %r in bucket %r" @@ -714,7 +719,7 @@ def create_record( # Make sure that no record already exists with this id. headers = DO_NOT_OVERWRITE if safe else None - endpoint = self.get_endpoint("record", id=id, bucket=bucket, collection=collection) + endpoint = self._get_endpoint("record", id=id, bucket=bucket, collection=collection) logger.info( "Create record with id %r in collection %r in bucket %r" @@ -752,7 +757,7 @@ def update_record( id = id or data.get("id") if id is None: raise KeyError("Unable to update a record, need an id.") - endpoint = self.get_endpoint("record", id=id, bucket=bucket, collection=collection) + endpoint = self._get_endpoint("record", id=id, bucket=bucket, collection=collection) headers = self._get_cache_headers(safe, data, if_match) logger.info( @@ -799,7 +804,7 @@ def patch_record( if id is None: raise KeyError("Unable to patch record, need an id.") - endpoint = self.get_endpoint("record", id=id, bucket=bucket, collection=collection) + endpoint = self._get_endpoint("record", id=id, bucket=bucket, collection=collection) logger.info( "Patch record with id %r in collection %r in bucket %r" @@ -818,7 +823,7 @@ def delete_record( return self._delete_if_exists( "record", id=id, collection=collection, bucket=bucket, safe=safe, if_match=if_match ) - endpoint = self.get_endpoint("record", id=id, bucket=bucket, collection=collection) + endpoint = self._get_endpoint("record", id=id, bucket=bucket, collection=collection) headers = self._get_cache_headers(safe, if_match=if_match) logger.info( @@ -831,7 +836,7 @@ def delete_record( @retry_timeout def delete_records(self, *, collection=None, bucket=None, safe=True, if_match=None) -> Dict: - endpoint = self.get_endpoint("records", bucket=bucket, collection=collection) + endpoint = self._get_endpoint("records", bucket=bucket, collection=collection) headers = self._get_cache_headers(safe, if_match=if_match) logger.info( @@ -844,13 +849,13 @@ def delete_records(self, *, collection=None, bucket=None, safe=True, if_match=No @retry_timeout def get_history(self, *, bucket=None, **kwargs) -> List[Dict]: - endpoint = self.get_endpoint("history", bucket=bucket) + endpoint = self._get_endpoint("history", bucket=bucket) logger.info("Get history from bucket %r" % bucket or self.bucket_name) return self._paginated(endpoint, **kwargs) @retry_timeout def purge_history(self, *, bucket=None, safe=True, if_match=None) -> List[Dict]: - endpoint = self.get_endpoint("history", bucket=bucket) + endpoint = self._get_endpoint("history", bucket=bucket) headers = self._get_cache_headers(safe, if_match=if_match) logger.info("Purge History of bucket %r" % bucket or self.bucket_name) resp, _ = self.session.request("delete", endpoint, headers=headers) @@ -858,492 +863,87 @@ def purge_history(self, *, bucket=None, safe=True, if_match=None) -> List[Dict]: def __repr__(self) -> str: if self.collection_name: - endpoint = self.get_endpoint( + endpoint = self._get_endpoint( "collection", bucket=self.bucket_name, collection=self.collection_name ) elif self.bucket_name: - endpoint = self.get_endpoint("bucket", bucket=self.bucket_name) + endpoint = self._get_endpoint("bucket", bucket=self.bucket_name) else: - endpoint = self.get_endpoint("root") + endpoint = self._get_endpoint("root") absolute_endpoint = utils.urljoin(self.session.server_url, endpoint) - return "" % absolute_endpoint - - -class AsyncClient(object): - def __init__( - self, - *, - server_url=None, - session=None, - auth=None, - bucket="default", - collection=None, - retry=0, - retry_after=None, - timeout=None, - ignore_batch_4xx=False, - headers=None, - ): - self.endpoints = Endpoints() - - session_kwargs = dict( - server_url=server_url, - auth=auth, - session=session, - retry=retry, - retry_after=retry_after, - timeout=timeout, - headers=headers, - ) - self.session = create_session(**session_kwargs) - self.bucket_name = bucket - self.collection_name = collection - self._server_settings = None - self._records_timestamp = {} - self._ignore_batch_4xx = ignore_batch_4xx - self._client = Client( - server_url=server_url, - session=session, - auth=auth, - bucket=bucket, - collection=collection, - retry=retry, - retry_after=retry_after, - timeout=timeout, - ignore_batch_4xx=ignore_batch_4xx, - headers=headers, - ) + return f"" - def clone(self, **kwargs): - if "server_url" in kwargs or "auth" in kwargs: - kwargs.setdefault("server_url", self.session.server_url) - kwargs.setdefault("auth", self.session.auth) - else: - kwargs.setdefault("session", self.session) - kwargs.setdefault("bucket", self.bucket_name) - kwargs.setdefault("collection", self.collection_name) - kwargs.setdefault("retry", self.session.nb_retry) - kwargs.setdefault("retry_after", self.session.retry_after) - return self.__class__(**kwargs) - async def server_info(self) -> Dict: +def async_wrap(func): + @functools.wraps(func) + async def wrapper(*args, **kwargs): loop = asyncio.get_event_loop() - return await loop.run_in_executor(None, lambda: self._client.server_info()) + func_partial = functools.partial(func, *args, **kwargs) + return await loop.run_in_executor(None, func_partial) - async def get_bucket(self, *, id=None, **kwargs) -> Dict: - loop = asyncio.get_event_loop() - return await loop.run_in_executor(None, lambda: self._client.get_bucket(id=id, **kwargs)) + return wrapper - async def get_buckets(self, **kwargs) -> List[Dict]: - loop = asyncio.get_event_loop() - return await loop.run_in_executor(None, lambda: self._client.get_buckets(**kwargs)) - async def create_bucket( - self, *, id=None, data=None, permissions=None, safe=True, if_not_exists=False - ) -> Dict: - loop = asyncio.get_event_loop() - return await loop.run_in_executor( - None, - lambda: self._client.create_bucket( - id=id, data=data, permissions=permissions, safe=safe, if_not_exists=if_not_exists - ), - ) +def async_client(cls): + for name, method in inspect.getmembers(cls, inspect.isfunction): + if not (name.startswith("_") or name == "clone"): + setattr(cls, name, async_wrap(method)) + return cls - async def update_bucket( - self, *, id=None, data=None, permissions=None, safe=True, if_match=None - ) -> Dict: - loop = asyncio.get_event_loop() - return await loop.run_in_executor( - None, - lambda: self._client.update_bucket( - id=id, data=data, permissions=permissions, safe=safe, if_match=if_match - ), - ) - async def patch_bucket( - self, - *, - id=None, - changes=None, - data=None, - original=None, - permissions=None, - safe=True, - if_match=None, - ) -> Dict: - loop = asyncio.get_event_loop() - return await loop.run_in_executor( - None, - lambda: self._client.patch_bucket( - id=id, - changes=changes, - data=data, - original=original, - permissions=permissions, - safe=safe, - if_match=if_match, - ), - ) +@async_client +class AsyncClient(Client): + """Wraps most public methods of the Client in an event_loop.run_in_executor + call to make them async-compatible - async def delete_bucket(self, *, id=None, safe=True, if_match=None, if_exists=False) -> Dict: - loop = asyncio.get_event_loop() - return await loop.run_in_executor( - None, - lambda: self._client.delete_bucket( - id=id, safe=safe, if_match=if_match, if_exists=if_exists - ), - ) - - async def delete_buckets(self, *, safe=True, if_match=None) -> Dict: - loop = asyncio.get_event_loop() - return await loop.run_in_executor( - None, lambda: self._client.delete_buckets(safe=safe, if_match=if_match) - ) - - async def get_group(self, *, id, bucket=None) -> Dict: - loop = asyncio.get_event_loop() - return await loop.run_in_executor( - None, lambda: self._client.get_group(id=id, bucket=bucket) - ) - - async def get_groups(self, *, bucket=None, **kwargs) -> List[Dict]: - loop = asyncio.get_event_loop() - return await loop.run_in_executor( - None, lambda: self._client.get_groups(bucket=bucket, **kwargs) - ) - - async def create_group( - self, *, id=None, bucket=None, data=None, permissions=None, safe=True, if_not_exists=False - ) -> Dict: - loop = asyncio.get_event_loop() - return await loop.run_in_executor( - None, - lambda: self._client.create_group( - id=id, - bucket=bucket, - data=data, - permissions=permissions, - safe=safe, - if_not_exists=if_not_exists, - ), - ) + Note: One limitation of this approach is that all methods that aren't + wrapped by the `async_client` need to be awaited. This means that public + methods can't call other public methods from within the class. We had to + patch `_create_if_not_exists` and `_delete_if_exists` below that reason. + """ - async def update_group( - self, *, id=None, bucket=None, data=None, permissions=None, safe=True, if_match=None - ) -> Dict: - loop = asyncio.get_event_loop() - return await loop.run_in_executor( - None, - lambda: self._client.update_group( - id=id, - bucket=bucket, - data=data, - permissions=permissions, - safe=safe, - if_match=if_match, - ), - ) - - async def patch_group( - self, - *, - id=None, - bucket=None, - changes=None, - data=None, - original=None, - permissions=None, - safe=True, - if_match=None, - ) -> Dict: - loop = asyncio.get_event_loop() - return await loop.run_in_executor( - None, - lambda: self._client.patch_group( - id=id, - bucket=bucket, - changes=changes, - data=data, - original=original, - permissions=permissions, - safe=safe, - if_match=if_match, - ), - ) - - async def delete_group( - self, *, id, bucket=None, safe=True, if_match=None, if_exists=False - ) -> Dict: - loop = asyncio.get_event_loop() - return await loop.run_in_executor( - None, - lambda: self._client.delete_group( - id=id, bucket=bucket, safe=safe, if_match=if_match, if_exists=if_exists - ), - ) - - async def delete_groups(self, *, bucket=None, safe=True, if_match=None) -> Dict: - loop = asyncio.get_event_loop() - return await loop.run_in_executor( - None, lambda: self._client.delete_groups(bucket=bucket, safe=safe, if_match=if_match) - ) - - async def get_collection(self, *, id=None, bucket=None, **kwargs) -> Dict: - loop = asyncio.get_event_loop() - return await loop.run_in_executor( - None, lambda: self._client.get_collection(id=id, bucket=bucket, **kwargs) - ) - - async def get_collections(self, *, bucket=None, **kwargs) -> List[Dict]: - loop = asyncio.get_event_loop() - return await loop.run_in_executor( - None, lambda: self._client.get_collections(bucket=bucket, **kwargs) - ) - - async def create_collection( - self, *, id=None, bucket=None, data=None, permissions=None, safe=True, if_not_exists=False - ) -> Dict: - loop = asyncio.get_event_loop() - return await loop.run_in_executor( - None, - lambda: self._client.create_collection( - id=id, - bucket=bucket, - data=data, - permissions=permissions, - safe=safe, - if_not_exists=if_not_exists, - ), - ) - - async def update_collection( - self, *, id=None, bucket=None, data=None, permissions=None, safe=True, if_match=None - ) -> Dict: - loop = asyncio.get_event_loop() - return await loop.run_in_executor( - None, - lambda: self._client.update_collection( - id=id, - bucket=bucket, - data=data, - permissions=permissions, - safe=safe, - if_match=if_match, - ), - ) - - async def patch_collection( - self, - *, - id=None, - bucket=None, - changes=None, - data=None, - original=None, - permissions=None, - safe=True, - if_match=None, - ) -> Dict: - loop = asyncio.get_event_loop() - return await loop.run_in_executor( - None, - lambda: self._client.patch_collection( - id=id, - bucket=bucket, - changes=changes, - data=data, - original=original, - permissions=permissions, - safe=safe, - if_match=if_match, - ), - ) - - async def delete_collection( - self, *, id=None, bucket=None, safe=True, if_match=None, if_exists=False - ) -> Dict: - loop = asyncio.get_event_loop() - return await loop.run_in_executor( - None, - lambda: self._client.delete_collection( - id=id, bucket=bucket, safe=safe, if_match=if_match, if_exists=if_exists - ), - ) - - async def delete_collections(self, *, bucket=None, safe=True, if_match=None) -> Dict: - loop = asyncio.get_event_loop() - return await loop.run_in_executor( - None, - lambda: self._client.delete_collections(bucket=bucket, safe=safe, if_match=if_match), - ) - - async def get_record(self, *, id, collection=None, bucket=None, **kwargs) -> Dict: - loop = asyncio.get_event_loop() - return await loop.run_in_executor( - None, - lambda: self._client.get_record(id=id, collection=collection, bucket=bucket, **kwargs), - ) - - async def get_records(self, *, collection=None, bucket=None, **kwargs) -> List[Dict]: - loop = asyncio.get_event_loop() - return await loop.run_in_executor( - None, lambda: self._client.get_records(collection=collection, bucket=bucket, **kwargs) - ) - - async def get_paginated_records(self, *, collection=None, bucket=None, **kwargs) -> List[Dict]: - loop = asyncio.get_event_loop() - return await loop.run_in_executor( - None, - lambda: self._client.get_paginated_records( - collection=collection, bucket=bucket, **kwargs - ), - ) - - async def get_records_timestamp(self, *, collection=None, bucket=None) -> str: - loop = asyncio.get_event_loop() - return await loop.run_in_executor( - None, lambda: self._client.get_records_timestamp(collection=collection, bucket=bucket) - ) - - async def create_record( - self, - *, - id=None, - bucket=None, - collection=None, - data=None, - permissions=None, - safe=True, - if_not_exists=False, - ) -> Dict: - loop = asyncio.get_event_loop() - return await loop.run_in_executor( - None, - lambda: self._client.create_record( - id=id, - bucket=bucket, - collection=collection, - data=data, - permissions=permissions, - safe=safe, - if_not_exists=if_not_exists, - ), - ) - - async def update_record( - self, - *, - id=None, - collection=None, - bucket=None, - data=None, - permissions=None, - safe=True, - if_match=None, - ) -> Dict: - loop = asyncio.get_event_loop() - return await loop.run_in_executor( - None, - lambda: self._client.update_record( - id=id, - collection=collection, - bucket=bucket, - data=data, - permissions=permissions, - safe=safe, - if_match=if_match, - ), - ) - - async def patch_record( - self, - *, - id=None, - collection=None, - bucket=None, - changes=None, - data=None, - original=None, - permissions=None, - safe=True, - if_match=None, - ) -> Dict: - loop = asyncio.get_event_loop() - return await loop.run_in_executor( - None, - lambda: self._client.patch_record( - id=id, - collection=collection, - bucket=bucket, - changes=changes, - data=data, - original=original, - permissions=permissions, - safe=safe, - if_match=if_match, - ), - ) - - async def delete_record( - self, *, id, collection=None, bucket=None, safe=True, if_match=None, if_exists=False - ) -> Dict: - loop = asyncio.get_event_loop() - return await loop.run_in_executor( - None, - lambda: self._client.delete_record( - id=id, - collection=collection, - bucket=bucket, - safe=safe, - if_match=if_match, - if_exists=if_exists, - ), - ) - - async def delete_records( - self, *, collection=None, bucket=None, safe=True, if_match=None - ) -> Dict: - loop = asyncio.get_event_loop() - return await loop.run_in_executor( - None, - lambda: self._client.delete_records( - collection=collection, bucket=bucket, safe=safe, if_match=if_match - ), - ) - - async def get_history(self, *, bucket=None, **kwargs) -> List[Dict]: - loop = asyncio.get_event_loop() - return await loop.run_in_executor( - None, lambda: self._client.get_history(bucket=bucket, **kwargs) - ) + # have to redefine this because of the use of getattr. We want to make sure + # that we get the synchronous version of the create_ or get_ method + def _create_if_not_exists(self, resource, **kwargs): + try: + create_method = getattr(super(), "create_%s" % resource) + return create_method(**kwargs) + except KintoException as e: + if not hasattr(e, "response") or e.response.status_code != 412: + raise e + # The exception contains the existing record in details.existing + # but it's not enough as we also need to return the permissions. + get_kwargs = {"id": kwargs["id"]} + if resource in ("group", "collection", "record"): + get_kwargs["bucket"] = kwargs["bucket"] - async def purge_history(self, *, bucket=None, safe=True, if_match=None) -> List[Dict]: - loop = asyncio.get_event_loop() - return await loop.run_in_executor( - None, lambda: self._client.purge_history(bucket=bucket, safe=safe, if_match=if_match) - ) + if resource == "record": + get_kwargs["collection"] = kwargs["collection"] + _id = kwargs.get("id") or kwargs["data"]["id"] + get_kwargs["id"] = _id - async def get_endpoint( - self, name, *, bucket=None, group=None, collection=None, id=None - ) -> str: - loop = asyncio.get_event_loop() - return await loop.run_in_executor( - None, - lambda: self._client.get_endpoint( - name, bucket=bucket, group=group, collection=collection, id=id - ), - ) + get_method = getattr(super(), "get_%s" % resource) + return get_method(**get_kwargs) - def __repr__(self) -> str: - if self.collection_name: - endpoint = self._client.get_endpoint( - "collection", bucket=self.bucket_name, collection=self.collection_name + # have to redefine this because of the use of getattr. We want to make sure + # that we get the synchronous version of the delete_ method + def _delete_if_exists(self, resource, **kwargs): + try: + delete_method = getattr(super(), "delete_%s" % resource) + return delete_method(**kwargs) + except KintoException as e: + # Should not raise in case of a 404. + should_raise = not ( + hasattr(e, "response") and e.response is not None and e.response.status_code == 404 ) - elif self.bucket_name: - endpoint = self._client.get_endpoint("bucket", bucket=self.bucket_name) - else: - endpoint = self._client.get_endpoint("root") - absolute_endpoint = utils.urljoin(self.session.server_url, endpoint) - return f"" + # Should not raise in case of a 403 on a bucket. + if should_raise and resource.startswith("bucket"): + should_raise = not ( + hasattr(e, "response") + and e.response is not None + and e.response.status_code == 403 + ) + if should_raise: + raise e