Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Integrate presence from hotfixes (#3694)
Browse files Browse the repository at this point in the history
  • Loading branch information
hawkowl committed Aug 17, 2018
1 parent 04f5d2d commit c334ca6
Show file tree
Hide file tree
Showing 17 changed files with 303 additions and 67 deletions.
1 change: 1 addition & 0 deletions changelog.d/3694.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Synapse's presence functionality can now be disabled with the "use_presence" configuration option.
8 changes: 8 additions & 0 deletions docs/workers.rst
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,14 @@ regular expressions::

^/_matrix/client/(api/v1|r0|unstable)/keys/upload

If ``use_presence`` is False in the homeserver config, it can also handle REST
endpoints matching the following regular expressions::

^/_matrix/client/(api/v1|r0|unstable)/presence/[^/]+/status

This "stub" presence handler will pass through ``GET`` request but make the
``PUT`` effectively a no-op.

It will proxy any requests it cannot handle to the main synapse instance. It
must therefore be configured with the location of the main instance, via
the ``worker_main_http_uri`` setting in the frontend_proxy worker configuration
Expand Down
6 changes: 4 additions & 2 deletions synapse/app/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ def listen_metrics(bind_addresses, port):
logger.info("Metrics now reporting on %s:%d", host, port)


def listen_tcp(bind_addresses, port, factory, backlog=50):
def listen_tcp(bind_addresses, port, factory, reactor=reactor, backlog=50):
"""
Create a TCP socket for a port and several addresses
"""
Expand All @@ -156,7 +156,9 @@ def listen_tcp(bind_addresses, port, factory, backlog=50):
check_bind_error(e, address, bind_addresses)


def listen_ssl(bind_addresses, port, factory, context_factory, backlog=50):
def listen_ssl(
bind_addresses, port, factory, context_factory, reactor=reactor, backlog=50
):
"""
Create an SSL socket for a port and several addresses
"""
Expand Down
39 changes: 38 additions & 1 deletion synapse/app/frontend_proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
from synapse.replication.slave.storage.devices import SlavedDeviceStore
from synapse.replication.slave.storage.registration import SlavedRegistrationStore
from synapse.replication.tcp.client import ReplicationClientHandler
from synapse.rest.client.v1.base import ClientV1RestServlet, client_path_patterns
from synapse.rest.client.v2_alpha._base import client_v2_patterns
from synapse.server import HomeServer
from synapse.storage.engines import create_engine
Expand All @@ -49,6 +50,35 @@
logger = logging.getLogger("synapse.app.frontend_proxy")


class PresenceStatusStubServlet(ClientV1RestServlet):
PATTERNS = client_path_patterns("/presence/(?P<user_id>[^/]*)/status")

def __init__(self, hs):
super(PresenceStatusStubServlet, self).__init__(hs)
self.http_client = hs.get_simple_http_client()
self.auth = hs.get_auth()
self.main_uri = hs.config.worker_main_http_uri

@defer.inlineCallbacks
def on_GET(self, request, user_id):
# Pass through the auth headers, if any, in case the access token
# is there.
auth_headers = request.requestHeaders.getRawHeaders("Authorization", [])
headers = {
"Authorization": auth_headers,
}
result = yield self.http_client.get_json(
self.main_uri + request.uri,
headers=headers,
)
defer.returnValue((200, result))

@defer.inlineCallbacks
def on_PUT(self, request, user_id):
yield self.auth.get_user_by_req(request)
defer.returnValue((200, {}))


class KeyUploadServlet(RestServlet):
PATTERNS = client_v2_patterns("/keys/upload(/(?P<device_id>[^/]+))?$")

Expand Down Expand Up @@ -135,6 +165,12 @@ def _listen_http(self, listener_config):
elif name == "client":
resource = JsonResource(self, canonical_json=False)
KeyUploadServlet(self).register(resource)

# If presence is disabled, use the stub servlet that does
# not allow sending presence
if not self.config.use_presence:
PresenceStatusStubServlet(self).register(resource)

resources.update({
"/_matrix/client/r0": resource,
"/_matrix/client/unstable": resource,
Expand All @@ -153,7 +189,8 @@ def _listen_http(self, listener_config):
listener_config,
root_resource,
self.version_string,
)
),
reactor=self.get_reactor()
)

