Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cancel asyncio tasks on worker close #6098

Merged
merged 7 commits into from
Apr 13, 2022
Merged

Conversation

crusaderky
Copy link
Collaborator

@crusaderky crusaderky commented Apr 8, 2022

Upon Worker.close(), clean up the nanny asyncio tasks that were spawned to run Worker.execute().

Caveats

  • Sync functions will keep running until they either terminate naturally or the process is killed. This PR does not change this behaviour.
  • This PR removes one source of leaks of asyncio tasks. There are many others (chiefly around networking).

@github-actions
Copy link
Contributor

github-actions bot commented Apr 9, 2022

Unit Test Results

       16 files  ±  0         16 suites  ±0   7h 21m 10s ⏱️ - 18m 1s
  2 733 tests +  3    2 653 ✔️ +  5       80 💤  - 1  0  - 1 
21 749 runs  +24  20 675 ✔️ +30  1 074 💤  - 5  0  - 1 

Results for commit 1ce2ef0. ± Comparison against base commit bd3f47e.

♻️ This comment has been updated with latest results.

@mrocklin
Copy link
Member

@graingert do you have time to review this?


async def f(ev):
await ev.set()
await asyncio.Future() # Block indefinitely
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
await asyncio.Future() # Block indefinitely
await asyncio.Event().wait()

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's the same, but sure

@mrocklin
Copy link
Member

Thank you for the review @graingert . Aside from those two comments are you ok with this PR? (should I merge if @crusaderky resolves those issues?)

for task in self._async_instructions:
task.cancel()
while self._async_instructions:
await asyncio.sleep(0)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

probably will need an asyncio.gather(*self._async_instructions, return_exceptions=True) and something to handle close itself being cancelled

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if close itself is cancelled, it should not wait for other tasks being cancelled to return (which will happen at the next event loop cycle anyway)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tasks can handle the cancel eg:

async def example():
    async with database():
        await asyncio.Event().wait()

The worker will need to wait for the async context manager to close

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

First, this isn't a plugin system. We don't have any tasks there that catch CancelledError.
Second,

            while self._async_instructions:
                await asyncio.sleep(0)

is doing exactly the same thing as the asyncio.gather you suggested.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a user can submit an async function via c.submit(f, ev, key="f1") which is likely to suppress cancellation. using while self._async_instructions: await asyncio.sleep(0) burns the cpu while waiting for cancellation to be handled

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See latest commit

@mrocklin
Copy link
Member

@graingert any further concerns here, or should I merge?

@mrocklin mrocklin merged commit 7d55039 into dask:main Apr 13, 2022
@crusaderky crusaderky deleted the WMSM/track_tasks branch April 13, 2022 15:09
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Migrate ensure_computing transitions to new WorkerState event mechanism
3 participants