From 9b1fb00e7a25ba96fd278c1ee1b049a72fabeb35 Mon Sep 17 00:00:00 2001 From: seankane-msft Date: Tue, 22 Jun 2021 16:12:12 -0400 Subject: [PATCH 1/5] creating async policy, adding lock around sync token portion --- .../_azure_appconfiguration_client.py | 1 - .../azure/appconfiguration/_sync_token.py | 29 +++--- .../aio/_azure_configuration_client_async.py | 5 +- .../appconfiguration/aio/_sync_token_async.py | 94 +++++++++++++++++++ 4 files changed, 113 insertions(+), 16 deletions(-) create mode 100644 sdk/appconfiguration/azure-appconfiguration/azure/appconfiguration/aio/_sync_token_async.py diff --git a/sdk/appconfiguration/azure-appconfiguration/azure/appconfiguration/_azure_appconfiguration_client.py b/sdk/appconfiguration/azure-appconfiguration/azure/appconfiguration/_azure_appconfiguration_client.py index 0790fa617a50..542e4c6d39e7 100644 --- a/sdk/appconfiguration/azure-appconfiguration/azure/appconfiguration/_azure_appconfiguration_client.py +++ b/sdk/appconfiguration/azure-appconfiguration/azure/appconfiguration/_azure_appconfiguration_client.py @@ -86,7 +86,6 @@ def __init__(self, base_url, credential, **kwargs): pipeline = kwargs.get("pipeline") if pipeline is None: - self._sync_token_policy = SyncTokenPolicy() aad_mode = not isinstance(credential, AppConfigConnectionStringCredential) pipeline = self._create_appconfig_pipeline( credential=credential, aad_mode=aad_mode, base_url=base_url, **kwargs diff --git a/sdk/appconfiguration/azure-appconfiguration/azure/appconfiguration/_sync_token.py b/sdk/appconfiguration/azure-appconfiguration/azure/appconfiguration/_sync_token.py index 23b9c419eeed..33e356e7c45f 100644 --- a/sdk/appconfiguration/azure-appconfiguration/azure/appconfiguration/_sync_token.py +++ b/sdk/appconfiguration/azure-appconfiguration/azure/appconfiguration/_sync_token.py @@ -24,6 +24,7 @@ # # -------------------------------------------------------------------------- from typing import Any, Dict +from threading import Lock from azure.core.pipeline import PipelineRequest, PipelineResponse from azure.core.pipeline.policies import SansIOHTTPPolicy @@ -63,6 +64,7 @@ def __init__(self, **kwargs): # pylint: disable=unused-argument # type: (**Any) -> None self._sync_token_header = "Sync-Token" self._sync_tokens = {} # type: Dict[str, Any] + self._lock = Lock() def on_request(self, request): # type: ignore # pylint: disable=arguments-differ # type: (PipelineRequest) -> None @@ -70,11 +72,12 @@ def on_request(self, request): # type: ignore # pylint: disable=arguments-diffe :param request: The PipelineRequest object. :type request: ~azure.core.pipeline.PipelineRequest """ - sync_token_header = ",".join(str(x) for x in self._sync_tokens.values()) - if sync_token_header: - request.http_request.headers.update( - {self._sync_token_header: sync_token_header} - ) + with self._lock: + sync_token_header = ",".join(str(x) for x in self._sync_tokens.values()) + if sync_token_header: + request.http_request.headers.update( + {self._sync_token_header: sync_token_header} + ) def on_response(self, request, response): # type: ignore # pylint: disable=arguments-differ # type: (PipelineRequest, PipelineResponse) -> None @@ -90,16 +93,18 @@ def on_response(self, request, response): # type: ignore # pylint: disable=argu sync_token_strings = sync_token_header.split(",") if not sync_token_strings: return - for sync_token_string in sync_token_strings: - sync_token = SyncToken.from_sync_token_string(sync_token_string) - self._update_sync_token(sync_token) + with self._lock: + for sync_token_string in sync_token_strings: + sync_token = SyncToken.from_sync_token_string(sync_token_string) + self._update_sync_token(sync_token) def add_token(self, full_raw_tokens): # type: (str) -> None - raw_tokens = full_raw_tokens.split(",") - for raw_token in raw_tokens: - sync_token = SyncToken.from_sync_token_string(raw_token) - self._update_sync_token(sync_token) + with self._lock: + raw_tokens = full_raw_tokens.split(",") + for raw_token in raw_tokens: + sync_token = SyncToken.from_sync_token_string(raw_token) + self._update_sync_token(sync_token) def _update_sync_token(self, sync_token): # type: (SyncToken) -> None diff --git a/sdk/appconfiguration/azure-appconfiguration/azure/appconfiguration/aio/_azure_configuration_client_async.py b/sdk/appconfiguration/azure-appconfiguration/azure/appconfiguration/aio/_azure_configuration_client_async.py index a90de362a385..7b9a3c4f76ff 100644 --- a/sdk/appconfiguration/azure-appconfiguration/azure/appconfiguration/aio/_azure_configuration_client_async.py +++ b/sdk/appconfiguration/azure-appconfiguration/azure/appconfiguration/aio/_azure_configuration_client_async.py @@ -38,8 +38,8 @@ from .._azure_appconfiguration_credential import AppConfigConnectionStringCredential from .._generated.models import KeyValue from .._models import ConfigurationSetting -from .._sync_token import SyncTokenPolicy from .._user_agent import USER_AGENT +from ._sync_token_async import AsyncSyncTokenPolicy try: from typing import TYPE_CHECKING @@ -87,10 +87,9 @@ def __init__(self, base_url, credential, **kwargs): ) pipeline = kwargs.get("pipeline") - self._sync_token_policy = SyncTokenPolicy() + self._sync_token_policy = AsyncSyncTokenPolicy() if pipeline is None: - self._sync_token_policy = SyncTokenPolicy() aad_mode = not isinstance(credential, AppConfigConnectionStringCredential) pipeline = self._create_appconfig_pipeline( credential=credential, aad_mode=aad_mode, base_url=base_url, **kwargs diff --git a/sdk/appconfiguration/azure-appconfiguration/azure/appconfiguration/aio/_sync_token_async.py b/sdk/appconfiguration/azure-appconfiguration/azure/appconfiguration/aio/_sync_token_async.py new file mode 100644 index 000000000000..9e7c11bcd270 --- /dev/null +++ b/sdk/appconfiguration/azure-appconfiguration/azure/appconfiguration/aio/_sync_token_async.py @@ -0,0 +1,94 @@ +# -------------------------------------------------------------------------- +# +# Copyright (c) Microsoft Corporation. All rights reserved. +# +# The MIT License (MIT) +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the ""Software""), to +# deal in the Software without restriction, including without limitation the +# rights to use, copy, modify, merge, publish, distribute, sublicense, and/or +# sell copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS +# IN THE SOFTWARE. +# +# -------------------------------------------------------------------------- +from typing import Any, Dict +from asyncio import Lock +from .._sync_token import SyncToken +from azure.core.pipeline import PipelineRequest, PipelineResponse +from azure.core.pipeline.policies import SansIOHTTPPolicy + + +class AsyncSyncTokenPolicy(SansIOHTTPPolicy): + """A simple policy that enable the given callback + with the response. + :keyword callback raw_response_hook: Callback function. Will be invoked on response. + """ + + def __init__(self, **kwargs): # pylint: disable=unused-argument + # type: (**Any) -> None + self._sync_token_header = "Sync-Token" + self._sync_tokens = {} # type: Dict[str, Any] + self._lock = Lock() + + async def on_request(self, request): # type: ignore # pylint: disable=arguments-differ + # type: (PipelineRequest) -> None + """This is executed before sending the request to the next policy. + :param request: The PipelineRequest object. + :type request: ~azure.core.pipeline.PipelineRequest + """ + async with self._lock: + sync_token_header = ",".join(str(x) for x in self._sync_tokens.values()) + if sync_token_header: + request.http_request.headers.update( + {self._sync_token_header: sync_token_header} + ) + + async def on_response(self, request, response): # type: ignore # pylint: disable=arguments-differ + # type: (PipelineRequest, PipelineResponse) -> None + """This is executed after the request comes back from the policy. + :param request: The PipelineRequest object. + :type request: ~azure.core.pipeline.PipelineRequest + :param response: The PipelineResponse object. + :type response: ~azure.core.pipeline.PipelineResponse + """ + sync_token_header = response.http_response.headers.get(self._sync_token_header) + if not sync_token_header: + return + sync_token_strings = sync_token_header.split(",") + if not sync_token_strings: + return + async with self._lock: + for sync_token_string in sync_token_strings: + sync_token = SyncToken.from_sync_token_string(sync_token_string) + self._update_sync_token(sync_token) + + async def add_token(self, full_raw_tokens): + # type: (str) -> None + async with self._lock: + raw_tokens = full_raw_tokens.split(",") + for raw_token in raw_tokens: + sync_token = SyncToken.from_sync_token_string(raw_token) + self._update_sync_token(sync_token) + + def _update_sync_token(self, sync_token): + # type: (SyncToken) -> None + if not sync_token: + return + existing_token = self._sync_tokens.get(sync_token.token_id, None) + if not existing_token: + self._sync_tokens[sync_token.token_id] = sync_token + return + if existing_token.sequence_number < sync_token.sequence_number: + self._sync_tokens[sync_token.token_id] = sync_token From bb5624ddbd71f733197563a1d349a04bed8c111d Mon Sep 17 00:00:00 2001 From: seankane-msft Date: Tue, 22 Jun 2021 16:40:07 -0400 Subject: [PATCH 2/5] pylint --- .../azure/appconfiguration/aio/_sync_token_async.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/sdk/appconfiguration/azure-appconfiguration/azure/appconfiguration/aio/_sync_token_async.py b/sdk/appconfiguration/azure-appconfiguration/azure/appconfiguration/aio/_sync_token_async.py index 9e7c11bcd270..2a35dc0f1e66 100644 --- a/sdk/appconfiguration/azure-appconfiguration/azure/appconfiguration/aio/_sync_token_async.py +++ b/sdk/appconfiguration/azure-appconfiguration/azure/appconfiguration/aio/_sync_token_async.py @@ -24,11 +24,12 @@ # # -------------------------------------------------------------------------- from typing import Any, Dict -from asyncio import Lock -from .._sync_token import SyncToken +from asyncio import locks from azure.core.pipeline import PipelineRequest, PipelineResponse from azure.core.pipeline.policies import SansIOHTTPPolicy +from .._sync_token import SyncToken + class AsyncSyncTokenPolicy(SansIOHTTPPolicy): """A simple policy that enable the given callback @@ -42,7 +43,7 @@ def __init__(self, **kwargs): # pylint: disable=unused-argument self._sync_tokens = {} # type: Dict[str, Any] self._lock = Lock() - async def on_request(self, request): # type: ignore # pylint: disable=arguments-differ + async def on_request(self, request): # type: ignore # pylint: disable=arguments-differ, invalid-overrident-method # type: (PipelineRequest) -> None """This is executed before sending the request to the next policy. :param request: The PipelineRequest object. @@ -55,7 +56,7 @@ async def on_request(self, request): # type: ignore # pylint: disable=arguments {self._sync_token_header: sync_token_header} ) - async def on_response(self, request, response): # type: ignore # pylint: disable=arguments-differ + async def on_response(self, request, response): # type: ignore # pylint: disable=arguments-differ, invalid-overrident-method # type: (PipelineRequest, PipelineResponse) -> None """This is executed after the request comes back from the policy. :param request: The PipelineRequest object. From 1e795c35100863434d3849e537a743466986a2fc Mon Sep 17 00:00:00 2001 From: seankane-msft Date: Tue, 22 Jun 2021 17:26:31 -0400 Subject: [PATCH 3/5] fixed import --- .../azure/appconfiguration/aio/_sync_token_async.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/appconfiguration/azure-appconfiguration/azure/appconfiguration/aio/_sync_token_async.py b/sdk/appconfiguration/azure-appconfiguration/azure/appconfiguration/aio/_sync_token_async.py index 2a35dc0f1e66..22cf86d1710b 100644 --- a/sdk/appconfiguration/azure-appconfiguration/azure/appconfiguration/aio/_sync_token_async.py +++ b/sdk/appconfiguration/azure-appconfiguration/azure/appconfiguration/aio/_sync_token_async.py @@ -24,7 +24,7 @@ # # -------------------------------------------------------------------------- from typing import Any, Dict -from asyncio import locks +from asyncio import Lock from azure.core.pipeline import PipelineRequest, PipelineResponse from azure.core.pipeline.policies import SansIOHTTPPolicy From 3a486cf858eee5fbdb1801ea0cbbb42eb612d681 Mon Sep 17 00:00:00 2001 From: seankane-msft Date: Tue, 22 Jun 2021 18:14:52 -0400 Subject: [PATCH 4/5] reducing to two locks --- .../azure/appconfiguration/_sync_token.py | 29 +++++++++-------- .../appconfiguration/aio/_sync_token_async.py | 31 +++++++++---------- 2 files changed, 29 insertions(+), 31 deletions(-) diff --git a/sdk/appconfiguration/azure-appconfiguration/azure/appconfiguration/_sync_token.py b/sdk/appconfiguration/azure-appconfiguration/azure/appconfiguration/_sync_token.py index 33e356e7c45f..f439b57b33c1 100644 --- a/sdk/appconfiguration/azure-appconfiguration/azure/appconfiguration/_sync_token.py +++ b/sdk/appconfiguration/azure-appconfiguration/azure/appconfiguration/_sync_token.py @@ -93,26 +93,25 @@ def on_response(self, request, response): # type: ignore # pylint: disable=argu sync_token_strings = sync_token_header.split(",") if not sync_token_strings: return - with self._lock: - for sync_token_string in sync_token_strings: - sync_token = SyncToken.from_sync_token_string(sync_token_string) - self._update_sync_token(sync_token) + for sync_token_string in sync_token_strings: + sync_token = SyncToken.from_sync_token_string(sync_token_string) + self._update_sync_token(sync_token) def add_token(self, full_raw_tokens): # type: (str) -> None - with self._lock: - raw_tokens = full_raw_tokens.split(",") - for raw_token in raw_tokens: - sync_token = SyncToken.from_sync_token_string(raw_token) - self._update_sync_token(sync_token) + raw_tokens = full_raw_tokens.split(",") + for raw_token in raw_tokens: + sync_token = SyncToken.from_sync_token_string(raw_token) + self._update_sync_token(sync_token) def _update_sync_token(self, sync_token): # type: (SyncToken) -> None if not sync_token: return - existing_token = self._sync_tokens.get(sync_token.token_id, None) - if not existing_token: - self._sync_tokens[sync_token.token_id] = sync_token - return - if existing_token.sequence_number < sync_token.sequence_number: - self._sync_tokens[sync_token.token_id] = sync_token + with self._lock: + existing_token = self._sync_tokens.get(sync_token.token_id, None) + if not existing_token: + self._sync_tokens[sync_token.token_id] = sync_token + return + if existing_token.sequence_number < sync_token.sequence_number: + self._sync_tokens[sync_token.token_id] = sync_token diff --git a/sdk/appconfiguration/azure-appconfiguration/azure/appconfiguration/aio/_sync_token_async.py b/sdk/appconfiguration/azure-appconfiguration/azure/appconfiguration/aio/_sync_token_async.py index 22cf86d1710b..7e714d90cf7c 100644 --- a/sdk/appconfiguration/azure-appconfiguration/azure/appconfiguration/aio/_sync_token_async.py +++ b/sdk/appconfiguration/azure-appconfiguration/azure/appconfiguration/aio/_sync_token_async.py @@ -70,26 +70,25 @@ async def on_response(self, request, response): # type: ignore # pylint: disabl sync_token_strings = sync_token_header.split(",") if not sync_token_strings: return - async with self._lock: - for sync_token_string in sync_token_strings: - sync_token = SyncToken.from_sync_token_string(sync_token_string) - self._update_sync_token(sync_token) + for sync_token_string in sync_token_strings: + sync_token = SyncToken.from_sync_token_string(sync_token_string) + await self._update_sync_token(sync_token) async def add_token(self, full_raw_tokens): # type: (str) -> None - async with self._lock: - raw_tokens = full_raw_tokens.split(",") - for raw_token in raw_tokens: - sync_token = SyncToken.from_sync_token_string(raw_token) - self._update_sync_token(sync_token) + raw_tokens = full_raw_tokens.split(",") + for raw_token in raw_tokens: + sync_token = SyncToken.from_sync_token_string(raw_token) + await self._update_sync_token(sync_token) - def _update_sync_token(self, sync_token): + async def _update_sync_token(self, sync_token): # type: (SyncToken) -> None if not sync_token: return - existing_token = self._sync_tokens.get(sync_token.token_id, None) - if not existing_token: - self._sync_tokens[sync_token.token_id] = sync_token - return - if existing_token.sequence_number < sync_token.sequence_number: - self._sync_tokens[sync_token.token_id] = sync_token + async with self._lock: + existing_token = self._sync_tokens.get(sync_token.token_id, None) + if not existing_token: + self._sync_tokens[sync_token.token_id] = sync_token + return + if existing_token.sequence_number < sync_token.sequence_number: + self._sync_tokens[sync_token.token_id] = sync_token From 45057c077a526a7cedb96bc86d217aebe6404c75 Mon Sep 17 00:00:00 2001 From: seankane-msft Date: Wed, 23 Jun 2021 09:28:06 -0400 Subject: [PATCH 5/5] pylint fix --- .../azure/appconfiguration/aio/_sync_token_async.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sdk/appconfiguration/azure-appconfiguration/azure/appconfiguration/aio/_sync_token_async.py b/sdk/appconfiguration/azure-appconfiguration/azure/appconfiguration/aio/_sync_token_async.py index 7e714d90cf7c..03bce644984b 100644 --- a/sdk/appconfiguration/azure-appconfiguration/azure/appconfiguration/aio/_sync_token_async.py +++ b/sdk/appconfiguration/azure-appconfiguration/azure/appconfiguration/aio/_sync_token_async.py @@ -43,7 +43,7 @@ def __init__(self, **kwargs): # pylint: disable=unused-argument self._sync_tokens = {} # type: Dict[str, Any] self._lock = Lock() - async def on_request(self, request): # type: ignore # pylint: disable=arguments-differ, invalid-overrident-method + async def on_request(self, request): # type: ignore # pylint: disable=arguments-differ, invalid-overridden-method # type: (PipelineRequest) -> None """This is executed before sending the request to the next policy. :param request: The PipelineRequest object. @@ -56,7 +56,7 @@ async def on_request(self, request): # type: ignore # pylint: disable=arguments {self._sync_token_header: sync_token_header} ) - async def on_response(self, request, response): # type: ignore # pylint: disable=arguments-differ, invalid-overrident-method + async def on_response(self, request, response): # type: ignore # pylint: disable=arguments-differ, invalid-overridden-method # type: (PipelineRequest, PipelineResponse) -> None """This is executed after the request comes back from the policy. :param request: The PipelineRequest object.