-
-
Notifications
You must be signed in to change notification settings - Fork 2.1k
Resolve and share state_groups
for all historical events in batch (MSC2716)
#10975
Changes from 24 commits
64448b3
f3174cd
6713a2a
97fa9a2
fa4f20d
4fea37e
8fb4d6f
96d9d11
b20fd16
0362887
cafb1dc
487754f
43f1328
d494673
6005c46
1227154
10c91ee
d0d6699
aa2e56e
3b085ab
b975bd2
c5ea94c
dc34f0f
77ffb69
1d1830d
14d6672
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
Resolve and share `state_groups` for all [MSC2716](https://github.com/matrix-org/matrix-doc/pull/2716) historical events in batch. |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -607,29 +607,6 @@ async def create_event( | |
|
||
builder.internal_metadata.historical = historical | ||
|
||
# Strip down the auth_event_ids to only what we need to auth the event. | ||
# For example, we don't need extra m.room.member that don't match event.sender | ||
if auth_event_ids is not None: | ||
# If auth events are provided, prev events must be also. | ||
assert prev_event_ids is not None | ||
|
||
temp_event = await builder.build( | ||
prev_event_ids=prev_event_ids, | ||
auth_event_ids=auth_event_ids, | ||
depth=depth, | ||
) | ||
auth_events = await self.store.get_events_as_list(auth_event_ids) | ||
# Create a StateMap[str] | ||
auth_event_state_map = { | ||
(e.type, e.state_key): e.event_id for e in auth_events | ||
} | ||
# Actually strip down and use the necessary auth events | ||
auth_event_ids = self._event_auth_handler.compute_auth_events( | ||
event=temp_event, | ||
current_state_ids=auth_event_state_map, | ||
for_verification=False, | ||
) | ||
|
||
event, context = await self.create_new_client_event( | ||
builder=builder, | ||
requester=requester, | ||
|
@@ -936,6 +913,33 @@ async def create_new_client_event( | |
Tuple of created event, context | ||
""" | ||
|
||
# Strip down the auth_event_ids to only what we need to auth the event. | ||
# For example, we don't need extra m.room.member that don't match event.sender | ||
full_state_ids_at_event = None | ||
if auth_event_ids is not None: | ||
# If auth events are provided, prev events must be also. | ||
assert prev_event_ids is not None | ||
|
||
# Copy the full auth state before it stripped down | ||
full_state_ids_at_event = auth_event_ids.copy() | ||
|
||
temp_event = await builder.build( | ||
prev_event_ids=prev_event_ids, | ||
auth_event_ids=auth_event_ids, | ||
depth=depth, | ||
) | ||
auth_events = await self.store.get_events_as_list(auth_event_ids) | ||
# Create a StateMap[str] | ||
auth_event_state_map = { | ||
(e.type, e.state_key): e.event_id for e in auth_events | ||
} | ||
# Actually strip down and use the necessary auth events | ||
auth_event_ids = self._event_auth_handler.compute_auth_events( | ||
event=temp_event, | ||
current_state_ids=auth_event_state_map, | ||
for_verification=False, | ||
) | ||
|
||
if prev_event_ids is not None: | ||
assert ( | ||
len(prev_event_ids) <= 10 | ||
|
@@ -965,6 +969,13 @@ async def create_new_client_event( | |
if builder.internal_metadata.outlier: | ||
event.internal_metadata.outlier = True | ||
context = EventContext.for_outlier() | ||
elif ( | ||
event.type == EventTypes.MSC2716_INSERTION | ||
and full_state_ids_at_event | ||
and builder.internal_metadata.is_historical() | ||
): | ||
old_state = await self.store.get_events_as_list(full_state_ids_at_event) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @MadLittleMods, @erikjohnston: I'm trying to figure out how this works, so that I can fit it in with my work on faster joins and partial room state. so... how does this work? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The relevant call stacks for this are:
In That full Here is the context where we originally started stripping down the state: #9247 (comment) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. right, thanks for the explanation, but this seems extremely confusing. At the very least, if But generally, this feels like an abuse of @erikjohnston I think you've been following the MSC2716 work a bit more than me - any thoughts here? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ah, I think I got confused when reading this as at some point the state at the insertion event was limited to the used auth events, which is no longer the case. I agree that we should therefore be passing the full state through as a separate parameter. Though at some point I'm wondering if this code path needs refactoring to make it easier to use in the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @richvdh Do you want me to tackle this new full state parameter? Don't want to collide and cause conflicts with your existing refactor. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You're unlikely to conflict with me here, so please do go ahead! There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Updated via #12083 |
||
context = await self.state.compute_event_context(event, old_state=old_state) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Add explicit state to the insertion event so the rest of the batch can inherit the same state and |
||
else: | ||
context = await self.state.compute_event_context(event) | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -13,6 +13,10 @@ | |
logger = logging.getLogger(__name__) | ||
|
||
|
||
def generate_fake_event_id() -> str: | ||
return "$fake_" + random_string(43) | ||
|
||
|
||
class RoomBatchHandler: | ||
def __init__(self, hs: "HomeServer"): | ||
self.hs = hs | ||
|
@@ -177,6 +181,11 @@ async def persist_state_events_at_start( | |
|
||
state_event_ids_at_start = [] | ||
auth_event_ids = initial_auth_event_ids.copy() | ||
|
||
# Make the state events float off on their own so we don't have a | ||
# bunch of `@mxid joined the room` noise between each batch | ||
prev_event_id_for_state_chain = generate_fake_event_id() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We still have the state events float off on their own, but now they're in a floating chain instead of individual. This way we can connect the historical events to the state chain via one |
||
|
||
for state_event in state_events_at_start: | ||
assert_params_in_dict( | ||
state_event, ["type", "origin_server_ts", "content", "sender"] | ||
|
@@ -200,10 +209,6 @@ async def persist_state_events_at_start( | |
# Mark all events as historical | ||
event_dict["content"][EventContentFields.MSC2716_HISTORICAL] = True | ||
|
||
# Make the state events float off on their own so we don't have a | ||
# bunch of `@mxid joined the room` noise between each batch | ||
fake_prev_event_id = "$" + random_string(43) | ||
|
||
# TODO: This is pretty much the same as some other code to handle inserting state in this file | ||
if event_dict["type"] == EventTypes.Member: | ||
membership = event_dict["content"].get("membership", None) | ||
|
@@ -216,7 +221,7 @@ async def persist_state_events_at_start( | |
action=membership, | ||
content=event_dict["content"], | ||
outlier=True, | ||
prev_event_ids=[fake_prev_event_id], | ||
prev_event_ids=[prev_event_id_for_state_chain], | ||
# Make sure to use a copy of this list because we modify it | ||
# later in the loop here. Otherwise it will be the same | ||
# reference and also update in the event when we append later. | ||
|
@@ -235,7 +240,7 @@ async def persist_state_events_at_start( | |
), | ||
event_dict, | ||
outlier=True, | ||
prev_event_ids=[fake_prev_event_id], | ||
prev_event_ids=[prev_event_id_for_state_chain], | ||
# Make sure to use a copy of this list because we modify it | ||
# later in the loop here. Otherwise it will be the same | ||
# reference and also update in the event when we append later. | ||
|
@@ -245,6 +250,8 @@ async def persist_state_events_at_start( | |
|
||
state_event_ids_at_start.append(event_id) | ||
auth_event_ids.append(event_id) | ||
# Connect all the state in a floating chain | ||
prev_event_id_for_state_chain = event_id | ||
|
||
return state_event_ids_at_start | ||
|
||
|
@@ -289,6 +296,10 @@ async def persist_historical_events( | |
for ev in events_to_create: | ||
assert_params_in_dict(ev, ["type", "origin_server_ts", "content", "sender"]) | ||
|
||
assert self.hs.is_mine_id(ev["sender"]), "User must be our own: %s" % ( | ||
ev["sender"], | ||
) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Moved this up because it makes sense to assert this earlier rather than later. |
||
|
||
event_dict = { | ||
"type": ev["type"], | ||
"origin_server_ts": ev["origin_server_ts"], | ||
|
@@ -311,17 +322,26 @@ async def persist_historical_events( | |
historical=True, | ||
depth=inherited_depth, | ||
) | ||
|
||
assert context._state_group | ||
|
||
# Normally this is done when persisting the event but we have to | ||
# pre-emptively do it here because we create all the events first, | ||
# then persist them in another pass below. And we want to share | ||
# state_groups across the whole batch so this lookup needs to work | ||
# for the next event in the batch in this loop. | ||
await self.store.store_state_group_id_for_event_id( | ||
event_id=event.event_id, | ||
state_group_id=context._state_group, | ||
) | ||
|
||
logger.debug( | ||
"RoomBatchSendEventRestServlet inserting event=%s, prev_event_ids=%s, auth_event_ids=%s", | ||
event, | ||
prev_event_ids, | ||
auth_event_ids, | ||
) | ||
|
||
assert self.hs.is_mine_id(event.sender), "User must be our own: %s" % ( | ||
event.sender, | ||
) | ||
|
||
events_to_persist.append((event, context)) | ||
event_id = event.event_id | ||
|
||
|
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -32,7 +32,6 @@ | |||||
from synapse.http.site import SynapseRequest | ||||||
from synapse.rest.client.transactions import HttpTransactionCache | ||||||
from synapse.types import JsonDict | ||||||
from synapse.util.stringutils import random_string | ||||||
|
||||||
if TYPE_CHECKING: | ||||||
from synapse.server import HomeServer | ||||||
|
@@ -160,11 +159,6 @@ async def on_POST( | |||||
base_insertion_event = None | ||||||
if batch_id_from_query: | ||||||
batch_id_to_connect_to = batch_id_from_query | ||||||
# All but the first base insertion event should point at a fake | ||||||
# event, which causes the HS to ask for the state at the start of | ||||||
# the batch later. | ||||||
fake_prev_event_id = "$" + random_string(43) | ||||||
prev_event_ids = [fake_prev_event_id] | ||||||
# Otherwise, create an insertion event to act as a starting point. | ||||||
# | ||||||
# We don't always have an insertion event to start hanging more history | ||||||
|
@@ -173,16 +167,14 @@ async def on_POST( | |||||
# an insertion event), in which case we just create a new insertion event | ||||||
# that can then get pointed to by a "marker" event later. | ||||||
else: | ||||||
prev_event_ids = prev_event_ids_from_query | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this was actually a bug. We only want the base insertion event to be tied to Whereas previously, this attached the base and normal insertion event for the first batch (when no There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. (notice the extra
|
||||||
|
||||||
base_insertion_event_dict = ( | ||||||
self.room_batch_handler.create_insertion_event_dict( | ||||||
sender=requester.user.to_string(), | ||||||
room_id=room_id, | ||||||
origin_server_ts=last_event_in_batch["origin_server_ts"], | ||||||
) | ||||||
) | ||||||
base_insertion_event_dict["prev_events"] = prev_event_ids.copy() | ||||||
base_insertion_event_dict["prev_events"] = prev_event_ids_from_query.copy() | ||||||
|
||||||
( | ||||||
base_insertion_event, | ||||||
|
@@ -203,6 +195,11 @@ async def on_POST( | |||||
EventContentFields.MSC2716_NEXT_BATCH_ID | ||||||
] | ||||||
|
||||||
# Also connect the historical event chain to the end of the floating | ||||||
# state chain, which causes the HS to ask for the state at the start of | ||||||
# the batch later. | ||||||
prev_event_ids = [state_event_ids_at_start[-1]] | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Connect our historical batch chain to the floating state chain so we still get the floating benefit but also more semantic |
||||||
|
||||||
# Create and persist all of the historical events as well as insertion | ||||||
# and batch meta events to make the batch navigable in the DAG. | ||||||
event_ids, next_batch_id = await self.room_batch_handler.handle_batch_of_events( | ||||||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -2069,12 +2069,14 @@ def _store_event_state_mappings_txn( | |
|
||
state_groups[event.event_id] = context.state_group | ||
|
||
self.db_pool.simple_insert_many_txn( | ||
self.db_pool.simple_upsert_many_txn( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Changed this to an upsert to avoid colliding with a previous entry we put in via |
||
txn, | ||
table="event_to_state_groups", | ||
values=[ | ||
{"state_group": state_group_id, "event_id": event_id} | ||
for event_id, state_group_id in state_groups.items() | ||
key_names=["event_id"], | ||
key_values=[[event_id] for event_id, _ in state_groups.items()], | ||
value_names=["state_group"], | ||
value_values=[ | ||
[state_group_id] for _, state_group_id in state_groups.items() | ||
], | ||
) | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -36,3 +36,16 @@ async def get_insertion_event_by_batch_id( | |
retcol="event_id", | ||
allow_none=True, | ||
) | ||
|
||
async def store_state_group_id_for_event_id( | ||
self, event_id: str, state_group_id: int | ||
) -> Optional[str]: | ||
{ | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not sure what's going on here. The braces to enclose the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. See #11310 |
||
await self.db_pool.simple_upsert( | ||
table="event_to_state_groups", | ||
keyvalues={"event_id": event_id}, | ||
values={"state_group": state_group_id, "event_id": event_id}, | ||
# Unique constraint on event_id so we don't have to lock | ||
lock=False, | ||
) | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,19 @@ | ||
/* Copyright 2021 The Matrix.org Foundation C.I.C | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
-- Recreate the insertion_event_edges event_id index without the unique constraint | ||
-- because an insetion event can have multiple edges. | ||
DROP INDEX insertion_event_edges_event_id; | ||
CREATE INDEX IF NOT EXISTS insertion_event_edges_event_id ON insertion_event_edges(event_id); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is moved down here so we can get a copy of the
full_state_ids_at_event
to put in the insertion events below.