Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Clean up types for PaginationConfig #8250

Merged
merged 5 commits into from
Sep 8, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/8250.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Clean up type hints for `PaginationConfig`.
11 changes: 5 additions & 6 deletions synapse/handlers/initial_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,14 +116,13 @@ async def _snapshot_all_rooms(
now_token = self.hs.get_event_sources().get_current_token()

presence_stream = self.hs.get_event_sources().sources["presence"]
pagination_config = PaginationConfig(from_token=now_token)
presence, _ = await presence_stream.get_pagination_rows(
user, pagination_config.get_source_config("presence"), None
presence, _ = await presence_stream.get_new_events(
user, from_key=None, include_offline=False
)

receipt_stream = self.hs.get_event_sources().sources["receipt"]
receipt, _ = await receipt_stream.get_pagination_rows(
user, pagination_config.get_source_config("receipt"), None
joined_rooms = [r.room_id for r in room_list if r.membership == Membership.JOIN]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like this is replacing:

room_ids = await self.store.get_rooms_for_user(user.to_string())

Which I'm fairly certain is the same.

receipt = await self.store.get_linearized_receipts_for_rooms(
joined_rooms, to_key=int(now_token.receipt_key),
)
Comment on lines +124 to 126
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm having a bit of trouble following how the keys were modified as part of this code, previously we have:

pagination_config = PaginationConfig(from_token=now_token)
config = pagination_config.get_source_config("receipt")
to_key = int(config.from_key)

if config.to_key:
	from_key = int(config.to_key)
else:
	from_key = None

Previously:

  • get_source_config was getting now_token.receipt_key (and saving it as from_key since that's how pagination_config was instantiated).
  • Then to_key was set as int(config.from_key).
  • Since config.to_key was never set (since pagination_config.to_key is unset) then from_key should be None.

So this looks equivalent.


tags_by_room = await self.store.get_tags_for_user(user_id)
Expand Down
42 changes: 22 additions & 20 deletions synapse/handlers/pagination.py
Original file line number Diff line number Diff line change
Expand Up @@ -335,20 +335,16 @@ async def get_messages(
user_id = requester.user.to_string()

if pagin_config.from_token:
room_token = pagin_config.from_token.room_key
from_token = pagin_config.from_token
else:
pagin_config.from_token = (
self.hs.get_event_sources().get_current_token_for_pagination()
)
room_token = pagin_config.from_token.room_key

room_token = RoomStreamToken.parse(room_token)
from_token = self.hs.get_event_sources().get_current_token_for_pagination()

pagin_config.from_token = pagin_config.from_token.copy_and_replace(
"room_key", str(room_token)
)
if pagin_config.limit is None:
# This shouldn't happen as we've set a default limit before this
# gets called.
raise Exception("limit not set")
Comment on lines +342 to +345
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we've been usually use assert for checks like this?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My reasoning here was that nothing stops someone from calling this from somewhere else, so it feels more plausible that this could happen


source_config = pagin_config.get_source_config("room")
room_token = RoomStreamToken.parse(from_token.room_key)

with await self.pagination_lock.read(room_id):
(
Expand All @@ -358,7 +354,7 @@ async def get_messages(
room_id, user_id, allow_departed_users=True
)

if source_config.direction == "b":
if pagin_config.direction == "b":
# if we're going backwards, we might need to backfill. This
# requires that we have a topo token.
if room_token.topological:
Expand All @@ -381,22 +377,28 @@ async def get_messages(
member_event_id
)
if RoomStreamToken.parse(leave_token).topological < max_topo:
source_config.from_key = str(leave_token)
from_token = from_token.copy_and_replace(
"room_key", leave_token
)

await self.hs.get_handlers().federation_handler.maybe_backfill(
room_id, max_topo
)

to_room_key = None
if pagin_config.to_token:
to_room_key = pagin_config.to_token.room_key
Comment on lines +388 to +390
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure where this matches up with the old version of this code? It seems previously we just always used what was in source_config?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There was special logic in get_source_config to handle the fact that pagin_config.to_token could be None, which we need to hoist up here. Bear in mind that source_config.to_key is the room token rather than the stream token (i.e. source_config.to_key = pagin_config.to_token.room_key if pagin_config.to_token else None)


events, next_key = await self.store.paginate_room_events(
room_id=room_id,
from_key=source_config.from_key,
to_key=source_config.to_key,
direction=source_config.direction,
limit=source_config.limit,
from_key=from_token.room_key,
to_key=to_room_key,
direction=pagin_config.direction,
limit=pagin_config.limit,
event_filter=event_filter,
)

next_token = pagin_config.from_token.copy_and_replace("room_key", next_key)
next_token = from_token.copy_and_replace("room_key", next_key)

if events:
if event_filter:
Expand All @@ -409,7 +411,7 @@ async def get_messages(
if not events:
return {
"chunk": [],
"start": pagin_config.from_token.to_string(),
"start": from_token.to_string(),
"end": next_token.to_string(),
}

Expand Down Expand Up @@ -438,7 +440,7 @@ async def get_messages(
events, time_now, as_client_event=as_client_event
)
),
"start": pagin_config.from_token.to_string(),
"start": from_token.to_string(),
"end": next_token.to_string(),
}

Expand Down
3 changes: 0 additions & 3 deletions synapse/handlers/presence.py
Original file line number Diff line number Diff line change
Expand Up @@ -1108,9 +1108,6 @@ async def get_new_events(
def get_current_key(self):
return self.store.get_current_presence_token()

async def get_pagination_rows(self, user, pagination_config, key):
return await self.get_new_events(user, from_key=None, include_offline=False)

@cached(num_args=2, cache_context=True)
async def _get_interested_in(self, user, explicit_room_id, cache_context):
"""Returns the set of users that the given user should see presence
Expand Down
15 changes: 0 additions & 15 deletions synapse/handlers/receipts.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,18 +142,3 @@ async def get_new_events(self, from_key, room_ids, **kwargs):

def get_current_key(self, direction="f"):
return self.store.get_max_receipt_stream_id()

async def get_pagination_rows(self, user, config, key):
to_key = int(config.from_key)

if config.to_key:
from_key = int(config.to_key)
else:
from_key = None

room_ids = await self.store.get_rooms_for_user(user.to_string())
events = await self.store.get_linearized_receipts_for_rooms(
room_ids, from_key=from_key, to_key=to_key
)

return (events, to_key)
5 changes: 3 additions & 2 deletions synapse/notifier.py
Original file line number Diff line number Diff line change
Expand Up @@ -432,8 +432,9 @@ async def get_events_for(
If explicit_room_id is set, that room will be polled for events only if
it is world readable or the user has joined the room.
"""
from_token = pagination_config.from_token
if not from_token:
if pagination_config.from_token:
from_token = pagination_config.from_token
else:
from_token = self.event_sources.get_current_token()
Comment on lines +435 to 438
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks equivalent, but I'm surprised that mypy complained?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Me too, I think it might be an edge case due to it crossing a function boundary or something. Possibly due to scoping rules? I spent ages hunting it down :/

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not a problem, just confirmation that it is meant to be the same is fine! 🥳


limit = pagination_config.limit
Expand Down
61 changes: 21 additions & 40 deletions synapse/streams/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,13 @@
# limitations under the License.

import logging
from typing import Optional

import attr

from synapse.api.errors import SynapseError
from synapse.http.servlet import parse_integer, parse_string
from synapse.http.site import SynapseRequest
from synapse.types import StreamToken

logger = logging.getLogger(__name__)
Expand All @@ -25,38 +29,22 @@
MAX_LIMIT = 1000


class SourcePaginationConfig:

"""A configuration object which stores pagination parameters for a
specific event source."""

def __init__(self, from_key=None, to_key=None, direction="f", limit=None):
self.from_key = from_key
self.to_key = to_key
self.direction = "f" if direction == "f" else "b"
self.limit = min(int(limit), MAX_LIMIT) if limit is not None else None

def __repr__(self):
return "StreamConfig(from_key=%r, to_key=%r, direction=%r, limit=%r)" % (
self.from_key,
self.to_key,
self.direction,
self.limit,
)


@attr.s(slots=True)
class PaginationConfig:

"""A configuration object which stores pagination parameters."""

def __init__(self, from_token=None, to_token=None, direction="f", limit=None):
self.from_token = from_token
self.to_token = to_token
self.direction = "f" if direction == "f" else "b"
self.limit = min(int(limit), MAX_LIMIT) if limit is not None else None
from_token = attr.ib(type=Optional[StreamToken])
to_token = attr.ib(type=Optional[StreamToken])
direction = attr.ib(type=str)
limit = attr.ib(type=Optional[int])

@classmethod
def from_request(cls, request, raise_invalid_params=True, default_limit=None):
def from_request(
cls,
request: SynapseRequest,
raise_invalid_params: bool = True,
default_limit: Optional[int] = None,
) -> "PaginationConfig":
direction = parse_string(request, "dir", default="f", allowed_values=["f", "b"])

from_tok = parse_string(request, "from")
Expand All @@ -78,29 +66,22 @@ def from_request(cls, request, raise_invalid_params=True, default_limit=None):

limit = parse_integer(request, "limit", default=default_limit)

if limit and limit < 0:
raise SynapseError(400, "Limit must be 0 or above")
if limit:
if limit < 0:
raise SynapseError(400, "Limit must be 0 or above")

limit = min(int(limit), MAX_LIMIT)
erikjohnston marked this conversation as resolved.
Show resolved Hide resolved

try:
return PaginationConfig(from_tok, to_tok, direction, limit)
except Exception:
logger.exception("Failed to create pagination config")
raise SynapseError(400, "Invalid request.")

def __repr__(self):
def __repr__(self) -> str:
return ("PaginationConfig(from_tok=%r, to_tok=%r, direction=%r, limit=%r)") % (
self.from_token,
self.to_token,
self.direction,
self.limit,
)

def get_source_config(self, source_name):
keyname = "%s_key" % source_name

return SourcePaginationConfig(
from_key=getattr(self.from_token, keyname),
to_key=getattr(self.to_token, keyname) if self.to_token else None,
direction=self.direction,
limit=self.limit,
)