Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Preserve changes to contexvars made in threadpools #1258

Closed
wants to merge 30 commits into from

Conversation

adriangb
Copy link
Member

@adriangb adriangb commented Aug 4, 2021

I think this addresses fastapi/fastapi#953.

Thank you @smagafurov for the link in fastapi/fastapi#953 (comment)

What does this do for users?

This fixes a confusing behavior where their endpoint works differently depending on if it is sync or async.
And in FastAPI (which for better or worse re-uses run_in_threadpool), it impacts dependencies as well.

This is the current behavior:

from typing import List
from contextvars import ContextVar

from starlette.applications import Starlette
from starlette.routing import Route
from starlette.requests import Request
from starlette.responses import Response
from starlette.middleware import Middleware
from starlette.testclient import TestClient
from starlette.types import ASGIApp, Scope, Receive, Send, Message

user: ContextVar[str] = ContextVar("user")

log: List[str] = []


class LogUserMiddleware:
    def __init__(self, app: ASGIApp) -> None:
        self.app = app
    
    async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None:
        async def send_wrapper(message: Message) -> None:
            if message["type"] == "http.response.start":
                log.append(user.get())
            await send(message)
        await self.app(scope, receive, send_wrapper)


def endpoint_sync(request: Request) -> Response:
    user.set(request.query_params["user"])
    return Response()


async def endpoint_async(request: Request) -> Response:
    return endpoint_sync(request)


app = Starlette(
    routes=[
        Route("/sync", endpoint_sync),
        Route("/async", endpoint_async),
    ],
    middleware=[Middleware(LogUserMiddleware)]
)


client = TestClient(app, raise_server_exceptions=False)


resp = client.get("/async", params={"user": "adrian"})
assert resp.status_code == 200, resp.content
assert log == ["adrian"]

resp = client.get("/sync", params={"user": "adrian"})
assert resp.status_code == 500, resp.content

I wouldn't expect a user to write this themselves, but some library they are using very well could use contextvars to set something that then gets picked up by a middleware (for example, some sort of tracing library). Since this is an implementation detail, and the failure mode is pretty tricky, it would be difficult for a user to figure out what happened.

@adriangb adriangb closed this Dec 7, 2021
@adriangb adriangb reopened this Dec 7, 2021
@adriangb
Copy link
Member Author

adriangb commented Dec 7, 2021

Accidental close. Sorry.

I merged in master, made the extra method private and cleaned up tests / coverage. Tests and linting are passing locally now.

@Kludex
Copy link
Member

Kludex commented Dec 16, 2021

This should have been fixed by anyio 3.4.0: https://anyio.readthedocs.io/en/stable/versionhistory.html

Can you confirm? @adriangb

@adriangb adriangb changed the title Restore threadpool Preserve changes to contexvars made in threadpools Dec 16, 2021
@adriangb
Copy link
Member Author

adriangb commented Dec 16, 2021

It seems like they implemented context propagation, but not capturing of changes in the context. You can verified this by taking this branch, replacing starlette/concurrency.py with the version from master and seeing that the new tests fail.

In other words, we can probably remove the contextvar stuff here:

child = functools.partial(func, *args, **kwargs)
context = contextvars.copy_context()
func = context.run
args = (child,)
. But that should probably be a separate PR.

@adriangb adriangb added feature New feature or request refactor Refactor code labels Feb 2, 2022
Comment on lines +46 to +83
@pytest.mark.anyio
async def test_restore_context_from_thread_previously_set():
"""Value outside of threadpool is overwitten with value set in threadpool"""
ctxvar: contextvars.ContextVar[str] = contextvars.ContextVar("ctxvar")
ctxvar.set("spam")

def sync_task():
ctxvar.set("ham")

await run_in_threadpool(sync_task)
assert ctxvar.get() == "ham"


@pytest.mark.anyio
async def test_restore_context_from_thread_previously_unset():
"""Value outside of threadpool is set to value in threadpool"""
ctxvar: contextvars.ContextVar[str] = contextvars.ContextVar("ctxvar")

def sync_task():
ctxvar.set("ham")

await run_in_threadpool(sync_task)
assert ctxvar.get() == "ham"


@pytest.mark.anyio
async def test_restore_context_from_thread_new_cvar():
"""Value outside of threadpool is set for a cvar created in the threadpool"""
ctxvars: List[contextvars.ContextVar[str]] = []

