Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PoolTimeout when num tasks in asyncio.gather() exceeds client max_connections #1171

Closed
zeldrinn opened this issue Aug 11, 2020 · 22 comments · Fixed by encode/httpcore#880
Closed
Labels
bug Something isn't working concurrency Issues related to concurrency and usage of async libraries pooling Issues and PRs relative to connection pooling

Comments

@zeldrinn
Copy link

zeldrinn commented Aug 11, 2020

Checklist

  • Reproducible on 0.13.3
  • This issue seems similar but it's closed and was supposedly fixed

Describe the bug

If the number of tasks executed via asyncio.gather(...) is greater than max_connections, i get a PoolTimeout. It seems like maybe this is happening because the tasks that have completed aren't releasing their connections upon completion.

I'm new to asyncio so it's possible I'm doing something wrong, but haven't been able to find any documentation or issues that cover this case definitively.

To reproduce

import asyncio
import httpx

async def main() -> None:
    url = "https://www.example.com"
    max_connections = 2
    timeout = httpx.Timeout(5.0, pool=2.0)
    limits = httpx.Limits(max_connections=2)
    client = httpx.AsyncClient(timeout=timeout, pool_limits=limits)

    async with client:
        tasks = []
        for _ in range(max_connections + 1):
            tasks.append(client.get(url))
        await asyncio.gather(*tasks)

if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    try:
        loop.run_until_complete(main())
    finally:
        loop.close()

Expected behavior

I would expect all tasks to complete, rather than getting a PoolTimeout on the nth task, where n = max_connections + 1.

Actual behavior

Getting a PoolTimeout on the nth task, where n = max_connections + 1.

Debugging material

Traceback (most recent call last):
  File "test_async.py", line 21, in <module>
    loop.run_until_complete(main())
  File "/Users/redacted/.pyenv/versions/3.6.9/lib/python3.6/asyncio/base_events.py", line 484, in run_until_complete
    return future.result()
  File "test_async.py", line 16, in main
    await asyncio.gather(*tasks)
  File "/Users/redacted/.pyenv/versions/3.6.9/lib/python3.6/site-packages/httpx/_client.py", line 1416, in get
    timeout=timeout,
  File "/Users/redacted/.pyenv/versions/3.6.9/lib/python3.6/site-packages/httpx/_client.py", line 1242, in request
    request, auth=auth, allow_redirects=allow_redirects, timeout=timeout,
  File "/Users/redacted/.pyenv/versions/3.6.9/lib/python3.6/site-packages/httpx/_client.py", line 1273, in send
    request, auth=auth, timeout=timeout, allow_redirects=allow_redirects,
  File "/Users/redacted/.pyenv/versions/3.6.9/lib/python3.6/site-packages/httpx/_client.py", line 1302, in _send_handling_redirects
    request, auth=auth, timeout=timeout, history=history
  File "/Users/redacted/.pyenv/versions/3.6.9/lib/python3.6/site-packages/httpx/_client.py", line 1338, in _send_handling_auth
    response = await self._send_single_request(request, timeout)
  File "/Users/redacted/.pyenv/versions/3.6.9/lib/python3.6/site-packages/httpx/_client.py", line 1374, in _send_single_request
    timeout=timeout.as_dict(),
  File "/Users/redacted/.pyenv/versions/3.6.9/lib/python3.6/contextlib.py", line 99, in __exit__
    self.gen.throw(type, value, traceback)
  File "/Users/redacted/.pyenv/versions/3.6.9/lib/python3.6/site-packages/httpx/_exceptions.py", line 359, in map_exceptions
    raise mapped_exc(message, **kwargs) from None  # type: ignore
httpx._exceptions.PoolTimeout

Environment

  • OS: macOS 10.14.6
  • Python version: 3.6.9
  • HTTPX version: 0.13.3
  • Async environment: asyncio
  • HTTP proxy: no
  • Custom certificates: no

Additional context

I commented on this issue, but it's closed so figured it would be better to create a new one.

@tomchristie
Copy link
Member

Yup, there's def. an issue here to be dealt with.

To get a bit more info, I tried this...

import asyncio
import httpx


async def get_url(client, url):
    print("GET", url)
    print(await client._transport.get_connection_info())
    print(await client.get(url))
    print(await client._transport.get_connection_info())


async def main() -> None:
    url = "https://www.example.com"
    max_connections = 2
    timeout = httpx.Timeout(5.0, pool=5.0)
    limits = httpx.Limits(max_connections=2, max_keepalive_connections=0)
    client = httpx.AsyncClient(timeout=timeout, limits=limits)

    async with client:
        tasks = []
        for _ in range(max_connections + 1):
            tasks.append(get_url(client, url))
        await asyncio.gather(*tasks)

