Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Use autocommit mode for single statement DB functions. #8542

Merged
merged 5 commits into from
Oct 14, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/8542.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Improve database performance by executing more queries without starting transactions.
99 changes: 91 additions & 8 deletions synapse/storage/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -893,6 +893,12 @@ async def simple_upsert(
attempts = 0
while True:
try:
# We can autocommit if we are going to use native upserts
autocommit = (
self.engine.can_native_upsert
and table not in self._unsafe_to_upsert_tables
)

return await self.runInteraction(
desc,
self.simple_upsert_txn,
Expand All @@ -901,6 +907,7 @@ async def simple_upsert(
values,
insertion_values,
lock=lock,
db_autocommit=autocommit,
clokep marked this conversation as resolved.
Show resolved Hide resolved
)
except self.engine.module.IntegrityError as e:
attempts += 1
Expand Down Expand Up @@ -1063,6 +1070,43 @@ def simple_upsert_txn_native_upsert(
)
txn.execute(sql, list(allvalues.values()))

async def simple_upsert_many(
self,
table: str,
key_names: Collection[str],
key_values: Collection[Iterable[Any]],
value_names: Collection[str],
value_values: Iterable[Iterable[Any]],
desc: str,
) -> None:
"""
Upsert, many times.

Args:
table: The table to upsert into
key_names: The key column names.
key_values: A list of each row's key column values.
value_names: The value column names
value_values: A list of each row's value column values.
Ignored if value_names is empty.
"""

# We can autocommit if we are going to use native upserts
autocommit = (
self.engine.can_native_upsert and table not in self._unsafe_to_upsert_tables
)

return await self.runInteraction(
desc,
self.simple_upsert_many_txn,
table,
key_names,
key_values,
value_names,
value_values,
db_autocommit=autocommit,
clokep marked this conversation as resolved.
Show resolved Hide resolved
)

def simple_upsert_many_txn(
self,
txn: LoggingTransaction,
Expand Down Expand Up @@ -1214,7 +1258,13 @@ async def simple_select_one(
desc: description of the transaction, for logging and metrics
"""
return await self.runInteraction(
desc, self.simple_select_one_txn, table, keyvalues, retcols, allow_none
desc,
self.simple_select_one_txn,
table,
keyvalues,
retcols,
allow_none,
db_autocommit=True,
)

@overload
Expand Down Expand Up @@ -1265,6 +1315,7 @@ async def simple_select_one_onecol(
keyvalues,
retcol,
allow_none=allow_none,
db_autocommit=True,
)

@overload
Expand Down Expand Up @@ -1346,7 +1397,12 @@ async def simple_select_onecol(
Results in a list
"""
return await self.runInteraction(
desc, self.simple_select_onecol_txn, table, keyvalues, retcol
desc,
self.simple_select_onecol_txn,
table,
keyvalues,
retcol,
db_autocommit=True,
)

async def simple_select_list(
Expand All @@ -1371,7 +1427,12 @@ async def simple_select_list(
A list of dictionaries.
"""
return await self.runInteraction(
desc, self.simple_select_list_txn, table, keyvalues, retcols
desc,
self.simple_select_list_txn,
table,
keyvalues,
retcols,
db_autocommit=True,
)

@classmethod
Expand Down Expand Up @@ -1450,6 +1511,7 @@ async def simple_select_many_batch(
chunk,
keyvalues,
retcols,
db_autocommit=True,
)

results.extend(rows)
Expand Down Expand Up @@ -1548,7 +1610,12 @@ async def simple_update_one(
desc: description of the transaction, for logging and metrics
"""
await self.runInteraction(
desc, self.simple_update_one_txn, table, keyvalues, updatevalues
desc,
self.simple_update_one_txn,
table,
keyvalues,
updatevalues,
db_autocommit=True,
)

@classmethod
Expand Down Expand Up @@ -1607,7 +1674,9 @@ async def simple_delete_one(
keyvalues: dict of column names and values to select the row with
desc: description of the transaction, for logging and metrics
"""
await self.runInteraction(desc, self.simple_delete_one_txn, table, keyvalues)
await self.runInteraction(
desc, self.simple_delete_one_txn, table, keyvalues, db_autocommit=True,
)

@staticmethod
def simple_delete_one_txn(
Expand Down Expand Up @@ -1646,7 +1715,9 @@ async def simple_delete(
Returns:
The number of deleted rows.
"""
return await self.runInteraction(desc, self.simple_delete_txn, table, keyvalues)
return await self.runInteraction(
desc, self.simple_delete_txn, table, keyvalues, db_autocommit=True
)

@staticmethod
def simple_delete_txn(
Expand Down Expand Up @@ -1694,7 +1765,13 @@ async def simple_delete_many(
Number rows deleted
"""
return await self.runInteraction(
desc, self.simple_delete_many_txn, table, column, iterable, keyvalues
desc,
self.simple_delete_many_txn,
table,
column,
iterable,
keyvalues,
db_autocommit=True,
)

@staticmethod
Expand Down Expand Up @@ -1860,7 +1937,13 @@ async def simple_search_list(
"""

return await self.runInteraction(
desc, self.simple_search_list_txn, table, term, col, retcols
desc,
self.simple_search_list_txn,
table,
term,
col,
retcols,
db_autocommit=True,
)

@classmethod
Expand Down
5 changes: 2 additions & 3 deletions synapse/storage/databases/main/keys.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,9 +122,7 @@ async def store_server_verify_keys(
# param, which is itself the 2-tuple (server_name, key_id).
invalidations.append((server_name, key_id))

await self.db_pool.runInteraction(
"store_server_verify_keys",
self.db_pool.simple_upsert_many_txn,
await self.db_pool.simple_upsert_many(
table="server_signature_keys",
key_names=("server_name", "key_id"),
key_values=key_values,
Expand All @@ -135,6 +133,7 @@ async def store_server_verify_keys(
"verify_key",
),
value_values=value_values,
desc="store_server_verify_keys",
)

invalidate = self._get_server_verify_key.invalidate
Expand Down
76 changes: 45 additions & 31 deletions synapse/storage/databases/main/transactions.py
Original file line number Diff line number Diff line change
Expand Up @@ -208,42 +208,56 @@ async def set_destination_retry_timings(
"""

self._destination_retry_cache.pop(destination, None)
return await self.db_pool.runInteraction(
"set_destination_retry_timings",
self._set_destination_retry_timings,
destination,
failure_ts,
retry_last_ts,
retry_interval,
)
if self.database_engine.can_native_upsert:
return await self.db_pool.runInteraction(
"set_destination_retry_timings",
self._set_destination_retry_timings_native,
destination,
failure_ts,
retry_last_ts,
retry_interval,
db_autocommit=True, # Safe as its a single upsert
)
else:
return await self.db_pool.runInteraction(
"set_destination_retry_timings",
self._set_destination_retry_timings_emulated,
destination,
failure_ts,
retry_last_ts,
retry_interval,
)

def _set_destination_retry_timings(
def _set_destination_retry_timings_native(
self, txn, destination, failure_ts, retry_last_ts, retry_interval
):
assert self.database_engine.can_native_upsert

# Upsert retry time interval if retry_interval is zero (i.e. we're
# resetting it) or greater than the existing retry interval.
#
# WARNING: This is executed in autocommit, so we shouldn't add any more
# SQL calls in here (without being very careful).
sql = """
INSERT INTO destinations (
destination, failure_ts, retry_last_ts, retry_interval
)
VALUES (?, ?, ?, ?)
ON CONFLICT (destination) DO UPDATE SET
failure_ts = EXCLUDED.failure_ts,
retry_last_ts = EXCLUDED.retry_last_ts,
retry_interval = EXCLUDED.retry_interval
WHERE
EXCLUDED.retry_interval = 0
OR destinations.retry_interval IS NULL
OR destinations.retry_interval < EXCLUDED.retry_interval
"""

if self.database_engine.can_native_upsert:
# Upsert retry time interval if retry_interval is zero (i.e. we're
# resetting it) or greater than the existing retry interval.

sql = """
INSERT INTO destinations (
destination, failure_ts, retry_last_ts, retry_interval
)
VALUES (?, ?, ?, ?)
ON CONFLICT (destination) DO UPDATE SET
failure_ts = EXCLUDED.failure_ts,
retry_last_ts = EXCLUDED.retry_last_ts,
retry_interval = EXCLUDED.retry_interval
WHERE
EXCLUDED.retry_interval = 0
OR destinations.retry_interval IS NULL
OR destinations.retry_interval < EXCLUDED.retry_interval
"""

txn.execute(sql, (destination, failure_ts, retry_last_ts, retry_interval))

return
txn.execute(sql, (destination, failure_ts, retry_last_ts, retry_interval))

def _set_destination_retry_timings_emulated(
self, txn, destination, failure_ts, retry_last_ts, retry_interval
):
self.database_engine.lock_table(txn, "destinations")

# We need to be careful here as the data may have changed from under us
Expand Down
45 changes: 17 additions & 28 deletions synapse/storage/databases/main/user_directory.py
Original file line number Diff line number Diff line change
Expand Up @@ -480,21 +480,16 @@ async def add_users_who_share_private_room(
user_id_tuples: iterable of 2-tuple of user IDs.
"""

def _add_users_who_share_room_txn(txn):
self.db_pool.simple_upsert_many_txn(
txn,
table="users_who_share_private_rooms",
key_names=["user_id", "other_user_id", "room_id"],
key_values=[
(user_id, other_user_id, room_id)
for user_id, other_user_id in user_id_tuples
],
value_names=(),
value_values=None,
)

await self.db_pool.runInteraction(
"add_users_who_share_room", _add_users_who_share_room_txn
await self.db_pool.simple_upsert_many(
table="users_who_share_private_rooms",
key_names=["user_id", "other_user_id", "room_id"],
key_values=[
(user_id, other_user_id, room_id)
for user_id, other_user_id in user_id_tuples
],
value_names=(),
value_values=None,
desc="add_users_who_share_room",
)

async def add_users_in_public_rooms(
Expand All @@ -508,19 +503,13 @@ async def add_users_in_public_rooms(
user_ids
"""

def _add_users_in_public_rooms_txn(txn):

self.db_pool.simple_upsert_many_txn(
txn,
table="users_in_public_rooms",
key_names=["user_id", "room_id"],
key_values=[(user_id, room_id) for user_id in user_ids],
value_names=(),
value_values=None,
)

await self.db_pool.runInteraction(
"add_users_in_public_rooms", _add_users_in_public_rooms_txn
await self.db_pool.simple_upsert_many(
table="users_in_public_rooms",
key_names=["user_id", "room_id"],
key_values=[(user_id, room_id) for user_id in user_ids],
value_names=(),
value_values=None,
desc="add_users_in_public_rooms",
)

async def delete_all_from_user_dir(self) -> None:
Expand Down