Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Retry some http replication failures #12182

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/12182.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Retry HTTP replication failures, this should prevent 502's when restarting stateful workers (main, event persisters, stream writers). Contributed by Nick @ Beeper.
47 changes: 36 additions & 11 deletions synapse/replication/http/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

from prometheus_client import Counter, Gauge

from twisted.internet.error import ConnectError, DNSLookupError
from twisted.web.server import Request

from synapse.api.errors import HttpResponseException, SynapseError
Expand Down Expand Up @@ -87,13 +88,19 @@ class ReplicationEndpoint(metaclass=abc.ABCMeta):
`_handle_request` must return a Deferred.
RETRY_ON_TIMEOUT(bool): Whether or not to retry the request when a 504
is received.
RETRY_ON_CONNECT_ERROR (bool): Whether or not to retry the request when
a connection error is received.
RETRY_ON_CONNECT_ERROR_ATTEMPTS (int): Number of attempts to retry when
receiving connection errors, each will backoff exponentially longer.
"""

NAME: str = abc.abstractproperty() # type: ignore
PATH_ARGS: Tuple[str, ...] = abc.abstractproperty() # type: ignore
METHOD = "POST"
CACHE = True
RETRY_ON_TIMEOUT = True
RETRY_ON_CONNECT_ERROR = True
RETRY_ON_CONNECT_ERROR_ATTEMPTS = 5 # =63s (2^6-1)
richvdh marked this conversation as resolved.
Show resolved Hide resolved

def __init__(self, hs: "HomeServer"):
if self.CACHE:
Expand Down Expand Up @@ -236,30 +243,48 @@ async def send_request(*, instance_name: str = "master", **kwargs: Any) -> Any:
"/".join(url_args),
)

headers: Dict[bytes, List[bytes]] = {}
# Add an authorization header, if configured.
if replication_secret:
headers[b"Authorization"] = [b"Bearer " + replication_secret]
opentracing.inject_header_dict(headers, check_destination=False)

try:
# Keep track of attempts made so we can bail if we don't manage to
# connect to the target after N tries.
attempts = 0
# We keep retrying the same request for timeouts. This is so that we
# have a good idea that the request has either succeeded or failed
# on the master, and so whether we should clean up or not.
while True:
headers: Dict[bytes, List[bytes]] = {}
# Add an authorization header, if configured.
if replication_secret:
headers[b"Authorization"] = [
b"Bearer " + replication_secret
]
opentracing.inject_header_dict(headers, check_destination=False)
try:
result = await request_func(uri, data, headers=headers)
break
except RequestTimedOutError:
if not cls.RETRY_ON_TIMEOUT:
raise

logger.warning("%s request timed out; retrying", cls.NAME)
logger.warning("%s request timed out; retrying", cls.NAME)

# If we timed out we probably don't need to worry about backing
# off too much, but lets just wait a little anyway.
await clock.sleep(1)
except (ConnectError, DNSLookupError) as e:
if not cls.RETRY_ON_CONNECT_ERROR:
raise
if attempts > cls.RETRY_ON_CONNECT_ERROR_ATTEMPTS:
raise

delay = 2 ** attempts
logger.warning(
"%s request connection failed; retrying in %ds: %r",
cls.NAME,
delay,
e,
)

# If we timed out we probably don't need to worry about backing
# off too much, but lets just wait a little anyway.
await clock.sleep(1)
await clock.sleep(delay)
attempts += 1
except HttpResponseException as e:
# We convert to SynapseError as we know that it was a SynapseError
# on the main process that we should send to the client. (And
Expand Down