if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    try:
        loop.run_until_complete(main())
    finally:
        loop.close()

Which results in...

GET https://www.example.com
{}
GET https://www.example.com
{}
GET https://www.example.com
{}
<Response [200 OK]>
{'https://www.example.com': ['HTTP/1.1, IDLE', 'HTTP/1.1, ACTIVE']}
<Response [200 OK]>
{'https://www.example.com': ['HTTP/1.1, IDLE', 'HTTP/1.1, IDLE']}

We can see the connections returning from ACTIVE to IDLE, but the keep-alive connections are not being used by the pending request.

The issue here is that the pending request is in a state where it's blocking on the connection semaphore waiting to start a new connection, which is not being released by the fact that we've now got an available keep alive connection.

Will need a bit of careful thinking about, but clearly needs resolving yup - thanks for raising this.

@jcugat
Copy link
Member

jcugat commented Aug 11, 2020

I was going to mention the same. I also tested reading the whole response body (which should release the connection) and also closing the response manually but the issue persists either way.

@zeldrinn
Copy link
Author

ah good call checking the connection state! is there existing logic that intends to have pending requests make use of existing idle connections, and it's just not working as expected? or does the code as written only intend for pending requests to create new connections? curious where that logic is if you can link me @tomchristie

@florimondmanca florimondmanca added concurrency/asyncio Issues related to concurrency (asyncio-only) bug Something isn't working concurrency Issues related to concurrency and usage of async libraries pooling Issues and PRs relative to connection pooling and removed concurrency/asyncio Issues related to concurrency (asyncio-only) labels Aug 12, 2020
@tomchristie tomchristie mentioned this issue Sep 22, 2020
4 tasks
@finswimmer
Copy link

Hello everyone,

just want to make sure that this it what I'm looking for. The server I want to send request to have a limited number of allowed connections. Currently I limit the number of async task by using Semaphore. But the pool_limits parameter for AsyncClient looks like this is intended for my use case. Am I right here? If so, any idea when this issue here will be resolved?

Thanks a lot!

fin swimmer

@tomchristie
Copy link
Member

I'm planning at getting stuck into this one pretty soon yup.
It's a bit of an involved one, but I know what we need to do to resolve it.

@adriangb
Copy link
Member

adriangb commented Mar 5, 2021

Until this is resolved, is there any reasonable way to work around this? Maybe we use our own asyncio.Semaphore like this:

async def main() -> None:
    url = "https://www.example.com"
    max_connections = 2
    timeout = httpx.Timeout(5.0, pool=2.0)
    limits = httpx.Limits(max_connections=max_connections)
    client = httpx.AsyncClient(timeout=timeout, pool_limits=limits)
    semaphore = asyncio.Semaphore(max_connections)
    
    async def aw_task(aw):
        async with semaphore:
            return await aw

    async with client:
        tasks = []
        for _ in range(max_connections + 1):
            tasks.append(aw_task(client.get(url)))
        await asyncio.gather(*tasks)

@eric-spitler
Copy link

I can't be certain (still debugging things), but I believe this to be the cause of issues I'm seeing as well.

In my case, its not using gather, but there is some keep-alive action happening in httpx as well as in our Cloud Foundry routing. The signature I see is the client begins the request, but the target server never sees it. Eventually the client request times out. We did notice that the target server's nginx router supplied by CF logs a 499 status, but those seem to be logged at other times as well, so may be a red herring.

It appears that the connection attempt is happening, but never actually connecting. Also, the symptoms are sporadic - in my case 1 in 10 fails, and there is no pattern to what type of request fails.

After a few weeks of testing, downgrading from httpx-0.18.1 to http-0.17.1 solved the issue - all requests succeed without error. I have not yet tried with 0.18.2, but based on the release notes and this open issue, it doesn't sound like the behavior will change.

@tomchristie
Copy link
Member

See #1741

@eric-spitler
Copy link

Is this still a problem in 0.19 or 1.0.0? I tried running @tomchristie 's code sample but couldn't replicate the behavior on 0.18.x or 0.19.

We've held off updating beyond 0.17.1 due to this, but would really like to get back onto the latest.

@luispsantos
Copy link

I can also confirm that I had this problem in 0.19 and downgrading to 0.17.1 solved the issue.

@tomchristie
Copy link
Member

Have confirmed that the given example now works in httpx 0.21 (Fixed due to the substantial reworking in the latest httpcore.)

michael-k added a commit to michael-k/awacs that referenced this issue Dec 16, 2021
markpeek pushed a commit to cloudtools/awacs that referenced this issue Dec 16, 2021
@pssolanki111
Copy link

pssolanki111 commented Jan 22, 2022

