From b54b03f9e1abc1964fe5f00115a165a2b8e10df5 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 9 May 2019 13:21:57 +0100 Subject: [PATCH 01/13] Allow client event serialization to be async --- synapse/events/utils.py | 44 +++++++++++++++++ synapse/handlers/events.py | 8 ++-- synapse/handlers/initial_sync.py | 44 ++++++++++------- synapse/handlers/message.py | 7 +-- synapse/handlers/pagination.py | 22 +++++---- synapse/handlers/search.py | 42 +++++++++-------- synapse/rest/client/v1/events.py | 5 +- synapse/rest/client/v1/room.py | 29 +++++++----- synapse/rest/client/v2_alpha/notifications.py | 10 ++-- synapse/rest/client/v2_alpha/sync.py | 47 ++++++++++--------- synapse/server.py | 5 ++ synapse/util/async_helpers.py | 19 ++++++++ 12 files changed, 187 insertions(+), 95 deletions(-) diff --git a/synapse/events/utils.py b/synapse/events/utils.py index 07fccdd8f98c..a5454556ccae 100644 --- a/synapse/events/utils.py +++ b/synapse/events/utils.py @@ -19,7 +19,10 @@ from frozendict import frozendict +from twisted.internet import defer + from synapse.api.constants import EventTypes +from synapse.util.async_helpers import yieldable_gather_results from . import EventBase @@ -311,3 +314,44 @@ def serialize_event(e, time_now_ms, as_client_event=True, d = only_fields(d, only_event_fields) return d + + +class EventClientSerializer(object): + """Serializes events that are to be sent to clients. + + This is used for bundling extra information with any events to be sent to + clients. + """ + + def __init__(self, hs): + pass + + def serialize_event(self, event, time_now, **kwargs): + """Serializes a single event. + + Args: + event (EventBase) + time_now (int): The current time in milliseconds + **kwargs: Arguments to pass to `serialize_event` + + Returns: + Deferred[dict]: The serialized event + """ + event = serialize_event(event, time_now, **kwargs) + return defer.succeed(event) + + def serialize_events(self, events, time_now, **kwargs): + """Serializes multiple events. + + Args: + event (iter[EventBase]) + time_now (int): The current time in milliseconds + **kwargs: Arguments to pass to `serialize_event` + + Returns: + Deferred[list[dict]]: The list of serialized events + """ + return yieldable_gather_results( + self.serialize_event, events, + time_now=time_now, **kwargs + ) diff --git a/synapse/handlers/events.py b/synapse/handlers/events.py index 1b4d8c74ae13..6003ad9cca87 100644 --- a/synapse/handlers/events.py +++ b/synapse/handlers/events.py @@ -21,7 +21,6 @@ from synapse.api.constants import EventTypes, Membership from synapse.api.errors import AuthError, SynapseError from synapse.events import EventBase -from synapse.events.utils import serialize_event from synapse.types import UserID from synapse.util.logutils import log_function from synapse.visibility import filter_events_for_client @@ -50,6 +49,7 @@ def __init__(self, hs): self.notifier = hs.get_notifier() self.state = hs.get_state_handler() self._server_notices_sender = hs.get_server_notices_sender() + self._event_serializer = hs.get_event_client_serializer() @defer.inlineCallbacks @log_function @@ -120,9 +120,9 @@ def get_stream(self, auth_user_id, pagin_config, timeout=0, time_now = self.clock.time_msec() - chunks = [ - serialize_event(e, time_now, as_client_event) for e in events - ] + chunks = yield self._event_serializer.serialize_events( + events, time_now, as_client_event=as_client_event, + ) chunk = { "chunk": chunks, diff --git a/synapse/handlers/initial_sync.py b/synapse/handlers/initial_sync.py index 7dfae78db01e..aaee5db0b7b5 100644 --- a/synapse/handlers/initial_sync.py +++ b/synapse/handlers/initial_sync.py @@ -19,7 +19,6 @@ from synapse.api.constants import EventTypes, Membership from synapse.api.errors import AuthError, Codes, SynapseError -from synapse.events.utils import serialize_event from synapse.events.validator import EventValidator from synapse.handlers.presence import format_user_presence_state from synapse.streams.config import PaginationConfig @@ -43,6 +42,7 @@ def __init__(self, hs): self.clock = hs.get_clock() self.validator = EventValidator() self.snapshot_cache = SnapshotCache() + self._event_serializer = hs.get_event_client_serializer() def snapshot_all_rooms(self, user_id=None, pagin_config=None, as_client_event=True, include_archived=False): @@ -138,7 +138,9 @@ def handle_room(event): d["inviter"] = event.sender invite_event = yield self.store.get_event(event.event_id) - d["invite"] = serialize_event(invite_event, time_now, as_client_event) + d["invite"] = yield self._event_serializer.serialize_event( + invite_event, time_now, as_client_event, + ) rooms_ret.append(d) @@ -185,18 +187,21 @@ def handle_room(event): time_now = self.clock.time_msec() d["messages"] = { - "chunk": [ - serialize_event(m, time_now, as_client_event) - for m in messages - ], + "chunk": ( + yield self._event_serializer.serialize_events( + messages, time_now=time_now, + as_client_event=as_client_event, + ) + ), "start": start_token.to_string(), "end": end_token.to_string(), } - d["state"] = [ - serialize_event(c, time_now, as_client_event) - for c in current_state.values() - ] + d["state"] = yield self._event_serializer.serialize_events( + current_state.values(), + time_now=time_now, + as_client_event=as_client_event + ) account_data_events = [] tags = tags_by_room.get(event.room_id) @@ -337,11 +342,15 @@ def _room_initial_sync_parted(self, user_id, room_id, pagin_config, "membership": membership, "room_id": room_id, "messages": { - "chunk": [serialize_event(m, time_now) for m in messages], + "chunk": (yield self._event_serializer.serialize_events( + messages, time_now, + )), "start": start_token.to_string(), "end": end_token.to_string(), }, - "state": [serialize_event(s, time_now) for s in room_state.values()], + "state": (yield self._event_serializer.serialize_events( + room_state.values(), time_now, + )), "presence": [], "receipts": [], }) @@ -355,10 +364,9 @@ def _room_initial_sync_joined(self, user_id, room_id, pagin_config, # TODO: These concurrently time_now = self.clock.time_msec() - state = [ - serialize_event(x, time_now) - for x in current_state.values() - ] + state = yield self._event_serializer.serialize_events( + current_state.values(), time_now, + ) now_token = yield self.hs.get_event_sources().get_current_token() @@ -425,7 +433,9 @@ def get_receipts(): ret = { "room_id": room_id, "messages": { - "chunk": [serialize_event(m, time_now) for m in messages], + "chunk": (yield self._event_serializer.serialize_events( + messages, time_now, + )), "start": start_token.to_string(), "end": end_token.to_string(), }, diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index e5afeadf68e1..7b2c33a9228f 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -32,7 +32,6 @@ ) from synapse.api.room_versions import RoomVersions from synapse.api.urls import ConsentURIBuilder -from synapse.events.utils import serialize_event from synapse.events.validator import EventValidator from synapse.replication.http.send_event import ReplicationSendEventRestServlet from synapse.storage.state import StateFilter @@ -57,6 +56,7 @@ def __init__(self, hs): self.clock = hs.get_clock() self.state = hs.get_state_handler() self.store = hs.get_datastore() + self._event_serializer = hs.get_event_client_serializer() @defer.inlineCallbacks def get_room_data(self, user_id=None, room_id=None, @@ -164,9 +164,10 @@ def get_state_events( room_state = room_state[membership_event_id] now = self.clock.time_msec() - defer.returnValue( - [serialize_event(c, now) for c in room_state.values()] + events = yield self._event_serializer.serialize_events( + room_state.values(), now, ) + defer.returnValue(events) @defer.inlineCallbacks def get_joined_members(self, requester, room_id): diff --git a/synapse/handlers/pagination.py b/synapse/handlers/pagination.py index e4fdae9266b8..8f811e24fed9 100644 --- a/synapse/handlers/pagination.py +++ b/synapse/handlers/pagination.py @@ -20,7 +20,6 @@ from synapse.api.constants import EventTypes, Membership from synapse.api.errors import SynapseError -from synapse.events.utils import serialize_event from synapse.storage.state import StateFilter from synapse.types import RoomStreamToken from synapse.util.async_helpers import ReadWriteLock @@ -78,6 +77,7 @@ def __init__(self, hs): self._purges_in_progress_by_room = set() # map from purge id to PurgeStatus self._purges_by_id = {} + self._event_serializer = hs.get_event_client_serializer() def start_purge_history(self, room_id, token, delete_local_events=False): @@ -278,18 +278,22 @@ def get_messages(self, requester, room_id=None, pagin_config=None, time_now = self.clock.time_msec() chunk = { - "chunk": [ - serialize_event(e, time_now, as_client_event) - for e in events - ], + "chunk": ( + yield self._event_serializer.serialize_events( + events, time_now, + as_client_event=as_client_event, + ) + ), "start": pagin_config.from_token.to_string(), "end": next_token.to_string(), } if state: - chunk["state"] = [ - serialize_event(e, time_now, as_client_event) - for e in state - ] + chunk["state"] = ( + yield self._event_serializer.serialize_events( + state, time_now, + as_client_event=as_client_event, + ) + ) defer.returnValue(chunk) diff --git a/synapse/handlers/search.py b/synapse/handlers/search.py index 49c439313ec2..9bba74d6c91c 100644 --- a/synapse/handlers/search.py +++ b/synapse/handlers/search.py @@ -23,7 +23,6 @@ from synapse.api.constants import EventTypes, Membership from synapse.api.errors import SynapseError from synapse.api.filtering import Filter -from synapse.events.utils import serialize_event from synapse.storage.state import StateFilter from synapse.visibility import filter_events_for_client @@ -36,6 +35,7 @@ class SearchHandler(BaseHandler): def __init__(self, hs): super(SearchHandler, self).__init__(hs) + self._event_serializer = hs.get_event_client_serializer() @defer.inlineCallbacks def get_old_rooms_from_upgraded_room(self, room_id): @@ -401,14 +401,16 @@ def search(self, user, content, batch=None): time_now = self.clock.time_msec() for context in contexts.values(): - context["events_before"] = [ - serialize_event(e, time_now) - for e in context["events_before"] - ] - context["events_after"] = [ - serialize_event(e, time_now) - for e in context["events_after"] - ] + context["events_before"] = ( + yield self._event_serializer.serialize_events( + context["events_before"], time_now, + ) + ) + context["events_after"] = ( + yield self._event_serializer.serialize_events( + context["events_after"], time_now, + ) + ) state_results = {} if include_state: @@ -422,14 +424,13 @@ def search(self, user, content, batch=None): # We're now about to serialize the events. We should not make any # blocking calls after this. Otherwise the 'age' will be wrong - results = [ - { + results = [] + for e in allowed_events: + results.append({ "rank": rank_map[e.event_id], - "result": serialize_event(e, time_now), + "result": (yield self._event_serializer.serialize_event(e, time_now)), "context": contexts.get(e.event_id, {}), - } - for e in allowed_events - ] + }) rooms_cat_res = { "results": results, @@ -438,10 +439,13 @@ def search(self, user, content, batch=None): } if state_results: - rooms_cat_res["state"] = { - room_id: [serialize_event(e, time_now) for e in state] - for room_id, state in state_results.items() - } + s = {} + for room_id, state in state_results.items(): + s[room_id] = yield self._event_serializer.serialize_events( + state, time_now, + ) + + rooms_cat_res["state"] = s if room_groups and "room_id" in group_keys: rooms_cat_res.setdefault("groups", {})["room_id"] = room_groups diff --git a/synapse/rest/client/v1/events.py b/synapse/rest/client/v1/events.py index cd9b3bdbd123..c3b0a39ab704 100644 --- a/synapse/rest/client/v1/events.py +++ b/synapse/rest/client/v1/events.py @@ -19,7 +19,6 @@ from twisted.internet import defer from synapse.api.errors import SynapseError -from synapse.events.utils import serialize_event from synapse.streams.config import PaginationConfig from .base import ClientV1RestServlet, client_path_patterns @@ -84,6 +83,7 @@ def __init__(self, hs): super(EventRestServlet, self).__init__(hs) self.clock = hs.get_clock() self.event_handler = hs.get_event_handler() + self._event_serializer = hs.get_event_client_serializer() @defer.inlineCallbacks def on_GET(self, request, event_id): @@ -92,7 +92,8 @@ def on_GET(self, request, event_id): time_now = self.clock.time_msec() if event: - defer.returnValue((200, serialize_event(event, time_now))) + event = yield self._event_serializer.serialize_event(event, time_now) + defer.returnValue((200, event)) else: defer.returnValue((404, "Event not found.")) diff --git a/synapse/rest/client/v1/room.py b/synapse/rest/client/v1/room.py index fab04965cb08..255a85c5888a 100644 --- a/synapse/rest/client/v1/room.py +++ b/synapse/rest/client/v1/room.py @@ -26,7 +26,7 @@ from synapse.api.constants import EventTypes, Membership from synapse.api.errors import AuthError, Codes, SynapseError from synapse.api.filtering import Filter -from synapse.events.utils import format_event_for_client_v2, serialize_event +from synapse.events.utils import format_event_for_client_v2 from synapse.http.servlet import ( assert_params_in_dict, parse_integer, @@ -537,6 +537,7 @@ def __init__(self, hs): super(RoomEventServlet, self).__init__(hs) self.clock = hs.get_clock() self.event_handler = hs.get_event_handler() + self._event_serializer = hs.get_event_client_serializer() @defer.inlineCallbacks def on_GET(self, request, room_id, event_id): @@ -545,7 +546,8 @@ def on_GET(self, request, room_id, event_id): time_now = self.clock.time_msec() if event: - defer.returnValue((200, serialize_event(event, time_now))) + event = yield self._event_serializer.serialize_event(event, time_now) + defer.returnValue((200, event)) else: defer.returnValue((404, "Event not found.")) @@ -559,6 +561,7 @@ def __init__(self, hs): super(RoomEventContextServlet, self).__init__(hs) self.clock = hs.get_clock() self.room_context_handler = hs.get_room_context_handler() + self._event_serializer = hs.get_event_client_serializer() @defer.inlineCallbacks def on_GET(self, request, room_id, event_id): @@ -588,16 +591,18 @@ def on_GET(self, request, room_id, event_id): ) time_now = self.clock.time_msec() - results["events_before"] = [ - serialize_event(event, time_now) for event in results["events_before"] - ] - results["event"] = serialize_event(results["event"], time_now) - results["events_after"] = [ - serialize_event(event, time_now) for event in results["events_after"] - ] - results["state"] = [ - serialize_event(event, time_now) for event in results["state"] - ] + results["events_before"] = yield self._event_serializer.serialize_events( + results["events_before"], time_now, + ) + results["event"] = yield self._event_serializer.serialize_event( + results["event"], time_now, + ) + results["events_after"] = yield self._event_serializer.serialize_events( + results["events_after"], time_now, + ) + results["state"] = yield self._event_serializer.serialize_events( + results["state"], time_now, + ) defer.returnValue((200, results)) diff --git a/synapse/rest/client/v2_alpha/notifications.py b/synapse/rest/client/v2_alpha/notifications.py index 2a6ea3df5f1e..0a1eb0ae45f2 100644 --- a/synapse/rest/client/v2_alpha/notifications.py +++ b/synapse/rest/client/v2_alpha/notifications.py @@ -17,10 +17,7 @@ from twisted.internet import defer -from synapse.events.utils import ( - format_event_for_client_v2_without_room_id, - serialize_event, -) +from synapse.events.utils import format_event_for_client_v2_without_room_id from synapse.http.servlet import RestServlet, parse_integer, parse_string from ._base import client_v2_patterns @@ -36,6 +33,7 @@ def __init__(self, hs): self.store = hs.get_datastore() self.auth = hs.get_auth() self.clock = hs.get_clock() + self._event_serializer = hs.get_event_client_serializer() @defer.inlineCallbacks def on_GET(self, request): @@ -69,11 +67,11 @@ def on_GET(self, request): "profile_tag": pa["profile_tag"], "actions": pa["actions"], "ts": pa["received_ts"], - "event": serialize_event( + "event": (yield self._event_serializer.serialize_event( notif_events[pa["event_id"]], self.clock.time_msec(), event_format=format_event_for_client_v2_without_room_id, - ), + )), } if pa["room_id"] not in receipts_by_room: diff --git a/synapse/rest/client/v2_alpha/sync.py b/synapse/rest/client/v2_alpha/sync.py index 39d157a44be8..078d65969a1d 100644 --- a/synapse/rest/client/v2_alpha/sync.py +++ b/synapse/rest/client/v2_alpha/sync.py @@ -26,7 +26,6 @@ from synapse.events.utils import ( format_event_for_client_v2_without_room_id, format_event_raw, - serialize_event, ) from synapse.handlers.presence import format_user_presence_state from synapse.handlers.sync import SyncConfig @@ -86,6 +85,7 @@ def __init__(self, hs): self.filtering = hs.get_filtering() self.presence_handler = hs.get_presence_handler() self._server_notices_sender = hs.get_server_notices_sender() + self._event_serializer = hs.get_event_client_serializer() @defer.inlineCallbacks def on_GET(self, request): @@ -168,14 +168,14 @@ def on_GET(self, request): ) time_now = self.clock.time_msec() - response_content = self.encode_response( + response_content = yield self.encode_response( time_now, sync_result, requester.access_token_id, filter ) defer.returnValue((200, response_content)) - @staticmethod - def encode_response(time_now, sync_result, access_token_id, filter): + @defer.inlineCallbacks + def encode_response(self, time_now, sync_result, access_token_id, filter): if filter.event_format == 'client': event_formatter = format_event_for_client_v2_without_room_id elif filter.event_format == 'federation': @@ -183,18 +183,18 @@ def encode_response(time_now, sync_result, access_token_id, filter): else: raise Exception("Unknown event format %s" % (filter.event_format, )) - joined = SyncRestServlet.encode_joined( + joined = yield self.encode_joined( sync_result.joined, time_now, access_token_id, filter.event_fields, event_formatter, ) - invited = SyncRestServlet.encode_invited( + invited = yield self.encode_invited( sync_result.invited, time_now, access_token_id, event_formatter, ) - archived = SyncRestServlet.encode_archived( + archived = yield self.encode_archived( sync_result.archived, time_now, access_token_id, filter.event_fields, event_formatter, @@ -239,8 +239,8 @@ def encode_presence(events, time_now): ] } - @staticmethod - def encode_joined(rooms, time_now, token_id, event_fields, event_formatter): + @defer.inlineCallbacks + def encode_joined(self, rooms, time_now, token_id, event_fields, event_formatter): """ Encode the joined rooms in a sync result @@ -261,15 +261,15 @@ def encode_joined(rooms, time_now, token_id, event_fields, event_formatter): """ joined = {} for room in rooms: - joined[room.room_id] = SyncRestServlet.encode_room( + joined[room.room_id] = yield self.encode_room( room, time_now, token_id, joined=True, only_fields=event_fields, event_formatter=event_formatter, ) return joined - @staticmethod - def encode_invited(rooms, time_now, token_id, event_formatter): + @defer.inlineCallbacks + def encode_invited(self, rooms, time_now, token_id, event_formatter): """ Encode the invited rooms in a sync result @@ -289,7 +289,7 @@ def encode_invited(rooms, time_now, token_id, event_formatter): """ invited = {} for room in rooms: - invite = serialize_event( + invite = yield self._event_serializer.serialize_event( room.invite, time_now, token_id=token_id, event_format=event_formatter, is_invite=True, @@ -304,8 +304,8 @@ def encode_invited(rooms, time_now, token_id, event_formatter): return invited - @staticmethod - def encode_archived(rooms, time_now, token_id, event_fields, event_formatter): + @defer.inlineCallbacks + def encode_archived(self, rooms, time_now, token_id, event_fields, event_formatter): """ Encode the archived rooms in a sync result @@ -326,7 +326,7 @@ def encode_archived(rooms, time_now, token_id, event_fields, event_formatter): """ joined = {} for room in rooms: - joined[room.room_id] = SyncRestServlet.encode_room( + joined[room.room_id] = yield self.encode_room( room, time_now, token_id, joined=False, only_fields=event_fields, event_formatter=event_formatter, @@ -334,9 +334,9 @@ def encode_archived(rooms, time_now, token_id, event_fields, event_formatter): return joined - @staticmethod + @defer.inlineCallbacks def encode_room( - room, time_now, token_id, joined, + self, room, time_now, token_id, joined, only_fields, event_formatter, ): """ @@ -355,9 +355,10 @@ def encode_room( Returns: dict[str, object]: the room, encoded in our response format """ - def serialize(event): - return serialize_event( - event, time_now, token_id=token_id, + def serialize(events): + return self._event_serializer.serialize_events( + events, time_now=time_now, + token_id=token_id, event_format=event_formatter, only_event_fields=only_fields, ) @@ -376,8 +377,8 @@ def serialize(event): event.event_id, room.room_id, event.room_id, ) - serialized_state = [serialize(e) for e in state_events] - serialized_timeline = [serialize(e) for e in timeline_events] + serialized_state = yield serialize(state_events) + serialized_timeline = yield serialize(timeline_events) account_data = room.account_data diff --git a/synapse/server.py b/synapse/server.py index 8c30ac2fa5fe..80d40b9272bc 100644 --- a/synapse/server.py +++ b/synapse/server.py @@ -35,6 +35,7 @@ from synapse.crypto.keyring import Keyring from synapse.events.builder import EventBuilderFactory from synapse.events.spamcheck import SpamChecker +from synapse.events.utils import EventClientSerializer from synapse.federation.federation_client import FederationClient from synapse.federation.federation_server import ( FederationHandlerRegistry, @@ -185,6 +186,7 @@ def build_DEPENDENCY(self) 'sendmail', 'registration_handler', 'account_validity_handler', + 'event_client_serializer', ] REQUIRED_ON_MASTER_STARTUP = [ @@ -511,6 +513,9 @@ def build_registration_handler(self): def build_account_validity_handler(self): return AccountValidityHandler(self) + def build_event_client_serializer(self): + return EventClientSerializer(self) + def remove_pusher(self, app_id, push_key, user_id): return self.get_pusherpool().remove_pusher(app_id, push_key, user_id) diff --git a/synapse/util/async_helpers.py b/synapse/util/async_helpers.py index 2f16f23d91fa..9a17dfdab2fc 100644 --- a/synapse/util/async_helpers.py +++ b/synapse/util/async_helpers.py @@ -156,6 +156,25 @@ def _concurrently_execute_inner(): ], consumeErrors=True)).addErrback(unwrapFirstError) +def yieldable_gather_results(func, iter, *args, **kwargs): + """Executes the function with each argument concurrently. + + Args: + func (func): Function to execute that returns a Deferred + iter (iter): An iterable that yields items that get passed as the first + argument to the function + *args: Arguments to be passed to each call to func + + Returns + Deferred: Resolved when all functions have been invoked, or errors if + one of the function calls fails. + """ + return logcontext.make_deferred_yieldable(defer.gatherResults([ + run_in_background(func, item, *args, **kwargs) + for item in iter + ], consumeErrors=True)).addErrback(unwrapFirstError) + + class Linearizer(object): """Limits concurrent access to resources based on a key. Useful to ensure only a few things happen at a time on a given resource. From a80e6b53f903d75b053546db5d99c93bffc4495e Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 14 May 2019 11:58:45 +0100 Subject: [PATCH 02/13] Newsfile --- changelog.d/5183.misc | 1 + synapse/rest/client/v2_alpha/sync.py | 12 ++++++------ 2 files changed, 7 insertions(+), 6 deletions(-) create mode 100644 changelog.d/5183.misc diff --git a/changelog.d/5183.misc b/changelog.d/5183.misc new file mode 100644 index 000000000000..a8970f29eba1 --- /dev/null +++ b/changelog.d/5183.misc @@ -0,0 +1 @@ +Allow client event serialization to be async. diff --git a/synapse/rest/client/v2_alpha/sync.py b/synapse/rest/client/v2_alpha/sync.py index 078d65969a1d..c701e534e7b8 100644 --- a/synapse/rest/client/v2_alpha/sync.py +++ b/synapse/rest/client/v2_alpha/sync.py @@ -200,7 +200,7 @@ def encode_response(self, time_now, sync_result, access_token_id, filter): event_formatter, ) - return { + defer.returnValue({ "account_data": {"events": sync_result.account_data}, "to_device": {"events": sync_result.to_device}, "device_lists": { @@ -222,7 +222,7 @@ def encode_response(self, time_now, sync_result, access_token_id, filter): }, "device_one_time_keys_count": sync_result.device_one_time_keys_count, "next_batch": sync_result.next_batch.to_string(), - } + }) @staticmethod def encode_presence(events, time_now): @@ -266,7 +266,7 @@ def encode_joined(self, rooms, time_now, token_id, event_fields, event_formatter event_formatter=event_formatter, ) - return joined + defer.returnValue(joined) @defer.inlineCallbacks def encode_invited(self, rooms, time_now, token_id, event_formatter): @@ -302,7 +302,7 @@ def encode_invited(self, rooms, time_now, token_id, event_formatter): "invite_state": {"events": invited_state} } - return invited + defer.returnValue(invited) @defer.inlineCallbacks def encode_archived(self, rooms, time_now, token_id, event_fields, event_formatter): @@ -332,7 +332,7 @@ def encode_archived(self, rooms, time_now, token_id, event_fields, event_formatt event_formatter=event_formatter, ) - return joined + defer.returnValue(joined) @defer.inlineCallbacks def encode_room( @@ -398,7 +398,7 @@ def serialize(events): result["unread_notifications"] = room.unread_notifications result["summary"] = room.summary - return result + defer.returnValue(result) def register_servlets(hs, http_server): From 4fb44fb5b9f0043ab7bfbc0aa7251c92680cc639 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 14 May 2019 13:37:44 +0100 Subject: [PATCH 03/13] Expose DataStore._get_events as get_events_as_list This is in preparation for reaction work which requires it. --- synapse/storage/appservice.py | 4 +-- synapse/storage/event_federation.py | 6 ++-- synapse/storage/events_worker.py | 50 ++++++++++++++++++++--------- synapse/storage/search.py | 4 +-- synapse/storage/stream.py | 18 +++++++---- tests/storage/test_appservice.py | 2 +- 6 files changed, 54 insertions(+), 30 deletions(-) diff --git a/synapse/storage/appservice.py b/synapse/storage/appservice.py index 6092f600ba44..eb329ebd8b43 100644 --- a/synapse/storage/appservice.py +++ b/synapse/storage/appservice.py @@ -302,7 +302,7 @@ def _get_oldest_unsent_txn(txn): event_ids = json.loads(entry["event_ids"]) - events = yield self._get_events(event_ids) + events = yield self.get_events_as_list(event_ids) defer.returnValue( AppServiceTransaction(service=service, id=entry["txn_id"], events=events) @@ -358,7 +358,7 @@ def get_new_events_for_appservice_txn(txn): "get_new_events_for_appservice", get_new_events_for_appservice_txn ) - events = yield self._get_events(event_ids) + events = yield self.get_events_as_list(event_ids) defer.returnValue((upper_bound, events)) diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py index 956f87657298..09e39c2c28a4 100644 --- a/synapse/storage/event_federation.py +++ b/synapse/storage/event_federation.py @@ -45,7 +45,7 @@ def get_auth_chain(self, event_ids, include_given=False): """ return self.get_auth_chain_ids( event_ids, include_given=include_given - ).addCallback(self._get_events) + ).addCallback(self.get_events_as_list) def get_auth_chain_ids(self, event_ids, include_given=False): """Get auth events for given event_ids. The events *must* be state events. @@ -316,7 +316,7 @@ def get_backfill_events(self, room_id, event_list, limit): event_list, limit, ) - .addCallback(self._get_events) + .addCallback(self.get_events_as_list) .addCallback(lambda l: sorted(l, key=lambda e: -e.depth)) ) @@ -382,7 +382,7 @@ def get_missing_events(self, room_id, earliest_events, latest_events, limit): latest_events, limit, ) - events = yield self._get_events(ids) + events = yield self.get_events_as_list(ids) defer.returnValue(events) def _get_missing_events(self, txn, room_id, earliest_events, latest_events, limit): diff --git a/synapse/storage/events_worker.py b/synapse/storage/events_worker.py index 663991a9b695..5c40056d11e2 100644 --- a/synapse/storage/events_worker.py +++ b/synapse/storage/events_worker.py @@ -103,7 +103,7 @@ def get_event( Returns: Deferred : A FrozenEvent. """ - events = yield self._get_events( + events = yield self.get_events_as_list( [event_id], check_redacted=check_redacted, get_prev_content=get_prev_content, @@ -142,7 +142,7 @@ def get_events( Returns: Deferred : Dict from event_id to event. """ - events = yield self._get_events( + events = yield self.get_events_as_list( event_ids, check_redacted=check_redacted, get_prev_content=get_prev_content, @@ -152,13 +152,32 @@ def get_events( defer.returnValue({e.event_id: e for e in events}) @defer.inlineCallbacks - def _get_events( + def get_events_as_list( self, event_ids, check_redacted=True, get_prev_content=False, allow_rejected=False, ): + """Get events from the database and return in a list in the same order + as given by `event_ids` arg. + + Args: + event_ids (list): The event_ids of the events to fetch + check_redacted (bool): If True, check if event has been redacted + and redact it. + get_prev_content (bool): If True and event is a state event, + include the previous states content in the unsigned field. + allow_rejected (bool): If True return rejected events. + + Returns: + Deferred[list]: List of events fetched from the database. The + events are in the same order as `event_ids` arg. + + Note that the returned list may be smaller than the list of event + IDs if not all events could be fetched. + """ + if not event_ids: defer.returnValue([]) @@ -202,21 +221,22 @@ def _get_events( # # The problem is that we end up at this point when an event # which has been redacted is pulled out of the database by - # _enqueue_events, because _enqueue_events needs to check the - # redaction before it can cache the redacted event. So obviously, - # calling get_event to get the redacted event out of the database - # gives us an infinite loop. + # _enqueue_events, because _enqueue_events needs to check + # the redaction before it can cache the redacted event. So + # obviously, calling get_event to get the redacted event out + # of the database gives us an infinite loop. # - # For now (quick hack to fix during 0.99 release cycle), we just - # go and fetch the relevant row from the db, but it would be nice - # to think about how we can cache this rather than hit the db - # every time we access a redaction event. + # For now (quick hack to fix during 0.99 release cycle), we + # just go and fetch the relevant row from the db, but it + # would be nice to think about how we can cache this rather + # than hit the db every time we access a redaction event. # # One thought on how to do this: - # 1. split _get_events up so that it is divided into (a) get the - # rawish event from the db/cache, (b) do the redaction/rejection - # filtering - # 2. have _get_event_from_row just call the first half of that + # 1. split get_events_as_list up so that it is divided into + # (a) get the rawish event from the db/cache, (b) do the + # redaction/rejection filtering + # 2. have _get_event_from_row just call the first half of + # that orig_sender = yield self._simple_select_one_onecol( table="events", diff --git a/synapse/storage/search.py b/synapse/storage/search.py index 226f8f1b7eab..ff49eaae0278 100644 --- a/synapse/storage/search.py +++ b/synapse/storage/search.py @@ -460,7 +460,7 @@ def search_msgs(self, room_ids, search_term, keys): results = list(filter(lambda row: row["room_id"] in room_ids, results)) - events = yield self._get_events([r["event_id"] for r in results]) + events = yield self.get_events_as_list([r["event_id"] for r in results]) event_map = {ev.event_id: ev for ev in events} @@ -605,7 +605,7 @@ def search_rooms(self, room_ids, search_term, keys, limit, pagination_token=None results = list(filter(lambda row: row["room_id"] in room_ids, results)) - events = yield self._get_events([r["event_id"] for r in results]) + events = yield self.get_events_as_list([r["event_id"] for r in results]) event_map = {ev.event_id: ev for ev in events} diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index 9cd1e0f9fe94..d105b6b17d95 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -319,7 +319,9 @@ def f(txn): rows = yield self.runInteraction("get_room_events_stream_for_room", f) - ret = yield self._get_events([r.event_id for r in rows], get_prev_content=True) + ret = yield self.get_events_as_list([ + r.event_id for r in rows], get_prev_content=True, + ) self._set_before_and_after(ret, rows, topo_order=from_id is None) @@ -367,7 +369,9 @@ def f(txn): rows = yield self.runInteraction("get_membership_changes_for_user", f) - ret = yield self._get_events([r.event_id for r in rows], get_prev_content=True) + ret = yield self.get_events_as_list( + [r.event_id for r in rows], get_prev_content=True, + ) self._set_before_and_after(ret, rows, topo_order=False) @@ -394,7 +398,7 @@ def get_recent_events_for_room(self, room_id, limit, end_token): ) logger.debug("stream before") - events = yield self._get_events( + events = yield self.get_events_as_list( [r.event_id for r in rows], get_prev_content=True ) logger.debug("stream after") @@ -580,11 +584,11 @@ def get_events_around( event_filter, ) - events_before = yield self._get_events( + events_before = yield self.get_events_as_list( [e for e in results["before"]["event_ids"]], get_prev_content=True ) - events_after = yield self._get_events( + events_after = yield self.get_events_as_list( [e for e in results["after"]["event_ids"]], get_prev_content=True ) @@ -697,7 +701,7 @@ def get_all_new_events_stream_txn(txn): "get_all_new_events_stream", get_all_new_events_stream_txn ) - events = yield self._get_events(event_ids) + events = yield self.get_events_as_list(event_ids) defer.returnValue((upper_bound, events)) @@ -849,7 +853,7 @@ def paginate_room_events( event_filter, ) - events = yield self._get_events( + events = yield self.get_events_as_list( [r.event_id for r in rows], get_prev_content=True ) diff --git a/tests/storage/test_appservice.py b/tests/storage/test_appservice.py index 3f0083831b51..25a6c89ef5c1 100644 --- a/tests/storage/test_appservice.py +++ b/tests/storage/test_appservice.py @@ -340,7 +340,7 @@ def test_get_oldest_unsent_txn(self): other_events = [Mock(event_id="e5"), Mock(event_id="e6")] # we aren't testing store._base stuff here, so mock this out - self.store._get_events = Mock(return_value=events) + self.store.get_events_as_list = Mock(return_value=events) yield self._insert_txn(self.as_list[1]["id"], 9, other_events) yield self._insert_txn(service.id, 10, events) From 53788a447ffc9a2b35dbb0a43e7ac881fc075b17 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 14 May 2019 13:41:36 +0100 Subject: [PATCH 04/13] Newsfile --- changelog.d/5184.misc | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/5184.misc diff --git a/changelog.d/5184.misc b/changelog.d/5184.misc new file mode 100644 index 000000000000..1588bdef6c26 --- /dev/null +++ b/changelog.d/5184.misc @@ -0,0 +1 @@ +Expose DataStore._get_events as get_events_as_list. From dc4f6d1b0104bb6d736deaec454640681bd31360 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 14 May 2019 14:37:40 +0100 Subject: [PATCH 05/13] Use correct config option for ratelimiting in tests --- tests/utils.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tests/utils.py b/tests/utils.py index f21074ae2816..f38533a0c734 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -156,7 +156,8 @@ def default_config(name, parse=False): "mau_stats_only": False, "mau_limits_reserved_threepids": [], "admin_contact": None, - "rc_message": {"per_second": 10000, "burst_count": 10000}, + "rc_messages_per_second": 10000, + "rc_message_burst_count": 10000, "rc_registration": {"per_second": 10000, "burst_count": 10000}, "rc_login": { "address": {"per_second": 10000, "burst_count": 10000}, From db3046f5657af0f4f512b7f9aeb0d424808c8663 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 14 May 2019 14:39:27 +0100 Subject: [PATCH 06/13] Newsfile --- changelog.d/5185.misc | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/5185.misc diff --git a/changelog.d/5185.misc b/changelog.d/5185.misc new file mode 100644 index 000000000000..d148b03b515f --- /dev/null +++ b/changelog.d/5185.misc @@ -0,0 +1 @@ +Update tests to consistently be configured via the same code that is used when loading from configuration files. From daa2fb63179345b993f213f126d9f7bc019f3281 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Tue, 14 May 2019 18:53:09 +0100 Subject: [PATCH 07/13] comment about user_joined_room --- synapse/util/distributor.py | 1 + 1 file changed, 1 insertion(+) diff --git a/synapse/util/distributor.py b/synapse/util/distributor.py index 194da87639f3..e14c8bdfda43 100644 --- a/synapse/util/distributor.py +++ b/synapse/util/distributor.py @@ -27,6 +27,7 @@ def user_left_room(distributor, user, room_id): distributor.fire("user_left_room", user=user, room_id=room_id) +# XXX: this is no longer used. We should probably kill it. def user_joined_room(distributor, user, room_id): distributor.fire("user_joined_room", user=user, room_id=room_id) From 52ddc6c0ed40dfc15e8e4abd383a2a5fc12fcfec Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 15 May 2019 09:52:15 +0100 Subject: [PATCH 08/13] Update docstring with correct type Co-Authored-By: Richard van der Hoff <1389908+richvdh@users.noreply.github.com> --- synapse/storage/events_worker.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/storage/events_worker.py b/synapse/storage/events_worker.py index 5c40056d11e2..adc6cf26b5c6 100644 --- a/synapse/storage/events_worker.py +++ b/synapse/storage/events_worker.py @@ -171,7 +171,7 @@ def get_events_as_list( allow_rejected (bool): If True return rejected events. Returns: - Deferred[list]: List of events fetched from the database. The + Deferred[list[EventBase]]: List of events fetched from the database. The events are in the same order as `event_ids` arg. Note that the returned list may be smaller than the list of event From 8ed2f182f74b5a99d643dd0ee5bad34211cd1311 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 15 May 2019 09:52:52 +0100 Subject: [PATCH 09/13] Update docstring with correct return type Co-Authored-By: Richard van der Hoff <1389908+richvdh@users.noreply.github.com> --- synapse/util/async_helpers.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/util/async_helpers.py b/synapse/util/async_helpers.py index 9a17dfdab2fc..7253ba120fbc 100644 --- a/synapse/util/async_helpers.py +++ b/synapse/util/async_helpers.py @@ -166,7 +166,7 @@ def yieldable_gather_results(func, iter, *args, **kwargs): *args: Arguments to be passed to each call to func Returns - Deferred: Resolved when all functions have been invoked, or errors if + Deferred[list]: Resolved when all functions have been invoked, or errors if one of the function calls fails. """ return logcontext.make_deferred_yieldable(defer.gatherResults([ From 4a926f528e88ae4dc2a3dd2e1fd86166763c4bb1 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Wed, 15 May 2019 13:58:45 +0100 Subject: [PATCH 10/13] 0.99.4 --- CHANGES.md | 6 ++++++ debian/changelog | 7 +++++-- synapse/__init__.py | 2 +- 3 files changed, 12 insertions(+), 3 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index 3cacca5a6bd6..91ddd07bc3f7 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -1,3 +1,9 @@ +Synapse 0.99.4 (2019-05-15) +=========================== + +No significant changes. + + Synapse 0.99.4rc1 (2019-05-13) ============================== diff --git a/debian/changelog b/debian/changelog index 454fa8eb164a..35cf8ffb20cb 100644 --- a/debian/changelog +++ b/debian/changelog @@ -1,9 +1,12 @@ -matrix-synapse-py3 (0.99.3.2+nmu1) UNRELEASED; urgency=medium +matrix-synapse-py3 (0.99.4) stable; urgency=medium [ Christoph Müller ] * Configure the systemd units to have a log identifier of `matrix-synapse` - -- Christoph Müller Wed, 17 Apr 2019 16:17:32 +0200 + [ Synapse Packaging team ] + * New synapse release 0.99.4. + + -- Synapse Packaging team Wed, 15 May 2019 13:58:08 +0100 matrix-synapse-py3 (0.99.3.2) stable; urgency=medium diff --git a/synapse/__init__.py b/synapse/__init__.py index cd9cfb240940..bf9e810da6a6 100644 --- a/synapse/__init__.py +++ b/synapse/__init__.py @@ -27,4 +27,4 @@ except ImportError: pass -__version__ = "0.99.4rc1" +__version__ = "0.99.4" From 13018bb997962fc00fc874c2508d8e76d4cb4603 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Wed, 15 May 2019 14:04:02 +0100 Subject: [PATCH 11/13] fix some typos in the changelog --- CHANGES.md | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index 91ddd07bc3f7..1e9c3cf95331 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -23,8 +23,8 @@ Features instead of the executable name, `python`. Contributed by Christoph Müller. ([\#5023](https://github.com/matrix-org/synapse/issues/5023)) - Add time-based account expiration. ([\#5027](https://github.com/matrix-org/synapse/issues/5027), [\#5047](https://github.com/matrix-org/synapse/issues/5047), [\#5073](https://github.com/matrix-org/synapse/issues/5073), [\#5116](https://github.com/matrix-org/synapse/issues/5116)) -- Add support for handling /verions, /voip and /push_rules client endpoints to client_reader worker. ([\#5063](https://github.com/matrix-org/synapse/issues/5063), [\#5065](https://github.com/matrix-org/synapse/issues/5065), [\#5070](https://github.com/matrix-org/synapse/issues/5070)) -- Add an configuration option to require authentication on /publicRooms and /profile endpoints. ([\#5083](https://github.com/matrix-org/synapse/issues/5083)) +- Add support for handling `/versions`, `/voip` and `/push_rules` client endpoints to client_reader worker. ([\#5063](https://github.com/matrix-org/synapse/issues/5063), [\#5065](https://github.com/matrix-org/synapse/issues/5065), [\#5070](https://github.com/matrix-org/synapse/issues/5070)) +- Add a configuration option to require authentication on /publicRooms and /profile endpoints. ([\#5083](https://github.com/matrix-org/synapse/issues/5083)) - Move admin APIs to `/_synapse/admin/v1`. (The old paths are retained for backwards-compatibility, for now). ([\#5119](https://github.com/matrix-org/synapse/issues/5119)) - Implement an admin API for sending server notices. Many thanks to @krombel who provided a foundation for this work. ([\#5121](https://github.com/matrix-org/synapse/issues/5121), [\#5142](https://github.com/matrix-org/synapse/issues/5142)) @@ -45,11 +45,9 @@ Bugfixes - Workaround bug in twisted where attempting too many concurrent DNS requests could cause it to hang due to running out of file descriptors. ([\#5037](https://github.com/matrix-org/synapse/issues/5037)) - Make sure we're not registering the same 3pid twice on registration. ([\#5071](https://github.com/matrix-org/synapse/issues/5071)) - Don't crash on lack of expiry templates. ([\#5077](https://github.com/matrix-org/synapse/issues/5077)) -- Fix the ratelimting on third party invites. ([\#5104](https://github.com/matrix-org/synapse/issues/5104)) +- Fix the ratelimiting on third party invites. ([\#5104](https://github.com/matrix-org/synapse/issues/5104)) - Add some missing limitations to room alias creation. ([\#5124](https://github.com/matrix-org/synapse/issues/5124), [\#5128](https://github.com/matrix-org/synapse/issues/5128)) - Limit the number of EDUs in transactions to 100 as expected by synapse. Thanks to @superboum for this work! ([\#5138](https://github.com/matrix-org/synapse/issues/5138)) -- Fix bogus imports in unit tests. ([\#5154](https://github.com/matrix-org/synapse/issues/5154)) - Internal Changes ---------------- @@ -84,6 +82,7 @@ Internal Changes - Prevent an exception from being raised in a IResolutionReceiver and use a more generic error message for blacklisted URL previews. ([\#5155](https://github.com/matrix-org/synapse/issues/5155)) - Run `black` on the tests directory. ([\#5170](https://github.com/matrix-org/synapse/issues/5170)) - Fix CI after new release of isort. ([\#5179](https://github.com/matrix-org/synapse/issues/5179)) +- Fix bogus imports in unit tests. ([\#5154](https://github.com/matrix-org/synapse/issues/5154)) Synapse 0.99.3.2 (2019-05-03) From 5f027a315fbf010c213ca6f88141404ed86d05ef Mon Sep 17 00:00:00 2001 From: Richard van der Hoff <1389908+richvdh@users.noreply.github.com> Date: Wed, 15 May 2019 17:37:46 +0100 Subject: [PATCH 12/13] Drop support for v2_alpha API prefix (#5190) --- changelog.d/5190.feature | 1 + synapse/api/urls.py | 3 +-- synapse/rest/client/v1/base.py | 8 ++++---- synapse/rest/client/v2_alpha/_base.py | 9 +++------ synapse/rest/client/v2_alpha/auth.py | 18 +++++++++--------- synapse/rest/client/v2_alpha/devices.py | 6 +++--- .../v2_alpha/room_upgrade_rest_servlet.py | 1 - synapse/rest/client/v2_alpha/sendtodevice.py | 1 - 8 files changed, 21 insertions(+), 26 deletions(-) create mode 100644 changelog.d/5190.feature diff --git a/changelog.d/5190.feature b/changelog.d/5190.feature new file mode 100644 index 000000000000..34904aa7a8cb --- /dev/null +++ b/changelog.d/5190.feature @@ -0,0 +1 @@ +Drop support for the undocumented /_matrix/client/v2_alpha API prefix. diff --git a/synapse/api/urls.py b/synapse/api/urls.py index cb71d8087592..3c6bddff7a9d 100644 --- a/synapse/api/urls.py +++ b/synapse/api/urls.py @@ -22,8 +22,7 @@ from synapse.config import ConfigError -CLIENT_PREFIX = "/_matrix/client/api/v1" -CLIENT_V2_ALPHA_PREFIX = "/_matrix/client/v2_alpha" +CLIENT_API_PREFIX = "/_matrix/client" FEDERATION_PREFIX = "/_matrix/federation" FEDERATION_V1_PREFIX = FEDERATION_PREFIX + "/v1" FEDERATION_V2_PREFIX = FEDERATION_PREFIX + "/v2" diff --git a/synapse/rest/client/v1/base.py b/synapse/rest/client/v1/base.py index c77d7aba6861..dc63b661c02d 100644 --- a/synapse/rest/client/v1/base.py +++ b/synapse/rest/client/v1/base.py @@ -19,7 +19,7 @@ import logging import re -from synapse.api.urls import CLIENT_PREFIX +from synapse.api.urls import CLIENT_API_PREFIX from synapse.http.servlet import RestServlet from synapse.rest.client.transactions import HttpTransactionCache @@ -36,12 +36,12 @@ def client_path_patterns(path_regex, releases=(0,), include_in_unstable=True): Returns: SRE_Pattern """ - patterns = [re.compile("^" + CLIENT_PREFIX + path_regex)] + patterns = [re.compile("^" + CLIENT_API_PREFIX + "/api/v1" + path_regex)] if include_in_unstable: - unstable_prefix = CLIENT_PREFIX.replace("/api/v1", "/unstable") + unstable_prefix = CLIENT_API_PREFIX + "/unstable" patterns.append(re.compile("^" + unstable_prefix + path_regex)) for release in releases: - new_prefix = CLIENT_PREFIX.replace("/api/v1", "/r%d" % release) + new_prefix = CLIENT_API_PREFIX + "/r%d" % (release,) patterns.append(re.compile("^" + new_prefix + path_regex)) return patterns diff --git a/synapse/rest/client/v2_alpha/_base.py b/synapse/rest/client/v2_alpha/_base.py index 77434937ffd4..24ac26bf0332 100644 --- a/synapse/rest/client/v2_alpha/_base.py +++ b/synapse/rest/client/v2_alpha/_base.py @@ -21,13 +21,12 @@ from twisted.internet import defer from synapse.api.errors import InteractiveAuthIncompleteError -from synapse.api.urls import CLIENT_V2_ALPHA_PREFIX +from synapse.api.urls import CLIENT_API_PREFIX logger = logging.getLogger(__name__) def client_v2_patterns(path_regex, releases=(0,), - v2_alpha=True, unstable=True): """Creates a regex compiled client path with the correct client path prefix. @@ -39,13 +38,11 @@ def client_v2_patterns(path_regex, releases=(0,), SRE_Pattern """ patterns = [] - if v2_alpha: - patterns.append(re.compile("^" + CLIENT_V2_ALPHA_PREFIX + path_regex)) if unstable: - unstable_prefix = CLIENT_V2_ALPHA_PREFIX.replace("/v2_alpha", "/unstable") + unstable_prefix = CLIENT_API_PREFIX + "/unstable" patterns.append(re.compile("^" + unstable_prefix + path_regex)) for release in releases: - new_prefix = CLIENT_V2_ALPHA_PREFIX.replace("/v2_alpha", "/r%d" % release) + new_prefix = CLIENT_API_PREFIX + "/r%d" % (release,) patterns.append(re.compile("^" + new_prefix + path_regex)) return patterns diff --git a/synapse/rest/client/v2_alpha/auth.py b/synapse/rest/client/v2_alpha/auth.py index ac035c7735b3..4c380ab84da3 100644 --- a/synapse/rest/client/v2_alpha/auth.py +++ b/synapse/rest/client/v2_alpha/auth.py @@ -19,7 +19,7 @@ from synapse.api.constants import LoginType from synapse.api.errors import SynapseError -from synapse.api.urls import CLIENT_V2_ALPHA_PREFIX +from synapse.api.urls import CLIENT_API_PREFIX from synapse.http.server import finish_request from synapse.http.servlet import RestServlet, parse_string @@ -139,8 +139,8 @@ def on_GET(self, request, stagetype): if stagetype == LoginType.RECAPTCHA: html = RECAPTCHA_TEMPLATE % { 'session': session, - 'myurl': "%s/auth/%s/fallback/web" % ( - CLIENT_V2_ALPHA_PREFIX, LoginType.RECAPTCHA + 'myurl': "%s/r0/auth/%s/fallback/web" % ( + CLIENT_API_PREFIX, LoginType.RECAPTCHA ), 'sitekey': self.hs.config.recaptcha_public_key, } @@ -159,8 +159,8 @@ def on_GET(self, request, stagetype): self.hs.config.public_baseurl, self.hs.config.user_consent_version, ), - 'myurl': "%s/auth/%s/fallback/web" % ( - CLIENT_V2_ALPHA_PREFIX, LoginType.TERMS + 'myurl': "%s/r0/auth/%s/fallback/web" % ( + CLIENT_API_PREFIX, LoginType.TERMS ), } html_bytes = html.encode("utf8") @@ -203,8 +203,8 @@ def on_POST(self, request, stagetype): else: html = RECAPTCHA_TEMPLATE % { 'session': session, - 'myurl': "%s/auth/%s/fallback/web" % ( - CLIENT_V2_ALPHA_PREFIX, LoginType.RECAPTCHA + 'myurl': "%s/r0/auth/%s/fallback/web" % ( + CLIENT_API_PREFIX, LoginType.RECAPTCHA ), 'sitekey': self.hs.config.recaptcha_public_key, } @@ -240,8 +240,8 @@ def on_POST(self, request, stagetype): self.hs.config.public_baseurl, self.hs.config.user_consent_version, ), - 'myurl': "%s/auth/%s/fallback/web" % ( - CLIENT_V2_ALPHA_PREFIX, LoginType.TERMS + 'myurl': "%s/r0/auth/%s/fallback/web" % ( + CLIENT_API_PREFIX, LoginType.TERMS ), } html_bytes = html.encode("utf8") diff --git a/synapse/rest/client/v2_alpha/devices.py b/synapse/rest/client/v2_alpha/devices.py index 9b75bb137748..5a5be7c3904e 100644 --- a/synapse/rest/client/v2_alpha/devices.py +++ b/synapse/rest/client/v2_alpha/devices.py @@ -30,7 +30,7 @@ class DevicesRestServlet(RestServlet): - PATTERNS = client_v2_patterns("/devices$", v2_alpha=False) + PATTERNS = client_v2_patterns("/devices$") def __init__(self, hs): """ @@ -56,7 +56,7 @@ class DeleteDevicesRestServlet(RestServlet): API for bulk deletion of devices. Accepts a JSON object with a devices key which lists the device_ids to delete. Requires user interactive auth. """ - PATTERNS = client_v2_patterns("/delete_devices", v2_alpha=False) + PATTERNS = client_v2_patterns("/delete_devices") def __init__(self, hs): super(DeleteDevicesRestServlet, self).__init__() @@ -95,7 +95,7 @@ def on_POST(self, request): class DeviceRestServlet(RestServlet): - PATTERNS = client_v2_patterns("/devices/(?P[^/]*)$", v2_alpha=False) + PATTERNS = client_v2_patterns("/devices/(?P[^/]*)$") def __init__(self, hs): """ diff --git a/synapse/rest/client/v2_alpha/room_upgrade_rest_servlet.py b/synapse/rest/client/v2_alpha/room_upgrade_rest_servlet.py index 3db7ff8d1b23..62b8de71fa03 100644 --- a/synapse/rest/client/v2_alpha/room_upgrade_rest_servlet.py +++ b/synapse/rest/client/v2_alpha/room_upgrade_rest_servlet.py @@ -50,7 +50,6 @@ class RoomUpgradeRestServlet(RestServlet): PATTERNS = client_v2_patterns( # /rooms/$roomid/upgrade "/rooms/(?P[^/]*)/upgrade$", - v2_alpha=False, ) def __init__(self, hs): diff --git a/synapse/rest/client/v2_alpha/sendtodevice.py b/synapse/rest/client/v2_alpha/sendtodevice.py index a9e9a47a0b06..21e9cef2d0aa 100644 --- a/synapse/rest/client/v2_alpha/sendtodevice.py +++ b/synapse/rest/client/v2_alpha/sendtodevice.py @@ -29,7 +29,6 @@ class SendToDeviceRestServlet(servlet.RestServlet): PATTERNS = client_v2_patterns( "/sendToDevice/(?P[^/]*)/(?P[^/]*)$", - v2_alpha=False ) def __init__(self, hs): From f1e5b413886ba4d9d0a16b028dba89c4a5cb56ac Mon Sep 17 00:00:00 2001 From: Amber Brown Date: Wed, 15 May 2019 12:06:04 -0500 Subject: [PATCH 13/13] Make all the rate limiting options more consistent (#5181) --- changelog.d/5181.feature | 1 + docs/sample_config.yaml | 53 +++++------ synapse/config/ratelimiting.py | 115 ++++++++++++++--------- synapse/federation/transport/server.py | 6 +- synapse/handlers/_base.py | 4 +- synapse/rest/client/v2_alpha/register.py | 23 +++-- synapse/util/ratelimitutils.py | 47 +++------ tests/utils.py | 20 ++-- 8 files changed, 138 insertions(+), 131 deletions(-) create mode 100644 changelog.d/5181.feature diff --git a/changelog.d/5181.feature b/changelog.d/5181.feature new file mode 100644 index 000000000000..5ce13aa2ea51 --- /dev/null +++ b/changelog.d/5181.feature @@ -0,0 +1 @@ +Ratelimiting configuration for clients sending messages and the federation server has been altered to match login ratelimiting. The old configuration names will continue working. Check the sample config for details of the new names. diff --git a/docs/sample_config.yaml b/docs/sample_config.yaml index c4e5c4cf3993..09ee0e8984d8 100644 --- a/docs/sample_config.yaml +++ b/docs/sample_config.yaml @@ -446,21 +446,15 @@ log_config: "CONFDIR/SERVERNAME.log.config" ## Ratelimiting ## -# Number of messages a client can send per second -# -#rc_messages_per_second: 0.2 - -# Number of message a client can send before being throttled -# -#rc_message_burst_count: 10.0 - -# Ratelimiting settings for registration and login. +# Ratelimiting settings for client actions (registration, login, messaging). # # Each ratelimiting configuration is made of two parameters: # - per_second: number of requests a client can send per second. # - burst_count: number of requests a client can send before being throttled. # # Synapse currently uses the following configurations: +# - one for messages that ratelimits sending based on the account the client +# is using # - one for registration that ratelimits registration requests based on the # client's IP address. # - one for login that ratelimits login requests based on the client's IP @@ -473,6 +467,10 @@ log_config: "CONFDIR/SERVERNAME.log.config" # # The defaults are as shown below. # +#rc_message: +# per_second: 0.2 +# burst_count: 10 +# #rc_registration: # per_second: 0.17 # burst_count: 3 @@ -488,29 +486,28 @@ log_config: "CONFDIR/SERVERNAME.log.config" # per_second: 0.17 # burst_count: 3 -# The federation window size in milliseconds -# -#federation_rc_window_size: 1000 -# The number of federation requests from a single server in a window -# before the server will delay processing the request. +# Ratelimiting settings for incoming federation # -#federation_rc_sleep_limit: 10 - -# The duration in milliseconds to delay processing events from -# remote servers by if they go over the sleep limit. -# -#federation_rc_sleep_delay: 500 - -# The maximum number of concurrent federation requests allowed -# from a single server +# The rc_federation configuration is made up of the following settings: +# - window_size: window size in milliseconds +# - sleep_limit: number of federation requests from a single server in +# a window before the server will delay processing the request. +# - sleep_delay: duration in milliseconds to delay processing events +# from remote servers by if they go over the sleep limit. +# - reject_limit: maximum number of concurrent federation requests +# allowed from a single server +# - concurrent: number of federation requests to concurrently process +# from a single server # -#federation_rc_reject_limit: 50 - -# The number of federation requests to concurrently process from a -# single server +# The defaults are as shown below. # -#federation_rc_concurrent: 3 +#rc_federation: +# window_size: 1000 +# sleep_limit: 10 +# sleep_delay: 500 +# reject_limit: 50 +# concurrent: 3 # Target outgoing federation transaction frequency for sending read-receipts, # per-room. diff --git a/synapse/config/ratelimiting.py b/synapse/config/ratelimiting.py index 5a68399e6313..5a9adac480fc 100644 --- a/synapse/config/ratelimiting.py +++ b/synapse/config/ratelimiting.py @@ -16,16 +16,56 @@ class RateLimitConfig(object): - def __init__(self, config): - self.per_second = config.get("per_second", 0.17) - self.burst_count = config.get("burst_count", 3.0) + def __init__(self, config, defaults={"per_second": 0.17, "burst_count": 3.0}): + self.per_second = config.get("per_second", defaults["per_second"]) + self.burst_count = config.get("burst_count", defaults["burst_count"]) -class RatelimitConfig(Config): +class FederationRateLimitConfig(object): + _items_and_default = { + "window_size": 10000, + "sleep_limit": 10, + "sleep_delay": 500, + "reject_limit": 50, + "concurrent": 3, + } + + def __init__(self, **kwargs): + for i in self._items_and_default.keys(): + setattr(self, i, kwargs.get(i) or self._items_and_default[i]) + +class RatelimitConfig(Config): def read_config(self, config): - self.rc_messages_per_second = config.get("rc_messages_per_second", 0.2) - self.rc_message_burst_count = config.get("rc_message_burst_count", 10.0) + + # Load the new-style messages config if it exists. Otherwise fall back + # to the old method. + if "rc_message" in config: + self.rc_message = RateLimitConfig( + config["rc_message"], defaults={"per_second": 0.2, "burst_count": 10.0} + ) + else: + self.rc_message = RateLimitConfig( + { + "per_second": config.get("rc_messages_per_second", 0.2), + "burst_count": config.get("rc_message_burst_count", 10.0), + } + ) + + # Load the new-style federation config, if it exists. Otherwise, fall + # back to the old method. + if "federation_rc" in config: + self.rc_federation = FederationRateLimitConfig(**config["rc_federation"]) + else: + self.rc_federation = FederationRateLimitConfig( + **{ + "window_size": config.get("federation_rc_window_size"), + "sleep_limit": config.get("federation_rc_sleep_limit"), + "sleep_delay": config.get("federation_rc_sleep_delay"), + "reject_limit": config.get("federation_rc_reject_limit"), + "concurrent": config.get("federation_rc_concurrent"), + } + ) self.rc_registration = RateLimitConfig(config.get("rc_registration", {})) @@ -33,38 +73,26 @@ def read_config(self, config): self.rc_login_address = RateLimitConfig(rc_login_config.get("address", {})) self.rc_login_account = RateLimitConfig(rc_login_config.get("account", {})) self.rc_login_failed_attempts = RateLimitConfig( - rc_login_config.get("failed_attempts", {}), + rc_login_config.get("failed_attempts", {}) ) - self.federation_rc_window_size = config.get("federation_rc_window_size", 1000) - self.federation_rc_sleep_limit = config.get("federation_rc_sleep_limit", 10) - self.federation_rc_sleep_delay = config.get("federation_rc_sleep_delay", 500) - self.federation_rc_reject_limit = config.get("federation_rc_reject_limit", 50) - self.federation_rc_concurrent = config.get("federation_rc_concurrent", 3) - self.federation_rr_transactions_per_room_per_second = config.get( - "federation_rr_transactions_per_room_per_second", 50, + "federation_rr_transactions_per_room_per_second", 50 ) def default_config(self, **kwargs): return """\ ## Ratelimiting ## - # Number of messages a client can send per second - # - #rc_messages_per_second: 0.2 - - # Number of message a client can send before being throttled - # - #rc_message_burst_count: 10.0 - - # Ratelimiting settings for registration and login. + # Ratelimiting settings for client actions (registration, login, messaging). # # Each ratelimiting configuration is made of two parameters: # - per_second: number of requests a client can send per second. # - burst_count: number of requests a client can send before being throttled. # # Synapse currently uses the following configurations: + # - one for messages that ratelimits sending based on the account the client + # is using # - one for registration that ratelimits registration requests based on the # client's IP address. # - one for login that ratelimits login requests based on the client's IP @@ -77,6 +105,10 @@ def default_config(self, **kwargs): # # The defaults are as shown below. # + #rc_message: + # per_second: 0.2 + # burst_count: 10 + # #rc_registration: # per_second: 0.17 # burst_count: 3 @@ -92,29 +124,28 @@ def default_config(self, **kwargs): # per_second: 0.17 # burst_count: 3 - # The federation window size in milliseconds - # - #federation_rc_window_size: 1000 - - # The number of federation requests from a single server in a window - # before the server will delay processing the request. - # - #federation_rc_sleep_limit: 10 - # The duration in milliseconds to delay processing events from - # remote servers by if they go over the sleep limit. + # Ratelimiting settings for incoming federation # - #federation_rc_sleep_delay: 500 - - # The maximum number of concurrent federation requests allowed - # from a single server + # The rc_federation configuration is made up of the following settings: + # - window_size: window size in milliseconds + # - sleep_limit: number of federation requests from a single server in + # a window before the server will delay processing the request. + # - sleep_delay: duration in milliseconds to delay processing events + # from remote servers by if they go over the sleep limit. + # - reject_limit: maximum number of concurrent federation requests + # allowed from a single server + # - concurrent: number of federation requests to concurrently process + # from a single server # - #federation_rc_reject_limit: 50 - - # The number of federation requests to concurrently process from a - # single server + # The defaults are as shown below. # - #federation_rc_concurrent: 3 + #rc_federation: + # window_size: 1000 + # sleep_limit: 10 + # sleep_delay: 500 + # reject_limit: 50 + # concurrent: 3 # Target outgoing federation transaction frequency for sending read-receipts, # per-room. diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py index 9030eb18c544..385eda2dca91 100644 --- a/synapse/federation/transport/server.py +++ b/synapse/federation/transport/server.py @@ -63,11 +63,7 @@ def __init__(self, hs, servlet_groups=None): self.authenticator = Authenticator(hs) self.ratelimiter = FederationRateLimiter( self.clock, - window_size=hs.config.federation_rc_window_size, - sleep_limit=hs.config.federation_rc_sleep_limit, - sleep_msec=hs.config.federation_rc_sleep_delay, - reject_limit=hs.config.federation_rc_reject_limit, - concurrent_requests=hs.config.federation_rc_concurrent, + config=hs.config.rc_federation, ) self.register_servlets() diff --git a/synapse/handlers/_base.py b/synapse/handlers/_base.py index ac09d03ba9ad..dca337ec61f3 100644 --- a/synapse/handlers/_base.py +++ b/synapse/handlers/_base.py @@ -90,8 +90,8 @@ def ratelimit(self, requester, update=True): messages_per_second = override.messages_per_second burst_count = override.burst_count else: - messages_per_second = self.hs.config.rc_messages_per_second - burst_count = self.hs.config.rc_message_burst_count + messages_per_second = self.hs.config.rc_message.per_second + burst_count = self.hs.config.rc_message.burst_count allowed, time_allowed = self.ratelimiter.can_do_action( user_id, time_now, diff --git a/synapse/rest/client/v2_alpha/register.py b/synapse/rest/client/v2_alpha/register.py index dc3e265bcd1d..3d045880b9b6 100644 --- a/synapse/rest/client/v2_alpha/register.py +++ b/synapse/rest/client/v2_alpha/register.py @@ -31,6 +31,7 @@ SynapseError, UnrecognizedRequestError, ) +from synapse.config.ratelimiting import FederationRateLimitConfig from synapse.config.server import is_threepid_reserved from synapse.http.servlet import ( RestServlet, @@ -153,16 +154,18 @@ def __init__(self, hs): self.registration_handler = hs.get_registration_handler() self.ratelimiter = FederationRateLimiter( hs.get_clock(), - # Time window of 2s - window_size=2000, - # Artificially delay requests if rate > sleep_limit/window_size - sleep_limit=1, - # Amount of artificial delay to apply - sleep_msec=1000, - # Error with 429 if more than reject_limit requests are queued - reject_limit=1, - # Allow 1 request at a time - concurrent_requests=1, + FederationRateLimitConfig( + # Time window of 2s + window_size=2000, + # Artificially delay requests if rate > sleep_limit/window_size + sleep_limit=1, + # Amount of artificial delay to apply + sleep_msec=1000, + # Error with 429 if more than reject_limit requests are queued + reject_limit=1, + # Allow 1 request at a time + concurrent_requests=1, + ) ) @defer.inlineCallbacks diff --git a/synapse/util/ratelimitutils.py b/synapse/util/ratelimitutils.py index 7deb38f2a732..b146d137f46b 100644 --- a/synapse/util/ratelimitutils.py +++ b/synapse/util/ratelimitutils.py @@ -30,31 +30,14 @@ class FederationRateLimiter(object): - def __init__(self, clock, window_size, sleep_limit, sleep_msec, - reject_limit, concurrent_requests): + def __init__(self, clock, config): """ Args: clock (Clock) - window_size (int): The window size in milliseconds. - sleep_limit (int): The number of requests received in the last - `window_size` milliseconds before we artificially start - delaying processing of requests. - sleep_msec (int): The number of milliseconds to delay processing - of incoming requests by. - reject_limit (int): The maximum number of requests that are can be - queued for processing before we start rejecting requests with - a 429 Too Many Requests response. - concurrent_requests (int): The number of concurrent requests to - process. + config (FederationRateLimitConfig) """ self.clock = clock - - self.window_size = window_size - self.sleep_limit = sleep_limit - self.sleep_msec = sleep_msec - self.reject_limit = reject_limit - self.concurrent_requests = concurrent_requests - + self._config = config self.ratelimiters = {} def ratelimit(self, host): @@ -76,25 +59,25 @@ def ratelimit(self, host): host, _PerHostRatelimiter( clock=self.clock, - window_size=self.window_size, - sleep_limit=self.sleep_limit, - sleep_msec=self.sleep_msec, - reject_limit=self.reject_limit, - concurrent_requests=self.concurrent_requests, + config=self._config, ) ).ratelimit() class _PerHostRatelimiter(object): - def __init__(self, clock, window_size, sleep_limit, sleep_msec, - reject_limit, concurrent_requests): + def __init__(self, clock, config): + """ + Args: + clock (Clock) + config (FederationRateLimitConfig) + """ self.clock = clock - self.window_size = window_size - self.sleep_limit = sleep_limit - self.sleep_sec = sleep_msec / 1000.0 - self.reject_limit = reject_limit - self.concurrent_requests = concurrent_requests + self.window_size = config.window_size + self.sleep_limit = config.sleep_limit + self.sleep_sec = config.sleep_delay / 1000.0 + self.reject_limit = config.reject_limit + self.concurrent_requests = config.concurrent # request_id objects for requests which have been slept self.sleeping_requests = set() diff --git a/tests/utils.py b/tests/utils.py index f38533a0c734..200c1ceabe0f 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -134,10 +134,6 @@ def default_config(name, parse=False): "email_enable_notifs": False, "block_non_admin_invites": False, "federation_domain_whitelist": None, - "federation_rc_reject_limit": 10, - "federation_rc_sleep_limit": 10, - "federation_rc_sleep_delay": 100, - "federation_rc_concurrent": 10, "filter_timeline_limit": 5000, "user_directory_search_all_users": False, "user_consent_server_notice_content": None, @@ -156,8 +152,13 @@ def default_config(name, parse=False): "mau_stats_only": False, "mau_limits_reserved_threepids": [], "admin_contact": None, - "rc_messages_per_second": 10000, - "rc_message_burst_count": 10000, + "rc_federation": { + "reject_limit": 10, + "sleep_limit": 10, + "sleep_delay": 10, + "concurrent": 10, + }, + "rc_message": {"per_second": 10000, "burst_count": 10000}, "rc_registration": {"per_second": 10000, "burst_count": 10000}, "rc_login": { "address": {"per_second": 10000, "burst_count": 10000}, @@ -375,12 +376,7 @@ def register_federation_servlets(hs, resource): resource=resource, authenticator=federation_server.Authenticator(hs), ratelimiter=FederationRateLimiter( - hs.get_clock(), - window_size=hs.config.federation_rc_window_size, - sleep_limit=hs.config.federation_rc_sleep_limit, - sleep_msec=hs.config.federation_rc_sleep_delay, - reject_limit=hs.config.federation_rc_reject_limit, - concurrent_requests=hs.config.federation_rc_concurrent, + hs.get_clock(), config=hs.config.rc_federation ), )