Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Integrate presence from hotfixes #3694

Merged
merged 28 commits into from
Aug 17, 2018
Merged
Show file tree
Hide file tree
Changes from 25 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/3694.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Synapse's presence functionality can now be disabled with the "use_presence" configuration option.
6 changes: 4 additions & 2 deletions synapse/app/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ def listen_metrics(bind_addresses, port):
logger.info("Metrics now reporting on %s:%d", host, port)


def listen_tcp(bind_addresses, port, factory, backlog=50):
def listen_tcp(bind_addresses, port, factory, reactor=reactor, backlog=50):
"""
Create a TCP socket for a port and several addresses
"""
Expand All @@ -156,7 +156,9 @@ def listen_tcp(bind_addresses, port, factory, backlog=50):
check_bind_error(e, address, bind_addresses)


def listen_ssl(bind_addresses, port, factory, context_factory, backlog=50):
def listen_ssl(
bind_addresses, port, factory, context_factory, reactor=reactor, backlog=50
):
"""
Create an SSL socket for a port and several addresses
"""
Expand Down
39 changes: 38 additions & 1 deletion synapse/app/frontend_proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
from synapse.replication.slave.storage.devices import SlavedDeviceStore
from synapse.replication.slave.storage.registration import SlavedRegistrationStore
from synapse.replication.tcp.client import ReplicationClientHandler
from synapse.rest.client.v1.base import ClientV1RestServlet, client_path_patterns
from synapse.rest.client.v2_alpha._base import client_v2_patterns
from synapse.server import HomeServer
from synapse.storage.engines import create_engine
Expand All @@ -49,6 +50,35 @@
logger = logging.getLogger("synapse.app.frontend_proxy")


class PresenceStatusStubServlet(ClientV1RestServlet):
PATTERNS = client_path_patterns("/presence/(?P<user_id>[^/]*)/status")

def __init__(self, hs):
super(PresenceStatusStubServlet, self).__init__(hs)
self.http_client = hs.get_simple_http_client()
self.auth = hs.get_auth()
self.main_uri = hs.config.worker_main_http_uri

@defer.inlineCallbacks
def on_GET(self, request, user_id):
# Pass through the auth headers, if any, in case the access token
# is there.
auth_headers = request.requestHeaders.getRawHeaders("Authorization", [])
headers = {
"Authorization": auth_headers,
}
result = yield self.http_client.get_json(
self.main_uri + request.uri,
headers=headers,
)
defer.returnValue((200, result))

@defer.inlineCallbacks
def on_PUT(self, request, user_id):
yield self.auth.get_user_by_req(request)
defer.returnValue((200, {}))


class KeyUploadServlet(RestServlet):
PATTERNS = client_v2_patterns("/keys/upload(/(?P<device_id>[^/]+))?$")

Expand Down Expand Up @@ -135,6 +165,12 @@ def _listen_http(self, listener_config):
elif name == "client":
resource = JsonResource(self, canonical_json=False)
KeyUploadServlet(self).register(resource)

# If presence is disabled, use the stub servlet that does
# not allow sending presence
if not self.config.use_presence:
PresenceStatusStubServlet(self).register(resource)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might be worth mentioning in docs/workers.rst that you can point presence endpoints at frontend worker when presence is disabled?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

resources.update({
"/_matrix/client/r0": resource,
"/_matrix/client/unstable": resource,
Expand All @@ -153,7 +189,8 @@ def _listen_http(self, listener_config):
listener_config,
root_resource,
self.version_string,
)
),
reactor=self.get_reactor()
)

logger.info("Synapse client reader now listening on port %d", port)
Expand Down
16 changes: 11 additions & 5 deletions synapse/app/synchrotron.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,10 @@ def __init__(self, hs):
logger.info("Presence process_id is %r", self.process_id)

