Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Merge commit 'f6f7511a4' into dinsic
Browse files Browse the repository at this point in the history
* commit 'f6f7511a4':
  Refactor getting replication updates from database. (#7636)
  • Loading branch information
anoadragon453 committed Aug 3, 2020
2 parents 5c4e0e6 + f6f7511 commit c6011c3
Show file tree
Hide file tree
Showing 9 changed files with 251 additions and 72 deletions.
1 change: 1 addition & 0 deletions changelog.d/7636.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Refactor getting replication updates from database.
29 changes: 26 additions & 3 deletions synapse/handlers/presence.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import abc
import logging
from contextlib import contextmanager
from typing import Dict, Iterable, List, Set
from typing import Dict, Iterable, List, Set, Tuple

from prometheus_client import Counter
from typing_extensions import ContextManager
Expand Down Expand Up @@ -773,7 +773,9 @@ async def is_visible(self, observed_user, observer_user):

return False

async def get_all_presence_updates(self, last_id, current_id, limit):
async def get_all_presence_updates(
self, instance_name: str, last_id: int, current_id: int, limit: int
) -> Tuple[List[Tuple[int, list]], int, bool]:
"""
Gets a list of presence update rows from between the given stream ids.
Each row has:
Expand All @@ -785,10 +787,31 @@ async def get_all_presence_updates(self, last_id, current_id, limit):
- last_user_sync_ts(int)
- status_msg(int)
- currently_active(int)
Args:
instance_name: The writer we want to fetch updates from. Unused
here since there is only ever one writer.
last_id: The token to fetch updates from. Exclusive.
current_id: The token to fetch updates up to. Inclusive.
limit: The requested limit for the number of rows to return. The
function may return more or fewer rows.
Returns:
A tuple consisting of: the updates, a token to use to fetch
subsequent updates, and whether we returned fewer rows than exists
between the requested tokens due to the limit.
The token returned can be used in a subsequent call to this
function to get further updatees.
The updates are a list of 2-tuples of stream ID and the row data
"""

# TODO(markjh): replicate the unpersisted changes.
# This could use the in-memory stores for recent changes.
rows = await self.store.get_all_presence_updates(last_id, current_id, limit)
rows = await self.store.get_all_presence_updates(
instance_name, last_id, current_id, limit
)
return rows

def notify_new_event(self):
Expand Down
40 changes: 32 additions & 8 deletions synapse/handlers/typing.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

import logging
from collections import namedtuple
from typing import List
from typing import List, Tuple

from twisted.internet import defer

Expand Down Expand Up @@ -259,14 +259,31 @@ def _push_update_local(self, member, typing):
)

