diff --git a/changelog.d/9465.bugfix b/changelog.d/9465.bugfix new file mode 100644 index 000000000000..2ab4f315c11f --- /dev/null +++ b/changelog.d/9465.bugfix @@ -0,0 +1 @@ +Fix deleting pushers when using sharded pushers. diff --git a/docs/tcp_replication.md b/docs/tcp_replication.md index ad145439b4f8..15df949debc8 100644 --- a/docs/tcp_replication.md +++ b/docs/tcp_replication.md @@ -220,10 +220,6 @@ Asks the server for the current position of all streams. Acknowledge receipt of some federation data -#### REMOVE_PUSHER (C) - - Inform the server a pusher should be removed - ### REMOTE_SERVER_UP (S, C) Inform other processes that a remote server may have come back online. diff --git a/synapse/app/generic_worker.py b/synapse/app/generic_worker.py index 6526acb2f285..6ed405e66b1f 100644 --- a/synapse/app/generic_worker.py +++ b/synapse/app/generic_worker.py @@ -645,9 +645,6 @@ def start_listening(self, listeners: Iterable[ListenerConfig]): self.get_tcp_replication().start_replication(self) - async def remove_pusher(self, app_id, push_key, user_id): - self.get_tcp_replication().send_remove_pusher(app_id, push_key, user_id) - @cache_in_self def get_replication_data_handler(self): return GenericWorkerReplicationHandler(self) diff --git a/synapse/push/httppusher.py b/synapse/push/httppusher.py index b9d3da2e0a56..f4d7e199e980 100644 --- a/synapse/push/httppusher.py +++ b/synapse/push/httppusher.py @@ -74,6 +74,7 @@ def __init__(self, hs: "HomeServer", pusher_config: PusherConfig): self.timed_call = None self._is_processing = False self._group_unread_count_by_room = hs.config.push_group_unread_count_by_room + self._pusherpool = hs.get_pusherpool() self.data = pusher_config.data if self.data is None: @@ -299,7 +300,7 @@ async def _process_one(self, push_action: dict) -> bool: ) else: logger.info("Pushkey %s was rejected: removing", pk) - await self.hs.remove_pusher(self.app_id, pk, self.user_id) + await self._pusherpool.remove_pusher(self.app_id, pk, self.user_id) return True async def _build_notification_dict( diff --git a/synapse/push/pusherpool.py b/synapse/push/pusherpool.py index ae1145be0e42..3936bf878479 100644 --- a/synapse/push/pusherpool.py +++ b/synapse/push/pusherpool.py @@ -25,6 +25,7 @@ ) from synapse.push import Pusher, PusherConfig, PusherConfigException from synapse.push.pusher import PusherFactory +from synapse.replication.http.push import ReplicationRemovePusherRestServlet from synapse.types import JsonDict, RoomStreamToken from synapse.util.async_helpers import concurrently_execute @@ -68,6 +69,13 @@ def __init__(self, hs: "HomeServer"): self._pusher_shard_config = hs.config.push.pusher_shard_config self._instance_name = hs.get_instance_name() + # We can only delete pushers on master. + self._remove_pusher_client = None + if hs.config.worker.worker_app: + self._remove_pusher_client = ReplicationRemovePusherRestServlet.make_client( + hs + ) + # Record the last stream ID that we were poked about so we can get # changes since then. We set this to the current max stream ID on # startup as every individual pusher will have checked for changes on @@ -175,9 +183,6 @@ async def remove_pushers_by_access_token( user_id: user to remove pushers for access_tokens: access token *ids* to remove pushers for """ - if not self._pusher_shard_config.should_handle(self._instance_name, user_id): - return - tokens = set(access_tokens) for p in await self.store.get_pushers_by_user_id(user_id): if p.access_token in tokens: @@ -380,6 +385,12 @@ async def remove_pusher(self, app_id: str, pushkey: str, user_id: str) -> None: synapse_pushers.labels(type(pusher).__name__, pusher.app_id).dec() - await self.store.delete_pusher_by_app_id_pushkey_user_id( - app_id, pushkey, user_id - ) + # We can only delete pushers on master. + if self._remove_pusher_client: + await self._remove_pusher_client( + app_id=app_id, pushkey=pushkey, user_id=user_id + ) + else: + await self.store.delete_pusher_by_app_id_pushkey_user_id( + app_id, pushkey, user_id + ) diff --git a/synapse/replication/http/__init__.py b/synapse/replication/http/__init__.py index dd527e807f62..cb4a52dbe9b4 100644 --- a/synapse/replication/http/__init__.py +++ b/synapse/replication/http/__init__.py @@ -21,6 +21,7 @@ login, membership, presence, + push, register, send_event, streams, @@ -42,6 +43,7 @@ def register_servlets(self, hs): membership.register_servlets(hs, self) streams.register_servlets(hs, self) account_data.register_servlets(hs, self) + push.register_servlets(hs, self) # The following can't currently be instantiated on workers. if hs.config.worker.worker_app is None: diff --git a/synapse/replication/http/push.py b/synapse/replication/http/push.py new file mode 100644 index 000000000000..054ed64d34dd --- /dev/null +++ b/synapse/replication/http/push.py @@ -0,0 +1,72 @@ +# -*- coding: utf-8 -*- +# Copyright 2021 The Matrix.org Foundation C.I.C. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +from typing import TYPE_CHECKING + +from synapse.http.servlet import parse_json_object_from_request +from synapse.replication.http._base import ReplicationEndpoint + +if TYPE_CHECKING: + from synapse.server import HomeServer + +logger = logging.getLogger(__name__) + + +class ReplicationRemovePusherRestServlet(ReplicationEndpoint): + """Deletes the given pusher. + + Request format: + + POST /_synapse/replication/remove_pusher/:user_id + + { + "app_id": "", + "pushkey": "" + } + + """ + + NAME = "add_user_account_data" + PATH_ARGS = ("user_id",) + CACHE = False + + def __init__(self, hs: "HomeServer"): + super().__init__(hs) + + self.pusher_pool = hs.get_pusherpool() + + @staticmethod + async def _serialize_payload(app_id, pushkey, user_id): + payload = { + "app_id": app_id, + "pushkey": pushkey, + } + + return payload + + async def _handle_request(self, request, user_id): + content = parse_json_object_from_request(request) + + app_id = content["app_id"] + pushkey = content["pushkey"] + + await self.pusher_pool.remove_pusher(app_id, pushkey, user_id) + + return 200, {} + + +def register_servlets(hs, http_server): + ReplicationRemovePusherRestServlet(hs).register(http_server) diff --git a/synapse/replication/tcp/commands.py b/synapse/replication/tcp/commands.py index 0a9da79c32a3..bb447f75b4d2 100644 --- a/synapse/replication/tcp/commands.py +++ b/synapse/replication/tcp/commands.py @@ -325,31 +325,6 @@ def to_line(self): return "%s %s" % (self.instance_name, self.token) -class RemovePusherCommand(Command): - """Sent by the client to request the master remove the given pusher. - - Format:: - - REMOVE_PUSHER - """ - - NAME = "REMOVE_PUSHER" - - def __init__(self, app_id, push_key, user_id): - self.user_id = user_id - self.app_id = app_id - self.push_key = push_key - - @classmethod - def from_line(cls, line): - app_id, push_key, user_id = line.split(" ", 2) - - return cls(app_id, push_key, user_id) - - def to_line(self): - return " ".join((self.app_id, self.push_key, self.user_id)) - - class UserIpCommand(Command): """Sent periodically when a worker sees activity from a client. @@ -416,7 +391,6 @@ class RemoteServerUpCommand(_SimpleCommand): ReplicateCommand, UserSyncCommand, FederationAckCommand, - RemovePusherCommand, UserIpCommand, RemoteServerUpCommand, ClearUserSyncsCommand, @@ -443,7 +417,6 @@ class RemoteServerUpCommand(_SimpleCommand): UserSyncCommand.NAME, ClearUserSyncsCommand.NAME, FederationAckCommand.NAME, - RemovePusherCommand.NAME, UserIpCommand.NAME, ErrorCommand.NAME, RemoteServerUpCommand.NAME, diff --git a/synapse/replication/tcp/handler.py b/synapse/replication/tcp/handler.py index d1d00c371769..a7245da15232 100644 --- a/synapse/replication/tcp/handler.py +++ b/synapse/replication/tcp/handler.py @@ -44,7 +44,6 @@ PositionCommand, RdataCommand, RemoteServerUpCommand, - RemovePusherCommand, ReplicateCommand, UserIpCommand, UserSyncCommand, @@ -373,23 +372,6 @@ def on_FEDERATION_ACK(self, conn: AbstractConnection, cmd: FederationAckCommand) if self._federation_sender: self._federation_sender.federation_ack(cmd.instance_name, cmd.token) - def on_REMOVE_PUSHER( - self, conn: AbstractConnection, cmd: RemovePusherCommand - ) -> Optional[Awaitable[None]]: - remove_pusher_counter.inc() - - if self._is_master: - return self._handle_remove_pusher(cmd) - else: - return None - - async def _handle_remove_pusher(self, cmd: RemovePusherCommand): - await self._store.delete_pusher_by_app_id_pushkey_user_id( - app_id=cmd.app_id, pushkey=cmd.push_key, user_id=cmd.user_id - ) - - self._notifier.on_new_replication_data() - def on_USER_IP( self, conn: AbstractConnection, cmd: UserIpCommand ) -> Optional[Awaitable[None]]: @@ -684,11 +666,6 @@ def send_user_sync( UserSyncCommand(instance_id, user_id, is_syncing, last_sync_ms) ) - def send_remove_pusher(self, app_id: str, push_key: str, user_id: str): - """Poke the master to remove a pusher for a user""" - cmd = RemovePusherCommand(app_id, push_key, user_id) - self.send_command(cmd) - def send_user_ip( self, user_id: str, diff --git a/synapse/server.py b/synapse/server.py index 6b3892e3cdff..5de87820002b 100644 --- a/synapse/server.py +++ b/synapse/server.py @@ -758,9 +758,6 @@ def get_outbound_redis_connection(self) -> Optional["RedisProtocol"]: reconnect=True, ) - async def remove_pusher(self, app_id: str, push_key: str, user_id: str): - return await self.get_pusherpool().remove_pusher(app_id, push_key, user_id) - def should_send_federation(self) -> bool: "Should this server be sending federation traffic directly?" return self.config.send_federation and (