Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Stop the master relaying USER_SYNC for other workers #7318

Merged
merged 8 commits into from
Apr 22, 2020
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions synapse/api/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,8 @@ class EventTypes(object):

Retention = "m.room.retention"

Presence = "m.presence"


class RejectedReason(object):
AUTH_ERROR = "auth_error"
Expand Down
8 changes: 6 additions & 2 deletions synapse/app/generic_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,11 @@
from synapse.config.logger import setup_logging
from synapse.federation import send_queue
from synapse.federation.transport.server import TransportLayerServer
from synapse.handlers.presence import PresenceHandler, get_interested_parties
from synapse.handlers.presence import (
AbstractPresenceHandler,
PresenceHandler,
get_interested_parties,
)
from synapse.http.server import JsonResource
from synapse.http.servlet import RestServlet, parse_json_object_from_request
from synapse.http.site import SynapseSite
Expand Down Expand Up @@ -224,7 +228,7 @@ async def on_POST(self, request, device_id):
UPDATE_SYNCING_USERS_MS = 10 * 1000


class GenericWorkerPresence(object):
class GenericWorkerPresence(AbstractPresenceHandler):
def __init__(self, hs):
self.hs = hs
self.is_mine_id = hs.is_mine_id
Expand Down
20 changes: 12 additions & 8 deletions synapse/handlers/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from synapse.api.constants import EventTypes, Membership
from synapse.api.errors import AuthError, SynapseError
from synapse.events import EventBase
from synapse.handlers.presence import format_user_presence_state
from synapse.logging.utils import log_function
from synapse.types import UserID
from synapse.visibility import filter_events_for_client
Expand Down Expand Up @@ -97,6 +98,8 @@ async def get_stream(
explicit_room_id=room_id,
)

time_now = self.clock.time_msec()

# When the user joins a new room, or another user joins a currently
# joined room, we need to send down presence for those users.
to_add = []
Expand All @@ -112,19 +115,20 @@ async def get_stream(
users = await self.state.get_current_users_in_room(
event.room_id
)
states = await presence_handler.get_states(users, as_event=True)
to_add.extend(states)
else:
users = [event.state_key]

ev = await presence_handler.get_state(
UserID.from_string(event.state_key), as_event=True
)
to_add.append(ev)
states = await presence_handler.get_states(users)
to_add.extend(
{
"type": EventTypes.Presence,
"content": format_user_presence_state(state, time_now),
}
for state in states
)

events.extend(to_add)

time_now = self.clock.time_msec()

chunks = await self._event_serializer.serialize_events(
events,
time_now,
Expand Down
10 changes: 8 additions & 2 deletions synapse/handlers/initial_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -381,10 +381,16 @@ async def get_presence():
return []

states = await presence_handler.get_states(
[m.user_id for m in room_members], as_event=True
[m.user_id for m in room_members]
)

return states
return [
{
"type": EventTypes.Presence,
"content": format_user_presence_state(s, time_now),
}
for s in states
]

async def get_receipts():
receipts = await self.store.get_linearized_receipts_for_room(
Expand Down
102 changes: 74 additions & 28 deletions synapse/handlers/presence.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# -*- coding: utf-8 -*-
# Copyright 2014-2016 OpenMarket Ltd
# Copyright 2020 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand All @@ -21,10 +22,10 @@
- PresenceHandler._handle_timeouts
- should_notify
"""

import abc
import logging
from contextlib import contextmanager
from typing import Dict, List, Set
from typing import Dict, Iterable, List, Set

from six import iteritems, itervalues

Expand All @@ -41,7 +42,7 @@
from synapse.metrics import LaterGauge
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.storage.presence import UserPresenceState
from synapse.types import UserID, get_domain_from_id
from synapse.types import JsonDict, UserID, get_domain_from_id
from synapse.util.async_helpers import Linearizer
from synapse.util.caches.descriptors import cached
from synapse.util.metrics import Measure
Expand Down Expand Up @@ -99,7 +100,69 @@
assert LAST_ACTIVE_GRANULARITY < IDLE_TIMER


class PresenceHandler(object):
class AbstractPresenceHandler(abc.ABC):
"""The base interface for things implementing PresenceHandler"""

@abc.abstractmethod
async def user_syncing(
self, user_id: str, affect_presence: bool = True
) -> ContextManager[None]:
"""Returns a context manager that should surround any stream requests
from the user.

This allows us to keep track of who is currently streaming and who isn't
without having to have timers outside of this module to avoid flickering
when users disconnect/reconnect.

Args:
user_id (str)
affect_presence (bool): If false this function will be a no-op.
richvdh marked this conversation as resolved.
Show resolved Hide resolved
Useful for streams that are not associated with an actual
client that is being used by a user.
"""

@abc.abstractmethod
def get_currently_syncing_users(self) -> Set[str]:
"""Get the set of user ids that are currently syncing on this HS.
Returns:
set(str): A set of user_id strings.
"""

@abc.abstractmethod
async def current_state_for_users(
self, user_ids: Iterable[str]
) -> Dict[str, UserPresenceState]:
"""Get the current presence state for multiple users.

Returns:
dict: `user_id` -> `UserPresenceState`
"""

@abc.abstractmethod
async def get_state(self, target_user: UserID) -> UserPresenceState:
...

@abc.abstractmethod
async def get_states(
self, target_user_ids: Iterable[str]
) -> List[UserPresenceState]:
"""Get the presence state for users.

Args:
target_user_ids (list)

Returns:
list
"""

@abc.abstractmethod
async def set_state(
self, target_user: UserID, state: JsonDict, ignore_status_msg: bool = False
) -> None:
"""Set the presence state of the user. """


class PresenceHandler(AbstractPresenceHandler):
def __init__(self, hs: "synapse.server.HomeServer"):
self.hs = hs
self.is_mine_id = hs.is_mine_id
Expand Down Expand Up @@ -669,39 +732,22 @@ async def incoming_presence(self, origin, content):
federation_presence_counter.inc(len(updates))
await self._update_states(updates)

async def get_state(self, target_user, as_event=False):
results = await self.get_states([target_user.to_string()], as_event=as_event)

async def get_state(self, target_user: UserID) -> UserPresenceState:
results = await self.get_states([target_user.to_string()])
return results[0]

async def get_states(self, target_user_ids, as_event=False):
"""Get the presence state for users.

Args:
target_user_ids (list)
as_event (bool): Whether to format it as a client event or not.

Returns:
list
"""
async def get_states(
self, target_user_ids: Iterable[str]
) -> List[UserPresenceState]:
"""Get the presence state for users."""

updates = await self.current_state_for_users(target_user_ids)
updates = list(updates.values())

for user_id in set(target_user_ids) - {u.user_id for u in updates}:
updates.append(UserPresenceState.default(user_id))

now = self.clock.time_msec()
if as_event:
return [
{
"type": "m.presence",
"content": format_user_presence_state(state, now),
}
for state in updates
]
else:
return updates
return updates

async def set_state(self, target_user, state, ignore_status_msg=False):
"""Set the presence state of the user.
Expand Down
4 changes: 3 additions & 1 deletion synapse/server.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,9 @@ class HomeServer(object):
pass
def get_notifier(self) -> synapse.notifier.Notifier:
pass
def get_presence_handler(self) -> synapse.handlers.presence.PresenceHandler:
def get_presence_handler(
self,
) -> synapse.handlers.presence.AbstractPresenceHandler:
pass
def get_clock(self) -> synapse.util.Clock:
pass
Expand Down