Skip to content

Commit

Permalink
[core] add provisional azure.core.rest (#19502)
Browse files Browse the repository at this point in the history
  • Loading branch information
iscai-msft authored Jun 29, 2021
1 parent 3654445 commit 662dedf
Show file tree
Hide file tree
Showing 43 changed files with 4,094 additions and 43 deletions.
10 changes: 9 additions & 1 deletion sdk/core/azure-core/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,14 @@
# Release History

## 1.15.1 (Unreleased)
## 1.16.0 (Unreleased)

### New Features

- Add new ***provisional*** methods `send_request` onto the `azure.core.PipelineClient` and `azure.core.AsyncPipelineClient`. This method takes in
requests and sends them through our pipelines.
- Add new ***provisional*** module `azure.core.rest`. `azure.core.rest` is our new public simple HTTP library in `azure.core` that users will use to create requests, and consume responses.
- Add new ***provisional*** errors `StreamConsumedError`, `StreamClosedError`, and `ResponseNotReadError` to `azure.core.exceptions`. These errors
are thrown if you mishandle streamed responses from the provisional `azure.core.rest` module

### Bug Fixes

Expand Down
15 changes: 15 additions & 0 deletions sdk/core/azure-core/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,21 @@ class TooManyRedirectsError(HttpResponseError):

*kwargs* are keyword arguments to include with the exception.

#### **Provisional** StreamConsumedError
A **provisional** error thrown if you try to access the stream of the **provisional**
responses `azure.core.rest.HttpResponse` or `azure.core.rest.AsyncHttpResponse` once
the response stream has been consumed.

#### **Provisional** StreamClosedError
A **provisional** error thrown if you try to access the stream of the **provisional**
responses `azure.core.rest.HttpResponse` or `azure.core.rest.AsyncHttpResponse` once
the response stream has been closed.

#### **Provisional** ResponseNotReadError
A **provisional** error thrown if you try to access the `content` of the **provisional**
responses `azure.core.rest.HttpResponse` or `azure.core.rest.AsyncHttpResponse` before
reading in the response's bytes first.

### Configurations

When calling the methods, some properties can be configured by passing in as kwargs arguments.
Expand Down
52 changes: 52 additions & 0 deletions sdk/core/azure-core/azure/core/_pipeline_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
RetryPolicy,
)
from .pipeline.transport import RequestsTransport
from .pipeline._tools import to_rest_response as _to_rest_response

try:
from typing import TYPE_CHECKING
Expand All @@ -58,10 +59,23 @@
Callable,
Iterator,
cast,
TypeVar
) # pylint: disable=unused-import
HTTPResponseType = TypeVar("HTTPResponseType")
HTTPRequestType = TypeVar("HTTPRequestType")

_LOGGER = logging.getLogger(__name__)

def _prepare_request(request):
# returns the request ready to run through pipelines
# and a bool telling whether we ended up converting it
rest_request = False
try:
request_to_run = request._to_pipeline_transport_request() # pylint: disable=protected-access
rest_request = True
except AttributeError:
request_to_run = request
return rest_request, request_to_run

class PipelineClient(PipelineClientBase):
"""Service client core methods.
Expand Down Expand Up @@ -170,3 +184,41 @@ def _build_pipeline(self, config, **kwargs): # pylint: disable=no-self-use
transport = RequestsTransport(**kwargs)

return Pipeline(transport, policies)


def send_request(self, request, **kwargs):
# type: (HTTPRequestType, Any) -> HTTPResponseType
"""**Provisional** method that runs the network request through the client's chained policies.
This method is marked as **provisional**, meaning it may be changed in a future release.
>>> from azure.core.rest import HttpRequest
>>> request = HttpRequest('GET', 'http://www.example.com')
<HttpRequest [GET], url: 'http://www.example.com'>
>>> response = client.send_request(request)
<HttpResponse: 200 OK>
:param request: The network request you want to make. Required.
:type request: ~azure.core.rest.HttpRequest
:keyword bool stream: Whether the response payload will be streamed. Defaults to False.
:return: The response of your network call. Does not do error handling on your response.
:rtype: ~azure.core.rest.HttpResponse
# """
rest_request, request_to_run = _prepare_request(request)
return_pipeline_response = kwargs.pop("_return_pipeline_response", False)
pipeline_response = self._pipeline.run(request_to_run, **kwargs) # pylint: disable=protected-access
response = pipeline_response.http_response
if rest_request:
response = _to_rest_response(response)
try:
if not kwargs.get("stream", False):
response.read()
response.close()
except Exception as exc:
response.close()
raise exc
if return_pipeline_response:
pipeline_response.http_response = response
pipeline_response.http_request = request
return pipeline_response
return response
62 changes: 60 additions & 2 deletions sdk/core/azure-core/azure/core/_pipeline_client_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@

