Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Split up txn for fetching device keys #15215

Merged
merged 4 commits into from
Mar 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/15215.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Refactor database transaction for query users' devices to reduce database pool contention.
10 changes: 9 additions & 1 deletion synapse/storage/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -672,7 +672,15 @@ def new_transaction(
f = cast(types.FunctionType, func) # type: ignore[redundant-cast]
if f.__closure__:
for i, cell in enumerate(f.__closure__):
if inspect.isgenerator(cell.cell_contents):
try:
contents = cell.cell_contents
except ValueError:
# cell.cell_contents can raise if the "cell" is empty,
# which indicates that the variable is currently
# unbound.
Comment on lines +678 to +680
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was this just something you found while debugging your code?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't really understand what is going on here either.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Argh, sorry missed this comment.

It's because we had the following construct in the new code:

if a:
    b = set()

def foo():
    if a:
        b.add(1)

foo()

Where if a is false, b is unbound and so cell.cell_contents raises a ValueError: cell is empty

continue

if inspect.isgenerator(contents):
logger.error(
"Programming error: function %s references generator %s "
"via its closure",
Expand Down
24 changes: 16 additions & 8 deletions synapse/storage/databases/main/end_to_end_keys.py
Original file line number Diff line number Diff line change
Expand Up @@ -244,9 +244,7 @@ async def get_e2e_device_keys_and_signatures(
set_tag("include_all_devices", include_all_devices)
set_tag("include_deleted_devices", include_deleted_devices)

result = await self.db_pool.runInteraction(
"get_e2e_device_keys",
self._get_e2e_device_keys_txn,
result = await self._get_e2e_device_keys(
query_list,
include_all_devices,
include_deleted_devices,
Expand Down Expand Up @@ -285,9 +283,8 @@ async def get_e2e_device_keys_and_signatures(
log_kv(result)
return result

def _get_e2e_device_keys_txn(
async def _get_e2e_device_keys(
self,
txn: LoggingTransaction,
query_list: Collection[Tuple[str, Optional[str]]],
include_all_devices: bool = False,
include_deleted_devices: bool = False,
Expand Down Expand Up @@ -319,7 +316,7 @@ def _get_e2e_device_keys_txn(

if user_list:
user_id_in_list_clause, user_args = make_in_list_sql_clause(
txn.database_engine, "user_id", user_list
self.database_engine, "user_id", user_list
)
query_clauses.append(user_id_in_list_clause)
query_params_list.append(user_args)
Expand All @@ -332,13 +329,16 @@ def _get_e2e_device_keys_txn(
user_device_id_in_list_clause,
user_device_args,
) = make_tuple_in_list_sql_clause(
txn.database_engine, ("user_id", "device_id"), user_device_batch
self.database_engine, ("user_id", "device_id"), user_device_batch
)
query_clauses.append(user_device_id_in_list_clause)
query_params_list.append(user_device_args)

result: Dict[str, Dict[str, Optional[DeviceKeyLookupResult]]] = {}
for query_clause, query_params in zip(query_clauses, query_params_list):

def get_e2e_device_keys_txn(
txn: LoggingTransaction, query_clause: str, query_params: list
) -> None:
sql = (
"SELECT user_id, device_id, "
" d.display_name, "
Expand All @@ -361,6 +361,14 @@ def _get_e2e_device_keys_txn(
display_name, db_to_json(key_json) if key_json else None
)

for query_clause, query_params in zip(query_clauses, query_params_list):
await self.db_pool.runInteraction(
"_get_e2e_device_keys",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if you purposefully used a new transaction description or not (it used to not have the _ in front of it I think).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, doesn't really matter. Though unintended consequence of this means we're not going to confuse the two types of transactions in the graphs

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we're not going to confuse the two types of transactions in the graphs

Yeah sometimes I feel like it is desirable to re-use it, even if it isn't apples-to-apples. I guess that's more important with cache names, maybe. 🤷

get_e2e_device_keys_txn,
query_clause,
query_params,
)

if include_deleted_devices:
for user_id, device_id in deleted_devices:
if device_id is None:
Expand Down