def sync_task():
ctxvar: contextvars.ContextVar[str] = contextvars.ContextVar("ctxvar")
ctxvar.set("ham")
ctxvars.append(ctxvar)

await run_in_threadpool(sync_task)
assert len(ctxvars) == 1
assert next(iter(ctxvars)).get() == "ham"
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This are the failing tests, which get to the root of the problem in the PR post without jumping through hoops to exercise the issue

@Kludex
Copy link
Member

Kludex commented Apr 24, 2022

Are there any cons of doing this? Are we going to break anything with this? Is this really necessary? Is it possible to do what the PR proposes without adding it to Starlette itself, but a Starlette user can do it on their own?

@Kludex Kludex modified the milestone: Version 0.20.0 Apr 24, 2022
@Kludex Kludex mentioned this pull request May 22, 2022
5 tasks
@Kludex
Copy link
Member

Kludex commented May 23, 2022

I've asked about this on anyio's Gitter channel: https://gitter.im/python-trio/AnyIO?at=628b22247df86c141e9b566a

As Gitter history is deleted after some time, I'll paste the conversation for the record:

Conversation screenshots image image

@Kludex Kludex modified the milestones: Version 0.20.1, Version 0.21.0 May 23, 2022
@adriangb
Copy link
Member Author

@agronholm opened a discussion on discuss.python.org: https://discuss.python.org/t/back-propagation-of-contextvar-changes-from-worker-threads/15928

@adriangb
Copy link
Member Author

Looks like in 3.11 we might be able to use this for async tasks as well: python/cpython#31837

@agronholm
Copy link
Contributor

Looks like in 3.11 we might be able to use this for async tasks as well: python/cpython#31837

Can you clarify? I'm not sure how this addition will help Starlette.

@adriangb
Copy link
Member Author

adriangb commented May 26, 2022

We have a similar issue with BaseHTTPMiddleware: https://github.com/encode/starlette/blob/master/tests/middleware/test_base.py#L192-L223

I also think that in that case it's better to push people away from using it instead of trying to fix it, but we may be able to also try to fix it to make the situation better for users that can't / won't drop BaseHTTPMiddleware.

So in theory in 3.11 I think we should be able to do:

ctx = copy_context()
with anyio.create_task_group() as tg:
    tg.start_soon(..., context=ctx)   # assuming you add this to anyio
_restore_context(ctx)  # this hack

@hs-sean-grant
Copy link

Not sure if this is appropriate to post here, but I am interested in this PR for exactly the use case @adriangb mentions in the OP. I am trying to implement AWS X-Ray tracing SDK into a FastAPI application but am blocked because the SDK stores the traces in the thread's context, which is lost when the controller endpoint thread is ran in the threadpool.

@adriangb
Copy link
Member Author

Yup that is appropriate! User feedback is always appreciated. Could you try this branch and see if it solves things for you?

@adriangb
Copy link
Member Author

Quite relevant: django/asgiref#267

So there are some edge cases with this implementation. In Django this is a problem because there are two different threads, in Starlette we just have one, so that usage doesn't make sense in Starlette and hence I don't think we'd run into this issue. Besides, it is currently also broken, this makes it no better or worse.

@Kludex Kludex requested review from a team and graingert August 13, 2022 14:39
@Kludex
Copy link
Member

Kludex commented Sep 3, 2022

@graingert would you mind sharing why we shouldn't go with this? Just to have the record here. 😟 🙏

@Kludex Kludex modified the milestones: Version 0.21.0, Version 0.22.0 Sep 3, 2022
@jhominal
Copy link
Member

jhominal commented Sep 3, 2022

I do not know what @graingert would say, but I would also opine that, on the balance of it, restoring contextvars is not a game that we should play, as the only guarantee given by contextvars is that it will flow downstream - there is no guarantee about either the scope of the context (is it the process? A HTTP request? Something else?), or that any variable set will flow upstream to the calling function. Implementing upstream contextvars propagation in generic functions is, in my opinion, likely to bite us in the a** later.

In other words, I currently think that “setting a context variable so that it can be picked up by upstream code” is a misuse of the contextvars API.

I believe that for the issues highlighted here, alternative solutions can be implemented, among which:

  • setting a mutable object in a context variable near the level where the variable will be read;
  • using explicit context.run in places where someone wants __enter__ and __exit__ to share a context;
  • If we think we should do it (with the appropriate thinking about race conditions), starlette could provide some kind of explicitly request-scoped context;