import logging
from collections.abc import Iterable
from typing import Any, Awaitable
from .configuration import Configuration
from .pipeline import AsyncPipeline
from .pipeline.transport._base import PipelineClientBase
Expand All @@ -36,16 +37,20 @@
RequestIdPolicy,
AsyncRetryPolicy,
)
from ._pipeline_client import _prepare_request
from .pipeline._tools_async import to_rest_response as _to_rest_response

try:
from typing import TYPE_CHECKING
from typing import TYPE_CHECKING, TypeVar
except ImportError:
TYPE_CHECKING = False

HTTPRequestType = TypeVar("HTTPRequestType")
AsyncHTTPResponseType = TypeVar("AsyncHTTPResponseType")

if TYPE_CHECKING:
from typing import (
List,
Any,
Dict,
Union,
IO,
Expand Down Expand Up @@ -168,3 +173,56 @@ def _build_pipeline(self, config, **kwargs): # pylint: disable=no-self-use
transport = AioHttpTransport(**kwargs)

return AsyncPipeline(transport, policies)

async def _make_pipeline_call(self, request, **kwargs):
rest_request, request_to_run = _prepare_request(request)
return_pipeline_response = kwargs.pop("_return_pipeline_response", False)
pipeline_response = await self._pipeline.run(
request_to_run, **kwargs # pylint: disable=protected-access
)
response = pipeline_response.http_response
if rest_request:
rest_response = _to_rest_response(response)
if not kwargs.get("stream"):
try:
# in this case, the pipeline transport response already called .load_body(), so
# the body is loaded. instead of doing response.read(), going to set the body
# to the internal content
rest_response._content = response.body() # pylint: disable=protected-access
await rest_response.close()
except Exception as exc:
await rest_response.close()
raise exc
response = rest_response
if return_pipeline_response:
pipeline_response.http_response = response
pipeline_response.http_request = request
return pipeline_response
return response

def send_request(
self,
request: HTTPRequestType,
*,
stream: bool = False,
**kwargs: Any
) -> Awaitable[AsyncHTTPResponseType]:
"""**Provisional** method that runs the network request through the client's chained policies.
This method is marked as **provisional**, meaning it may be changed in a future release.
>>> from azure.core.rest import HttpRequest
>>> request = HttpRequest('GET', 'http://www.example.com')
<HttpRequest [GET], url: 'http://www.example.com'>
>>> response = await client.send_request(request)
<AsyncHttpResponse: 200 OK>
:param request: The network request you want to make. Required.
:type request: ~azure.core.rest.HttpRequest
:keyword bool stream: Whether the response payload will be streamed. Defaults to False.
:return: The response of your network call. Does not do error handling on your response.
:rtype: ~azure.core.rest.AsyncHttpResponse
"""
from .rest._rest_py3 import _AsyncContextManager
wrapped = self._make_pipeline_call(request, stream=stream, **kwargs)
return _AsyncContextManager(wrapped=wrapped)
2 changes: 1 addition & 1 deletion sdk/core/azure-core/azure/core/_version.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,4 @@
# regenerated.
# --------------------------------------------------------------------------

VERSION = "1.15.1"
VERSION = "1.16.0"
47 changes: 47 additions & 0 deletions sdk/core/azure-core/azure/core/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -433,3 +433,50 @@ def __str__(self):
if self._error_format:
return str(self._error_format)
return super(ODataV4Error, self).__str__()

class StreamConsumedError(AzureError):
"""**Provisional** error thrown if you try to access the stream of a response once consumed.
This error is marked as **provisional**, meaning it may be changed in a future release. It is
thrown if you try to read / stream an ~azure.core.rest.HttpResponse or
~azure.core.rest.AsyncHttpResponse once the response's stream has been consumed.
"""
def __init__(self, response):
message = (
"You are attempting to read or stream the content from request {}. "\
"You have likely already consumed this stream, so it can not be accessed anymore.".format(
response.request
)
)
super(StreamConsumedError, self).__init__(message)

class StreamClosedError(AzureError):
"""**Provisional** error thrown if you try to access the stream of a response once closed.
This error is marked as **provisional**, meaning it may be changed in a future release. It is
thrown if you try to read / stream an ~azure.core.rest.HttpResponse or
~azure.core.rest.AsyncHttpResponse once the response's stream has been closed.
"""
def __init__(self, response):
message = (
"The content for response from request {} can no longer be read or streamed, since the "\
"response has already been closed.".format(response.request)
)
super(StreamClosedError, self).__init__(message)

class ResponseNotReadError(AzureError):
"""**Provisional** error thrown if you try to access a response's content without reading first.
This error is marked as **provisional**, meaning it may be changed in a future release. It is
thrown if you try to access an ~azure.core.rest.HttpResponse or
~azure.core.rest.AsyncHttpResponse's content without first reading the response's bytes in first.
"""

def __init__(self, response):
message = (
"You have not read in the bytes for the response from request {}. "\
"Call .read() on the response first.".format(
response.request
)
)
super(ResponseNotReadError, self).__init__(message)
37 changes: 37 additions & 0 deletions sdk/core/azure-core/azure/core/pipeline/_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,3 +32,40 @@ def await_result(func, *args, **kwargs):
"Policy {} returned awaitable object in non-async pipeline.".format(func)
)
return result

