Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Refactor getting replication updates from database. #7636

Merged
merged 13 commits into from
Jun 16, 2020
Merged
12 changes: 6 additions & 6 deletions synapse/handlers/typing.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

import logging
from collections import namedtuple
from typing import List
from typing import List, Tuple

from twisted.internet import defer

Expand Down Expand Up @@ -259,14 +259,14 @@ def _push_update_local(self, member, typing):
)

async def get_all_typing_updates(
self, last_id: int, current_id: int, limit: int
) -> List[dict]:
self, instance_name: str, last_id: int, current_id: int, limit: int
) -> Tuple[List[Tuple[int, list]], int, bool]:
"""Get up to `limit` typing updates between the given tokens, earliest
updates first.
"""

if last_id == current_id:
return []
return [], current_id, False

changed_rooms = self._typing_stream_change_cache.get_all_entities_changed(
last_id
Expand All @@ -280,9 +280,9 @@ async def get_all_typing_updates(
serial = self._room_serials[room_id]
if last_id < serial <= current_id:
typing = self._room_typing[room_id]
rows.append((serial, room_id, list(typing)))
rows.append((serial, (room_id, list(typing))))
rows.sort()
return rows[:limit]
return rows[:limit], current_id, False
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

doesn't this need more intelligence?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Woops, for some reason i thought we weren't limiting for this


def get_current_token(self):
return self._latest_room_serial
Expand Down
4 changes: 1 addition & 3 deletions synapse/replication/tcp/streams/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -316,9 +316,7 @@ def __init__(self, hs):

if hs.config.worker_app is None:
# on the master, query the typing handler
update_function = db_query_to_update_function(
typing_handler.get_all_typing_updates
)
update_function = typing_handler.get_all_typing_updates
else:
# Query master process
update_function = make_http_update_function(hs, self.NAME)
Expand Down