Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Fix historical messages backfilling in random order on remote homeservers (MSC2716) #11114

Merged
Show file tree
Hide file tree
Changes from 53 commits
Commits
Show all changes
55 commits
Select commit Hold shift + click to select a range
f30302d
Scratch debugging why events appear out of order on remote homeservers
MadLittleMods Oct 18, 2021
438e222
Use OrderedDict to gurantee order returned is the same as we were bui…
MadLittleMods Oct 18, 2021
4983739
Avoid constant missing prev_event fetching while backfilling
MadLittleMods Oct 19, 2021
a64bb2e
Add changelog
MadLittleMods Oct 19, 2021
260ca06
Some more trials of trying to get many many events to backfill in ord…
MadLittleMods Oct 19, 2021
886071b
Fix backfill not picking up batch events connected to non-base insert…
MadLittleMods Oct 20, 2021
477c15d
Some more debug logging
MadLittleMods Oct 21, 2021
4191f56
Remove fake prev events from historical state chain
MadLittleMods Oct 21, 2021
f39c1da
Remove debug logging
MadLittleMods Oct 21, 2021
7da8012
Remove extra event info
MadLittleMods Oct 21, 2021
69dfa16
Move to sorting the backfill events in the existing sorted
MadLittleMods Oct 21, 2021
83474d9
Put MSC2716 backfill logic behind experimental feature flag
MadLittleMods Oct 21, 2021
1263c7e
Remove unused import
MadLittleMods Oct 21, 2021
ee47878
Fix mypy lints
MadLittleMods Oct 21, 2021
5bfde7b
Merge branch 'master' into madlittlemods/return-historical-events-in-…
MadLittleMods Oct 21, 2021
2fbe3f1
Merge branch 'develop' into madlittlemods/return-historical-events-in…
MadLittleMods Oct 21, 2021
1d3f417
Revert back to string interpolation for SQL boolean value
MadLittleMods Oct 21, 2021
4a12304
Put empty prev_events behind new room version
MadLittleMods Oct 28, 2021
9a6d8fa
WIP: Don't include the event we branch from
MadLittleMods Oct 29, 2021
3e09d49
Revert "WIP: Don't include the event we branch from"
MadLittleMods Oct 29, 2021
5afc264
WIP: Sort events topologically when we receive them over backfill
MadLittleMods Oct 29, 2021
6ea263b
Revert "WIP: Sort events topologically when we receive them over back…
MadLittleMods Oct 29, 2021
3d387f9
WIP: Sort events topologically when we receive them over backfill
MadLittleMods Oct 29, 2021
fb8e281
Fix direction of fake edges
MadLittleMods Oct 29, 2021
c772b35
Implement backfill in handler so we can do fetching later
MadLittleMods Oct 29, 2021
e0ff66d
Fix backfill being able to cleanly branch into history and back to "l…
MadLittleMods Oct 29, 2021
76d454f
Some backfill receive sorting fixes but not using it yet
MadLittleMods Oct 30, 2021
3529449
Fix lints
MadLittleMods Oct 30, 2021
321f9ea
Move back to the old get_backfill_events and simplify backfill.
MadLittleMods Nov 2, 2021
15c3282
Remove the new backfill implementation and pull some good parts of th…
MadLittleMods Nov 2, 2021
5db717a
Always process marker events regardless if backfilled
MadLittleMods Nov 3, 2021
e96fd5c
Add comment docs
MadLittleMods Nov 3, 2021
f3b7b3e
Add better explanatory comment
MadLittleMods Nov 3, 2021
7f2105a
Remove topological sort when receiving backfill events
MadLittleMods Nov 3, 2021
246278e
Fix lints
MadLittleMods Nov 3, 2021
ec35be5
Merge branch 'develop' into madlittlemods/return-historical-events-in…
MadLittleMods Nov 3, 2021
bc0ba8c
Protect from no auth events for non-existent provided prev_event
MadLittleMods Nov 3, 2021
363aed6
Revert unused refactor to get PDU raw
MadLittleMods Nov 3, 2021
d771fbd
Only run the tests package to get streaming Complement output
MadLittleMods Nov 11, 2021
b559e23
Merge branch 'develop' into madlittlemods/return-historical-events-in…
MadLittleMods Nov 11, 2021
6b64184
Merge branch 'develop' into madlittlemods/return-historical-events-in…
MadLittleMods Dec 9, 2021
1d00043
Merge branch 'develop' into madlittlemods/return-historical-events-in…
MadLittleMods Dec 16, 2021
b071426
Plumb allow_no_prev_events through for MSC2716
MadLittleMods Dec 16, 2021
ec33a40
Make the historical events float separately from the state chain
MadLittleMods Dec 16, 2021
b99efa8
Plumb allow_no_prev_events through create_and_send_nonmember_event
MadLittleMods Dec 16, 2021
3810ae1
Clarify comments
MadLittleMods Dec 16, 2021
df2a152
Fix NPE when trying to grab event from wrong roomId (fix sytest)
MadLittleMods Dec 16, 2021
cc4eb72
Merge branch 'develop' into madlittlemods/return-historical-events-in…
MadLittleMods Jan 14, 2022
47590bb
Merge branch 'develop' into madlittlemods/return-historical-events-in…
MadLittleMods Feb 4, 2022
a38befa
Some review optimizations
MadLittleMods Feb 4, 2022
033360a
Fix lints
MadLittleMods Feb 4, 2022
3f22e42
Fix unused lint
MadLittleMods Feb 4, 2022
e5670ff
Fix lints
MadLittleMods Feb 4, 2022
023bd3e
Don't run MSC2716 complement tests for everyone
MadLittleMods Feb 7, 2022
b3fcffb
Use same txn iteration optimization
MadLittleMods Feb 7, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/11114.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix [MSC2716](https://github.com/matrix-org/matrix-doc/pull/2716) historical messages backfilling in random order on remote homeservers.
2 changes: 1 addition & 1 deletion scripts-dev/complement.sh
Original file line number Diff line number Diff line change
Expand Up @@ -71,4 +71,4 @@ fi

# Run the tests!
MadLittleMods marked this conversation as resolved.
Show resolved Hide resolved
echo "Images built; running complement"
go test -v -tags synapse_blacklist,msc2403 -count=1 $EXTRA_COMPLEMENT_ARGS ./tests/...
go test -v -tags synapse_blacklist,msc2403,msc2716 -count=1 $EXTRA_COMPLEMENT_ARGS ./tests/...
MadLittleMods marked this conversation as resolved.
Show resolved Hide resolved
27 changes: 23 additions & 4 deletions synapse/handlers/federation.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,9 +166,14 @@ async def _maybe_backfill_inner(
oldest_events_with_depth = (
await self.store.get_oldest_event_ids_with_depth_in_room(room_id)
)
insertion_events_to_be_backfilled = (
await self.store.get_insertion_event_backwards_extremities_in_room(room_id)
)

insertion_events_to_be_backfilled: Dict[str, int] = {}
if self.hs.config.experimental.msc2716_enabled:
insertion_events_to_be_backfilled = (
await self.store.get_insertion_event_backward_extremities_in_room(
room_id
)
)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just putting this behind the experimental feature flag so people can easily turn off the brokenness if it occurs

logger.debug(
"_maybe_backfill_inner: extremities oldest_events_with_depth=%s insertion_events_to_be_backfilled=%s",
oldest_events_with_depth,
Expand Down Expand Up @@ -271,11 +276,12 @@ async def _maybe_backfill_inner(
]

logger.debug(
"room_id: %s, backfill: current_depth: %s, limit: %s, max_depth: %s, extrems: %s filtered_sorted_extremeties_tuple: %s",
"room_id: %s, backfill: current_depth: %s, limit: %s, max_depth: %s, extrems (%d): %s filtered_sorted_extremeties_tuple: %s",
room_id,
current_depth,
limit,
max_depth,
len(sorted_extremeties_tuple),
sorted_extremeties_tuple,
filtered_sorted_extremeties_tuple,
)
Expand Down Expand Up @@ -1047,6 +1053,19 @@ async def on_backfill_request(
limit = min(limit, 100)

events = await self.store.get_backfill_events(room_id, pdu_list, limit)
logger.debug(
"on_backfill_request: backfill events=%s",
[
"event_id=%s,depth=%d,body=%s,prevs=%s\n"
% (
event.event_id,
event.depth,
event.content.get("body", event.type),
event.prev_event_ids(),
)
for event in events
],
)

events = await filter_events_for_server(self.storage, origin, events)

Expand Down
34 changes: 29 additions & 5 deletions synapse/handlers/federation_event.py
Original file line number Diff line number Diff line change
Expand Up @@ -508,7 +508,11 @@ async def backfill(
f"room {ev.room_id}, when we were backfilling in {room_id}"
MadLittleMods marked this conversation as resolved.
Show resolved Hide resolved
)

await self._process_pulled_events(dest, events, backfilled=True)
await self._process_pulled_events(
dest,
events,
backfilled=True,
)

async def _get_missing_events_for_pdu(
self, origin: str, pdu: EventBase, prevs: Set[str], min_depth: int
Expand Down Expand Up @@ -626,11 +630,24 @@ async def _process_pulled_events(
backfilled: True if this is part of a historical batch of events (inhibits
notification to clients, and validation of device keys.)
"""
logger.debug(
"processing pulled backfilled=%s events=%s",
backfilled,
[
"event_id=%s,depth=%d,body=%s,prevs=%s\n"
% (
event.event_id,
event.depth,
event.content.get("body", event.type),
event.prev_event_ids(),
)
for event in events
],
)
MadLittleMods marked this conversation as resolved.
Show resolved Hide resolved

# We want to sort these by depth so we process them and
# tell clients about them in order.
sorted_events = sorted(events, key=lambda x: x.depth)

for ev in sorted_events:
with nested_logging_context(ev.event_id):
await self._process_pulled_event(origin, ev, backfilled=backfilled)
Expand Down Expand Up @@ -992,6 +1009,8 @@ async def _process_received_pdu(

await self._run_push_actions_and_persist_event(event, context, backfilled)

await self._handle_marker_event(origin, event)
Copy link
Contributor Author

@MadLittleMods MadLittleMods Nov 3, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved this above the early-return when backfilled so that we always process marker events regardless if it's backfilled or not.

Before, we could rely on the connected_insertion_event_query to navigate the and find the historical branch. But now we solely rely on the marker event to point out the historical branch. So we need to make sure to add the insertion event extremities whenever we see a marker event. Whether it be a live event or backfilled.

-- 5db717a


Related to #11114 (comment)


if backfilled or context.rejected:
return

Expand Down Expand Up @@ -1071,8 +1090,6 @@ async def _process_received_pdu(
event.sender,
)

await self._handle_marker_event(origin, event)

async def _resync_device(self, sender: str) -> None:
"""We have detected that the device list for the given user may be out
of sync, so we try and resync them.
Expand Down Expand Up @@ -1323,7 +1340,14 @@ def prep(event: EventBase) -> Optional[Tuple[EventBase, EventContext]]:
return event, context

events_to_persist = (x for x in (prep(event) for event in fetched_events) if x)
await self.persist_events_and_notify(room_id, tuple(events_to_persist))
await self.persist_events_and_notify(
room_id,
tuple(events_to_persist),
# Mark these events backfilled as they're historic events that will
# eventually be backfilled. For example, missing events we fetch
# during backfill should be marked as backfilled as well.
backfilled=True,
MadLittleMods marked this conversation as resolved.
Show resolved Hide resolved
)

async def _check_event_auth(
self,
Expand Down
20 changes: 16 additions & 4 deletions synapse/handlers/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -490,12 +490,12 @@ async def create_event(
requester: Requester,
event_dict: dict,
txn_id: Optional[str] = None,
allow_no_prev_events: bool = False,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why did we move this up?

Copy link
Contributor Author

@MadLittleMods MadLittleMods Feb 4, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To group it next to prev_event_ids for easier association.

prev_event_ids: Optional[List[str]] = None,
auth_event_ids: Optional[List[str]] = None,
require_consent: bool = True,
outlier: bool = False,
historical: bool = False,
allow_no_prev_events: bool = False,
depth: Optional[int] = None,
) -> Tuple[EventBase, EventContext]:
"""
Expand All @@ -510,6 +510,10 @@ async def create_event(
requester
event_dict: An entire event
txn_id
allow_no_prev_events: Whether to allow this event to be created an empty
list of prev_events. Normally this is prohibited just because most
events should have a prev_event and we should only use this in special
cases like MSC2716.
prev_event_ids:
the forward extremities to use as the prev_events for the
new event.
Expand Down Expand Up @@ -604,10 +608,10 @@ async def create_event(
event, context = await self.create_new_client_event(
builder=builder,
requester=requester,
allow_no_prev_events=allow_no_prev_events,
prev_event_ids=prev_event_ids,
auth_event_ids=auth_event_ids,
depth=depth,
allow_no_prev_events=allow_no_prev_events,
)

# In an ideal world we wouldn't need the second part of this condition. However,
Expand Down Expand Up @@ -764,6 +768,7 @@ async def create_and_send_nonmember_event(
self,
requester: Requester,
event_dict: dict,
allow_no_prev_events: bool = False,
prev_event_ids: Optional[List[str]] = None,
auth_event_ids: Optional[List[str]] = None,
ratelimit: bool = True,
Expand All @@ -781,6 +786,10 @@ async def create_and_send_nonmember_event(
Args:
requester: The requester sending the event.
event_dict: An entire event.
allow_no_prev_events: Whether to allow this event to be created an empty
list of prev_events. Normally this is prohibited just because most
events should have a prev_event and we should only use this in special
cases like MSC2716.
prev_event_ids:
The event IDs to use as the prev events.
Should normally be left as None to automatically request them
Expand Down Expand Up @@ -880,16 +889,20 @@ async def create_new_client_event(
self,
builder: EventBuilder,
requester: Optional[Requester] = None,
allow_no_prev_events: bool = False,
prev_event_ids: Optional[List[str]] = None,
auth_event_ids: Optional[List[str]] = None,
depth: Optional[int] = None,
allow_no_prev_events: bool = False,
) -> Tuple[EventBase, EventContext]:
"""Create a new event for a local client

Args:
builder:
requester:
allow_no_prev_events: Whether to allow this event to be created an empty
list of prev_events. Normally this is prohibited just because most
events should have a prev_event and we should only use this in special
cases like MSC2716.
prev_event_ids:
the forward extremities to use as the prev_events for the
new event.
Expand All @@ -908,7 +921,6 @@ async def create_new_client_event(
Returns:
Tuple of created event, context
"""

# Strip down the auth_event_ids to only what we need to auth the event.
# For example, we don't need extra m.room.member that don't match event.sender
full_state_ids_at_event = None
Expand Down
44 changes: 22 additions & 22 deletions synapse/handlers/room_batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,6 @@
logger = logging.getLogger(__name__)


def generate_fake_event_id() -> str:
return "$fake_" + random_string(43)


class RoomBatchHandler:
def __init__(self, hs: "HomeServer"):
self.hs = hs
Expand Down Expand Up @@ -182,11 +178,12 @@ async def persist_state_events_at_start(
state_event_ids_at_start = []
auth_event_ids = initial_auth_event_ids.copy()

# Make the state events float off on their own so we don't have a
# bunch of `@mxid joined the room` noise between each batch
prev_event_id_for_state_chain = generate_fake_event_id()
# Make the state events float off on their own by specifying no
# prev_events for the first one in the chain so we don't have a bunch of
# `@mxid joined the room` noise between each batch.
prev_event_ids_for_state_chain: List[str] = []
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of using fake prev_events to make the historical events float, I've moved to just making an event with an empty array of prev_events. Which also stops the backfill mechanism from getting clogged with unresolvable backward extremities, see #11091

This is to allow prev_event_ids = [] if it has auth_event_ids present

-- #11114 (comment)


for state_event in state_events_at_start:
for index, state_event in enumerate(state_events_at_start):
assert_params_in_dict(
state_event, ["type", "origin_server_ts", "content", "sender"]
)
Expand Down Expand Up @@ -222,7 +219,10 @@ async def persist_state_events_at_start(
content=event_dict["content"],
outlier=True,
historical=True,
prev_event_ids=[prev_event_id_for_state_chain],
# Only the first event in the chain should be floating.
# The rest should hang off each other in a chain.
allow_no_prev_events=index == 0,
prev_event_ids=prev_event_ids_for_state_chain,
# Make sure to use a copy of this list because we modify it
# later in the loop here. Otherwise it will be the same
# reference and also update in the event when we append later.
Expand All @@ -242,7 +242,10 @@ async def persist_state_events_at_start(
event_dict,
outlier=True,
historical=True,
prev_event_ids=[prev_event_id_for_state_chain],
# Only the first event in the chain should be floating.
# The rest should hang off each other in a chain.
allow_no_prev_events=index == 0,
prev_event_ids=prev_event_ids_for_state_chain,
# Make sure to use a copy of this list because we modify it
# later in the loop here. Otherwise it will be the same
# reference and also update in the event when we append later.
Expand All @@ -253,15 +256,14 @@ async def persist_state_events_at_start(
state_event_ids_at_start.append(event_id)
auth_event_ids.append(event_id)
# Connect all the state in a floating chain
prev_event_id_for_state_chain = event_id
prev_event_ids_for_state_chain = [event_id]

return state_event_ids_at_start

async def persist_historical_events(
self,
events_to_create: List[JsonDict],
room_id: str,
initial_prev_event_ids: List[str],
inherited_depth: int,
auth_event_ids: List[str],
app_service_requester: Requester,
Expand All @@ -277,9 +279,6 @@ async def persist_historical_events(
events_to_create: List of historical events to create in JSON
dictionary format.
room_id: Room where you want the events persisted in.
initial_prev_event_ids: These will be the prev_events for the first
event created. Each event created afterwards will point to the
previous event created.
inherited_depth: The depth to create the events at (you will
probably by calling inherit_depth_from_prev_ids(...)).
auth_event_ids: Define which events allow you to create the given
Expand All @@ -291,11 +290,14 @@ async def persist_historical_events(
"""
assert app_service_requester.app_service

prev_event_ids = initial_prev_event_ids.copy()
# Make the historical event chain float off on its own by specifying no
# prev_events for the first event in the chain which causes the HS to
# ask for the state at the start of the batch later.
prev_event_ids: List[str] = []

event_ids = []
events_to_persist = []
for ev in events_to_create:
for index, ev in enumerate(events_to_create):
assert_params_in_dict(ev, ["type", "origin_server_ts", "content", "sender"])

assert self.hs.is_mine_id(ev["sender"]), "User must be our own: %s" % (
Expand All @@ -319,6 +321,9 @@ async def persist_historical_events(
ev["sender"], app_service_requester.app_service
),
event_dict,
# Only the first event in the chain should be floating.
# The rest should hang off each other in a chain.
allow_no_prev_events=index == 0,
prev_event_ids=event_dict.get("prev_events"),
auth_event_ids=auth_event_ids,
historical=True,
Expand Down Expand Up @@ -370,7 +375,6 @@ async def handle_batch_of_events(
events_to_create: List[JsonDict],
room_id: str,
batch_id_to_connect_to: str,
initial_prev_event_ids: List[str],
inherited_depth: int,
auth_event_ids: List[str],
app_service_requester: Requester,
Expand All @@ -385,9 +389,6 @@ async def handle_batch_of_events(
room_id: Room where you want the events created in.
batch_id_to_connect_to: The batch_id from the insertion event you
want this batch to connect to.
initial_prev_event_ids: These will be the prev_events for the first
event created. Each event created afterwards will point to the
previous event created.
inherited_depth: The depth to create the events at (you will
probably by calling inherit_depth_from_prev_ids(...)).
auth_event_ids: Define which events allow you to create the given
Expand Down Expand Up @@ -436,7 +437,6 @@ async def handle_batch_of_events(
event_ids = await self.persist_historical_events(
events_to_create=events_to_create,
room_id=room_id,
initial_prev_event_ids=initial_prev_event_ids,
inherited_depth=inherited_depth,
auth_event_ids=auth_event_ids,
app_service_requester=app_service_requester,
Expand Down
Loading