Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Add cancellation support to ReadWriteLock #12120

Merged
merged 16 commits into from
Mar 14, 2022
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/12120.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add support for cancellation to `ReadWriteLock`.
8 changes: 4 additions & 4 deletions synapse/handlers/pagination.py
Original file line number Diff line number Diff line change
Expand Up @@ -349,7 +349,7 @@ async def _purge_history(
"""
self._purges_in_progress_by_room.add(room_id)
try:
with await self.pagination_lock.write(room_id):
async with self.pagination_lock.write(room_id):
await self.storage.purge_events.purge_history(
room_id, token, delete_local_events
)
Expand Down Expand Up @@ -405,7 +405,7 @@ async def purge_room(self, room_id: str, force: bool = False) -> None:
room_id: room to be purged
force: set true to skip checking for joined users.
"""
with await self.pagination_lock.write(room_id):
async with self.pagination_lock.write(room_id):
# first check that we have no users in this room
if not force:
joined = await self.store.is_host_joined(room_id, self._server_name)
Expand Down Expand Up @@ -447,7 +447,7 @@ async def get_messages(

room_token = from_token.room_key

with await self.pagination_lock.read(room_id):
async with self.pagination_lock.read(room_id):
(
membership,
member_event_id,
Expand Down Expand Up @@ -612,7 +612,7 @@ async def _shutdown_and_purge_room(

self._purges_in_progress_by_room.add(room_id)
try:
with await self.pagination_lock.write(room_id):
async with self.pagination_lock.write(room_id):
self._delete_by_id[delete_id].status = DeleteStatus.STATUS_SHUTTING_DOWN
self._delete_by_id[
delete_id
Expand Down
67 changes: 43 additions & 24 deletions synapse/util/async_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,10 @@
import inspect
import itertools
import logging
from contextlib import contextmanager
from contextlib import asynccontextmanager, contextmanager
from typing import (
Any,
AsyncIterator,
Awaitable,
Callable,
Collection,
Expand All @@ -40,7 +41,7 @@
)

import attr
from typing_extensions import ContextManager
from typing_extensions import AsyncContextManager

from twisted.internet import defer
from twisted.internet.defer import CancelledError
Expand Down Expand Up @@ -483,7 +484,7 @@ class ReadWriteLock:

Example:

with await read_write_lock.read("test_key"):
async with read_write_lock.read("test_key"):
# do some work
"""

Expand All @@ -506,22 +507,24 @@ def __init__(self) -> None:
# Latest writer queued
self.key_to_current_writer: Dict[str, defer.Deferred] = {}

async def read(self, key: str) -> ContextManager:
def read(self, key: str) -> AsyncContextManager:
new_defer: "defer.Deferred[None]" = defer.Deferred()

curr_readers = self.key_to_current_readers.setdefault(key, set())
curr_writer = self.key_to_current_writer.get(key, None)

curr_readers.add(new_defer)

# We wait for the latest writer to finish writing. We can safely ignore
# any existing readers... as they're readers.
if curr_writer:
await make_deferred_yieldable(curr_writer)

@contextmanager
def _ctx_manager() -> Iterator[None]:
@asynccontextmanager
async def _ctx_manager() -> AsyncIterator[None]:
try:
# We wait for the latest writer to finish writing. We can safely ignore
# any existing readers... as they're readers.
# May raise a `CancelledError` if the `Deferred` wrapping us is
# cancelled. The `Deferred` we are waiting on must not be cancelled,
# since we do not own it.
if curr_writer:
await make_deferred_yieldable(stop_cancellation(curr_writer))
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure of the benefit of using async context managers.
It mainly benefits the cleanup for the read path, where the cleanup for not-holding-the-lock and holding-the-lock happen to be the same. On the write path we have to handle both cases differently, by deferring the cleanup until we actually acquire the lock.

yield
finally:
with PreserveLoggingContext():
Expand All @@ -530,7 +533,7 @@ def _ctx_manager() -> Iterator[None]:

return _ctx_manager()

async def write(self, key: str) -> ContextManager:
def write(self, key: str) -> AsyncContextManager:
new_defer: "defer.Deferred[None]" = defer.Deferred()

curr_readers = self.key_to_current_readers.get(key, set())
Expand All @@ -541,25 +544,41 @@ async def write(self, key: str) -> ContextManager:
if curr_writer:
to_wait_on.append(curr_writer)

# We can clear the list of current readers since the new writer waits
# We can clear the list of current readers since `new_defer` waits
# for them to finish.
curr_readers.clear()
self.key_to_current_writer[key] = new_defer

await make_deferred_yieldable(defer.gatherResults(to_wait_on))

@contextmanager
def _ctx_manager() -> Iterator[None]:
@asynccontextmanager
async def _ctx_manager() -> AsyncIterator[None]:
to_wait_on_defer = defer.gatherResults(to_wait_on)
try:
# Wait for all current readers and the latest writer to finish.
# May raise a `CancelledError` if the `Deferred` wrapping us is
# cancelled. The `Deferred`s we are waiting on must not be cancelled,
# since we do not own them.
await make_deferred_yieldable(stop_cancellation(to_wait_on_defer))
yield
finally:
with PreserveLoggingContext():
new_defer.callback(None)
# `self.key_to_current_writer[key]` may be missing if there was another
# writer waiting for us and it completed entirely within the
# `new_defer.callback()` call above.
if self.key_to_current_writer.get(key) == new_defer:
self.key_to_current_writer.pop(key)

def release() -> None:
with PreserveLoggingContext():
richvdh marked this conversation as resolved.
Show resolved Hide resolved
new_defer.callback(None)
# `self.key_to_current_writer[key]` may be missing if there was another
# writer waiting for us and it completed entirely within the
# `new_defer.callback()` call above.
if self.key_to_current_writer.get(key) == new_defer:
self.key_to_current_writer.pop(key)

if to_wait_on_defer.called:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could we, instead of all this, do:

             to_wait_on_defer = defer.gatherResults(to_wait_on)
             to_wait_on_defer.addBoth(lambda _: release())
             await make_deferred_yieldable(to_wait_on_defer)
             yield

... possibly with more stop_cancellation ? Or does that not work?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

release() must only be called after the yield, so I'm guessing you meant:

try:
    await make_deferred_yieldable(stop_cancellation(to_wait_on_defer))
    yield
finally:
    to_wait_on_defer.addBoth(lambda _: release())

which I was originally hesitant to do because of re-entrancy in the happy path:

  1. When we exit the await, it will be because we are inside a call to to_wait_on_defer.callback(...).
  2. During the yield / body of the context manager, we may or may not do a blocking await. If we block, then the to_wait_on_defer.callback() call gets exited, otherwise we remain within the to_wait_on_defer.callback() call.
  3. We reach the finally block.
    If we're no longer inside the callback() call, release() will be called instantly with the current logging context.
    Otherwise if we're still inside the callback() call, release() won't be called until we next block or terminate and will have the sentinel context.

The sometimes-extra delay before release() is tricky to reason about, but I think the proposed code would be alright.


As an alternative, I'm very tempted by @erikjohnston's suggestion of delaying CancelledErrors:

try:
    await make_deferred_yieldable(delay_cancellation(to_wait_on_defer))
    yield
finally:
    release()  # but inline again

Where delay_cancellation would hold on to the CancelledError and only raise it once to_wait_on_defer resolves.

We'd get better maintainability at the cost of having the crud associated with cancelled requests lingering a little while longer, but maybe that's a worthwhile tradeoff.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

release() must only be called after the yield,

oh, duh. Yes, of course.

The sometimes-extra delay before release() is tricky to reason about, but I think the proposed code would be alright.

ugh, right, yes, let's avoid that.

And yes, delay_cancellation looks like it might be worth exploring.

Copy link
Contributor Author

@squahtx squahtx Mar 8, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had a go at using a delay in cadfe0a.
We now depend on #12180 for delay_cancellation of course.

release()
else:
# We don't have the lock yet, probably because we were cancelled
# while waiting for it. We can't call `release()` yet, since
# `new_defer` must only resolve once all previous readers and
# writers have finished.
# NB: `release()` won't have a logcontext in this path.
to_wait_on_defer.addCallback(lambda _: release())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

presumably we need to do this even if to_wait_on_defer ends up erroring, so this needs to be

Suggested change
to_wait_on_defer.addCallback(lambda _: release())
to_wait_on_defer.addBoth(lambda _: release())

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll make the change for the sake of robustness, but all of the Deferreds that represent lock release, and thus also to_wait_on_defer, should never error.


return _ctx_manager()

Expand Down
Loading