Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Use execute_batch instead of executemany in places #9181

Merged
merged 3 commits into from
Jan 21, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/9181.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Speed up batch insertion when using PostgreSQL.
5 changes: 2 additions & 3 deletions synapse/storage/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -267,8 +267,7 @@ def execute_batch(self, sql: str, args: Iterable[Iterable[Any]]) -> None:

self._do_execute(lambda *x: execute_batch(self.txn, *x), sql, args)
else:
for val in args:
self.execute(sql, val)
self.executemany(sql, args)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it expected that this has any performance difference?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The docs are unclear, so possibly? Either way probably clearer to shove it down a layer 🤷

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hopefully the underlying code is optimized, yeah. 👍


def execute_values(self, sql: str, *args: Any) -> List[Tuple]:
"""Corresponds to psycopg2.extras.execute_values. Only available when
Expand Down Expand Up @@ -888,7 +887,7 @@ def simple_insert_many_txn(
", ".join("?" for _ in keys[0]),
)

txn.executemany(sql, vals)
txn.execute_batch(sql, vals)

async def simple_upsert(
self,
Expand Down
18 changes: 9 additions & 9 deletions synapse/storage/databases/main/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -876,7 +876,7 @@ def _update_current_state_txn(
WHERE room_id = ? AND type = ? AND state_key = ?
)
"""
txn.executemany(
txn.execute_batch(
sql,
(
(
Expand All @@ -895,7 +895,7 @@ def _update_current_state_txn(
)
# Now we actually update the current_state_events table

txn.executemany(
txn.execute_batch(
"DELETE FROM current_state_events"
" WHERE room_id = ? AND type = ? AND state_key = ?",
(
Expand All @@ -907,7 +907,7 @@ def _update_current_state_txn(
# We include the membership in the current state table, hence we do
# a lookup when we insert. This assumes that all events have already
# been inserted into room_memberships.
txn.executemany(
txn.execute_batch(
"""INSERT INTO current_state_events
(room_id, type, state_key, event_id, membership)
VALUES (?, ?, ?, ?, (SELECT membership FROM room_memberships WHERE event_id = ?))
Expand All @@ -927,7 +927,7 @@ def _update_current_state_txn(
# we have no record of the fact the user *was* a member of the
# room but got, say, state reset out of it.
if to_delete or to_insert:
txn.executemany(
txn.execute_batch(
"DELETE FROM local_current_membership"
" WHERE room_id = ? AND user_id = ?",
(
Expand All @@ -938,7 +938,7 @@ def _update_current_state_txn(
)

if to_insert:
txn.executemany(
txn.execute_batch(
"""INSERT INTO local_current_membership
(room_id, user_id, event_id, membership)
VALUES (?, ?, ?, (SELECT membership FROM room_memberships WHERE event_id = ?))
Expand Down Expand Up @@ -1738,7 +1738,7 @@ def _set_push_actions_for_event_and_users_txn(
"""

if events_and_contexts:
txn.executemany(
txn.execute_batch(
sql,
(
(
Expand Down Expand Up @@ -1767,7 +1767,7 @@ def _set_push_actions_for_event_and_users_txn(

# Now we delete the staging area for *all* events that were being
# persisted.
txn.executemany(
txn.execute_batch(
"DELETE FROM event_push_actions_staging WHERE event_id = ?",
((event.event_id,) for event, _ in all_events_and_contexts),
)
Expand Down Expand Up @@ -1886,7 +1886,7 @@ def _update_backward_extremeties(self, txn, events):
" )"
)

txn.executemany(
txn.execute_batch(
query,
[
(e_id, ev.room_id, e_id, ev.room_id, e_id, ev.room_id, False)
Expand All @@ -1900,7 +1900,7 @@ def _update_backward_extremeties(self, txn, events):
"DELETE FROM event_backward_extremities"
" WHERE event_id = ? AND room_id = ?"
)
txn.executemany(
txn.execute_batch(
query,
[
(ev.event_id, ev.room_id)
Expand Down