Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Merge commit 'd2ac767de' into anoa/dinsic_release_1_21_x
Browse files Browse the repository at this point in the history
* commit 'd2ac767de':
  Convert ReadWriteLock to async/await. (#8202)
  Fix incorrect return signature
  Fix `wait_for_stream_position` for multiple waiters. (#8196)
  • Loading branch information
anoadragon453 committed Oct 20, 2020
2 parents 5313899 + d2ac767 commit d49dd2f
Show file tree
Hide file tree
Showing 8 changed files with 46 additions and 39 deletions.
1 change: 1 addition & 0 deletions changelog.d/8196.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix `wait_for_stream_position` to allow multiple waiters on same stream ID.
1 change: 1 addition & 0 deletions changelog.d/8202.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Convert various parts of the codebase to async/await.
49 changes: 26 additions & 23 deletions synapse/handlers/pagination.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,18 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from typing import Any, Dict, Optional

from twisted.python.failure import Failure

from synapse.api.constants import EventTypes, Membership
from synapse.api.errors import SynapseError
from synapse.api.filtering import Filter
from synapse.logging.context import run_in_background
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.storage.state import StateFilter
from synapse.types import RoomStreamToken
from synapse.streams.config import PaginationConfig
from synapse.types import Requester, RoomStreamToken
from synapse.util.async_helpers import ReadWriteLock
from synapse.util.stringutils import random_string
from synapse.visibility import filter_events_for_client
Expand Down Expand Up @@ -247,15 +250,16 @@ def start_purge_history(self, room_id, token, delete_local_events=False):
)
return purge_id

async def _purge_history(self, purge_id, room_id, token, delete_local_events):
async def _purge_history(
self, purge_id: str, room_id: str, token: str, delete_local_events: bool
) -> None:
"""Carry out a history purge on a room.
Args:
purge_id (str): The id for this purge
room_id (str): The room to purge from
token (str): topological token to delete events before
delete_local_events (bool): True to delete local events as well as
remote ones
purge_id: The id for this purge
room_id: The room to purge from
token: topological token to delete events before
delete_local_events: True to delete local events as well as remote ones
"""
self._purges_in_progress_by_room.add(room_id)
try:
Expand Down Expand Up @@ -291,9 +295,9 @@ def get_purge_status(self, purge_id):
"""
return self._purges_by_id.get(purge_id)

async def purge_room(self, room_id):
async def purge_room(self, room_id: str) -> None:
"""Purge the given room from the database"""
with (await self.pagination_lock.write(room_id)):
with await self.pagination_lock.write(room_id):
# check we know about the room
await self.store.get_room_version_id(room_id)

Expand All @@ -307,23 +311,22 @@ async def purge_room(self, room_id):

async def get_messages(
self,
requester,
room_id=None,
pagin_config=None,
as_client_event=True,
event_filter=None,
):
requester: Requester,
room_id: Optional[str] = None,
pagin_config: Optional[PaginationConfig] = None,
as_client_event: bool = True,
event_filter: Optional[Filter] = None,
) -> Dict[str, Any]:
"""Get messages in a room.
Args:
requester (Requester): The user requesting messages.
room_id (str): The room they want messages from.
pagin_config (synapse.api.streams.PaginationConfig): The pagination
config rules to apply, if any.
as_client_event (bool): True to get events in client-server format.
event_filter (Filter): Filter to apply to results or None
requester: The user requesting messages.
room_id: The room they want messages from.
pagin_config: The pagination config rules to apply, if any.
as_client_event: True to get events in client-server format.
event_filter: Filter to apply to results or None
Returns:
dict: Pagination API results
Pagination API results
"""
user_id = requester.user.to_string()

Expand All @@ -343,7 +346,7 @@ async def get_messages(

source_config = pagin_config.get_source_config("room")

with (await self.pagination_lock.read(room_id)):
with await self.pagination_lock.read(room_id):
(
membership,
member_event_id,
Expand Down
4 changes: 3 additions & 1 deletion synapse/python_dependencies.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,9 @@
"msgpack>=0.5.2",
"phonenumbers>=8.2.0",
"prometheus_client>=0.0.18,<0.9.0",
# we use attr.validators.deep_iterable, which arrived in 19.1.0
# we use attr.validators.deep_iterable, which arrived in 19.1.0 (Note:
# Fedora 31 only has 19.1, so if we want to upgrade we should wait until 33
# is out in November.)
"attrs>=19.1.0",
"netaddr>=0.7.18",
"Jinja2>=2.9",
Expand Down
6 changes: 2 additions & 4 deletions synapse/replication/tcp/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
# limitations under the License.
"""A replication client for use by synapse workers.
"""
import heapq
import logging
from typing import TYPE_CHECKING, Dict, List, Tuple

Expand Down Expand Up @@ -219,9 +218,8 @@ async def wait_for_stream_position(

waiting_list = self._streams_to_waiters.setdefault(stream_name, [])

# We insert into the list using heapq as it is more efficient than
# pushing then resorting each time.
heapq.heappush(waiting_list, (position, deferred))
waiting_list.append((position, deferred))
waiting_list.sort(key=lambda t: t[0])

# We measure here to get in flight counts and average waiting time.
with Measure(self._clock, "repl.wait_for_stream_position"):
Expand Down
2 changes: 1 addition & 1 deletion synapse/storage/databases/main/registration.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ async def get_user_by_access_token(self, token: str) -> Optional[dict]:
)

@cached()
async def get_expiration_ts_for_user(self, user_id: str) -> Optional[None]:
async def get_expiration_ts_for_user(self, user_id: str) -> Optional[int]:
"""Get the expiration timestamp for the account bearing a given user ID.
Args:
Expand Down
16 changes: 8 additions & 8 deletions synapse/util/async_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from typing import Dict, Sequence, Set, Union

import attr
from typing_extensions import ContextManager

from twisted.internet import defer
from twisted.internet.defer import CancelledError
Expand Down Expand Up @@ -338,11 +339,11 @@ def eb(e):


class ReadWriteLock(object):
"""A deferred style read write lock.
"""An async read write lock.
Example:
with (yield read_write_lock.read("test_key")):
with await read_write_lock.read("test_key"):
# do some work
"""

Expand All @@ -365,8 +366,7 @@ def __init__(self):
# Latest writer queued
self.key_to_current_writer = {} # type: Dict[str, defer.Deferred]

@defer.inlineCallbacks
def read(self, key):
async def read(self, key: str) -> ContextManager:
new_defer = defer.Deferred()

curr_readers = self.key_to_current_readers.setdefault(key, set())
Expand All @@ -376,7 +376,8 @@ def read(self, key):

# We wait for the latest writer to finish writing. We can safely ignore
# any existing readers... as they're readers.
yield make_deferred_yieldable(curr_writer)
if curr_writer:
await make_deferred_yieldable(curr_writer)

@contextmanager
def _ctx_manager():
Expand All @@ -388,8 +389,7 @@ def _ctx_manager():

return _ctx_manager()

@defer.inlineCallbacks
def write(self, key):
async def write(self, key: str) -> ContextManager:
new_defer = defer.Deferred()

curr_readers = self.key_to_current_readers.get(key, set())
Expand All @@ -405,7 +405,7 @@ def write(self, key):
curr_readers.clear()
self.key_to_current_writer[key] = new_defer

yield make_deferred_yieldable(defer.gatherResults(to_wait_on))
await make_deferred_yieldable(defer.gatherResults(to_wait_on))

@contextmanager
def _ctx_manager():
Expand Down
6 changes: 4 additions & 2 deletions tests/util/test_rwlock.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from twisted.internet import defer

from synapse.util.async_helpers import ReadWriteLock

Expand Down Expand Up @@ -43,6 +44,7 @@ def test_rwlock(self):
rwlock.read(key), # 5
rwlock.write(key), # 6
]
ds = [defer.ensureDeferred(d) for d in ds]

self._assert_called_before_not_after(ds, 2)

Expand Down Expand Up @@ -73,12 +75,12 @@ def test_rwlock(self):
with ds[6].result:
pass

d = rwlock.write(key)
d = defer.ensureDeferred(rwlock.write(key))
self.assertTrue(d.called)
with d.result:
pass

d = rwlock.read(key)
d = defer.ensureDeferred(rwlock.read(key))
self.assertTrue(d.called)
with d.result:
pass

0 comments on commit d49dd2f

Please sign in to comment.