def to_rest_request(pipeline_transport_request):
from ..rest import HttpRequest as RestHttpRequest
return RestHttpRequest(
method=pipeline_transport_request.method,
url=pipeline_transport_request.url,
headers=pipeline_transport_request.headers,
files=pipeline_transport_request.files,
data=pipeline_transport_request.data
)

def to_rest_response(pipeline_transport_response):
from .transport._requests_basic import RequestsTransportResponse
from ..rest._requests_basic import RestRequestsTransportResponse
from ..rest import HttpResponse
if isinstance(pipeline_transport_response, RequestsTransportResponse):
response_type = RestRequestsTransportResponse
else:
response_type = HttpResponse
response = response_type(
request=to_rest_request(pipeline_transport_response.request),
internal_response=pipeline_transport_response.internal_response,
)
response._connection_data_block_size = pipeline_transport_response.block_size # pylint: disable=protected-access
return response

def get_block_size(response):
try:
return response._connection_data_block_size # pylint: disable=protected-access
except AttributeError:
return response.block_size

def get_internal_response(response):
try:
return response._internal_response # pylint: disable=protected-access
except AttributeError:
return response.internal_response
35 changes: 35 additions & 0 deletions sdk/core/azure-core/azure/core/pipeline/_tools_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
# IN THE SOFTWARE.
#
# --------------------------------------------------------------------------
from ._tools import to_rest_request

async def await_result(func, *args, **kwargs):
"""If func returns an awaitable, await it."""
Expand All @@ -31,3 +32,37 @@ async def await_result(func, *args, **kwargs):
# type ignore on await: https://github.com/python/mypy/issues/7587
return await result # type: ignore
return result

def _get_response_type(pipeline_transport_response):
try:
from .transport import AioHttpTransportResponse
from ..rest._aiohttp import RestAioHttpTransportResponse
if isinstance(pipeline_transport_response, AioHttpTransportResponse):
return RestAioHttpTransportResponse
except ImportError:
pass
try:
from .transport import AsyncioRequestsTransportResponse
from ..rest._requests_asyncio import RestAsyncioRequestsTransportResponse
if isinstance(pipeline_transport_response, AsyncioRequestsTransportResponse):
return RestAsyncioRequestsTransportResponse
except ImportError:
pass
try:
from .transport import TrioRequestsTransportResponse
from ..rest._requests_trio import RestTrioRequestsTransportResponse
if isinstance(pipeline_transport_response, TrioRequestsTransportResponse):
return RestTrioRequestsTransportResponse
except ImportError:
pass
from ..rest import AsyncHttpResponse
return AsyncHttpResponse

def to_rest_response(pipeline_transport_response):
response_type = _get_response_type(pipeline_transport_response)
response = response_type(
request=to_rest_request(pipeline_transport_response.request),
internal_response=pipeline_transport_response.internal_response,
)
response._connection_data_block_size = pipeline_transport_response.block_size # pylint: disable=protected-access
return response
15 changes: 9 additions & 6 deletions sdk/core/azure-core/azure/core/pipeline/transport/_aiohttp.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
AsyncHttpTransport,
AsyncHttpResponse,
_ResponseStopIteration)
from .._tools import get_block_size as _get_block_size, get_internal_response as _get_internal_response

# Matching requests, because why not?
CONTENT_CHUNK_SIZE = 10 * 1024
Expand Down Expand Up @@ -215,22 +216,24 @@ def __init__(self, pipeline: Pipeline, response: AsyncHttpResponse, *, decompres
self.pipeline = pipeline
self.request = response.request
self.response = response
self.block_size = response.block_size
self.block_size = _get_block_size(response)
self._decompress = decompress
self.content_length = int(response.internal_response.headers.get('Content-Length', 0))
internal_response = _get_internal_response(response)
self.content_length = int(internal_response.headers.get('Content-Length', 0))
self._decompressor = None

def __len__(self):
return self.content_length

async def __anext__(self):
internal_response = _get_internal_response(self.response)
try:
chunk = await self.response.internal_response.content.read(self.block_size)
chunk = await internal_response.content.read(self.block_size)
if not chunk:
raise _ResponseStopIteration()
if not self._decompress:
return chunk
enc = self.response.internal_response.headers.get('Content-Encoding')
enc = internal_response.headers.get('Content-Encoding')
if not enc:
return chunk
enc = enc.lower()
Expand All @@ -242,13 +245,13 @@ async def __anext__(self):
chunk = self._decompressor.decompress(chunk)
return chunk
except _ResponseStopIteration:
self.response.internal_response.close()
internal_response.close()
raise StopAsyncIteration()
except StreamConsumedError:
raise
except Exception as err:
_LOGGER.warning("Unable to stream download: %s", err)
self.response.internal_response.close()
internal_response.close()
raise

class AioHttpTransportResponse(AsyncHttpResponse):
Expand Down
Loading

0 comments on commit 662dedf

Please sign in to comment.