Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Don't send normal presence updates over federation replication stream #9828

Merged
merged 7 commits into from
Apr 19, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/9828.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add experimental support for handling presence on a worker.
70 changes: 2 additions & 68 deletions synapse/federation/send_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,6 @@ def __init__(self, hs: "HomeServer"):
# Pending presence map user_id -> UserPresenceState
self.presence_map = {} # type: Dict[str, UserPresenceState]

# Stream position -> list[user_id]
self.presence_changed = SortedDict() # type: SortedDict[int, List[str]]

# Stores the destinations we need to explicitly send presence to about a
# given user.
# Stream position -> (user_id, destinations)
Expand All @@ -96,7 +93,7 @@ def __init__(self, hs: "HomeServer"):

self.edus = SortedDict() # type: SortedDict[int, Edu]

# stream ID for the next entry into presence_changed/keyed_edu_changed/edus.
# stream ID for the next entry into keyed_edu_changed/edus.
self.pos = 1

# map from stream ID to the time that stream entry was generated, so that we
Expand All @@ -117,7 +114,6 @@ def register(name: str, queue: Sized) -> None:

for queue_name in [
"presence_map",
"presence_changed",
"keyed_edu",
"keyed_edu_changed",
"edus",
Expand Down Expand Up @@ -155,23 +151,12 @@ def _clear_queue_before_pos(self, position_to_delete: int) -> None:
"""Clear all the queues from before a given position"""
with Measure(self.clock, "send_queue._clear"):
# Delete things out of presence maps
keys = self.presence_changed.keys()
i = self.presence_changed.bisect_left(position_to_delete)
for key in keys[:i]:
del self.presence_changed[key]

user_ids = {
user_id for uids in self.presence_changed.values() for user_id in uids
}

keys = self.presence_destinations.keys()
i = self.presence_destinations.bisect_left(position_to_delete)
for key in keys[:i]:
del self.presence_destinations[key]

user_ids.update(
user_id for user_id, _ in self.presence_destinations.values()
)
user_ids = {user_id for user_id, _ in self.presence_destinations.values()}

to_del = [
user_id for user_id in self.presence_map if user_id not in user_ids
Expand Down Expand Up @@ -244,23 +229,6 @@ async def send_read_receipt(self, receipt: ReadReceipt) -> None:
"""
# nothing to do here: the replication listener will handle it.

def send_presence(self, states: List[UserPresenceState]) -> None:
"""As per FederationSender

Args:
states
"""
pos = self._next_pos()

# We only want to send presence for our own users, so lets always just
# filter here just in case.
local_states = [s for s in states if self.is_mine_id(s.user_id)]

self.presence_map.update({state.user_id: state for state in local_states})
self.presence_changed[pos] = [state.user_id for state in local_states]

self.notifier.on_new_replication_data()

def send_presence_to_destinations(
self, states: Iterable[UserPresenceState], destinations: Iterable[str]
) -> None:
Expand Down Expand Up @@ -325,18 +293,6 @@ async def get_replication_rows(
# of the federation stream.
rows = [] # type: List[Tuple[int, BaseFederationRow]]

# Fetch changed presence
i = self.presence_changed.bisect_right(from_token)
j = self.presence_changed.bisect_right(to_token) + 1
dest_user_ids = [
(pos, user_id)
for pos, user_id_list in self.presence_changed.items()[i:j]
for user_id in user_id_list
]

for (key, user_id) in dest_user_ids:
rows.append((key, PresenceRow(state=self.presence_map[user_id])))

# Fetch presence to send to destinations
i = self.presence_destinations.bisect_right(from_token)
j = self.presence_destinations.bisect_right(to_token) + 1
Expand Down Expand Up @@ -427,22 +383,6 @@ def add_to_buffer(self, buff):
raise NotImplementedError()


class PresenceRow(
BaseFederationRow, namedtuple("PresenceRow", ("state",)) # UserPresenceState
):
TypeId = "p"

@staticmethod
def from_data(data):
return PresenceRow(state=UserPresenceState.from_dict(data))

def to_data(self):
return self.state.as_dict()

def add_to_buffer(self, buff):
buff.presence.append(self.state)


class PresenceDestinationsRow(
BaseFederationRow,
namedtuple(
Expand Down Expand Up @@ -506,7 +446,6 @@ def add_to_buffer(self, buff):


_rowtypes = (
PresenceRow,
PresenceDestinationsRow,
KeyedEduRow,
EduRow,
Expand All @@ -518,7 +457,6 @@ def add_to_buffer(self, buff):
ParsedFederationStreamData = namedtuple(
"ParsedFederationStreamData",
(
"presence", # list(UserPresenceState)
"presence_destinations", # list of tuples of UserPresenceState and destinations
"keyed_edus", # dict of destination -> { key -> Edu }
"edus", # dict of destination -> [Edu]
Expand All @@ -543,7 +481,6 @@ def process_rows_for_federation(
# them into the appropriate collection and then send them off.

buff = ParsedFederationStreamData(
presence=[],
presence_destinations=[],
keyed_edus={},
edus={},
Expand All @@ -559,9 +496,6 @@ def process_rows_for_federation(
parsed_row = RowType.from_data(row.data)
parsed_row.add_to_buffer(buff)

if buff.presence:
transaction_queue.send_presence(buff.presence)

for state, destinations in buff.presence_destinations:
transaction_queue.send_presence_to_destinations(
states=[state], destinations=destinations
Expand Down
96 changes: 1 addition & 95 deletions synapse/federation/sender/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@
from synapse.federation.sender.per_destination_queue import PerDestinationQueue
from synapse.federation.sender.transaction_manager import TransactionManager
from synapse.federation.units import Edu
from synapse.handlers.presence import get_interested_remotes
from synapse.logging.context import preserve_fn
from synapse.metrics import (
LaterGauge,
event_processing_loop_counter,
Expand All @@ -34,7 +32,7 @@
)
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.types import Collection, JsonDict, ReadReceipt, RoomStreamToken
from synapse.util.metrics import Measure, measure_func
from synapse.util.metrics import Measure

if TYPE_CHECKING:
from synapse.events.presence_router import PresenceRouter
Expand Down Expand Up @@ -79,15 +77,6 @@ async def send_read_receipt(self, receipt: ReadReceipt) -> None:
"""
raise NotImplementedError()

@abc.abstractmethod
def send_presence(self, states: List[UserPresenceState]) -> None:
"""Send the new presence states to the appropriate destinations.

This actually queues up the presence states ready for sending and
triggers a background task to process them and send out the transactions.
"""
raise NotImplementedError()

@abc.abstractmethod
def send_presence_to_destinations(
self, states: Iterable[UserPresenceState], destinations: Iterable[str]
Expand Down Expand Up @@ -176,11 +165,6 @@ def __init__(self, hs: "HomeServer"):
),
)

# Map of user_id -> UserPresenceState for all the pending presence
# to be sent out by user_id. Entries here get processed and put in
# pending_presence_by_dest
self.pending_presence = {} # type: Dict[str, UserPresenceState]

LaterGauge(
"synapse_federation_transaction_queue_pending_pdus",
"",
Expand All @@ -201,8 +185,6 @@ def __init__(self, hs: "HomeServer"):
self._is_processing = False
self._last_poked_id = -1

self._processing_pending_presence = False

# map from room_id to a set of PerDestinationQueues which we believe are
# awaiting a call to flush_read_receipts_for_room. The presence of an entry
# here for a given room means that we are rate-limiting RR flushes to that room,
Expand Down Expand Up @@ -546,48 +528,6 @@ def _flush_rrs_for_room(self, room_id: str) -> None:
for queue in queues:
queue.flush_read_receipts_for_room(room_id)

@preserve_fn # the caller should not yield on this
async def send_presence(self, states: List[UserPresenceState]) -> None:
"""Send the new presence states to the appropriate destinations.

This actually queues up the presence states ready for sending and
triggers a background task to process them and send out the transactions.
"""
if not self.hs.config.use_presence:
# No-op if presence is disabled.
return

# First we queue up the new presence by user ID, so multiple presence
# updates in quick succession are correctly handled.
# We only want to send presence for our own users, so lets always just
# filter here just in case.
self.pending_presence.update(
{state.user_id: state for state in states if self.is_mine_id(state.user_id)}
)

# We then handle the new pending presence in batches, first figuring
# out the destinations we need to send each state to and then poking it
# to attempt a new transaction. We linearize this so that we don't
# accidentally mess up the ordering and send multiple presence updates
# in the wrong order
if self._processing_pending_presence:
return

self._processing_pending_presence = True
try:
while True:
states_map = self.pending_presence
self.pending_presence = {}

if not states_map:
break

await self._process_presence_inner(list(states_map.values()))
except Exception:
logger.exception("Error sending presence states to servers")
finally:
self._processing_pending_presence = False

def send_presence_to_destinations(
self, states: Iterable[UserPresenceState], destinations: Iterable[str]
) -> None:
Expand All @@ -608,40 +548,6 @@ def send_presence_to_destinations(
continue
self._get_per_destination_queue(destination).send_presence(states)

@measure_func("txnqueue._process_presence")
async def _process_presence_inner(self, states: List[UserPresenceState]) -> None:
"""Given a list of states populate self.pending_presence_by_dest and
poke to send a new transaction to each destination
"""
# We pull the presence router here instead of __init__
# to prevent a dependency cycle:
#
# AuthHandler -> Notifier -> FederationSender
# -> PresenceRouter -> ModuleApi -> AuthHandler
if self._presence_router is None:
self._presence_router = self.hs.get_presence_router()

assert self._presence_router is not None

hosts_and_states = await get_interested_remotes(
self.store,
self._presence_router,
states,
self.state,
)

for destinations, states in hosts_and_states:
for destination in destinations:
if destination == self.server_name:
continue

if not self._federation_shard_config.should_handle(
self._instance_name, destination
):
continue

self._get_per_destination_queue(destination).send_presence(states)

def build_and_send_edu(
self,
destination: str,
Expand Down
Loading