async def get_all_typing_updates(
self, last_id: int, current_id: int, limit: int
) -> List[dict]:
"""Get up to `limit` typing updates between the given tokens, earliest
updates first.
self, instance_name: str, last_id: int, current_id: int, limit: int
) -> Tuple[List[Tuple[int, list]], int, bool]:
"""Get updates for typing replication stream.
Args:
instance_name: The writer we want to fetch updates from. Unused
here since there is only ever one writer.
last_id: The token to fetch updates from. Exclusive.
current_id: The token to fetch updates up to. Inclusive.
limit: The requested limit for the number of rows to return. The
function may return more or fewer rows.
Returns:
A tuple consisting of: the updates, a token to use to fetch
subsequent updates, and whether we returned fewer rows than exists
between the requested tokens due to the limit.
The token returned can be used in a subsequent call to this
function to get further updatees.
The updates are a list of 2-tuples of stream ID and the row data
"""

if last_id == current_id:
return []
return [], current_id, False

changed_rooms = self._typing_stream_change_cache.get_all_entities_changed(
last_id
Expand All @@ -280,9 +297,16 @@ async def get_all_typing_updates(
serial = self._room_serials[room_id]
if last_id < serial <= current_id:
typing = self._room_typing[room_id]
rows.append((serial, room_id, list(typing)))
rows.append((serial, [room_id, list(typing)]))
rows.sort()
return rows[:limit]

limited = False
if len(rows) > limit:
rows = rows[:limit]
current_id = rows[-1][0]
limited = True

return rows, current_id, limited

def get_current_token(self):
return self._latest_room_serial
Expand Down
4 changes: 1 addition & 3 deletions synapse/push/pusherpool.py
Original file line number Diff line number Diff line change
Expand Up @@ -215,11 +215,9 @@ def on_new_receipts(self, min_stream_id, max_stream_id, affected_room_ids):
try:
# Need to subtract 1 from the minimum because the lower bound here
# is not inclusive
updated_receipts = yield self.store.get_all_updated_receipts(
users_affected = yield self.store.get_users_sent_receipts_between(
min_stream_id - 1, max_stream_id
)
# This returns a tuple, user_id is at index 3
users_affected = {r[3] for r in updated_receipts}

for u in users_affected:
if u in self.pushers:
Expand Down
29 changes: 8 additions & 21 deletions synapse/replication/tcp/streams/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,7 @@ def __init__(self, hs):
super().__init__(
hs.get_instance_name(),
current_token_without_instance(store.get_current_backfill_token),
db_query_to_update_function(store.get_all_new_backfill_event_rows),
store.get_all_new_backfill_event_rows,
)


Expand All @@ -291,9 +291,7 @@ def __init__(self, hs):
if hs.config.worker_app is None:
# on the master, query the presence handler
presence_handler = hs.get_presence_handler()
update_function = db_query_to_update_function(
presence_handler.get_all_presence_updates
)
update_function = presence_handler.get_all_presence_updates
else:
# Query master process
update_function = make_http_update_function(hs, self.NAME)
Expand All @@ -318,9 +316,7 @@ def __init__(self, hs):

if hs.config.worker_app is None:
# on the master, query the typing handler
update_function = db_query_to_update_function(
typing_handler.get_all_typing_updates
)
update_function = typing_handler.get_all_typing_updates
else:
# Query master process
update_function = make_http_update_function(hs, self.NAME)
Expand Down Expand Up @@ -352,7 +348,7 @@ def __init__(self, hs):
super().__init__(
hs.get_instance_name(),
current_token_without_instance(store.get_max_receipt_stream_id),
db_query_to_update_function(store.get_all_updated_receipts),
store.get_all_updated_receipts,
)


Expand All @@ -367,26 +363,17 @@ class PushRulesStream(Stream):

def __init__(self, hs):
self.store = hs.get_datastore()

super(PushRulesStream, self).__init__(
hs.get_instance_name(), self._current_token, self._update_function
hs.get_instance_name(),
self._current_token,
self.store.get_all_push_rule_updates,
)

def _current_token(self, instance_name: str) -> int:
push_rules_token, _ = self.store.get_push_rules_stream_token()
return push_rules_token

async def _update_function(
self, instance_name: str, from_token: Token, to_token: Token, limit: int
):
rows = await self.store.get_all_push_rule_updates(from_token, to_token, limit)

limited = False
if len(rows) == limit:
to_token = rows[-1][0]
limited = True

return [(row[0], (row[2],)) for row in rows], to_token, limited


class PushersStream(Stream):
"""A user has added/changed/removed a pusher
Expand Down
41 changes: 35 additions & 6 deletions synapse/storage/data_stores/main/events_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -1077,9 +1077,32 @@ def get_ex_outlier_stream_rows_txn(txn):
"get_ex_outlier_stream_rows", get_ex_outlier_stream_rows_txn
)