logger.info("Synapse client reader now listening on port %d", port)
Expand Down
16 changes: 11 additions & 5 deletions synapse/app/synchrotron.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,10 @@ def __init__(self, hs):
logger.info("Presence process_id is %r", self.process_id)

def send_user_sync(self, user_id, is_syncing, last_sync_ms):
self.hs.get_tcp_replication().send_user_sync(user_id, is_syncing, last_sync_ms)
if self.hs.config.use_presence:
self.hs.get_tcp_replication().send_user_sync(
user_id, is_syncing, last_sync_ms
)

def mark_as_coming_online(self, user_id):
"""A user has started syncing. Send a UserSync to the master, unless they
Expand Down Expand Up @@ -211,10 +214,13 @@ def process_replication_rows(self, token, rows):
yield self.notify_from_replication(states, stream_id)

def get_currently_syncing_users(self):
return [
user_id for user_id, count in iteritems(self.user_to_num_current_syncs)
if count > 0
]
if self.hs.config.use_presence:
return [
user_id for user_id, count in iteritems(self.user_to_num_current_syncs)
if count > 0
]
else:
return set()


class SynchrotronTyping(object):
Expand Down
6 changes: 6 additions & 0 deletions synapse/config/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@ def read_config(self, config):
# "disable" federation
self.send_federation = config.get("send_federation", True)

# Whether to enable user presence.
self.use_presence = config.get("use_presence", True)

# Whether to update the user directory or not. This should be set to
# false only if we are updating the user directory in a worker
self.update_user_directory = config.get("update_user_directory", True)
Expand Down Expand Up @@ -250,6 +253,9 @@ def default_config(self, server_name, **kwargs):
# hard limit.
soft_file_limit: 0
# Set to false to disable presence tracking on this homeserver.
use_presence: true
# The GC threshold parameters to pass to `gc.set_threshold`, if defined
# gc_thresholds: [700, 10, 10]
Expand Down
4 changes: 4 additions & 0 deletions synapse/federation/transaction_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ class TransactionQueue(object):
"""

def __init__(self, hs):
self.hs = hs
self.server_name = hs.hostname

self.store = hs.get_datastore()
Expand Down Expand Up @@ -308,6 +309,9 @@ def send_presence(self, states):
Args:
states (list(UserPresenceState))
"""
if not self.hs.config.use_presence:
# No-op if presence is disabled.
return

# First we queue up the new presence by user ID, so multiple presence
# updates in quick successtion are correctly handled
Expand Down
4 changes: 4 additions & 0 deletions synapse/handlers/initial_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -372,6 +372,10 @@ def _room_initial_sync_joined(self, user_id, room_id, pagin_config,

@defer.inlineCallbacks
def get_presence():
# If presence is disabled, return an empty list
if not self.hs.config.use_presence:
defer.returnValue([])

states = yield presence_handler.get_states(
[m.user_id for m in room_members],
as_event=True,
Expand Down
26 changes: 19 additions & 7 deletions synapse/handlers/presence.py
Original file line number Diff line number Diff line change
Expand Up @@ -395,6 +395,10 @@ def bump_presence_active_time(self, user):
"""We've seen the user do something that indicates they're interacting
with the app.
"""
# If presence is disabled, no-op
if not self.hs.config.use_presence:
return

user_id = user.to_string()

bump_active_time_counter.inc()
Expand Down Expand Up @@ -424,6 +428,11 @@ def user_syncing(self, user_id, affect_presence=True):
Useful for streams that are not associated with an actual
client that is being used by a user.
"""
# Override if it should affect the user's presence, if presence is
# disabled.
if not self.hs.config.use_presence:
affect_presence = False

