diff --git a/changelog.d/17241.bugfix b/changelog.d/17241.bugfix new file mode 100644 index 00000000000..1b7f0bca941 --- /dev/null +++ b/changelog.d/17241.bugfix @@ -0,0 +1 @@ +Fix handling of duplicate concurrent uploading of device one-time-keys. diff --git a/synapse/handlers/e2e_keys.py b/synapse/handlers/e2e_keys.py index 1ece54ccfca..4f40e9ffd6e 100644 --- a/synapse/handlers/e2e_keys.py +++ b/synapse/handlers/e2e_keys.py @@ -53,6 +53,9 @@ logger = logging.getLogger(__name__) +ONE_TIME_KEY_UPLOAD = "one_time_key_upload_lock" + + class E2eKeysHandler: def __init__(self, hs: "HomeServer"): self.config = hs.config @@ -62,6 +65,7 @@ def __init__(self, hs: "HomeServer"): self._appservice_handler = hs.get_application_service_handler() self.is_mine = hs.is_mine self.clock = hs.get_clock() + self._worker_lock_handler = hs.get_worker_locks_handler() federation_registry = hs.get_federation_registry() @@ -855,45 +859,53 @@ async def upload_keys_for_user( async def _upload_one_time_keys_for_user( self, user_id: str, device_id: str, time_now: int, one_time_keys: JsonDict ) -> None: - logger.info( - "Adding one_time_keys %r for device %r for user %r at %d", - one_time_keys.keys(), - device_id, - user_id, - time_now, - ) + # We take out a lock so that we don't have to worry about a client + # sending duplicate requests. + lock_key = f"{user_id}_{device_id}" + async with self._worker_lock_handler.acquire_lock( + ONE_TIME_KEY_UPLOAD, lock_key + ): + logger.info( + "Adding one_time_keys %r for device %r for user %r at %d", + one_time_keys.keys(), + device_id, + user_id, + time_now, + ) - # make a list of (alg, id, key) tuples - key_list = [] - for key_id, key_obj in one_time_keys.items(): - algorithm, key_id = key_id.split(":") - key_list.append((algorithm, key_id, key_obj)) + # make a list of (alg, id, key) tuples + key_list = [] + for key_id, key_obj in one_time_keys.items(): + algorithm, key_id = key_id.split(":") + key_list.append((algorithm, key_id, key_obj)) - # First we check if we have already persisted any of the keys. - existing_key_map = await self.store.get_e2e_one_time_keys( - user_id, device_id, [k_id for _, k_id, _ in key_list] - ) + # First we check if we have already persisted any of the keys. + existing_key_map = await self.store.get_e2e_one_time_keys( + user_id, device_id, [k_id for _, k_id, _ in key_list] + ) - new_keys = [] # Keys that we need to insert. (alg, id, json) tuples. - for algorithm, key_id, key in key_list: - ex_json = existing_key_map.get((algorithm, key_id), None) - if ex_json: - if not _one_time_keys_match(ex_json, key): - raise SynapseError( - 400, - ( - "One time key %s:%s already exists. " - "Old key: %s; new key: %r" + new_keys = [] # Keys that we need to insert. (alg, id, json) tuples. + for algorithm, key_id, key in key_list: + ex_json = existing_key_map.get((algorithm, key_id), None) + if ex_json: + if not _one_time_keys_match(ex_json, key): + raise SynapseError( + 400, + ( + "One time key %s:%s already exists. " + "Old key: %s; new key: %r" + ) + % (algorithm, key_id, ex_json, key), ) - % (algorithm, key_id, ex_json, key), + else: + new_keys.append( + (algorithm, key_id, encode_canonical_json(key).decode("ascii")) ) - else: - new_keys.append( - (algorithm, key_id, encode_canonical_json(key).decode("ascii")) - ) - log_kv({"message": "Inserting new one_time_keys.", "keys": new_keys}) - await self.store.add_e2e_one_time_keys(user_id, device_id, time_now, new_keys) + log_kv({"message": "Inserting new one_time_keys.", "keys": new_keys}) + await self.store.add_e2e_one_time_keys( + user_id, device_id, time_now, new_keys + ) async def upload_signing_keys_for_user( self, user_id: str, keys: JsonDict