Skip to content

Commit

Permalink
Beta: add AIOHTTP http client (#1254)
Browse files Browse the repository at this point in the history
  • Loading branch information
richardm-stripe authored Feb 27, 2024
1 parent b077eea commit eaa54a3
Show file tree
Hide file tree
Showing 5 changed files with 603 additions and 23 deletions.
1 change: 1 addition & 0 deletions stripe/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@ def set_app_info(
RequestsClient as RequestsClient,
UrlFetchClient as UrlFetchClient,
HTTPXClient as HTTPXClient,
AIOHTTPClient as AIOHTTPClient,
new_default_http_client as new_default_http_client,
)

Expand Down
120 changes: 118 additions & 2 deletions stripe/_http_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,12 @@
import random
import threading
import json
import asyncio
import ssl
from http.client import HTTPResponse

from aiohttp import TCPConnector

# Used for global variables
import stripe # noqa: IMP101
from stripe import _util
Expand Down Expand Up @@ -61,6 +65,13 @@
httpx = None
anyio = None

try:
import aiohttp
from aiohttp import ClientTimeout as AIOHTTPTimeout
from aiohttp import StreamReader as AIOHTTPStreamReader
except ImportError:
aiohttp = None

try:
import requests
from requests import Session as RequestsSession
Expand Down Expand Up @@ -121,6 +132,8 @@ def new_default_http_client(*args: Any, **kwargs: Any) -> "HTTPClient":
def new_http_client_async_fallback(*args: Any, **kwargs: Any) -> "HTTPClient":
if httpx:
impl = HTTPXClient
elif aiohttp:
impl = AIOHTTPClient
else:
impl = NoImportFoundAsyncClient

Expand Down Expand Up @@ -1261,7 +1274,6 @@ def request(
url: str,
headers: Mapping[str, str],
post_data=None,
timeout: float = 80.0,
) -> Tuple[bytes, int, Mapping[str, str]]:
args, kwargs = self._get_request_args_kwargs(
method, url, headers, post_data
Expand All @@ -1282,7 +1294,6 @@ async def request_async(
url: str,
headers: Mapping[str, str],
post_data=None,
timeout: float = 80.0,
) -> Tuple[bytes, int, Mapping[str, str]]:
args, kwargs = self._get_request_args_kwargs(
method, url, headers, post_data
Expand Down Expand Up @@ -1353,6 +1364,111 @@ async def close_async(self):
await self._client_async.aclose()


class AIOHTTPClient(HTTPClient):
name = "aiohttp"

def __init__(
self, timeout: Optional[Union[float, "AIOHTTPTimeout"]] = 80, **kwargs
):
super(AIOHTTPClient, self).__init__(**kwargs)

if aiohttp is None:
raise ImportError(
"Unexpected: tried to initialize AIOHTTPClient but the aiohttp module is not present."
)

kwargs = {}
if self._verify_ssl_certs:
ssl_context = ssl.create_default_context(
capath=stripe.ca_bundle_path
)
kwargs["connector"] = TCPConnector(ssl=ssl_context)

self._session = aiohttp.ClientSession(**kwargs)
self._timeout = timeout

def sleep_async(self, secs):
return asyncio.sleep(secs)

def request(self) -> Tuple[bytes, int, Mapping[str, str]]:
raise NotImplementedError(
"AIOHTTPClient does not support synchronous requests."
)

def _get_request_args_kwargs(
self, method: str, url: str, headers: Mapping[str, str], post_data
):
args = (method, url)
kwargs = {}
if self._proxy:
if self._proxy["http"] != self._proxy["https"]:
raise ValueError(
"AIOHTTPClient does not support different proxies for HTTP and HTTPS."
)
kwargs["proxy"] = self._proxy["https"]
if self._timeout:
kwargs["timeout"] = self._timeout

kwargs["headers"] = headers
kwargs["data"] = post_data
return args, kwargs

async def request_async(
self,
method: str,
url: str,
headers: Mapping[str, str],
post_data=None,
) -> Tuple[bytes, int, Mapping[str, str]]:
(
content,
status_code,
response_headers,
) = await self.request_stream_async(
method, url, headers, post_data=post_data
)

return (await content.read()), status_code, response_headers

def _handle_request_error(self, e) -> NoReturn:
msg = (
"Unexpected error communicating with Stripe. If this "
"problem persists, let us know at support@stripe.com."
)
err = "A %s was raised" % (type(e).__name__,)
should_retry = True

msg = textwrap.fill(msg) + "\n\n(Network error: %s)" % (err,)
raise APIConnectionError(msg, should_retry=should_retry)

def request_stream(self) -> Tuple[Iterable[bytes], int, Mapping[str, str]]:
raise NotImplementedError(
"AIOHTTPClient does not support synchronous requests."
)

async def request_stream_async(
self, method: str, url: str, headers: Mapping[str, str], post_data=None
) -> Tuple["AIOHTTPStreamReader", int, Mapping[str, str]]:
args, kwargs = self._get_request_args_kwargs(
method, url, headers, post_data
)
try:
response = await self._session.request(*args, **kwargs)
except Exception as e:
self._handle_request_error(e)

content = response.content
status_code = response.status
response_headers = response.headers
return content, status_code, response_headers

def close(self):
pass

async def close_async(self):
await self._session.close()


class NoImportFoundAsyncClient(HTTPClient):
def __init__(self, **kwargs):
super(NoImportFoundAsyncClient, self).__init__(**kwargs)
Expand Down
2 changes: 2 additions & 0 deletions test-requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

# This is the last version of httpx compatible with Python 3.6
httpx == 0.22.0
aiohttp == 3.8.6; python_version <= "3.7"
aiohttp == 3.9.0; python_version > "3.7"
anyio[trio] == 3.6.2

pytest-cov >= 2.8.1, < 2.11.0
Expand Down
Loading

0 comments on commit eaa54a3

Please sign in to comment.