Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Fix background update to use an index #14181

Merged
merged 10 commits into from
Oct 14, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/14181.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix poor performance of the `event_push_backfill_thread_id` background update, which was introduced in Synapse 1.68.0rc1.
clokep marked this conversation as resolved.
Show resolved Hide resolved
62 changes: 51 additions & 11 deletions synapse/storage/databases/main/event_push_actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -269,11 +269,11 @@ async def _background_backfill_thread_id(
event_push_actions_done = progress.get("event_push_actions_done", False)

def add_thread_id_txn(
txn: LoggingTransaction, table_name: str, start_stream_ordering: int
txn: LoggingTransaction, start_stream_ordering: int
) -> int:
sql = f"""
sql = """
SELECT stream_ordering
FROM {table_name}
FROM event_push_actions
WHERE
thread_id IS NULL
AND stream_ordering > ?
Expand All @@ -285,7 +285,7 @@ def add_thread_id_txn(
# No more rows to process.
rows = txn.fetchall()
if not rows:
progress[f"{table_name}_done"] = True
progress["event_push_actions_done"] = True
self.db_pool.updates._background_update_progress_txn(
txn, "event_push_backfill_thread_id", progress
)
Expand All @@ -294,8 +294,8 @@ def add_thread_id_txn(
# Update the thread ID for any of those rows.
max_stream_ordering = rows[-1][0]

sql = f"""
UPDATE {table_name}
sql = """
UPDATE event_push_actions
SET thread_id = 'main'
WHERE ? < stream_ordering AND stream_ordering <= ? AND thread_id IS NULL
reivilibre marked this conversation as resolved.
Show resolved Hide resolved
"""
Expand All @@ -309,7 +309,50 @@ def add_thread_id_txn(

# Update progress.
processed_rows = txn.rowcount
progress[f"max_{table_name}_stream_ordering"] = max_stream_ordering
progress["max_event_push_actions_stream_ordering"] = max_stream_ordering
self.db_pool.updates._background_update_progress_txn(
txn, "event_push_backfill_thread_id", progress
)

return processed_rows

def add_thread_id_summary_txn(txn: LoggingTransaction) -> int:
min_user_id = progress.get("max_summary_user_id", "")
min_room_id = progress.get("max_summary_room_id", "")

# Slightly overcomplicated query for getting the Nth user ID / room
# ID tuple, or the last if there are less than N remaining.
sql = """
SELECT user_id, room_id FROM (
SELECT user_id, room_id FROM event_push_summary
WHERE (user_id, room_id) > (?, ?)
AND thread_id IS NULL
ORDER BY user_id, room_id
LIMIT ?
) AS e
ORDER BY user_id DESC, room_id DESC
LIMIT 1
"""

txn.execute(sql, (min_user_id, min_room_id, batch_size))
row = txn.fetchone()
if not row:
return 0

max_user_id, max_room_id = row

sql = """
UPDATE event_push_summary
SET thread_id = 'main'
WHERE
(?, ?) < (user_id, room_id) AND (user_id, room_id) <= (?, ?)
AND thread_id IS NULL
"""
txn.execute(sql, (min_user_id, min_room_id, max_user_id, max_room_id))
processed_rows = txn.rowcount

progress["max_summary_user_id"] = max_user_id
progress["max_summary_room_id"] = max_room_id
self.db_pool.updates._background_update_progress_txn(
txn, "event_push_backfill_thread_id", progress
)
Expand All @@ -325,15 +368,12 @@ def add_thread_id_txn(
result = await self.db_pool.runInteraction(
"event_push_backfill_thread_id",
add_thread_id_txn,
"event_push_actions",
progress.get("max_event_push_actions_stream_ordering", 0),
)
else:
result = await self.db_pool.runInteraction(
"event_push_backfill_thread_id",
add_thread_id_txn,
"event_push_summary",
progress.get("max_event_push_summary_stream_ordering", 0),
add_thread_id_summary_txn,
)

# Only done after the event_push_summary table is done.
Expand Down