Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Thread through room_id in federation handler
Browse files Browse the repository at this point in the history
  • Loading branch information
erikjohnston committed Sep 1, 2020
1 parent 40af6b7 commit e151ff2
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 15 deletions.
33 changes: 21 additions & 12 deletions synapse/handlers/federation.py
Original file line number Diff line number Diff line change
Expand Up @@ -924,7 +924,7 @@ async def backfill(self, dest, room_id, limit, extremities):
)

if ev_infos:
await self._handle_new_events(dest, ev_infos, backfilled=True)
await self._handle_new_events(dest, room_id, ev_infos, backfilled=True)

# Step 2: Persist the rest of the events in the chunk one by one
events.sort(key=lambda e: e.depth)
Expand Down Expand Up @@ -1217,7 +1217,7 @@ async def get_event(event_id: str):
event_infos.append(_NewEventInfo(event, None, auth))

await self._handle_new_events(
destination, event_infos,
destination, room_id, event_infos,
)

def _sanity_check_event(self, ev):
Expand Down Expand Up @@ -1364,7 +1364,7 @@ async def do_invite_join(
)

max_stream_id = await self._persist_auth_tree(
origin, auth_chain, state, event, room_version_obj
origin, room_id, auth_chain, state, event, room_version_obj
)

# We wait here until this instance has seen the events come down
Expand Down Expand Up @@ -1628,7 +1628,7 @@ async def on_invite_request(
)

context = await self.state_handler.compute_event_context(event)
await self.persist_events_and_notify([(event, context)])
await self.persist_events_and_notify(event.room_id, [(event, context)])

return event

Expand All @@ -1655,7 +1655,9 @@ async def do_remotely_reject_invite(
await self.federation_client.send_leave(host_list, event)

context = await self.state_handler.compute_event_context(event)
stream_id = await self.persist_events_and_notify([(event, context)])
stream_id = await self.persist_events_and_notify(
event.room_id, [(event, context)]
)

return event, stream_id

Expand Down Expand Up @@ -1903,7 +1905,7 @@ async def _handle_new_event(
)

await self.persist_events_and_notify(
[(event, context)], backfilled=backfilled
event.room_id, [(event, context)], backfilled=backfilled
)
except Exception:
run_in_background(
Expand All @@ -1916,6 +1918,7 @@ async def _handle_new_event(
async def _handle_new_events(
self,
origin: str,
room_id: str,
event_infos: Iterable[_NewEventInfo],
backfilled: bool = False,
) -> None:
Expand Down Expand Up @@ -1947,6 +1950,7 @@ async def prep(ev_info: _NewEventInfo):
)

await self.persist_events_and_notify(
room_id,
[
(ev_info.event, context)
for ev_info, context in zip(event_infos, contexts)
Expand All @@ -1957,6 +1961,7 @@ async def prep(ev_info: _NewEventInfo):
async def _persist_auth_tree(
self,
origin: str,
room_id: str,
auth_events: List[EventBase],
state: List[EventBase],
event: EventBase,
Expand All @@ -1971,6 +1976,7 @@ async def _persist_auth_tree(
Args:
origin: Where the events came from
room_id,
auth_events
state
event
Expand Down Expand Up @@ -2045,17 +2051,20 @@ async def _persist_auth_tree(
events_to_context[e.event_id].rejected = RejectedReason.AUTH_ERROR

await self.persist_events_and_notify(
room_id,
[
(e, events_to_context[e.event_id])
for e in itertools.chain(auth_events, state)
]
],
)

new_event_context = await self.state_handler.compute_event_context(
event, old_state=state
)

return await self.persist_events_and_notify([(event, new_event_context)])
return await self.persist_events_and_notify(
room_id, [(event, new_event_context)]
)

async def _prep_event(
self,
Expand Down Expand Up @@ -2906,25 +2915,25 @@ async def _check_key_revocation(self, public_key, url):

async def persist_events_and_notify(
self,
room_id: str,
event_and_contexts: Sequence[Tuple[EventBase, EventContext]],
backfilled: bool = False,
) -> int:
"""Persists events and tells the notifier/pushers about them, if
necessary.
Args:
room_id:
event_and_contexts:
backfilled: Whether these events are a result of
backfilling or not
"""
# FIXME:
instance = self.config.worker.events_shard_config.get_instance(
event_and_contexts[0][0].room_id
)
instance = self.config.worker.events_shard_config.get_instance(room_id)
if instance != self._instance_name:
result = await self._send_events(
instance_name=instance,
store=self.store,
room_id=room_id,
event_and_contexts=event_and_contexts,
backfilled=backfilled,
)
Expand Down
12 changes: 9 additions & 3 deletions synapse/replication/http/federation.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,10 +65,11 @@ def __init__(self, hs):
self.federation_handler = hs.get_handlers().federation_handler

@staticmethod
async def _serialize_payload(store, event_and_contexts, backfilled):
async def _serialize_payload(store, room_id, event_and_contexts, backfilled):
"""
Args:
store
room_id (str)
event_and_contexts (list[tuple[FrozenEvent, EventContext]])
backfilled (bool): Whether or not the events are the result of
backfilling
Expand All @@ -88,14 +89,19 @@ async def _serialize_payload(store, event_and_contexts, backfilled):
}
)

payload = {"events": event_payloads, "backfilled": backfilled}
payload = {
"events": event_payloads,
"backfilled": backfilled,
"room_id": room_id,
}

return payload

async def _handle_request(self, request):
with Measure(self.clock, "repl_fed_send_events_parse"):
content = parse_json_object_from_request(request)

room_id = content["room_id"]
backfilled = content["backfilled"]

event_payloads = content["events"]
Expand All @@ -120,7 +126,7 @@ async def _handle_request(self, request):
logger.info("Got %d events from federation", len(event_and_contexts))

max_stream_id = await self.federation_handler.persist_events_and_notify(
event_and_contexts, backfilled
room_id, event_and_contexts, backfilled
)

return 200, {"max_stream_id": max_stream_id}
Expand Down

0 comments on commit e151ff2

Please sign in to comment.