Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Remove configuration options for direct TCP replication. #13647

Merged
merged 13 commits into from
Sep 6, 2022
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 0 additions & 6 deletions .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,6 @@ jobs:
POSTGRES: ${{ matrix.postgres && 1}}
MULTI_POSTGRES: ${{ (matrix.postgres == 'multi-postgres') && 1}}
WORKERS: ${{ matrix.workers && 1 }}
REDIS: ${{ matrix.redis && 1 }}
BLACKLIST: ${{ matrix.workers && 'synapse-blacklist-with-workers' }}
TOP: ${{ github.workspace }}

Expand All @@ -233,11 +232,6 @@ jobs:
postgres: multi-postgres
workers: workers

- sytest-tag: buster
postgres: postgres
workers: workers
redis: redis

steps:
- uses: actions/checkout@v2
- name: Prepare test blacklist
Expand Down
1 change: 1 addition & 0 deletions changelog.d/13647.removal
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Remove the ability to use direct TCP replication with workers. Direct TCP replication was deprecated in Synapse v1.18.0. Workers now require using Redis.
17 changes: 17 additions & 0 deletions docs/upgrade.md
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,23 @@ process, for example:
dpkg -i matrix-synapse-py3_1.3.0+stretch1_amd64.deb
```

# Upgrading to v1.67.0

## Direct TCP replication is no longer supported: migrate to Redis

Redis support was added in v1.13.0 with it becoming the recommended method in
v1.18.0. It replaced the old direct TCP connections (which was deprecated as of
v1.18.0) to the main process. With Redis, rather than all the workers connecting
to the main process, all the workers and the main process connect to Redis,
which relays replication commands between processes. This can give a significant
CPU saving on the main process and is a prerequisite for upcoming
performance improvements.

To migrate to Redis add the [`redis` config](./workers.md#shared-configuration),
and remove the TCP `replication` listener from config of the master and
`worker_replication_port` from worker config. Note that a HTTP listener with a
`replication` resource is still required.

# Upgrading to v1.66.0

## Delegation of email validation no longer supported
Expand Down
2 changes: 0 additions & 2 deletions docs/usage/configuration/config_documentation.md
Original file line number Diff line number Diff line change
Expand Up @@ -431,8 +431,6 @@ Sub-options for each listener include:

* `metrics`: (see the docs [here](../../metrics-howto.md)),

* `replication`: (deprecated as of Synapse 1.18, see the docs [here](../../workers.md)).

* `tls`: set to true to enable TLS for this listener. Will use the TLS key/cert specified in tls_private_key_path / tls_certificate_path.

* `x_forwarded`: Only valid for an 'http' listener. Set to true to use the X-Forwarded-For header as the client IP. Useful when Synapse is
Expand Down
22 changes: 5 additions & 17 deletions docs/workers.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,8 @@ stream between all configured Synapse processes. Additionally, processes may
make HTTP requests to each other, primarily for operations which need to wait
for a reply ─ such as sending an event.

Redis support was added in v1.13.0 with it becoming the recommended method in
v1.18.0. It replaced the old direct TCP connections (which is deprecated as of
v1.18.0) to the main process. With Redis, rather than all the workers connecting
to the main process, all the workers and the main process connect to Redis,
which relays replication commands between processes. This can give a significant
cpu saving on the main process and will be a prerequisite for upcoming
performance improvements.
All the workers and the main process connect to Redis, which relays replication
commands between processes.

If Redis support is enabled Synapse will use it as a shared cache, as well as a
pub/sub mechanism.
Expand Down Expand Up @@ -330,7 +325,6 @@ effects of bursts of events from that bridge on events sent by normal users.

Additionally, the writing of specific streams (such as events) can be moved off
of the main process to a particular worker.
(This is only supported with Redis-based replication.)

To enable this, the worker must have a HTTP replication listener configured,
have a `worker_name` and be listed in the `instance_map` config. The same worker
Expand Down Expand Up @@ -600,15 +594,9 @@ equivalent to `synapse.app.generic_worker`:

## Migration from old config

There are two main independent changes that have been made: introducing Redis
support and merging apps into `synapse.app.generic_worker`. Both these changes
are backwards compatible and so no changes to the config are required, however
server admins are encouraged to plan to migrate to Redis as the old style direct
TCP replication config is deprecated.

To migrate to Redis add the `redis` config as above, and optionally remove the
TCP `replication` listener from master and `worker_replication_port` from worker
config.
A main change that has occurred is the merging of worker apps into
`synapse.app.generic_worker`. This change is backwards compatible and so no
changes to the config are required.

To migrate apps to use `synapse.app.generic_worker` simply update the
`worker_app` option in the worker configs, and where worker are started (e.g.
Expand Down
11 changes: 0 additions & 11 deletions synapse/app/homeserver.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@
from synapse.logging.context import LoggingContext
from synapse.metrics import METRICS_PREFIX, MetricsResource, RegistryProxy
from synapse.replication.http import REPLICATION_PREFIX, ReplicationRestResource
from synapse.replication.tcp.resource import ReplicationStreamProtocolFactory
from synapse.rest import ClientRestResource
from synapse.rest.admin import AdminRestResource
from synapse.rest.health import HealthResource
Expand Down Expand Up @@ -290,16 +289,6 @@ def start_listening(self) -> None:
manhole_settings=self.config.server.manhole_settings,
manhole_globals={"hs": self},
)
elif listener.type == "replication":
services = listen_tcp(
listener.bind_addresses,
listener.port,
ReplicationStreamProtocolFactory(self),
reivilibre marked this conversation as resolved.
Show resolved Hide resolved
)
for s in services:
self.get_reactor().addSystemEventTrigger(
"before", "shutdown", s.stopListening
)
elif listener.type == "metrics":
if not self.config.metrics.enable_metrics:
logger.warning(
Expand Down
16 changes: 13 additions & 3 deletions synapse/config/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,12 @@

logger = logging.Logger(__name__)

DIRECT_TCP_ERROR = """
Using direct TCP replication for workers is no longer supported.

Please see https://matrix-org.github.io/synapse/latest/upgrade.html#direct-tcp-replication-is-no-longer-supported-migrate-to-redis
"""

# by default, we attempt to listen on both '::' *and* '0.0.0.0' because some OSes
# (Windows, macOS, other BSD/Linux where net.ipv6.bindv6only is set) will only listen
# on IPv6 when '::' is set.
Expand Down Expand Up @@ -165,7 +171,6 @@ def generate_ip_set(
"http",
"metrics",
"manhole",
"replication",
clokep marked this conversation as resolved.
Show resolved Hide resolved
}

KNOWN_RESOURCES = {
Expand Down Expand Up @@ -515,7 +520,9 @@ def read_config(self, config: JsonDict, **kwargs: Any) -> None:
):
raise ConfigError("allowed_avatar_mimetypes must be a list")

self.listeners = [parse_listener_def(x) for x in config.get("listeners", [])]
self.listeners = [
parse_listener_def(i, x) for i, x in enumerate(config.get("listeners", []))
]

# no_tls is not really supported any more, but let's grandfather it in
# here.
Expand Down Expand Up @@ -880,9 +887,12 @@ def read_gc_thresholds(
)


def parse_listener_def(listener: Any) -> ListenerConfig:
def parse_listener_def(num: int, listener: Any) -> ListenerConfig:
"""parse a listener config from the config file"""
listener_type = listener["type"]
# Raise a helpful error if direct TCP replication is still configured.
if listener_type == "replication":
raise ConfigError(DIRECT_TCP_ERROR, ("listeners", str(num), "type"))

port = listener.get("port")
if not isinstance(port, int):
Expand Down
8 changes: 5 additions & 3 deletions synapse/config/workers.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
RoutableShardedWorkerHandlingConfig,
ShardedWorkerHandlingConfig,
)
from .server import ListenerConfig, parse_listener_def
from .server import DIRECT_TCP_ERROR, ListenerConfig, parse_listener_def

_FEDERATION_SENDER_WITH_SEND_FEDERATION_ENABLED_ERROR = """
The send_federation config option must be disabled in the main
Expand Down Expand Up @@ -128,7 +128,8 @@ def read_config(self, config: JsonDict, **kwargs: Any) -> None:
self.worker_app = None

self.worker_listeners = [
parse_listener_def(x) for x in config.get("worker_listeners", [])
parse_listener_def(i, x)
for i, x in enumerate(config.get("worker_listeners", []))
]
self.worker_daemonize = bool(config.get("worker_daemonize"))
self.worker_pid_file = config.get("worker_pid_file")
Expand All @@ -142,7 +143,8 @@ def read_config(self, config: JsonDict, **kwargs: Any) -> None:
self.worker_replication_host = config.get("worker_replication_host", None)

# The port on the main synapse for TCP replication
self.worker_replication_port = config.get("worker_replication_port", None)
if "worker_replication_port" in config:
raise ConfigError(DIRECT_TCP_ERROR, ("worker_replication_port",))

# The port on the main synapse for HTTP replication endpoint
self.worker_replication_http_port = config.get("worker_replication_http_port")
Expand Down
58 changes: 21 additions & 37 deletions synapse/replication/tcp/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@

from synapse.metrics import LaterGauge
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.replication.tcp.client import DirectTcpReplicationClientFactory
from synapse.replication.tcp.commands import (
ClearUserSyncsCommand,
Command,
Expand Down Expand Up @@ -332,46 +331,31 @@ async def _process_command(

def start_replication(self, hs: "HomeServer") -> None:
"""Helper method to start replication."""
if hs.config.redis.redis_enabled:
from synapse.replication.tcp.redis import (
RedisDirectTcpReplicationClientFactory,
)
from synapse.replication.tcp.redis import RedisDirectTcpReplicationClientFactory

# First let's ensure that we have a ReplicationStreamer started.
hs.get_replication_streamer()
# First let's ensure that we have a ReplicationStreamer started.
hs.get_replication_streamer()

# We need two connections to redis, one for the subscription stream and
# one to send commands to (as you can't send further redis commands to a
# connection after SUBSCRIBE is called).
# We need two connections to redis, one for the subscription stream and
# one to send commands to (as you can't send further redis commands to a
# connection after SUBSCRIBE is called).

# First create the connection for sending commands.
outbound_redis_connection = hs.get_outbound_redis_connection()
# First create the connection for sending commands.
outbound_redis_connection = hs.get_outbound_redis_connection()

# Now create the factory/connection for the subscription stream.
self._factory = RedisDirectTcpReplicationClientFactory(
hs,
outbound_redis_connection,
channel_names=self._channels_to_subscribe_to,
)
hs.get_reactor().connectTCP(
hs.config.redis.redis_host,
hs.config.redis.redis_port,
self._factory,
timeout=30,
bindAddress=None,
)
else:
client_name = hs.get_instance_name()
self._factory = DirectTcpReplicationClientFactory(hs, client_name, self)
host = hs.config.worker.worker_replication_host
port = hs.config.worker.worker_replication_port
hs.get_reactor().connectTCP(
host,
port,
self._factory,
timeout=30,
bindAddress=None,
)
# Now create the factory/connection for the subscription stream.
self._factory = RedisDirectTcpReplicationClientFactory(
hs,
outbound_redis_connection,
channel_names=self._channels_to_subscribe_to,
)
hs.get_reactor().connectTCP(
hs.config.redis.redis_host,
hs.config.redis.redis_port,
self._factory,
timeout=30,
bindAddress=None,
)

def get_streams(self) -> Dict[str, Stream]:
"""Get a map from stream name to all streams."""
Expand Down
4 changes: 2 additions & 2 deletions tests/app/test_openid_listener.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ def test_openid_listener(self, names, expectation):
}

# Listen with the config
self.hs._listen_http(parse_listener_def(config))
self.hs._listen_http(parse_listener_def(0, config))

# Grab the resource from the site that was told to listen
site = self.reactor.tcpServers[0][1]
Expand Down Expand Up @@ -109,7 +109,7 @@ def test_openid_listener(self, names, expectation):
}

# Listen with the config
self.hs._listener_http(self.hs.config, parse_listener_def(config))
self.hs._listener_http(self.hs.config, parse_listener_def(0, config))

# Grab the resource from the site that was told to listen
site = self.reactor.tcpServers[0][1]
Expand Down
2 changes: 1 addition & 1 deletion tests/test_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ def _make_request(self, method: bytes, path: bytes) -> FakeChannel:
site = SynapseSite(
"test",
"site_tag",
parse_listener_def({"type": "http", "port": 0}),
parse_listener_def(0, {"type": "http", "port": 0}),
self.resource,
"1.0",
max_request_body_size=4096,
Expand Down
1 change: 0 additions & 1 deletion tests/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,6 @@ def default_config(
"enable_registration_captcha": False,
"macaroon_secret_key": "not even a little secret",
"password_providers": [],
"worker_replication_url": "",
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not really related, but it was removed in 2017 in #2117.

"worker_app": None,
"block_non_admin_invites": False,
"federation_domain_whitelist": None,
Expand Down