def send_user_sync(self, user_id, is_syncing, last_sync_ms):
self.hs.get_tcp_replication().send_user_sync(user_id, is_syncing, last_sync_ms)
if self.hs.config.use_presence:
self.hs.get_tcp_replication().send_user_sync(
user_id, is_syncing, last_sync_ms
)

def mark_as_coming_online(self, user_id):
"""A user has started syncing. Send a UserSync to the master, unless they
Expand Down Expand Up @@ -211,10 +214,13 @@ def process_replication_rows(self, token, rows):
yield self.notify_from_replication(states, stream_id)

def get_currently_syncing_users(self):
return [
user_id for user_id, count in iteritems(self.user_to_num_current_syncs)
if count > 0
]
if self.hs.config.use_presence:
return [
user_id for user_id, count in iteritems(self.user_to_num_current_syncs)
if count > 0
]
else:
return set()


class SynchrotronTyping(object):
Expand Down
6 changes: 6 additions & 0 deletions synapse/config/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@ def read_config(self, config):
# "disable" federation
self.send_federation = config.get("send_federation", True)

# Whether to enable user presence.
self.use_presence = config.get("use_presence", True)

# Whether to update the user directory or not. This should be set to
# false only if we are updating the user directory in a worker
self.update_user_directory = config.get("update_user_directory", True)
Expand Down Expand Up @@ -249,6 +252,9 @@ def default_config(self, server_name, **kwargs):
# hard limit.
soft_file_limit: 0

# Set to false to disable presence tracking on this homeserver.
use_presence: true

# The GC threshold parameters to pass to `gc.set_threshold`, if defined
# gc_thresholds: [700, 10, 10]

Expand Down
4 changes: 4 additions & 0 deletions synapse/federation/transaction_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ class TransactionQueue(object):
"""

def __init__(self, hs):
self.hs = hs
self.server_name = hs.hostname

self.store = hs.get_datastore()
Expand Down Expand Up @@ -308,6 +309,9 @@ def send_presence(self, states):
Args:
states (list(UserPresenceState))
"""
if not self.hs.config.use_presence:
# No-op if presence is disabled.
return

# First we queue up the new presence by user ID, so multiple presence
# updates in quick successtion are correctly handled
Expand Down
4 changes: 4 additions & 0 deletions synapse/handlers/initial_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -372,6 +372,10 @@ def _room_initial_sync_joined(self, user_id, room_id, pagin_config,

@defer.inlineCallbacks
def get_presence():
# If presence is disabled, return an empty list
if not self.hs.config.use_presence:
defer.returnValue([])

states = yield presence_handler.get_states(
[m.user_id for m in room_members],
as_event=True,
Expand Down
26 changes: 19 additions & 7 deletions synapse/handlers/presence.py
Original file line number Diff line number Diff line change
Expand Up @@ -395,6 +395,10 @@ def bump_presence_active_time(self, user):
"""We've seen the user do something that indicates they're interacting
with the app.
"""
# If presence is disabled, no-op
if not self.hs.config.use_presence:
return

user_id = user.to_string()

bump_active_time_counter.inc()
Expand Down Expand Up @@ -424,6 +428,11 @@ def user_syncing(self, user_id, affect_presence=True):
Useful for streams that are not associated with an actual
client that is being used by a user.
"""
# Override if it should affect the user's presence, if presence is
# disabled.
if not self.hs.config.use_presence:
affect_presence = False

