Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Fix ratelimiting for federation /send requests. #8342

Merged
merged 7 commits into from
Sep 18, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/8342.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix ratelimitng of federation `/send` requests.
52 changes: 40 additions & 12 deletions synapse/federation/federation_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,10 +97,16 @@ def __init__(self, hs):
self.state = hs.get_state_handler()

self.device_handler = hs.get_device_handler()
self._federation_ratelimiter = hs.get_federation_ratelimiter()

self._server_linearizer = Linearizer("fed_server")
self._transaction_linearizer = Linearizer("fed_txn_handler")

# We cache results for transaction with the same ID
self._transaction_resp_cache = ResponseCache(
hs, "fed_txn_handler", timeout_ms=30000
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Curious about the reasoning about the timeout here. 30 seconds seems a bit short?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We tend to default to 30s for these I think. Note that this is 30s after the response is sent, though given the response won't generally be large we could probably up it some more.

Copy link
Member

@clokep clokep Sep 17, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I thought it was 30s from when the request was sent. That makes more sense now!

)

self.transaction_actions = TransactionActions(self.store)

self.registry = hs.get_federation_registry()
Expand Down Expand Up @@ -135,22 +141,44 @@ async def on_incoming_transaction(
request_time = self._clock.time_msec()

transaction = Transaction(**transaction_data)
transaction_id = transaction.transaction_id # type: ignore

if not transaction.transaction_id: # type: ignore
if not transaction_id:
raise Exception("Transaction missing transaction_id")

logger.debug("[%s] Got transaction", transaction.transaction_id) # type: ignore
logger.debug("[%s] Got transaction", transaction_id)

# use a linearizer to ensure that we don't process the same transaction
# multiple times in parallel.
with (
await self._transaction_linearizer.queue(
(origin, transaction.transaction_id) # type: ignore
)
):
result = await self._handle_incoming_transaction(
origin, transaction, request_time
)
# We wrap in a ResponseCache so that we de-duplicate retried
# transactions.
return await self._transaction_resp_cache.wrap(
(origin, transaction_id),
self._on_incoming_transaction_inner,
origin,
transaction,
request_time,
)

async def _on_incoming_transaction_inner(
self, origin: str, transaction: Transaction, request_time: int
) -> Tuple[int, Dict[str, Any]]:
# Use a linearizer to ensure that transactions from a remote are
# processed in order.
with await self._transaction_linearizer.queue(origin):
# We rate limit here *after* we've queued up the incoming requests,
# so that we don't fill up the ratelimiter with blocked requests.
Comment on lines +167 to +168
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure I'm following what this change is. Could you describe it a bit more? I see how the rate-limiter is moved deeper into the stack, but I'm not sure how a "blocked request" would fill it up previously that won't now? Does blocked mean a 429 is returned or that it got stuck in the linearizer?

#
# This is important as the ratelimiter allows N concurrent requests
# at a time, and only starts ratelimiting if there are more requests
# than that being processed at a time. If we queued up requests in
# the linearizer/response cache *after* the ratelimiting then those
# queued up requests would count as part of the allowed limit of N
# concurrent requests.
with self._federation_ratelimiter.ratelimit(origin) as d:
await d
clokep marked this conversation as resolved.
Show resolved Hide resolved

result = await self._handle_incoming_transaction(
origin, transaction, request_time
)

return result

Expand Down
13 changes: 8 additions & 5 deletions synapse/federation/transport/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@
)
from synapse.server import HomeServer
from synapse.types import ThirdPartyInstanceID, get_domain_from_id
from synapse.util.ratelimitutils import FederationRateLimiter
from synapse.util.versionstring import get_version_string

logger = logging.getLogger(__name__)
Expand All @@ -72,9 +71,7 @@ def __init__(self, hs, servlet_groups=None):
super(TransportLayerServer, self).__init__(hs, canonical_json=False)

self.authenticator = Authenticator(hs)
self.ratelimiter = FederationRateLimiter(
self.clock, config=hs.config.rc_federation
)
self.ratelimiter = hs.get_federation_ratelimiter()

self.register_servlets()

Expand Down Expand Up @@ -272,6 +269,8 @@ class BaseFederationServlet:

PREFIX = FEDERATION_V1_PREFIX # Allows specifying the API version

RATELIMIT = True # Whether to rate limit requests or not

def __init__(self, handler, authenticator, ratelimiter, server_name):
self.handler = handler
self.authenticator = authenticator
Expand Down Expand Up @@ -335,7 +334,7 @@ async def new_func(request, *args, **kwargs):
)

with scope:
if origin:
if origin and self.RATELIMIT:
with ratelimiter.ratelimit(origin) as d:
await d
if request._disconnected:
Expand Down Expand Up @@ -372,6 +371,10 @@ def register(self, server):
class FederationSendServlet(BaseFederationServlet):
PATH = "/send/(?P<transaction_id>[^/]*)/?"

# We ratelimit manually in the handler as we queue up the requests and we
# don't want to fill up the ratelimiter with blocked requests.
RATELIMIT = False

def __init__(self, handler, server_name, **kwargs):
super(FederationSendServlet, self).__init__(
handler, server_name=server_name, **kwargs
Expand Down
5 changes: 5 additions & 0 deletions synapse/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@
from synapse.types import DomainSpecificString
from synapse.util import Clock
from synapse.util.distributor import Distributor
from synapse.util.ratelimitutils import FederationRateLimiter
from synapse.util.stringutils import random_string

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -642,6 +643,10 @@ def get_replication_data_handler(self) -> ReplicationDataHandler:
def get_replication_streams(self) -> Dict[str, Stream]:
return {stream.NAME: stream(self) for stream in STREAMS_MAP.values()}

@cache_in_self
def get_federation_ratelimiter(self) -> FederationRateLimiter:
return FederationRateLimiter(self.clock, config=self.config.rc_federation)

async def remove_pusher(self, app_id: str, push_key: str, user_id: str):
return await self.get_pusherpool().remove_pusher(app_id, push_key, user_id)

Expand Down