diff --git a/changelog.d/7337.bugfix b/changelog.d/7337.bugfix new file mode 100644 index 000000000000..f49c600173b9 --- /dev/null +++ b/changelog.d/7337.bugfix @@ -0,0 +1 @@ +Fix a bug where event updates might not be sent over replication to worker processes after the stream falls behind. diff --git a/synapse/replication/http/streams.py b/synapse/replication/http/streams.py index ffd4c6199378..f35cebc710e0 100644 --- a/synapse/replication/http/streams.py +++ b/synapse/replication/http/streams.py @@ -28,7 +28,7 @@ class ReplicationGetStreamUpdates(ReplicationEndpoint): The API looks like: - GET /_synapse/replication/get_repl_stream_updates/events?from_token=0&to_token=10&limit=100 + GET /_synapse/replication/get_repl_stream_updates/?from_token=0&to_token=10 200 OK @@ -38,6 +38,9 @@ class ReplicationGetStreamUpdates(ReplicationEndpoint): limited: False, } + If there are more rows than can sensibly be returned in one lump, `limited` will be + set to true, and the caller should call again with a new `from_token`. + """ NAME = "get_repl_stream_updates" @@ -52,8 +55,8 @@ def __init__(self, hs): self.streams = hs.get_replication_streamer().get_streams() @staticmethod - def _serialize_payload(stream_name, from_token, upto_token, limit): - return {"from_token": from_token, "upto_token": upto_token, "limit": limit} + def _serialize_payload(stream_name, from_token, upto_token): + return {"from_token": from_token, "upto_token": upto_token} async def _handle_request(self, request, stream_name): stream = self.streams.get(stream_name) @@ -62,10 +65,9 @@ async def _handle_request(self, request, stream_name): from_token = parse_integer(request, "from_token", required=True) upto_token = parse_integer(request, "upto_token", required=True) - limit = parse_integer(request, "limit", required=True) updates, upto_token, limited = await stream.get_updates_since( - from_token, upto_token, limit + from_token, upto_token ) return ( diff --git a/synapse/replication/tcp/streams/_base.py b/synapse/replication/tcp/streams/_base.py index a860072ccf4a..4ae3cffb1e14 100644 --- a/synapse/replication/tcp/streams/_base.py +++ b/synapse/replication/tcp/streams/_base.py @@ -24,8 +24,8 @@ logger = logging.getLogger(__name__) - -MAX_EVENTS_BEHIND = 500000 +# the number of rows to request from an update_function. +_STREAM_UPDATE_TARGET_ROW_COUNT = 100 # Some type aliases to make things a bit easier. @@ -56,7 +56,11 @@ # * from_token: the previous stream token: the starting point for fetching the # updates # * to_token: the new stream token: the point to get updates up to -# * limit: the maximum number of rows to return +# * target_row_count: a target for the number of rows to be returned. +# +# The update_function is expected to return up to _approximately_ target_row_count rows. +# If there are more updates available, it should set `limited` in the result, and +# it will be called again to get the next batch. # UpdateFunction = Callable[[Token, Token, int], Awaitable[StreamUpdateResult]] @@ -138,7 +142,7 @@ async def get_updates(self) -> StreamUpdateResult: return updates, current_token, limited async def get_updates_since( - self, from_token: Token, upto_token: Token, limit: int = 100 + self, from_token: Token, upto_token: Token ) -> StreamUpdateResult: """Like get_updates except allows specifying from when we should stream updates @@ -156,7 +160,7 @@ async def get_updates_since( return [], upto_token, False updates, upto_token, limited = await self.update_function( - from_token, upto_token, limit, + from_token, upto_token, _STREAM_UPDATE_TARGET_ROW_COUNT, ) return updates, upto_token, limited @@ -193,10 +197,7 @@ async def update_function( from_token: int, upto_token: int, limit: int ) -> StreamUpdateResult: result = await client( - stream_name=stream_name, - from_token=from_token, - upto_token=upto_token, - limit=limit, + stream_name=stream_name, from_token=from_token, upto_token=upto_token, ) return result["updates"], result["upto_token"], result["limited"] diff --git a/synapse/replication/tcp/streams/events.py b/synapse/replication/tcp/streams/events.py index 051114596b38..aa50492569cd 100644 --- a/synapse/replication/tcp/streams/events.py +++ b/synapse/replication/tcp/streams/events.py @@ -15,11 +15,12 @@ # limitations under the License. import heapq -from typing import Iterable, Tuple, Type +from collections import Iterable +from typing import List, Tuple, Type import attr -from ._base import Stream, Token, db_query_to_update_function +from ._base import Stream, StreamUpdateResult, Token """Handling of the 'events' replication stream @@ -117,30 +118,106 @@ class EventsStream(Stream): def __init__(self, hs): self._store = hs.get_datastore() super().__init__( - self._store.get_current_events_token, - db_query_to_update_function(self._update_function), + self._store.get_current_events_token, self._update_function, ) async def _update_function( - self, from_token: Token, current_token: Token, limit: int - ) -> Iterable[tuple]: + self, from_token: Token, current_token: Token, target_row_count: int + ) -> StreamUpdateResult: + + # the events stream merges together three separate sources: + # * new events + # * current_state changes + # * events which were previously outliers, but have now been de-outliered. + # + # The merge operation is complicated by the fact that we only have a single + # "stream token" which is supposed to indicate how far we have got through + # all three streams. It's therefore no good to return rows 1-1000 from the + # "new events" table if the state_deltas are limited to rows 1-100 by the + # target_row_count. + # + # In other words: we must pick a new upper limit, and must return *all* rows + # up to that point for each of the three sources. + # + # Start by trying to split the target_row_count up. We expect to have a + # negligible number of ex-outliers, and a rough approximation based on recent + # traffic on sw1v.org shows that there are approximately the same number of + # event rows between a given pair of stream ids as there are state + # updates, so let's split our target_row_count among those two types. The target + # is only an approximation - it doesn't matter if we end up going a bit over it. + + target_row_count //= 2 + + # now we fetch up to that many rows from the events table + event_rows = await self._store.get_all_new_forward_event_rows( - from_token, current_token, limit - ) - event_updates = ( - (row[0], EventsStreamEventRow.TypeId, row[1:]) for row in event_rows - ) + from_token, current_token, target_row_count + ) # type: List[Tuple] + + # we rely on get_all_new_forward_event_rows strictly honouring the limit, so + # that we know it is safe to just take upper_limit = event_rows[-1][0]. + assert ( + len(event_rows) <= target_row_count + ), "get_all_new_forward_event_rows did not honour row limit" + + # if we hit the limit on event_updates, there's no point in going beyond the + # last stream_id in the batch for the other sources. + + if len(event_rows) == target_row_count: + limited = True + upper_limit = event_rows[-1][0] # type: int + else: + limited = False + upper_limit = current_token + + # next up is the state delta table state_rows = await self._store.get_all_updated_current_state_deltas( - from_token, current_token, limit - ) - state_updates = ( - (row[0], EventsStreamCurrentStateRow.TypeId, row[1:]) for row in state_rows - ) + from_token, upper_limit, target_row_count + ) # type: List[Tuple] + + # again, if we've hit the limit there, we'll need to limit the other sources + assert len(state_rows) < target_row_count + if len(state_rows) == target_row_count: + assert state_rows[-1][0] <= upper_limit + upper_limit = state_rows[-1][0] + limited = True + + # FIXME: is it a given that there is only one row per stream_id in the + # state_deltas table (so that we can be sure that we have got all of the + # rows for upper_limit)? + + # finally, fetch the ex-outliers rows. We assume there are few enough of these + # not to bother with the limit. - all_updates = heapq.merge(event_updates, state_updates) + ex_outliers_rows = await self._store.get_ex_outlier_stream_rows( + from_token, upper_limit + ) # type: List[Tuple] - return all_updates + # we now need to turn the raw database rows returned into tuples suitable + # for the replication protocol (basically, we add an identifier to + # distinguish the row type). At the same time, we can limit the event_rows + # to the max stream_id from state_rows. + + event_updates = ( + (stream_id, (EventsStreamEventRow.TypeId, rest)) + for (stream_id, *rest) in event_rows + if stream_id <= upper_limit + ) # type: Iterable[Tuple[int, Tuple]] + + state_updates = ( + (stream_id, (EventsStreamCurrentStateRow.TypeId, rest)) + for (stream_id, *rest) in state_rows + ) # type: Iterable[Tuple[int, Tuple]] + + ex_outliers_updates = ( + (stream_id, (EventsStreamEventRow.TypeId, rest)) + for (stream_id, *rest) in ex_outliers_rows + ) # type: Iterable[Tuple[int, Tuple]] + + # we need to return a sorted list, so merge them together. + updates = list(heapq.merge(event_updates, state_updates, ex_outliers_updates)) + return updates, upper_limit, limited @classmethod def parse_row(cls, row): diff --git a/synapse/storage/data_stores/main/events_worker.py b/synapse/storage/data_stores/main/events_worker.py index accde349a7c0..ce8be72bfe88 100644 --- a/synapse/storage/data_stores/main/events_worker.py +++ b/synapse/storage/data_stores/main/events_worker.py @@ -973,8 +973,18 @@ def get_current_events_token(self): return self._stream_id_gen.get_current_token() def get_all_new_forward_event_rows(self, last_id, current_id, limit): - if last_id == current_id: - return defer.succeed([]) + """Returns new events, for the Events replication stream + + Args: + last_id: the last stream_id from the previous batch. + current_id: the maximum stream_id to return up to + limit: the maximum number of rows to return + + Returns: Deferred[List[Tuple]] + a list of events stream rows. Each tuple consists of a stream id as + the first element, followed by fields suitable for casting into an + EventsStreamRow. + """ def get_all_new_forward_event_rows(txn): sql = ( @@ -989,13 +999,26 @@ def get_all_new_forward_event_rows(txn): " LIMIT ?" ) txn.execute(sql, (last_id, current_id, limit)) - new_event_updates = txn.fetchall() + return txn.fetchall() - if len(new_event_updates) == limit: - upper_bound = new_event_updates[-1][0] - else: - upper_bound = current_id + return self.db.runInteraction( + "get_all_new_forward_event_rows", get_all_new_forward_event_rows + ) + + def get_ex_outlier_stream_rows(self, last_id, current_id): + """Returns de-outliered events, for the Events replication stream + Args: + last_id: the last stream_id from the previous batch. + current_id: the maximum stream_id to return up to + + Returns: Deferred[List[Tuple]] + a list of events stream rows. Each tuple consists of a stream id as + the first element, followed by fields suitable for casting into an + EventsStreamRow. + """ + + def get_ex_outlier_stream_rows_txn(txn): sql = ( "SELECT event_stream_ordering, e.event_id, e.room_id, e.type," " state_key, redacts, relates_to_id" @@ -1006,15 +1029,14 @@ def get_all_new_forward_event_rows(txn): " LEFT JOIN event_relations USING (event_id)" " WHERE ? < event_stream_ordering" " AND event_stream_ordering <= ?" - " ORDER BY event_stream_ordering DESC" + " ORDER BY event_stream_ordering ASC" ) - txn.execute(sql, (last_id, upper_bound)) - new_event_updates.extend(txn) - return new_event_updates + txn.execute(sql, (last_id, current_id)) + return txn.fetchall() return self.db.runInteraction( - "get_all_new_forward_event_rows", get_all_new_forward_event_rows + "get_ex_outlier_stream_rows", get_ex_outlier_stream_rows_txn ) def get_all_new_backfill_event_rows(self, last_id, current_id, limit):