if affect_presence:
curr_sync = self.user_to_num_current_syncs.get(user_id, 0)
self.user_to_num_current_syncs[user_id] = curr_sync + 1
Expand Down Expand Up @@ -469,13 +478,16 @@ def get_currently_syncing_users(self):
Returns:
set(str): A set of user_id strings.
"""
syncing_user_ids = {
user_id for user_id, count in self.user_to_num_current_syncs.items()
if count
}
for user_ids in self.external_process_to_current_syncs.values():
syncing_user_ids.update(user_ids)
return syncing_user_ids
if self.hs.config.use_presence:
syncing_user_ids = {
user_id for user_id, count in self.user_to_num_current_syncs.items()
if count
}
for user_ids in self.external_process_to_current_syncs.values():
syncing_user_ids.update(user_ids)
return syncing_user_ids
else:
return set()

@defer.inlineCallbacks
def update_external_syncs_row(self, process_id, user_id, is_syncing, sync_time_msec):
Expand Down
3 changes: 2 additions & 1 deletion synapse/handlers/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,7 @@ def __nonzero__(self):
class SyncHandler(object):

def __init__(self, hs):
self.hs_config = hs.config
self.store = hs.get_datastore()
self.notifier = hs.get_notifier()
self.presence_handler = hs.get_presence_handler()
Expand Down Expand Up @@ -860,7 +861,7 @@ def generate_sync_result(self, sync_config, since_token=None, full_state=False):
since_token is None and
sync_config.filter_collection.blocks_all_presence()
)
if not block_all_presence_data:
if self.hs_config.use_presence and not block_all_presence_data:
yield self._generate_sync_entry_for_presence(
sync_result_builder, newly_joined_rooms, newly_joined_users
)
Expand Down
3 changes: 2 additions & 1 deletion synapse/rest/client/v1/presence.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,8 @@ def on_PUT(self, request, user_id):
except Exception:
raise SynapseError(400, "Unable to parse state")

yield self.presence_handler.set_state(user, state)
if self.hs.config.use_presence:
yield self.presence_handler.set_state(user, state)

defer.returnValue((200, {}))

Expand Down
Empty file added tests/app/__init__.py
Empty file.
88 changes: 88 additions & 0 deletions tests/app/test_frontend_proxy.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
# -*- coding: utf-8 -*-
# Copyright 2018 New Vector Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from synapse.app.frontend_proxy import FrontendProxyServer

from tests.unittest import HomeserverTestCase


class FrontendProxyTests(HomeserverTestCase):
def make_homeserver(self, reactor, clock):

hs = self.setup_test_homeserver(
http_client=None, homeserverToUse=FrontendProxyServer
)

return hs

def test_listen_http_with_presence_enabled(self):
"""
When presence is on, the stub servlet will not register.
"""
# Presence is on
self.hs.config.use_presence = True

config = {
"port": 8080,
"bind_addresses": ["0.0.0.0"],
"resources": [{"names": ["client"]}],
}

# Listen with the config
self.hs._listen_http(config)

# Grab the resource from the site that was told to listen
self.assertEqual(len(self.reactor.tcpServers), 1)
site = self.reactor.tcpServers[0][1]
self.resource = (
site.resource.children["_matrix"].children["client"].children["r0"]
)

request, channel = self.make_request("PUT", "presence/a/status")
self.render(request)

# 400 + unrecognised, because nothing is registered
self.assertEqual(channel.code, 400)
self.assertEqual(channel.json_body["errcode"], "M_UNRECOGNIZED")

def test_listen_http_with_presence_disabled(self):
"""
When presence is on, the stub servlet will register.
"""
# Presence is off
self.hs.config.use_presence = False

config = {
"port": 8080,
"bind_addresses": ["0.0.0.0"],
"resources": [{"names": ["client"]}],
}

# Listen with the config
self.hs._listen_http(config)

# Grab the resource from the site that was told to listen
self.assertEqual(len(self.reactor.tcpServers), 1)
site = self.reactor.tcpServers[0][1]
self.resource = (
site.resource.children["_matrix"].children["client"].children["r0"]
)

request, channel = self.make_request("PUT", "presence/a/status")
self.render(request)

# 401, because the stub servlet still checks authentication
self.assertEqual(channel.code, 401)
self.assertEqual(channel.json_body["errcode"], "M_MISSING_TOKEN")
Loading

0 comments on commit c334ca6

Please sign in to comment.