def get_all_new_backfill_event_rows(self, last_id, current_id, limit):
async def get_all_new_backfill_event_rows(
self, instance_name: str, last_id: int, current_id: int, limit: int
) -> Tuple[List[Tuple[int, list]], int, bool]:
"""Get updates for backfill replication stream, including all new
backfilled events and events that have gone from being outliers to not.
Args:
instance_name: The writer we want to fetch updates from. Unused
here since there is only ever one writer.
last_id: The token to fetch updates from. Exclusive.
current_id: The token to fetch updates up to. Inclusive.
limit: The requested limit for the number of rows to return. The
function may return more or fewer rows.
Returns:
A tuple consisting of: the updates, a token to use to fetch
subsequent updates, and whether we returned fewer rows than exists
between the requested tokens due to the limit.
The token returned can be used in a subsequent call to this
function to get further updatees.
The updates are a list of 2-tuples of stream ID and the row data
"""
if last_id == current_id:
return defer.succeed([])
return [], current_id, False

def get_all_new_backfill_event_rows(txn):
sql = (
Expand All @@ -1094,10 +1117,12 @@ def get_all_new_backfill_event_rows(txn):
" LIMIT ?"
)
txn.execute(sql, (-last_id, -current_id, limit))
new_event_updates = txn.fetchall()
new_event_updates = [(row[0], row[1:]) for row in txn]

limited = False
if len(new_event_updates) == limit:
upper_bound = new_event_updates[-1][0]
limited = True
else:
upper_bound = current_id

Expand All @@ -1114,11 +1139,15 @@ def get_all_new_backfill_event_rows(txn):
" ORDER BY event_stream_ordering DESC"
)
txn.execute(sql, (-last_id, -upper_bound))
new_event_updates.extend(txn.fetchall())
new_event_updates.extend((row[0], row[1:]) for row in txn)

return new_event_updates
if len(new_event_updates) >= limit:
upper_bound = new_event_updates[-1][0]
limited = True

return self.db.runInteraction(
return new_event_updates, upper_bound, limited

return await self.db.runInteraction(
"get_all_new_backfill_event_rows", get_all_new_backfill_event_rows
)

Expand Down
41 changes: 37 additions & 4 deletions synapse/storage/data_stores/main/presence.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from typing import List, Tuple

from twisted.internet import defer

from synapse.storage._base import SQLBaseStore, make_in_list_sql_clause
Expand Down Expand Up @@ -73,9 +75,32 @@ def _update_presence_txn(self, txn, stream_orderings, presence_states):
)
txn.execute(sql + clause, [stream_id] + list(args))

def get_all_presence_updates(self, last_id, current_id, limit):
async def get_all_presence_updates(
self, instance_name: str, last_id: int, current_id: int, limit: int
) -> Tuple[List[Tuple[int, list]], int, bool]:
"""Get updates for presence replication stream.
Args:
instance_name: The writer we want to fetch updates from. Unused
here since there is only ever one writer.
last_id: The token to fetch updates from. Exclusive.
current_id: The token to fetch updates up to. Inclusive.
limit: The requested limit for the number of rows to return. The
function may return more or fewer rows.
Returns:
A tuple consisting of: the updates, a token to use to fetch
subsequent updates, and whether we returned fewer rows than exists
between the requested tokens due to the limit.
The token returned can be used in a subsequent call to this
function to get further updatees.
The updates are a list of 2-tuples of stream ID and the row data
"""

if last_id == current_id:
return defer.succeed([])
return [], current_id, False

def get_all_presence_updates_txn(txn):
sql = """
Expand All @@ -89,9 +114,17 @@ def get_all_presence_updates_txn(txn):
LIMIT ?
"""
txn.execute(sql, (last_id, current_id, limit))
return txn.fetchall()
updates = [(row[0], row[1:]) for row in txn]

upper_bound = current_id
limited = False
if len(updates) >= limit:
upper_bound = updates[-1][0]
limited = True

return updates, upper_bound, limited

return self.db.runInteraction(
return await self.db.runInteraction(
"get_all_presence_updates", get_all_presence_updates_txn
)

Expand Down
Loading

0 comments on commit c6011c3

Please sign in to comment.