@adriangb
Copy link
Member Author

adriangb commented Sep 3, 2022

as the only guarantee given by contextvars is that it will flow downstream

I think the issue is that for a FastAPI user this is downstream:

def dep1() -> None:
    ctx.set("foo")

def dep2(d1: None = Depends()) -> None:
    ctx.get()

This should be modeled as dep2(dep1()) -> dep2 is downstream of dep1 and should see changes to context variables. But they don't because executing dep1 in threads means that any context variables set are erased. I know that we all probably understand why that is and think that the current behavior makes sense, but for FastAPI users who don't understand these things (and perhaps aren't even interacting with context variables directly) the behavior will seem wrong.

Implementing upstream contextvars propagation in generic functions is, in my opinion, likely to bite us in the a** later

FWIW I don't disagree with this. I have a gut feeling that there will be some issues with implementing this, even if it's been in asgiref for years and hasn't caused any issues.

If we think we should do it (with the appropriate thinking about race conditions), starlette could provide some kind of explicitly request-scoped context

This won't solve things for users that are just using a library that uses context vars, but I think providing an explicit context would be great. Flask does it. This would allow FastAPI to manipulate the context as it sees fit to make it work with the dependency injection system. Starlette could use this context to make propagating data from lifespans to requests less akward/not use globals. I already proposed this in asgiref: django/asgiref#322

@jhominal
Copy link
Member

jhominal commented Sep 3, 2022

I think the issue is that for a FastAPI user this is downstream:

def dep1() -> None:
    ctx.set("foo")

def dep2(d1: None = Depends()) -> None:
    ctx.get()

This should be modeled as dep2(dep1()) -> dep2 is downstream of dep1 and should see changes to context variables. But they don't because executing dep1 in threads means that any context variables set are erased. I know that we all probably understand why that is and think that the current behavior makes sense, but for FastAPI users who don't understand these things (and perhaps aren't even interacting with context variables directly) the behavior will seem wrong.

The definition of what I mean by "downstream" for contextvars is quite restrictive - that is, g is downstream of f if the upstream call stack for g has f (either the real call stack in the case of synchronous calls, or the "logical call stack" in the case of asynchronous calls).

In this sense, dep2 is not downstream of dep1 - the dep1 function has long finished executing when dep2 is called.

However, I would rather avoid in-depth discussion of how to solve FastAPI's dependency issue in this repository - FastAPI's dependency system, and, in particular, any runtime/execution guarantees that FastAPI wishes to provide to its users, need to be part of FastAPI's feature set.

In this case, I sincerely believe that it is possible for FastAPI to use the contextvars.Context.run function to provide the guarantees that FastAPI wishes to provide to its users.

In order to enable FastAPI to support these cases more easily, I would propose that run_in_threadpool gets a new kwarg argument of type contextvars.Context, that will use the context.run method to ensure that the synchronous function being run is run in the provided context instance.

This won't solve things for users that are just using a library that uses context vars, but I think providing an explicit context would be great. Flask does it. This would allow FastAPI to manipulate the context as it sees fit to make it work with the dependency injection system. Starlette could use this context to make propagating data from lifespans to requests less akward/not use globals. I already proposed this in asgiref: django/asgiref#322

I already knew that. :)

@adriangb
Copy link
Member Author

adriangb commented Sep 5, 2022

I would rather avoid in-depth discussion of how to solve FastAPI's dependency issue in this repository - FastAPI's dependency system, and, in particular, any runtime/execution guarantees that FastAPI wishes to provide to its users, need to be part of FastAPI's feature set

I do agree on this 😄, I think FastAPI should not be using these functions from Starlette (I think I even remember Tom saying we should have made these private from the get-go).

Implementing upstream contextvars propagation in generic functions is, in my opinion, likely to bite us in the a** later

This is something I keep thinking about. Although this has been in asgiref for a long time and as far as I can remember from all of these discussions no one has pointed out a concrete issue with the idea or implementation, I can't help but agree with you that it just feels like there's going to be some edge cases if we go down this route.

To be honest I'm not enthused with this solution, it's not a problem I personally deal with and there doesn't seem to be much support amongst our team for it. I'm going to close the PR for now and if things change in the future we can re-evaluate.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
feature New feature or request refactor Refactor code
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants