Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add WorkerThreadPool for running synchronous work in threads #7875

Merged
merged 6 commits into from
Dec 15, 2022

Conversation

zanieb
Copy link
Contributor

@zanieb zanieb commented Dec 13, 2022

Follow-up to #7865

Adds a worker pool class for submitting synchronous functions to run in background workers. This will eventually replace our usage of anyio's worker threads, giving us greater control over concurrent work and improved debugging.

Workers will be extended in the near future to support sending asynchronous calls back from workers to the event loop thread and sending asynchronous calls to workers running event loops.

This implementation is inspired largely by the CPython ThreadPoolExecutor implementation. Handling for several weird edge cases (mostly related to garbage collection) is replicated here. This implementation will diverge further as we capitalize on asynchronous support.

Example

import asyncio
from prefect._internal.concurrency.workers import WorkerThreadPool

def identity(x):
    return x

async def main():
    pool = WorkerThreadPool()
    future = await pool.submit(identity, 1)
    print(await future.aresult())

asyncio.run(main())

Checklist

  • This pull request references any related issue by including "closes <link to issue>"
    • If no issue exists and your change is not a small fix, please create an issue first.
  • This pull request includes tests or only affects documentation.
  • This pull request includes a label categorizing the change e.g. fix, feature, enhancement

@zanieb zanieb added the development Tech debt, refactors, CI, tests, and other related work. label Dec 13, 2022
@netlify
Copy link

netlify bot commented Dec 13, 2022

Deploy Preview for prefect-orion ready!

Name Link
🔨 Latest commit d21e4cb
🔍 Latest deploy log https://app.netlify.com/sites/prefect-orion/deploys/6399fc8c5f96ca00094b6351
😎 Deploy Preview https://deploy-preview-7875--prefect-orion.netlify.app
📱 Preview on mobile
Toggle QR Code...

QR Code

Use your smartphone camera to open QR code link.

To edit notification comments on pull requests, go to your Netlify site settings.

except BaseException as exc:
self.future.set_exception(exc)
# Prevent reference cycle in `exc`
self = None

Check notice

Code scanning / CodeQL

Unused local variable

Variable self is not used.
Comment on lines +130 to +132
pool = WorkerThreadPool(max_workers=1)
futures = [await pool.submit(worker.join) for worker in self._workers]
await asyncio.gather(*[future.aresult() for future in futures])
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is really meta but it's way easier than spinning a single thread manually just for this purpose.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was confused for a second but then remembered -- yeah, the ThreadPoolExecutor interface (and thus ours) is way easier to deal with.


Returns a future which can be used to retrieve the result of the function.
"""
async with self._lock:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Technically, because this function never calls await it does not need a lock here. I'm tempted to remove the lock and make a synchronous method, but it feels safer to wait and see what we need in the future.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmmm. My instinct would be to remove the lock until we need it, but I'll defer to you here because I'm not sure what keeping it protects us against -- you may have a better idea of that.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've removed the lock in a following pull request — it turns out I need submit to be synchronous.

I actually had never considered that if your async function doesn't await anything it doesn't need a lock, this was pointed out to me during review of an httpx PR :)

Copy link
Contributor

@peytonrunyan peytonrunyan Dec 20, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Double checking understanding here - this is because we aren't going to be handing execution to another coroutine without an await, right? So as long as this object is only being accessed by coroutines running within the same thread, and not being accessed by other threads, we don't have to worry about race conditions without the await?

@@ -0,0 +1,172 @@
import asyncio
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am considering renaming this module to threads.py to clear the path for process based workers, but think I will also defer that to the future.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The _internal path allows us the freedom to do that without worrying about compatibility, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep!

Copy link
Collaborator

@abrookins abrookins left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM! Left a few comments, YMMV. 👍

assert len(pool._workers) == pool._max_workers


async def test_submit_reuses_idle_thread():
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I might preemptively decorate this with the flaky decorator to get retries.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should never flake in theory :) We can also just sleep for a full second there instead of doing a busy wait, but this is a bit faster. We're just letting Python context switch to call the release method.

pool = WorkerThreadPool()
future = await pool.submit(time.sleep, 1)
await pool.shutdown()
assert await future.aresult() is None
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have a mild concern that aresult() returning None in the successful case leaves room to mask an unknown failure case that would erroneously lead to the same value. I don't feel strongly about it though.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 I could add a function that sleeps then returns a value.

assert await future.aresult() is None


async def test_shutdown_exception_during_join():
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where does the join happen? I'm a little confused about what this test checks.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah pool.shutdown joins all the threads. When called with start, it sends the shutdown to signal to works than kills the threads. Here the pattern is a bit like

--> Test: Shutdown pool
--> Pool: Sends signal to workers
--> Pool: Awaits on worker join which context switches
--> Test: Raises exception
--> Pool: Shuts down cleanly still

I wrote this while dealing with some weird issues with pool shutdown when an exception was raised. It's unclear to me how to clarify it / if it's worth keeping.

@@ -0,0 +1,172 @@
import asyncio
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The _internal path allows us the freedom to do that without worrying about compatibility, right?

work_item = self._queue.get()
if work_item is None:
# Shutdown command received; forward to other workers and exit
self._queue.put_nowait(None)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering about a slightly different approach here, though I haven't thought it through. But what if WorkerThreadPool handed a shutdown_event (Event) to WorkerThread on init. The worker checks every iteration of its run loop to see if shutdown_event is set, and if it is, run() returns. I'm slightly biased toward this approach -- assuming it actually makes sense -- to avoid attaching a semantic value to None. Up to you though!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, the issue is that self._queue.get() is blocking so the worker will not do anything until it receives something in the queue. We could check an event at the end of each work item but we still need to push something in the queue to wake up all the workers. Both AnyIO's worker threads and the CPython thread pool executor use this model — I trust what they're up to for now :)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, you're right -- I went looking to see how ThreadPoolExecutor handled this and missed its use of None to signal. If it's good enough for them I suppose it's good enough for us! 😂

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A bit late but I finally feel up to speed with all of the is. Double checking my understanding from https://github.com/python/cpython/blob/1332fdabbab75bc9e4bced064dc4daab2d7acb47/Lib/asyncio/queues.py#L149

queue.get() will only ever return None after an exception, so we can use it as a signal here?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also double checking - by blocking here, this keeps us burning through CPU on this while True loop, yeah?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes it stops us from burning CPU.

It returns None when we put None in the queue :D — exceptions are raised. We place None in the queue to signal shutdown and to wake up the worker since it is otherwise blocked.


Returns a future which can be used to retrieve the result of the function.
"""
async with self._lock:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmmm. My instinct would be to remove the lock until we need it, but I'll defer to you here because I'm not sure what keeping it protects us against -- you may have a better idea of that.

