Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Beta: add AIOHTTP http client #1254

Merged
merged 7 commits into from
Feb 27, 2024
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions stripe/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@ def set_app_info(
RequestsClient as RequestsClient,
UrlFetchClient as UrlFetchClient,
HTTPXClient as HTTPXClient,
AIOHTTPClient as AIOHTTPClient,
new_default_http_client as new_default_http_client,
)

Expand Down
122 changes: 120 additions & 2 deletions stripe/_http_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,12 @@
import random
import threading
import json
import asyncio
import ssl
from http.client import HTTPResponse

from aiohttp import TCPConnector

# Used for global variables
import stripe # noqa: IMP101
from stripe import _util
Expand Down Expand Up @@ -61,6 +65,13 @@
httpx = None
anyio = None

try:
import aiohttp
from aiohttp import ClientTimeout as AIOHTTPTimeout
from aiohttp import StreamReader as AIOHTTPStreamReader
except ImportError:
aiohttp = None

try:
import requests
from requests import Session as RequestsSession
Expand Down Expand Up @@ -121,6 +132,8 @@ def new_default_http_client(*args: Any, **kwargs: Any) -> "HTTPClient":
def new_http_client_async_fallback(*args: Any, **kwargs: Any) -> "HTTPClient":
if httpx:
impl = HTTPXClient
elif aiohttp:
impl = AIOHTTPClient
else:
impl = NoImportFoundAsyncClient

Expand Down Expand Up @@ -1261,7 +1274,6 @@ def request(
url: str,
headers: Mapping[str, str],
post_data=None,
timeout: float = 80.0,
) -> Tuple[bytes, int, Mapping[str, str]]:
args, kwargs = self._get_request_args_kwargs(
method, url, headers, post_data
Expand All @@ -1282,7 +1294,6 @@ async def request_async(
url: str,
headers: Mapping[str, str],
post_data=None,
timeout: float = 80.0,
) -> Tuple[bytes, int, Mapping[str, str]]:
args, kwargs = self._get_request_args_kwargs(
method, url, headers, post_data
Expand Down Expand Up @@ -1353,6 +1364,113 @@ async def close_async(self):
await self._client_async.aclose()


class AIOHTTPClient(HTTPClient):
name = "aiohttp"

def __init__(
self, timeout: Optional[Union[float, "AIOHTTPTimeout"]] = 80, **kwargs
):
super(AIOHTTPClient, self).__init__(**kwargs)

if aiohttp is None:
raise ImportError(
"Unexpected: tried to initialize AIOHTTPClient but the aiohttp module is not present."
)

self.httpx = httpx
richardm-stripe marked this conversation as resolved.
Show resolved Hide resolved

kwargs = {}
if self._verify_ssl_certs:
ssl_context = ssl.create_default_context(
capath=stripe.ca_bundle_path
)
kwargs["connector"] = TCPConnector(ssl=ssl_context)

self._session = aiohttp.ClientSession(**kwargs)
self._timeout = timeout

def sleep_async(self, secs):
return asyncio.sleep(secs)

def request(self) -> Tuple[bytes, int, Mapping[str, str]]:
richardm-stripe marked this conversation as resolved.
Show resolved Hide resolved
raise NotImplementedError(
"AIOHTTPClient does not support synchronous requests."
)

def _get_request_args_kwargs(
self, method: str, url: str, headers: Mapping[str, str], post_data
):
args = (method, url)
kwargs = {}
if self._proxy:
if self._proxy["http"] != self._proxy["https"]:
pakrym-stripe marked this conversation as resolved.
Show resolved Hide resolved
raise ValueError(
"AIOHTTPClient does not support different proxies for HTTP and HTTPS."
)
kwargs["proxy"] = self._proxy["https"]
if self._timeout:
kwargs["timeout"] = self._timeout

kwargs["headers"] = headers
kwargs["data"] = post_data
return args, kwargs

async def request_async(
self,
method: str,
url: str,
headers: Mapping[str, str],
post_data=None,
) -> Tuple[bytes, int, Mapping[str, str]]:
(
content,
status_code,
response_headers,
) = await self.request_stream_async(
method, url, headers, post_data=post_data
)

return (await content.read()), status_code, response_headers

def _handle_request_error(self, e) -> NoReturn:
msg = (
"Unexpected error communicating with Stripe. If this "
"problem persists, let us know at support@stripe.com."
)
err = "A %s was raised" % (type(e).__name__,)
should_retry = True

msg = textwrap.fill(msg) + "\n\n(Network error: %s)" % (err,)
raise APIConnectionError(msg, should_retry=should_retry)

def request_stream(self) -> Tuple[Iterable[bytes], int, Mapping[str, str]]:
raise NotImplementedError(
"AIOHTTPClient does not support synchronous requests."
)

async def request_stream_async(
self, method: str, url: str, headers: Mapping[str, str], post_data=None
) -> Tuple["AIOHTTPStreamReader", int, Mapping[str, str]]:
args, kwargs = self._get_request_args_kwargs(
method, url, headers, post_data
)
try:
response = await self._session.request(*args, **kwargs)
except Exception as e:
self._handle_request_error(e)

content = response.content
status_code = response.status
response_headers = response.headers
return content, status_code, response_headers

def close(self):
pass

async def close_async(self):
await self._session.close()


class NoImportFoundAsyncClient(HTTPClient):
def __init__(self, **kwargs):
super(NoImportFoundAsyncClient, self).__init__(**kwargs)
Expand Down
2 changes: 2 additions & 0 deletions test-requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

# This is the last version of httpx compatible with Python 3.6
httpx == 0.22.0
aiohttp == 3.8.6; python_version <= "3.7"
aiohttp == 3.9.0; python_version > "3.7"
anyio[trio] == 3.6.2

pytest-cov >= 2.8.1, < 2.11.0
Expand Down
Loading
Loading