is there a default max_connections_limit in httpx? I'm getting a PoolTimeout and I do not specify any sort of keepalive connections or max connections. I only specify the connect and read timeouts. @tomchristie

@tomchristie
Copy link
Member

@pssolanki111 Sure is, yup... https://www.python-httpx.org/advanced/#pool-limit-configuration

@mikheyevav
Copy link

mikheyevav commented Nov 5, 2023

Faced this error in 0.25.1.
Fallback to 0.25.0 fixed the problem
@tomchristie

@ddelange
Copy link

can confirm the regression ^

@zanieb
Copy link
Contributor

zanieb commented Nov 10, 2023

Perhaps related to encode/httpcore#823 in httpcore>=1.0.1

Can you share your httpcore versions? and perhaps try different versions of httpcore?

@zanieb zanieb reopened this Nov 10, 2023
@ddelange
Copy link

this was the fix:

- httpcore==1.0.1
- httpx==0.25.1
+ httpcore==0.18.0
+ httpx==0.25.0

@NewUserHa
Copy link

NewUserHa commented Nov 21, 2023

tested version '0.25.1' has no the op issue. (version 0.23 has)

but it has no client._transport.get_connection_info() as well, and no change log in httpcore

@jonathanslenders
Copy link

jonathanslenders commented Dec 11, 2023

Same issue here. I'm able to reproduce it also with httpcore==0.18.0 + httpx==0.25.0 (as well as the latest).
Setting max_connections=5 (instead of 100), makes it very reproducible.

edit: It could be that it happens after some previous requests were cancelled while they were ongoing, but I'm not entirely sure.

edit2: Some more information:

  • I'm using async with client.stream() as the only way to interact with the connection pool.
  • The code calling async with client.stream() is protected by a semaphore of us, meaning I'm sure that the number of concurrent active requests are less than the max_connections limit. So, some slots from the connection pool are not released after the client.stream() call returns (or get cancelled - it could be a result of cancellation).

@jonathanslenders
Copy link

jonathanslenders commented Dec 11, 2023

I've a reproducer:

Run this HTTP server script (a simple HTTP server that takes long to respond):

  import asyncio
  from hypercorn.asyncio import serve
  from hypercorn.config import Config
  from starlette.applications import Starlette
  from starlette.responses import JSONResponse
  from starlette.routing import Route

  async def homepage(request):
      await asyncio.sleep(10)
      return JSONResponse({})

  app = Starlette(
      routes=[
          Route("/", homepage),
      ],
  )

  config = Config.from_mapping({})
  config.bind = ["127.0.0.1:8001"]
  asyncio.run(serve(app, config))

Then run this client code:

  from anyio import create_task_group
  import asyncio
  import httpx

  async def main() -> None:
      async with httpx.AsyncClient(
          limits=httpx.Limits(max_connections=2),
          verify=False,
      ) as client:

          async def do_one_request() -> None:
              await client.get("http://localhost:8001/")

          # First, create many requests, then cancel while they are in progress.
          async with create_task_group() as tg:
              for i in range(5):
                  tg.start_soon(do_one_request)
              await asyncio.sleep(0.5)
              tg.cancel_scope.cancel()

          # Starting another request will now fail with a `PoolTimeout`.
          await do_one_request()

  asyncio.run(main())

Looks like the slots in the connection pool are not released during cancellation.

This happens for me on both httpx 0.25.0 + httpcore 0.18.0 as well as on httpx 0.25.2 + httpcore 1.0.2.
Shielding the get() call or stream() call from cancellation is a workaround that works for us.

@konstantin-baidin-y42
Copy link

Probably it's in the httpcore library. I can reproduce the same error using only httpcore. I use the same long-answering server from the @jonathanslenders comment.

import asyncio
import httpcore


async def main() -> None:
    async with httpcore.AsyncConnectionPool(max_connections=3) as pool:
        async def do_one_request():
            return await pool.request("GET", "http://localhost:5522/", extensions={"timeout": {"pool": 1}})

        # First, create many requests, then cancel while they are in progress.
        tasks = []
        for i in range(5):
            tasks.append(asyncio.create_task(do_one_request()))
            await asyncio.sleep(0.0001)
            tasks[-1].cancel()

        print("Wait reasonable amount of time")
        await asyncio.sleep(5)
        print("Starting another request will now fail with a `PoolTimeout`")
        await do_one_request()


asyncio.run(main())

@tomchristie
Copy link
Member

I have verified that encode/httpcore#880 resolves this issue.

Using server example at: #1171 (comment)
And client example at: #1171 (comment)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working concurrency Issues related to concurrency and usage of async libraries pooling Issues and PRs relative to connection pooling
Projects
None yet
Development

Successfully merging a pull request may close this issue.