Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Refactor getting replication updates from database. #7636

Merged
merged 13 commits into from
Jun 16, 2020
Merged
1 change: 1 addition & 0 deletions changelog.d/7636.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Refactor getting replication updates from database.
29 changes: 26 additions & 3 deletions synapse/handlers/presence.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import abc
import logging
from contextlib import contextmanager
from typing import Dict, Iterable, List, Set
from typing import Dict, Iterable, List, Set, Tuple

from six import iteritems, itervalues

Expand Down Expand Up @@ -775,7 +775,9 @@ async def is_visible(self, observed_user, observer_user):

return False

async def get_all_presence_updates(self, last_id, current_id, limit):
async def get_all_presence_updates(
self, instance_name: str, last_id: int, current_id: int, limit: int
) -> Tuple[List[Tuple[int, list]], int, bool]:
"""
Gets a list of presence update rows from between the given stream ids.
Each row has:
Expand All @@ -787,10 +789,31 @@ async def get_all_presence_updates(self, last_id, current_id, limit):
- last_user_sync_ts(int)
- status_msg(int)
- currently_active(int)

Args:
instance_name: The writer we want to fetch updates from. Unused
here since there is only ever one writer.
last_id: The token to fetch updates from. Exclusive.
current_id: The token to fetch updates up to. Inclusive.
limit: The requested limit for the number of rows to return. The
function may return more or fewer rows.

Returns:
A tuple consisting of: the updates, a token to use to fetch
subsequent updates, and whether we returned fewer rows than exists
between the requested tokens due to the limit.

The token returned can be used in a subsequent call to this
function to get further updatees.

The updates are a list of 2-tuples of stream ID and the row data
"""

# TODO(markjh): replicate the unpersisted changes.
# This could use the in-memory stores for recent changes.
rows = await self.store.get_all_presence_updates(last_id, current_id, limit)
rows = await self.store.get_all_presence_updates(
instance_name, last_id, current_id, limit
)
return rows

def notify_new_event(self):
Expand Down
40 changes: 32 additions & 8 deletions synapse/handlers/typing.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

import logging
from collections import namedtuple
from typing import List
from typing import List, Tuple

from twisted.internet import defer

Expand Down Expand Up @@ -259,14 +259,31 @@ def _push_update_local(self, member, typing):
)

async def get_all_typing_updates(
self, last_id: int, current_id: int, limit: int
) -> List[dict]:
"""Get up to `limit` typing updates between the given tokens, earliest
updates first.
self, instance_name: str, last_id: int, current_id: int, limit: int
) -> Tuple[List[Tuple[int, list]], int, bool]:
"""Get updates for typing replication stream.

Args:
instance_name: The writer we want to fetch updates from. Unused
here since there is only ever one writer.
last_id: The token to fetch updates from. Exclusive.
current_id: The token to fetch updates up to. Inclusive.
limit: The requested limit for the number of rows to return. The
function may return more or fewer rows.

Returns:
A tuple consisting of: the updates, a token to use to fetch
subsequent updates, and whether we returned fewer rows than exists
between the requested tokens due to the limit.

The token returned can be used in a subsequent call to this
function to get further updatees.

The updates are a list of 2-tuples of stream ID and the row data
"""

if last_id == current_id:
return []
return [], current_id, False

changed_rooms = self._typing_stream_change_cache.get_all_entities_changed(
last_id
Expand All @@ -280,9 +297,16 @@ async def get_all_typing_updates(
serial = self._room_serials[room_id]
if last_id < serial <= current_id:
typing = self._room_typing[room_id]
rows.append((serial, room_id, list(typing)))
rows.append((serial, [room_id, list(typing)]))
rows.sort()
return rows[:limit]

limited = False
if len(rows) > limit:
rows = rows[:limit]
current_id = rows[-1][0]
limited = True

return rows, current_id, limited

def get_current_token(self):
return self._latest_room_serial
Expand Down
4 changes: 1 addition & 3 deletions synapse/push/pusherpool.py
Original file line number Diff line number Diff line change
Expand Up @@ -215,11 +215,9 @@ def on_new_receipts(self, min_stream_id, max_stream_id, affected_room_ids):
try:
# Need to subtract 1 from the minimum because the lower bound here
# is not inclusive
updated_receipts = yield self.store.get_all_updated_receipts(
users_affected = yield self.store.get_users_sent_receipts_between(
min_stream_id - 1, max_stream_id
)
# This returns a tuple, user_id is at index 3
users_affected = {r[3] for r in updated_receipts}

for u in users_affected:
if u in self.pushers:
Expand Down
29 changes: 8 additions & 21 deletions synapse/replication/tcp/streams/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,7 @@ def __init__(self, hs):
super().__init__(
hs.get_instance_name(),
current_token_without_instance(store.get_current_backfill_token),
db_query_to_update_function(store.get_all_new_backfill_event_rows),
store.get_all_new_backfill_event_rows,
)


Expand All @@ -291,9 +291,7 @@ def __init__(self, hs):
if hs.config.worker_app is None:
# on the master, query the presence handler
presence_handler = hs.get_presence_handler()
update_function = db_query_to_update_function(
presence_handler.get_all_presence_updates
)
update_function = presence_handler.get_all_presence_updates
richvdh marked this conversation as resolved.
Show resolved Hide resolved
else:
# Query master process
update_function = make_http_update_function(hs, self.NAME)
Expand All @@ -318,9 +316,7 @@ def __init__(self, hs):

if hs.config.worker_app is None:
# on the master, query the typing handler
update_function = db_query_to_update_function(
typing_handler.get_all_typing_updates
)
update_function = typing_handler.get_all_typing_updates
else:
# Query master process
update_function = make_http_update_function(hs, self.NAME)
Expand Down Expand Up @@ -352,7 +348,7 @@ def __init__(self, hs):
super().__init__(
hs.get_instance_name(),
current_token_without_instance(store.get_max_receipt_stream_id),
db_query_to_update_function(store.get_all_updated_receipts),
store.get_all_updated_receipts,
)


Expand All @@ -367,26 +363,17 @@ class PushRulesStream(Stream):

def __init__(self, hs):
self.store = hs.get_datastore()

super(PushRulesStream, self).__init__(
hs.get_instance_name(), self._current_token, self._update_function
hs.get_instance_name(),
self._current_token,
self.store.get_all_push_rule_updates,
)

def _current_token(self, instance_name: str) -> int:
push_rules_token, _ = self.store.get_push_rules_stream_token()
return push_rules_token

async def _update_function(
self, instance_name: str, from_token: Token, to_token: Token, limit: int
):
rows = await self.store.get_all_push_rule_updates(from_token, to_token, limit)

limited = False
if len(rows) == limit:
to_token = rows[-1][0]
limited = True

return [(row[0], (row[2],)) for row in rows], to_token, limited


class PushersStream(Stream):
"""A user has added/changed/removed a pusher
Expand Down
41 changes: 35 additions & 6 deletions synapse/storage/data_stores/main/events_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -1077,9 +1077,32 @@ def get_ex_outlier_stream_rows_txn(txn):
"get_ex_outlier_stream_rows", get_ex_outlier_stream_rows_txn
)

