Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Refactor getting replication updates from database. #7636

Merged
merged 13 commits into from
Jun 16, 2020
Merged
10 changes: 7 additions & 3 deletions synapse/handlers/presence.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import abc
import logging
from contextlib import contextmanager
from typing import Dict, Iterable, List, Set
from typing import Dict, Iterable, List, Set, Tuple

from six import iteritems, itervalues

Expand Down Expand Up @@ -775,7 +775,9 @@ async def is_visible(self, observed_user, observer_user):

return False

async def get_all_presence_updates(self, last_id, current_id, limit):
async def get_all_presence_updates(
self, instance_name: str, last_id: int, current_id: int, limit: int
) -> Tuple[List[Tuple[int, list]], int, bool]:
"""
Gets a list of presence update rows from between the given stream ids.
Each row has:
Expand All @@ -790,7 +792,9 @@ async def get_all_presence_updates(self, last_id, current_id, limit):
"""
# TODO(markjh): replicate the unpersisted changes.
# This could use the in-memory stores for recent changes.
rows = await self.store.get_all_presence_updates(last_id, current_id, limit)
rows = await self.store.get_all_presence_updates(
instance_name, last_id, current_id, limit
)
return rows

def notify_new_event(self):
Expand Down
12 changes: 6 additions & 6 deletions synapse/handlers/typing.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

import logging
from collections import namedtuple
from typing import List
from typing import List, Tuple

from twisted.internet import defer

Expand Down Expand Up @@ -259,14 +259,14 @@ def _push_update_local(self, member, typing):
)

async def get_all_typing_updates(
self, last_id: int, current_id: int, limit: int
) -> List[dict]:
self, instance_name: str, last_id: int, current_id: int, limit: int
) -> Tuple[List[Tuple[int, list]], int, bool]:
"""Get up to `limit` typing updates between the given tokens, earliest
updates first.
"""

if last_id == current_id:
return []
return [], current_id, False

changed_rooms = self._typing_stream_change_cache.get_all_entities_changed(
last_id
Expand All @@ -280,9 +280,9 @@ async def get_all_typing_updates(
serial = self._room_serials[room_id]
if last_id < serial <= current_id:
typing = self._room_typing[room_id]
rows.append((serial, room_id, list(typing)))
rows.append((serial, (room_id, list(typing))))
rows.sort()
return rows[:limit]
return rows[:limit], current_id, False
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

doesn't this need more intelligence?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Woops, for some reason i thought we weren't limiting for this


def get_current_token(self):
return self._latest_room_serial
Expand Down
10 changes: 3 additions & 7 deletions synapse/replication/tcp/streams/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,7 @@ def __init__(self, hs):
super().__init__(
hs.get_instance_name(),
current_token_without_instance(store.get_current_backfill_token),
db_query_to_update_function(store.get_all_new_backfill_event_rows),
store.get_all_new_backfill_event_rows,
)


Expand All @@ -291,9 +291,7 @@ def __init__(self, hs):
if hs.config.worker_app is None:
# on the master, query the presence handler
presence_handler = hs.get_presence_handler()
update_function = db_query_to_update_function(
presence_handler.get_all_presence_updates
)
update_function = presence_handler.get_all_presence_updates
richvdh marked this conversation as resolved.
Show resolved Hide resolved
else:
# Query master process
update_function = make_http_update_function(hs, self.NAME)
Expand All @@ -318,9 +316,7 @@ def __init__(self, hs):

if hs.config.worker_app is None:
# on the master, query the typing handler
update_function = db_query_to_update_function(
typing_handler.get_all_typing_updates
)
update_function = typing_handler.get_all_typing_updates
else:
# Query master process
update_function = make_http_update_function(hs, self.NAME)
Expand Down
23 changes: 17 additions & 6 deletions synapse/storage/data_stores/main/events_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -1077,9 +1077,14 @@ def get_ex_outlier_stream_rows_txn(txn):
"get_ex_outlier_stream_rows", get_ex_outlier_stream_rows_txn
)

def get_all_new_backfill_event_rows(self, last_id, current_id, limit):
async def get_all_new_backfill_event_rows(
self, instance_name: str, last_id: int, current_id: int, limit: int
) -> Tuple[List[Tuple[int, list]], int, bool]:
"""Get updates for backfill replication stream, including all new
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this docstring is very helpful, but please can all the updated/new storage methods have one, not just this method?

backfilled events and events that have gone from being outliers to not.
"""
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could you document the args and return format of the update functions? (Are the ids inclusive or exclusive?)

I know they are all much the same, but if I'm working on the storage layer, I don't want to have to go digging into how the replication layer works to know what a given function is supposed to do, and not having it written down explicitly is a good way for assumptions to be made and off-by-one errors to get in.

if last_id == current_id:
return defer.succeed([])
return [], current_id, False

def get_all_new_backfill_event_rows(txn):
sql = (
Expand All @@ -1094,10 +1099,12 @@ def get_all_new_backfill_event_rows(txn):
" LIMIT ?"
)
txn.execute(sql, (-last_id, -current_id, limit))
new_event_updates = txn.fetchall()
new_event_updates = [(row[0], row[1:]) for row in txn]

limited = False
if len(new_event_updates) == limit:
upper_bound = new_event_updates[-1][0]
limited = True
else:
upper_bound = current_id

Expand All @@ -1114,11 +1121,15 @@ def get_all_new_backfill_event_rows(txn):
" ORDER BY event_stream_ordering DESC"
)
txn.execute(sql, (-last_id, -upper_bound))
new_event_updates.extend(txn.fetchall())
new_event_updates.extend((row[0], row[1:]) for row in txn)

if len(new_event_updates) >= limit:
upper_bound = new_event_updates[-1][0]
limited = True

return new_event_updates
return new_event_updates, upper_bound, limited

return self.db.runInteraction(
return await self.db.runInteraction(
"get_all_new_backfill_event_rows", get_all_new_backfill_event_rows
)

Expand Down
20 changes: 16 additions & 4 deletions synapse/storage/data_stores/main/presence.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from typing import List, Tuple

from twisted.internet import defer

from synapse.storage._base import SQLBaseStore, make_in_list_sql_clause
Expand Down Expand Up @@ -73,9 +75,11 @@ def _update_presence_txn(self, txn, stream_orderings, presence_states):
)
txn.execute(sql + clause, [stream_id] + list(args))

def get_all_presence_updates(self, last_id, current_id, limit):
async def get_all_presence_updates(
self, instance_name: str, last_id: int, current_id: int, limit: int
) -> Tuple[List[Tuple[int, list]], int, bool]:
if last_id == current_id:
return defer.succeed([])
return [], current_id, False

def get_all_presence_updates_txn(txn):
sql = """
Expand All @@ -89,9 +93,17 @@ def get_all_presence_updates_txn(txn):
LIMIT ?
"""
txn.execute(sql, (last_id, current_id, limit))
return txn.fetchall()
updates = [(row[0], row[1:]) for row in txn]

upper_bound = current_id
limited = False
if len(updates) >= limit:
upper_bound = updates[-1][0]
limited = True

return updates, upper_bound, limited

return self.db.runInteraction(
return await self.db.runInteraction(
"get_all_presence_updates", get_all_presence_updates_txn
)

Expand Down