Comment on lines +130 to +132
pool = WorkerThreadPool(max_workers=1)
futures = [await pool.submit(worker.join) for worker in self._workers]
await asyncio.gather(*[future.aresult() for future in futures])
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was confused for a second but then remembered -- yeah, the ThreadPoolExecutor interface (and thus ours) is way easier to deal with.

@@ -46,4 +56,23 @@ def wrapper() -> None:
raise

__loop.call_soon_threadsafe(wrapper)
return future.result()
return future
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can't say I understand what's going on here, but I'm ok with that.

Copy link
Contributor Author

@zanieb zanieb Dec 15, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These utilities are kind of dumb, but basically AbstractEventLoop.call_soon_threadsafe returns a Handle which is the most useless object around town — you can cancel it and that's it. In cases where we might like, actually want to know what our function returned or wait for its result, we need to return something else. To accomplish this, we wrap the function that we submit to call_soon_threadsafe and use a threading Future to capture the return value.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've added some additional documentation for these functions in the next PR

except BaseException as exc:
self.future.set_exception(exc)
# Prevent reference cycle in `exc`
self = None
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm confused about how this works

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for calling this out! This is a common pattern in CPython, but I also have no idea how it works. Let me try to find some resources.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

python/cpython#80111 may be helpful?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the exception captures frame locals so the future references the exception which references the work item which references the future and we have a cycle. If we set self to None the exception no longer references the work item.

self._queue.put_nowait(work_item)

# Ensure there are workers available to run the work
self._adjust_worker_count()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would it ever be possible for work to be submitted and error before the worker count is adjusted?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think the threads will context switch until after submission completes because control isn't yielded, but even if it did I don't think there would be significant effects.

@zanieb
Copy link
Contributor Author

zanieb commented Dec 15, 2022

Thanks for the reviews!

@zanieb zanieb merged commit 36973f9 into main Dec 15, 2022
@zanieb zanieb deleted the engine/worker branch December 15, 2022 15:51
@serinamarie
Copy link
Contributor

If we pass in a timeout when calling aresult() in terms of how long we will wait for a task to complete, would it make sense to have a test around a TimeOut error or would that just never happen?

@abrookins
Copy link
Collaborator

Good question @serinamarie!

@zanieb
Copy link
Contributor Author

zanieb commented Dec 15, 2022

In aresult we pass timeout=0 to the underlying implementation because we know the future is done (from waiting for the event) and if it's not done for some reason our assumption is wrong and we should crash. If we didn't set the timeout to 0 and waited instead of crashing, we'd sneakily block the event loop.

We could do something like reach into the internals of the Future and set the done_event without setting a result and we'd get the TimeOut error — but generally I try to avoid reaching into the implementation details while writing tests.

not self._idle.acquire(blocking=False)
and len(self._workers) < self._max_workers
):
self._add_worker()
Copy link
Contributor

@peytonrunyan peytonrunyan Dec 20, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Double checking understanding here. We've got the semaphore set to 0. So we go to submit work, acquire() will give us back False, we check that we have room for additional workers, we spin up a new worker, it does its work, then calls the release() incrementing our semaphore, which lets us know we have idle workers in the pool, yeah?

@zanieb zanieb mentioned this pull request Feb 3, 2023
3 tasks
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
development Tech debt, refactors, CI, tests, and other related work.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants