Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Rewrite the KeyRing #10035

Merged
merged 17 commits into from
Jun 2, 2021
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 41 additions & 34 deletions synapse/crypto/keyring.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,18 @@ class KeyLookupError(ValueError):

@attr.s(slots=True)
class _FetchKeyRequest:
"""A request for keys for a given server."""
"""A request for keys for a given server.

We will continue to try and fetch until we have all the keys listed under
`key_ids` (with an appropriate `valid_until_ts` property) or we run out of
places to fetch keys from.

Attributes:
server_name: The name of the server that owns the keys.
minimum_valid_until_ts: The earliest timestamp at which we need the
keys to be valid at.
erikjohnston marked this conversation as resolved.
Show resolved Hide resolved
key_ids: The IDs of the keys to attempt to fetch
"""

server_name = attr.ib(type=str)
minimum_valid_until_ts = attr.ib(type=int)
Expand Down Expand Up @@ -166,12 +177,12 @@ def __init__(
process_batch_callback=self._inner_fetch_key_requests,
) # type: BatchingQueue[_FetchKeyRequest, Dict[str, Dict[str, FetchKeyResult]]]

def verify_json_for_server(
async def verify_json_for_server(
self,
server_name: str,
json_object: JsonDict,
validity_time: int,
) -> defer.Deferred:
):
erikjohnston marked this conversation as resolved.
Show resolved Hide resolved
"""Verify that a JSON object has been signed by a given server

Args:
Expand All @@ -191,7 +202,7 @@ def verify_json_for_server(
json_object,
validity_time,
)
return defer.ensureDeferred(self.process_request(request))
return await self.process_request(request)
erikjohnston marked this conversation as resolved.
Show resolved Hide resolved

def verify_json_objects_for_server(
self, server_and_json: Iterable[Tuple[str, dict, int]]
Expand All @@ -214,15 +225,13 @@ def verify_json_objects_for_server(
logcontext.
"""
return [
defer.ensureDeferred(
run_in_background(
self.process_request,
VerifyJsonRequest.from_json_object(
server_name,
json_object,
validity_time,
),
)
run_in_background(
self.process_request,
VerifyJsonRequest.from_json_object(
server_name,
json_object,
validity_time,
),
)
for server_name, json_object, validity_time in server_and_json
]
Expand Down Expand Up @@ -276,43 +285,38 @@ async def process_request(self, verify_request: VerifyJsonRequest) -> None:
Codes.UNAUTHORIZED,
)

# Add they keys we need to verify to the queue for retrieval. We queue
# Add the keys we need to verify to the queue for retrieval. We queue
# up requests for the same server so we don't end up with many in flight
# requests for the same keys.
key_request = verify_request.to_fetch_key_request()
found_keys_by_server = await self._server_queue.add_to_queue(
verify_request.to_fetch_key_request(), key=verify_request.server_name
key_request, key=verify_request.server_name
)

# Since we batch up requests the returned set of keys may contain keys
# from other servers, so we pull out only the ones we care about.s
found_keys = found_keys_by_server.get(verify_request.server_name, {})

# For each signature to check we ensure we have fetched the necessary
# keys and the signature matches.
# Verify each signature we got valid keys for, raising if we can't
# verify any of them.
verified = False
for key_id in verify_request.key_ids:
key_result = found_keys.get(key_id)
if not key_result:
raise SynapseError(
401,
f"Missing keys for {verify_request.server_name}: {key_id}",
Codes.UNAUTHORIZED,
)
continue

if key_result.valid_until_ts < verify_request.minimum_valid_until_ts:
raise SynapseError(
401,
f"Failed to find key with recent enough `valid_until_ts` for {verify_request.server_name}: {key_id}",
Codes.UNAUTHORIZED,
)
continue

verify_key = key_result.verify_key
json_object = verify_request.get_json_object()
try:
json_object = verify_request.get_json_object()
verify_signed_json(
json_object,
verify_request.server_name,
verify_key,
)
verified = True
except SignatureVerifyException as e:
logger.debug(
"Error verifying signature for %s:%s:%s with key %s: %s",
Expand All @@ -334,6 +338,13 @@ async def process_request(self, verify_request: VerifyJsonRequest) -> None:
Codes.UNAUTHORIZED,
)

if not verified:
raise SynapseError(
401,
f"Failed to find any key to satisfy: {key_request}",
Codes.UNAUTHORIZED,
)

async def _inner_fetch_key_requests(
self, requests: List[_FetchKeyRequest]
) -> Dict[str, Dict[str, FetchKeyResult]]:
Expand All @@ -348,12 +359,8 @@ async def _inner_fetch_key_requests(
for request in requests:
by_server = server_to_key_to_ts.setdefault(request.server_name, {})
for key_id in request.key_ids:
existing_ts = by_server.get(key_id)

if existing_ts:
by_server[key_id] = max(request.minimum_valid_until_ts, existing_ts)
else:
by_server[key_id] = request.minimum_valid_until_ts
existing_ts = by_server.get(key_id, 0)
by_server[key_id] = max(request.minimum_valid_until_ts, existing_ts)

deduped_requests = [
_FetchKeyRequest(server_name, minimum_valid_ts, [key_id])
Expand Down
19 changes: 3 additions & 16 deletions tests/crypto/test_keyring.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
from nacl.signing import SigningKey
from signedjson.key import encode_verify_key_base64, get_verify_key

from twisted.internet import defer
from twisted.internet.defer import Deferred, ensureDeferred

from synapse.api.errors import SynapseError
Expand Down Expand Up @@ -611,21 +610,9 @@ def get_key_id(key):
return "%s:%s" % (key.alg, key.version)


@defer.inlineCallbacks
def run_in_context(f, *args, **kwargs):
with LoggingContext("testctx"):
rv = yield f(*args, **kwargs)
return rv


def _verify_json_for_server(kr, *args):
async def _verify_json_for_server(kr, *args):
"""thin wrapper around verify_json_for_server which makes sure it is wrapped
with the patched defer.inlineCallbacks.
erikjohnston marked this conversation as resolved.
Show resolved Hide resolved
"""

@defer.inlineCallbacks
def v():
rv1 = yield kr.verify_json_for_server(*args)
return rv1

return run_in_context(v)
with LoggingContext("testctx"):
return await kr.verify_json_for_server(*args)
37 changes: 16 additions & 21 deletions tests/util/test_batching_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,37 +45,32 @@ async def _process_queue(self, values):
self._pending_calls.append((values, d))
return await make_deferred_yieldable(d)

def _get_sample_with_name(self, metric, name) -> int:
"""For a prometheus metric get the value of the sample that has a
matching "name" label.
"""
for sample in metric.collect()[0].samples:
if sample.labels.get("name") == name:
return sample.value

self.fail("Found no matching sample")

def _assert_metrics(self, queued, keys, in_flight):
"""Assert that the metrics are correct"""

self.assertEqual(len(number_queued.collect()), 1)
self.assertEqual(len(number_queued.collect()[0].samples), 1)
sample = self._get_sample_with_name(number_queued, self.queue._name)
self.assertEqual(
number_queued.collect()[0].samples[0].labels,
{"name": self.queue._name},
)
self.assertEqual(
number_queued.collect()[0].samples[0].value,
sample,
queued,
"number_queued",
)

self.assertEqual(len(number_of_keys.collect()), 1)
self.assertEqual(len(number_of_keys.collect()[0].samples), 1)
self.assertEqual(
number_queued.collect()[0].samples[0].labels, {"name": self.queue._name}
)
self.assertEqual(
number_of_keys.collect()[0].samples[0].value, keys, "number_of_keys"
)
sample = self._get_sample_with_name(number_of_keys, self.queue._name)
self.assertEqual(sample, keys, "number_of_keys")

self.assertEqual(len(number_in_flight.collect()), 1)
self.assertEqual(len(number_in_flight.collect()[0].samples), 1)
self.assertEqual(
number_queued.collect()[0].samples[0].labels, {"name": self.queue._name}
)
sample = self._get_sample_with_name(number_in_flight, self.queue._name)
self.assertEqual(
number_in_flight.collect()[0].samples[0].value,
sample,
in_flight,
"number_in_flight",
)
Expand Down