if affect_presence:
curr_sync = self.user_to_num_current_syncs.get(user_id, 0)
self.user_to_num_current_syncs[user_id] = curr_sync + 1
Expand Down Expand Up @@ -469,13 +478,16 @@ def get_currently_syncing_users(self):
Returns:
set(str): A set of user_id strings.
"""
syncing_user_ids = {
user_id for user_id, count in self.user_to_num_current_syncs.items()
if count
}
for user_ids in self.external_process_to_current_syncs.values():
syncing_user_ids.update(user_ids)
return syncing_user_ids
if self.hs.config.use_presence:
syncing_user_ids = {
user_id for user_id, count in self.user_to_num_current_syncs.items()
if count
}
for user_ids in self.external_process_to_current_syncs.values():
syncing_user_ids.update(user_ids)
return syncing_user_ids
else:
return set()

@defer.inlineCallbacks
def update_external_syncs_row(self, process_id, user_id, is_syncing, sync_time_msec):
Expand Down
3 changes: 2 additions & 1 deletion synapse/handlers/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,7 @@ def __nonzero__(self):
class SyncHandler(object):

def __init__(self, hs):
self.hs = hs
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We try and avoid storing the hs object itself, to avoid the temptation to do hs.get_foo() in the code. Can this be self.config = hs.config please?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I made it hs_config (seemed clearer to me)

self.store = hs.get_datastore()
self.notifier = hs.get_notifier()
self.presence_handler = hs.get_presence_handler()
Expand Down Expand Up @@ -860,7 +861,7 @@ def generate_sync_result(self, sync_config, since_token=None, full_state=False):
since_token is None and
sync_config.filter_collection.blocks_all_presence()
)
if not block_all_presence_data:
if self.hs.config.use_presence and not block_all_presence_data:
yield self._generate_sync_entry_for_presence(
sync_result_builder, newly_joined_rooms, newly_joined_users
)
Expand Down
3 changes: 2 additions & 1 deletion synapse/rest/client/v1/presence.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,8 @@ def on_PUT(self, request, user_id):
except Exception:
raise SynapseError(400, "Unable to parse state")

yield self.presence_handler.set_state(user, state)
if self.hs.config.use_presence:
yield self.presence_handler.set_state(user, state)

defer.returnValue((200, {}))

Expand Down
Empty file added tests/app/__init__.py
Empty file.
88 changes: 88 additions & 0 deletions tests/app/test_frontend_proxy.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
# -*- coding: utf-8 -*-
# Copyright 2018 New Vector Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from synapse.app.frontend_proxy import FrontendProxyServer

from tests.unittest import HomeserverTestCase


class FrontendProxyTests(HomeserverTestCase):
def make_homeserver(self, reactor, clock):

hs = self.setup_test_homeserver(
http_client=None, homeserverToUse=FrontendProxyServer
)

return hs

def test_listen_http_with_presence_enabled(self):
"""
When presence is on, the stub servlet will not register.
"""
# Presence is on
self.hs.config.use_presence = True

config = {
"port": 8080,
"bind_addresses": ["0.0.0.0"],
"resources": [{"names": ["client"]}],
}

# Listen with the config
self.hs._listen_http(config)

# Grab the resource from the site that was told to listen
self.assertEqual(len(self.reactor.tcpServers), 1)
site = self.reactor.tcpServers[0][1]
self.resource = (
site.resource.children["_matrix"].children["client"].children["r0"]
)

request, channel = self.make_request("PUT", "presence/a/status")
self.render(request)

# 400 + unrecognised, because nothing is registered
self.assertEqual(channel.code, 400)
self.assertEqual(channel.json_body["errcode"], "M_UNRECOGNIZED")

def test_listen_http_with_presence_disabled(self):
"""
When presence is on, the stub servlet will register.
"""
# Presence is off
self.hs.config.use_presence = False

config = {
"port": 8080,
"bind_addresses": ["0.0.0.0"],
"resources": [{"names": ["client"]}],
}

# Listen with the config
self.hs._listen_http(config)

# Grab the resource from the site that was told to listen
self.assertEqual(len(self.reactor.tcpServers), 1)
site = self.reactor.tcpServers[0][1]
self.resource = (
site.resource.children["_matrix"].children["client"].children["r0"]
)

request, channel = self.make_request("PUT", "presence/a/status")
self.render(request)

# 401, because the stub servlet still checks authentication
self.assertEqual(channel.code, 401)
self.assertEqual(channel.json_body["errcode"], "M_MISSING_TOKEN")
Loading