Skip to content

Commit

Permalink
initial commit
Browse files Browse the repository at this point in the history
  • Loading branch information
simorenoh committed Aug 13, 2021
1 parent d393021 commit 61ba8d1
Show file tree
Hide file tree
Showing 11 changed files with 5,378 additions and 1 deletion.
2 changes: 1 addition & 1 deletion sdk/cosmos/azure-cosmos/azure/cosmos/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ def GetHeaders( # pylint: disable=too-many-statements,too-many-branches
if options.get("consistencyLevel"):
consistency_level = options["consistencyLevel"]
headers[http_constants.HttpHeaders.ConsistencyLevel] = consistency_level
elif default_client_consistency_level is not None:
elif default_client_consistency_level is not None: # Why not just check for `default_client_consistency_level`
consistency_level = default_client_consistency_level
headers[http_constants.HttpHeaders.ConsistencyLevel] = consistency_level

Expand Down
Empty file.
186 changes: 186 additions & 0 deletions sdk/cosmos/azure-cosmos/azure/cosmos/aio/_asynchronous_request.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,186 @@
# The MIT License (MIT)
# Copyright (c) 2014 Microsoft Corporation

# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:

# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.

# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.

"""Asynchronous request in the Azure Cosmos database service.
"""

import json
import time

from six.moves.urllib.parse import urlparse
import six
from azure.core.exceptions import DecodeError # type: ignore

from .. import exceptions
from .. import http_constants
from .. import _retry_utility
from .._synchronized_request import _request_body_from_data


async def _Request(global_endpoint_manager, request_params, connection_policy, pipeline_client, request, **kwargs):
"""Makes one http request using the requests module.
:param _GlobalEndpointManager global_endpoint_manager:
:param dict request_params:
contains the resourceType, operationType, endpointOverride,
useWriteEndpoint, useAlternateWriteEndpoint information
:param documents.ConnectionPolicy connection_policy:
:param azure.core.PipelineClient pipeline_client:
Pipeline client to process the request
:param azure.core.HttpRequest request:
The request object to send through the pipeline
:return: tuple of (result, headers)
:rtype: tuple of (dict, dict)
"""
# pylint: disable=protected-access

connection_timeout = connection_policy.RequestTimeout
connection_timeout = kwargs.pop("connection_timeout", connection_timeout / 1000.0)

# Every request tries to perform a refresh
client_timeout = kwargs.get('timeout')
start_time = time.time()
global_endpoint_manager.refresh_endpoint_list(None, **kwargs)
if client_timeout is not None:
kwargs['timeout'] = client_timeout - (time.time() - start_time)
if kwargs['timeout'] <= 0:
raise exceptions.CosmosClientTimeoutError()

if request_params.endpoint_override:
base_url = request_params.endpoint_override
else:
base_url = global_endpoint_manager.resolve_service_endpoint(request_params)
if base_url != pipeline_client._base_url:
request.url = request.url.replace(pipeline_client._base_url, base_url)

parse_result = urlparse(request.url)

# The requests library now expects header values to be strings only starting 2.11,
# and will raise an error on validation if they are not, so casting all header values to strings.
request.headers.update({header: str(value) for header, value in request.headers.items()})

# We are disabling the SSL verification for local emulator(localhost/127.0.0.1) or if the user
# has explicitly specified to disable SSL verification.
is_ssl_enabled = (
parse_result.hostname != "localhost"
and parse_result.hostname != "127.0.0.1"
and not connection_policy.DisableSSLVerification
)

if connection_policy.SSLConfiguration or "connection_cert" in kwargs:
ca_certs = connection_policy.SSLConfiguration.SSLCaCerts
cert_files = (connection_policy.SSLConfiguration.SSLCertFile, connection_policy.SSLConfiguration.SSLKeyFile)
response = await _PipelineRunFunction(
pipeline_client,
request,
connection_timeout=connection_timeout,
connection_verify=kwargs.pop("connection_verify", ca_certs),
connection_cert=kwargs.pop("connection_cert", cert_files),
**kwargs
)
else:
response = await _PipelineRunFunction(
pipeline_client,
request,
connection_timeout=connection_timeout,
# If SSL is disabled, verify = false
connection_verify=kwargs.pop("connection_verify", is_ssl_enabled),
**kwargs
)

response = response.http_response
headers = dict(response.headers)

data = response.body()
if data and not six.PY2:
# python 3 compatible: convert data from byte to unicode string
data = data.decode("utf-8")

if response.status_code == 404:
raise exceptions.CosmosResourceNotFoundError(message=data, response=response)
if response.status_code == 409:
raise exceptions.CosmosResourceExistsError(message=data, response=response)
if response.status_code == 412:
raise exceptions.CosmosAccessConditionFailedError(message=data, response=response)
if response.status_code >= 400:
raise exceptions.CosmosHttpResponseError(message=data, response=response)

result = None
if data:
try:
result = json.loads(data)
except Exception as e:
raise DecodeError(
message="Failed to decode JSON data: {}".format(e),
response=response,
error=e)

return result, headers


async def _PipelineRunFunction(pipeline_client, request, **kwargs):
# pylint: disable=protected-access

return await pipeline_client._pipeline.run(request, **kwargs)

async def AsynchronousRequest(
client,
request_params,
global_endpoint_manager,
connection_policy,
pipeline_client,
request,
request_data,
**kwargs
):
"""Performs one asynchronous http request according to the parameters.
:param object client: Document client instance
:param dict request_params:
:param _GlobalEndpointManager global_endpoint_manager:
:param documents.ConnectionPolicy connection_policy:
:param azure.core.PipelineClient pipeline_client: PipelineClient to process the request.
:param str method:
:param str path:
:param (str, unicode, file-like stream object, dict, list or None) request_data:
:param dict query_params:
:param dict headers:
:return: tuple of (result, headers)
:rtype: tuple of (dict dict)
"""
request.data = _request_body_from_data(request_data)
if request.data and isinstance(request.data, six.string_types):
request.headers[http_constants.HttpHeaders.ContentLength] = len(request.data)
elif request.data is None:
request.headers[http_constants.HttpHeaders.ContentLength] = 0

# Pass _Request function with it's parameters to retry_utility's Execute method that wraps the call with retries
return await _retry_utility.Execute(
client,
global_endpoint_manager,
_Request,
request_params,
connection_policy,
pipeline_client,
request,
**kwargs
)
Loading

0 comments on commit 61ba8d1

Please sign in to comment.