Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Fix background update to use an index #14181

Merged
merged 10 commits into from
Oct 14, 2022
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/14181.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix poor performance of the `event_push_backfill_thread_id` background update, which was introduced in Synapse 1.68.0rc1.
clokep marked this conversation as resolved.
Show resolved Hide resolved
57 changes: 46 additions & 11 deletions synapse/storage/databases/main/event_push_actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -269,11 +269,11 @@ async def _background_backfill_thread_id(
event_push_actions_done = progress.get("event_push_actions_done", False)

def add_thread_id_txn(
txn: LoggingTransaction, table_name: str, start_stream_ordering: int
txn: LoggingTransaction, start_stream_ordering: int
) -> int:
sql = f"""
sql = """
SELECT stream_ordering
FROM {table_name}
FROM event_push_actions
WHERE
thread_id IS NULL
AND stream_ordering > ?
Expand All @@ -285,7 +285,7 @@ def add_thread_id_txn(
# No more rows to process.
rows = txn.fetchall()
if not rows:
progress[f"{table_name}_done"] = True
progress["event_push_actions_done"] = True
self.db_pool.updates._background_update_progress_txn(
txn, "event_push_backfill_thread_id", progress
)
Expand All @@ -294,8 +294,8 @@ def add_thread_id_txn(
# Update the thread ID for any of those rows.
max_stream_ordering = rows[-1][0]

sql = f"""
UPDATE {table_name}
sql = """
UPDATE event_push_actions
SET thread_id = 'main'
WHERE ? < stream_ordering AND stream_ordering <= ? AND thread_id IS NULL
reivilibre marked this conversation as resolved.
Show resolved Hide resolved
"""
Expand All @@ -309,7 +309,45 @@ def add_thread_id_txn(

# Update progress.
processed_rows = txn.rowcount
progress[f"max_{table_name}_stream_ordering"] = max_stream_ordering
progress["max_event_push_actions_stream_ordering"] = max_stream_ordering
self.db_pool.updates._background_update_progress_txn(
txn, "event_push_backfill_thread_id", progress
)

return processed_rows

def add_thread_id_summary_txn(txn: LoggingTransaction) -> int:
min_user_id = progress.get("max_summary_user_id", "")
min_room_id = progress.get("max_summary_room_id", "")

sql = """
SELECT user_id, room_id FROM event_push_summary
WHERE (user_id, room_id) > (?, ?)
AND thread_id IS NULL
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

are we expecting many thread_ids to be NULL? If not, this query may read a lot of rows

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, pretty much everything should be NULL. I was toying with changing it, but note that this is the same behaviour as before.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Everything is NULL to start with...if we think there isn't that many summaries we can just try to do it all at once, but I suspect this table is quite large.

ORDER BY user_id, room_id
LIMIT 1
OFFSET ?
"""

txn.execute(sql, (min_user_id, min_room_id, batch_size))
row = txn.fetchone()
if not row:
return 0

Copy link
Contributor

@reivilibre reivilibre Oct 14, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, actually, there's an issue with this because of the OFFSET. If you don't have a number of rows divisible by the batch size, we'll get some rows 'left over'. You'd have to do a final pass with > (min_user_id, min_room_id)? :/

Alternatively: do an UPDATE .. RETURNING and use rows[-1] as the next min

(?? is there a LIMIT for updates?)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought about different ways of doing this, in the end i just made it not include an upper bound if we don't have an upper bound.

max_user_id, max_room_id = row

sql = """
UPDATE event_push_summary
SET thread_id = 'main'
WHERE
(?, ?) < (user_id, room_id) AND (user_id, room_id) < (?, ?)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

two exclusive bounds here. I think at least one of them should be inclusive.

...Looking at it, I think you need the upper bound to be inclusive? As when we run out of rows, we return 0 so you need to have processed the upper item

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fuck, yes thanks

AND thread_is IS NULL
"""
txn.execute(sql, (min_user_id, min_room_id, max_user_id, max_room_id))
processed_rows = txn.rowcount

progress["max_summary_user_id"] = max_user_id
progress["max_summary_room_id"] = max_room_id
self.db_pool.updates._background_update_progress_txn(
txn, "event_push_backfill_thread_id", progress
)
Expand All @@ -325,15 +363,12 @@ def add_thread_id_txn(
result = await self.db_pool.runInteraction(
"event_push_backfill_thread_id",
add_thread_id_txn,
"event_push_actions",
progress.get("max_event_push_actions_stream_ordering", 0),
)
else:
result = await self.db_pool.runInteraction(
"event_push_backfill_thread_id",
add_thread_id_txn,
"event_push_summary",
progress.get("max_event_push_summary_stream_ordering", 0),
add_thread_id_summary_txn,
)

# Only done after the event_push_summary table is done.
Expand Down