def get_all_new_backfill_event_rows(self, last_id, current_id, limit):
async def get_all_new_backfill_event_rows(
self, instance_name: str, last_id: int, current_id: int, limit: int
) -> Tuple[List[Tuple[int, list]], int, bool]:
"""Get updates for backfill replication stream, including all new
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this docstring is very helpful, but please can all the updated/new storage methods have one, not just this method?

backfilled events and events that have gone from being outliers to not.

Args:
instance_name: The writer we want to fetch updates from. Unused
here since there is only ever one writer.
last_id: The token to fetch updates from. Exclusive.
current_id: The token to fetch updates up to. Inclusive.
limit: The requested limit for the number of rows to return. The
function may return more or fewer rows.

Returns:
A tuple consisting of: the updates, a token to use to fetch
subsequent updates, and whether we returned fewer rows than exists
between the requested tokens due to the limit.

The token returned can be used in a subsequent call to this
function to get further updatees.

The updates are a list of 2-tuples of stream ID and the row data
"""
if last_id == current_id:
return defer.succeed([])
return [], current_id, False

def get_all_new_backfill_event_rows(txn):
sql = (
Expand All @@ -1094,10 +1117,12 @@ def get_all_new_backfill_event_rows(txn):
" LIMIT ?"
)
txn.execute(sql, (-last_id, -current_id, limit))
new_event_updates = txn.fetchall()
new_event_updates = [(row[0], row[1:]) for row in txn]

limited = False
if len(new_event_updates) == limit:
upper_bound = new_event_updates[-1][0]
limited = True
else:
upper_bound = current_id

Expand All @@ -1114,11 +1139,15 @@ def get_all_new_backfill_event_rows(txn):
" ORDER BY event_stream_ordering DESC"
)
txn.execute(sql, (-last_id, -upper_bound))
new_event_updates.extend(txn.fetchall())
new_event_updates.extend((row[0], row[1:]) for row in txn)

return new_event_updates
if len(new_event_updates) >= limit:
upper_bound = new_event_updates[-1][0]
limited = True

return self.db.runInteraction(
return new_event_updates, upper_bound, limited

return await self.db.runInteraction(
"get_all_new_backfill_event_rows", get_all_new_backfill_event_rows
)

Expand Down
41 changes: 37 additions & 4 deletions synapse/storage/data_stores/main/presence.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from typing import List, Tuple

from twisted.internet import defer

from synapse.storage._base import SQLBaseStore, make_in_list_sql_clause
Expand Down Expand Up @@ -73,9 +75,32 @@ def _update_presence_txn(self, txn, stream_orderings, presence_states):
)
txn.execute(sql + clause, [stream_id] + list(args))

def get_all_presence_updates(self, last_id, current_id, limit):
async def get_all_presence_updates(
self, instance_name: str, last_id: int, current_id: int, limit: int
) -> Tuple[List[Tuple[int, list]], int, bool]:
"""Get updates for presence replication stream.

Args:
instance_name: The writer we want to fetch updates from. Unused
here since there is only ever one writer.
last_id: The token to fetch updates from. Exclusive.
current_id: The token to fetch updates up to. Inclusive.
limit: The requested limit for the number of rows to return. The
function may return more or fewer rows.

Returns:
A tuple consisting of: the updates, a token to use to fetch
subsequent updates, and whether we returned fewer rows than exists
between the requested tokens due to the limit.

The token returned can be used in a subsequent call to this
function to get further updatees.

The updates are a list of 2-tuples of stream ID and the row data
"""

if last_id == current_id:
return defer.succeed([])
return [], current_id, False

def get_all_presence_updates_txn(txn):
sql = """
Expand All @@ -89,9 +114,17 @@ def get_all_presence_updates_txn(txn):
LIMIT ?
"""
txn.execute(sql, (last_id, current_id, limit))
return txn.fetchall()
updates = [(row[0], row[1:]) for row in txn]

upper_bound = current_id
limited = False
if len(updates) >= limit:
upper_bound = updates[-1][0]
limited = True

return updates, upper_bound, limited

return self.db.runInteraction(
return await self.db.runInteraction(
"get_all_presence_updates", get_all_presence_updates_txn
)

Expand Down
Loading