Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Federation Sender & Appservice Pusher Stream Optimisations #13251

Merged
merged 6 commits into from
Jul 15, 2022
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/13251.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Optimise federation sender and appservice pusher event stream processing queries. Contributed by Nick @ Beeper (@fizzadar).
10 changes: 7 additions & 3 deletions synapse/federation/sender/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -351,7 +351,11 @@ async def _process_event_queue_loop(self) -> None:
self._is_processing = True
while True:
last_token = await self.store.get_federation_out_pos("events")
next_token, events = await self.store.get_all_new_events_stream(
(
next_token,
events,
event_to_received_ts,
) = await self.store.get_all_new_events_stream(
last_token, self._last_poked_id, limit=100
)

Expand Down Expand Up @@ -476,7 +480,7 @@ async def handle_event(event: EventBase) -> None:
await self._send_pdu(event, sharded_destinations)

now = self.clock.time_msec()
ts = await self.store.get_received_ts(event.event_id)
ts = event_to_received_ts[event.event_id]
assert ts is not None
synapse.metrics.event_processing_lag_by_event.labels(
"federation_sender"
Expand Down Expand Up @@ -509,7 +513,7 @@ async def handle_room_events(events: List[EventBase]) -> None:

if events:
now = self.clock.time_msec()
ts = await self.store.get_received_ts(events[-1].event_id)
ts = event_to_received_ts[events[-1].event_id]
assert ts is not None

synapse.metrics.event_processing_lag.labels(
Expand Down
11 changes: 6 additions & 5 deletions synapse/handlers/appservice.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,14 +104,15 @@ async def _notify_interested_services(self, max_token: RoomStreamToken) -> None:
with Measure(self.clock, "notify_interested_services"):
self.is_processing = True
try:
limit = 100
upper_bound = -1
while upper_bound < self.current_max:
last_token = await self.store.get_appservice_last_pos()
(
upper_bound,
events,
) = await self.store.get_new_events_for_appservice(
self.current_max, limit
event_to_received_ts,
) = await self.store.get_all_new_events_stream(
last_token, self.current_max, limit=100, get_prev_content=True
)

events_by_room: Dict[str, List[EventBase]] = {}
Expand Down Expand Up @@ -150,7 +151,7 @@ async def start_scheduler() -> None:
)

now = self.clock.time_msec()
ts = await self.store.get_received_ts(event.event_id)
ts = event_to_received_ts[event.event_id]
assert ts is not None

synapse.metrics.event_processing_lag_by_event.labels(
Expand Down Expand Up @@ -187,7 +188,7 @@ async def handle_room_events(events: Iterable[EventBase]) -> None:

if events:
now = self.clock.time_msec()
ts = await self.store.get_received_ts(events[-1].event_id)
ts = event_to_received_ts[events[-1].event_id]
assert ts is not None

synapse.metrics.event_processing_lag.labels(
Expand Down
54 changes: 12 additions & 42 deletions synapse/storage/databases/main/appservice.py
Original file line number Diff line number Diff line change
Expand Up @@ -371,52 +371,22 @@ def _get_oldest_unsent_txn(
device_list_summary=DeviceListUpdates(),
)

async def set_appservice_last_pos(self, pos: int) -> None:
def set_appservice_last_pos_txn(txn: LoggingTransaction) -> None:
txn.execute(
"UPDATE appservice_stream_position SET stream_ordering = ?", (pos,)
)

await self.db_pool.runInteraction(
"set_appservice_last_pos", set_appservice_last_pos_txn
async def get_appservice_last_pos(self) -> int:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please could you give this a docstring?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added in c59d717

return await self.db_pool.simple_select_one_onecol(
table="appservice_stream_position",
retcol="stream_ordering",
keyvalues={},
desc="get_appservice_last_pos_txn",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

generally we don't include _txn in the transaction descriptions.

Suggested change
desc="get_appservice_last_pos_txn",
desc="get_appservice_last_pos",

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah makes sense, noted for future! Fixed via c59d717

)

async def get_new_events_for_appservice(
self, current_id: int, limit: int
) -> Tuple[int, List[EventBase]]:
"""Get all new events for an appservice"""

def get_new_events_for_appservice_txn(
txn: LoggingTransaction,
) -> Tuple[int, List[str]]:
sql = (
"SELECT e.stream_ordering, e.event_id"
" FROM events AS e"
" WHERE"
" (SELECT stream_ordering FROM appservice_stream_position)"
" < e.stream_ordering"
" AND e.stream_ordering <= ?"
" ORDER BY e.stream_ordering ASC"
" LIMIT ?"
)

txn.execute(sql, (current_id, limit))
rows = txn.fetchall()

upper_bound = current_id
if len(rows) == limit:
upper_bound = rows[-1][0]

return upper_bound, [row[1] for row in rows]

upper_bound, event_ids = await self.db_pool.runInteraction(
"get_new_events_for_appservice", get_new_events_for_appservice_txn
async def set_appservice_last_pos(self, pos: int) -> None:
await self.db_pool.simple_update_one(
table="appservice_stream_position",
keyvalues={},
updatevalues={"stream_ordering": pos},
desc="set_appservice_last_pos_txn",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

any reason not to stick with the old description?

Suggested change
desc="set_appservice_last_pos_txn",
desc="set_appservice_last_pos",

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Whoops typo'd! Fixed in c59d717

)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Am I right in thinking that this rewrite of set_appservice_last_pos is non-functional and just a cleanup?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep just switched to the simple query versions for consistency elsewhere, no functional change.


events = await self.get_events_as_list(event_ids, get_prev_content=True)

return upper_bound, events

async def get_type_stream_id_for_appservice(
self, service: ApplicationService, type: str
) -> int:
Expand Down
19 changes: 0 additions & 19 deletions synapse/storage/databases/main/events_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -292,25 +292,6 @@ def process_replication_rows(

super().process_replication_rows(stream_name, instance_name, token, rows)

async def get_received_ts(self, event_id: str) -> Optional[int]:
"""Get received_ts (when it was persisted) for the event.

Raises an exception for unknown events.

Args:
event_id: The event ID to query.

Returns:
Timestamp in milliseconds, or None for events that were persisted
before received_ts was implemented.
"""
return await self.db_pool.simple_select_one_onecol(
table="events",
keyvalues={"event_id": event_id},
retcol="received_ts",
desc="get_received_ts",
)

async def have_censored_event(self, event_id: str) -> bool:
"""Check if an event has been censored, i.e. if the content of the event has been erased
from the database due to a redaction.
Expand Down
22 changes: 14 additions & 8 deletions synapse/storage/databases/main/stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -1022,8 +1022,8 @@ def _get_events_around_txn(
}

async def get_all_new_events_stream(
self, from_id: int, current_id: int, limit: int
) -> Tuple[int, List[EventBase]]:
self, from_id: int, current_id: int, limit: int, get_prev_content: bool = False
) -> Tuple[int, List[EventBase], Dict[str, Optional[int]]]:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please update the docstring!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"""Get all new events

Returns all events with from_id < stream_ordering <= current_id.
Expand All @@ -1042,9 +1042,9 @@ async def get_all_new_events_stream(

def get_all_new_events_stream_txn(
txn: LoggingTransaction,
) -> Tuple[int, List[str]]:
) -> Tuple[int, Dict[str, Optional[int]]]:
sql = (
"SELECT e.stream_ordering, e.event_id"
"SELECT e.stream_ordering, e.event_id, e.received_ts"
" FROM events AS e"
" WHERE"
" ? < e.stream_ordering AND e.stream_ordering <= ?"
Expand All @@ -1059,15 +1059,21 @@ def get_all_new_events_stream_txn(
if len(rows) == limit:
upper_bound = rows[-1][0]

return upper_bound, [row[1] for row in rows]
event_to_received_ts: Dict[str, Optional[int]] = {
row[1]: row[2] for row in rows
}
return upper_bound, event_to_received_ts

upper_bound, event_ids = await self.db_pool.runInteraction(
upper_bound, event_to_received_ts = await self.db_pool.runInteraction(
"get_all_new_events_stream", get_all_new_events_stream_txn
)

events = await self.get_events_as_list(event_ids)
events = await self.get_events_as_list(
event_to_received_ts.keys(),
get_prev_content=get_prev_content,
)

return upper_bound, events
return upper_bound, events, event_to_received_ts

async def get_federation_out_pos(self, typ: str) -> int:
if self._need_to_reset_federation_stream_positions:
Expand Down
16 changes: 8 additions & 8 deletions tests/handlers/test_appservice.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ def setUp(self):
self.mock_scheduler = Mock()
hs = Mock()
hs.get_datastores.return_value = Mock(main=self.mock_store)
self.mock_store.get_received_ts.return_value = make_awaitable(0)
self.mock_store.get_appservice_last_pos.return_value = make_awaitable(None)
self.mock_store.set_appservice_last_pos.return_value = make_awaitable(None)
self.mock_store.set_appservice_stream_type_pos.return_value = make_awaitable(
None
Expand All @@ -76,9 +76,9 @@ def test_notify_interested_services(self):
event = Mock(
sender="@someone:anywhere", type="m.room.message", room_id="!foo:bar"
)
self.mock_store.get_new_events_for_appservice.side_effect = [
make_awaitable((0, [])),
make_awaitable((1, [event])),
self.mock_store.get_all_new_events_stream.side_effect = [
make_awaitable((0, [], {})),
make_awaitable((1, [event], {event.event_id: 0})),
]
self.handler.notify_interested_services(RoomStreamToken(None, 1))

Expand All @@ -95,8 +95,8 @@ def test_query_user_exists_unknown_user(self):

event = Mock(sender=user_id, type="m.room.message", room_id="!foo:bar")
self.mock_as_api.query_user.return_value = make_awaitable(True)
self.mock_store.get_new_events_for_appservice.side_effect = [
make_awaitable((0, [event])),
self.mock_store.get_all_new_events_stream.side_effect = [
make_awaitable((0, [event], {event.event_id: 0})),
]

self.handler.notify_interested_services(RoomStreamToken(None, 0))
Expand All @@ -112,8 +112,8 @@ def test_query_user_exists_known_user(self):

event = Mock(sender=user_id, type="m.room.message", room_id="!foo:bar")
self.mock_as_api.query_user.return_value = make_awaitable(True)
self.mock_store.get_new_events_for_appservice.side_effect = [
make_awaitable((0, [event])),
self.mock_store.get_all_new_events_stream.side_effect = [
make_awaitable((0, [event], {event.event_id: 0})),
]

self.handler.notify_interested_services(RoomStreamToken(None, 0))
Expand Down