From 16b62fb11f426fab92ab6954894fd1a446622420 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 3 Sep 2020 18:58:24 +0100 Subject: [PATCH 1/4] Clean up types for PaginationConfig --- synapse/handlers/initial_sync.py | 11 +++--- synapse/handlers/pagination.py | 40 +++++++++++---------- synapse/handlers/presence.py | 3 -- synapse/handlers/receipts.py | 15 -------- synapse/notifier.py | 5 +-- synapse/streams/config.py | 61 +++++++++++--------------------- 6 files changed, 50 insertions(+), 85 deletions(-) diff --git a/synapse/handlers/initial_sync.py b/synapse/handlers/initial_sync.py index d5ddc583ad69..ddb8f0712bae 100644 --- a/synapse/handlers/initial_sync.py +++ b/synapse/handlers/initial_sync.py @@ -116,14 +116,13 @@ async def _snapshot_all_rooms( now_token = self.hs.get_event_sources().get_current_token() presence_stream = self.hs.get_event_sources().sources["presence"] - pagination_config = PaginationConfig(from_token=now_token) - presence, _ = await presence_stream.get_pagination_rows( - user, pagination_config.get_source_config("presence"), None + presence, _ = await presence_stream.get_new_events( + user, from_key=None, include_offline=False ) - receipt_stream = self.hs.get_event_sources().sources["receipt"] - receipt, _ = await receipt_stream.get_pagination_rows( - user, pagination_config.get_source_config("receipt"), None + joined_rooms = [r.room_id for r in room_list if r.membership == Membership.JOIN] + receipt = await self.store.get_linearized_receipts_for_rooms( + joined_rooms, to_key=int(now_token.receipt_key), ) tags_by_room = await self.store.get_tags_for_user(user_id) diff --git a/synapse/handlers/pagination.py b/synapse/handlers/pagination.py index 63d7edff87a1..fc3fac2e2521 100644 --- a/synapse/handlers/pagination.py +++ b/synapse/handlers/pagination.py @@ -335,20 +335,18 @@ async def get_messages( user_id = requester.user.to_string() if pagin_config.from_token: - room_token = pagin_config.from_token.room_key + from_token = pagin_config.from_token else: - pagin_config.from_token = ( - self.hs.get_event_sources().get_current_token_for_pagination() - ) - room_token = pagin_config.from_token.room_key + from_token = self.hs.get_event_sources().get_current_token_for_pagination() - room_token = RoomStreamToken.parse(room_token) + if pagin_config.limit is None: + # This shouldn't happen as we've set a default limit before this + # gets called. + raise Exception("limit not set") - pagin_config.from_token = pagin_config.from_token.copy_and_replace( - "room_key", str(room_token) - ) + room_token = RoomStreamToken.parse(from_token.room_key) - source_config = pagin_config.get_source_config("room") + from_token = from_token.copy_and_replace("room_key", str(room_token)) with await self.pagination_lock.read(room_id): ( @@ -358,7 +356,7 @@ async def get_messages( room_id, user_id, allow_departed_users=True ) - if source_config.direction == "b": + if pagin_config.direction == "b": # if we're going backwards, we might need to backfill. This # requires that we have a topo token. if room_token.topological: @@ -381,7 +379,9 @@ async def get_messages( member_event_id ) if RoomStreamToken.parse(leave_token).topological < max_topo: - source_config.from_key = str(leave_token) + from_token = from_token.copy_and_replace( + "room_key", leave_token + ) await self.hs.get_handlers().federation_handler.maybe_backfill( room_id, max_topo @@ -389,14 +389,16 @@ async def get_messages( events, next_key = await self.store.paginate_room_events( room_id=room_id, - from_key=source_config.from_key, - to_key=source_config.to_key, - direction=source_config.direction, - limit=source_config.limit, + from_key=from_token.room_key, + to_key=pagin_config.to_token.room_key + if pagin_config.to_token + else None, + direction=pagin_config.direction, + limit=pagin_config.limit, event_filter=event_filter, ) - next_token = pagin_config.from_token.copy_and_replace("room_key", next_key) + next_token = from_token.copy_and_replace("room_key", next_key) if events: if event_filter: @@ -409,7 +411,7 @@ async def get_messages( if not events: return { "chunk": [], - "start": pagin_config.from_token.to_string(), + "start": from_token.to_string(), "end": next_token.to_string(), } @@ -438,7 +440,7 @@ async def get_messages( events, time_now, as_client_event=as_client_event ) ), - "start": pagin_config.from_token.to_string(), + "start": from_token.to_string(), "end": next_token.to_string(), } diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index 184606815094..a0b7bd4131ce 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -1108,9 +1108,6 @@ async def get_new_events( def get_current_key(self): return self.store.get_current_presence_token() - async def get_pagination_rows(self, user, pagination_config, key): - return await self.get_new_events(user, from_key=None, include_offline=False) - @cached(num_args=2, cache_context=True) async def _get_interested_in(self, user, explicit_room_id, cache_context): """Returns the set of users that the given user should see presence diff --git a/synapse/handlers/receipts.py b/synapse/handlers/receipts.py index f922d8a54545..bf8bd8a34d38 100644 --- a/synapse/handlers/receipts.py +++ b/synapse/handlers/receipts.py @@ -142,18 +142,3 @@ async def get_new_events(self, from_key, room_ids, **kwargs): def get_current_key(self, direction="f"): return self.store.get_max_receipt_stream_id() - - async def get_pagination_rows(self, user, config, key): - to_key = int(config.from_key) - - if config.to_key: - from_key = int(config.to_key) - else: - from_key = None - - room_ids = await self.store.get_rooms_for_user(user.to_string()) - events = await self.store.get_linearized_receipts_for_rooms( - room_ids, from_key=from_key, to_key=to_key - ) - - return (events, to_key) diff --git a/synapse/notifier.py b/synapse/notifier.py index dfb096e589ad..8ec6320b7615 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -432,8 +432,9 @@ async def get_events_for( If explicit_room_id is set, that room will be polled for events only if it is world readable or the user has joined the room. """ - from_token = pagination_config.from_token - if not from_token: + if pagination_config.from_token: + from_token = pagination_config.from_token + else: from_token = self.event_sources.get_current_token() limit = pagination_config.limit diff --git a/synapse/streams/config.py b/synapse/streams/config.py index ca7c16ff65c1..ab8faf3e18b8 100644 --- a/synapse/streams/config.py +++ b/synapse/streams/config.py @@ -14,9 +14,13 @@ # limitations under the License. import logging +from typing import Optional + +import attr from synapse.api.errors import SynapseError from synapse.http.servlet import parse_integer, parse_string +from synapse.http.site import SynapseRequest from synapse.types import StreamToken logger = logging.getLogger(__name__) @@ -25,38 +29,22 @@ MAX_LIMIT = 1000 -class SourcePaginationConfig(object): - - """A configuration object which stores pagination parameters for a - specific event source.""" - - def __init__(self, from_key=None, to_key=None, direction="f", limit=None): - self.from_key = from_key - self.to_key = to_key - self.direction = "f" if direction == "f" else "b" - self.limit = min(int(limit), MAX_LIMIT) if limit is not None else None - - def __repr__(self): - return "StreamConfig(from_key=%r, to_key=%r, direction=%r, limit=%r)" % ( - self.from_key, - self.to_key, - self.direction, - self.limit, - ) - - +@attr.s class PaginationConfig(object): - """A configuration object which stores pagination parameters.""" - def __init__(self, from_token=None, to_token=None, direction="f", limit=None): - self.from_token = from_token - self.to_token = to_token - self.direction = "f" if direction == "f" else "b" - self.limit = min(int(limit), MAX_LIMIT) if limit is not None else None + from_token = attr.ib(type=Optional[StreamToken]) + to_token = attr.ib(type=Optional[StreamToken]) + direction = attr.ib(type=str) + limit = attr.ib(type=Optional[int]) @classmethod - def from_request(cls, request, raise_invalid_params=True, default_limit=None): + def from_request( + cls, + request: SynapseRequest, + raise_invalid_params: bool = True, + default_limit: Optional[int] = None, + ) -> "PaginationConfig": direction = parse_string(request, "dir", default="f", allowed_values=["f", "b"]) from_tok = parse_string(request, "from") @@ -78,8 +66,11 @@ def from_request(cls, request, raise_invalid_params=True, default_limit=None): limit = parse_integer(request, "limit", default=default_limit) - if limit and limit < 0: - raise SynapseError(400, "Limit must be 0 or above") + if limit: + if limit < 0: + raise SynapseError(400, "Limit must be 0 or above") + + limit = min(int(limit), MAX_LIMIT) try: return PaginationConfig(from_tok, to_tok, direction, limit) @@ -87,20 +78,10 @@ def from_request(cls, request, raise_invalid_params=True, default_limit=None): logger.exception("Failed to create pagination config") raise SynapseError(400, "Invalid request.") - def __repr__(self): + def __repr__(self) -> str: return ("PaginationConfig(from_tok=%r, to_tok=%r, direction=%r, limit=%r)") % ( self.from_token, self.to_token, self.direction, self.limit, ) - - def get_source_config(self, source_name): - keyname = "%s_key" % source_name - - return SourcePaginationConfig( - from_key=getattr(self.from_token, keyname), - to_key=getattr(self.to_token, keyname) if self.to_token else None, - direction=self.direction, - limit=self.limit, - ) From 1ca19cec68d281b9fcd62894ad4706d689a37397 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 3 Sep 2020 22:05:41 +0100 Subject: [PATCH 2/4] Newsfile --- changelog.d/8250.misc | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/8250.misc diff --git a/changelog.d/8250.misc b/changelog.d/8250.misc new file mode 100644 index 000000000000..b6896a9300d5 --- /dev/null +++ b/changelog.d/8250.misc @@ -0,0 +1 @@ +Clean up type hints for `PaginationConfig`. From bc4f225608e2da560200c657b7e8b39ba34a2928 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 3 Sep 2020 23:26:32 +0100 Subject: [PATCH 3/4] Move ternary out of function invocation --- synapse/handlers/pagination.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/synapse/handlers/pagination.py b/synapse/handlers/pagination.py index fc3fac2e2521..bdb28961c317 100644 --- a/synapse/handlers/pagination.py +++ b/synapse/handlers/pagination.py @@ -387,12 +387,14 @@ async def get_messages( room_id, max_topo ) + to_room_key = None + if pagin_config.to_token: + to_room_key = pagin_config.to_token.room_key + events, next_key = await self.store.paginate_room_events( room_id=room_id, from_key=from_token.room_key, - to_key=pagin_config.to_token.room_key - if pagin_config.to_token - else None, + to_key=to_room_key, direction=pagin_config.direction, limit=pagin_config.limit, event_filter=event_filter, From 2b97d1186e4893b5a1968222f1532181ab225284 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 8 Sep 2020 14:20:57 +0100 Subject: [PATCH 4/4] Fixup --- synapse/handlers/pagination.py | 2 -- synapse/streams/config.py | 2 +- 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/synapse/handlers/pagination.py b/synapse/handlers/pagination.py index bdb28961c317..3ca344476c50 100644 --- a/synapse/handlers/pagination.py +++ b/synapse/handlers/pagination.py @@ -346,8 +346,6 @@ async def get_messages( room_token = RoomStreamToken.parse(from_token.room_key) - from_token = from_token.copy_and_replace("room_key", str(room_token)) - with await self.pagination_lock.read(room_id): ( membership, diff --git a/synapse/streams/config.py b/synapse/streams/config.py index ab8faf3e18b8..4af500b7397d 100644 --- a/synapse/streams/config.py +++ b/synapse/streams/config.py @@ -29,7 +29,7 @@ MAX_LIMIT = 1000 -@attr.s +@attr.s(slots=True) class PaginationConfig(object): """A configuration object which stores pagination parameters."""