Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Track per-room join rates actioned by this worker
Browse files Browse the repository at this point in the history
and consult it when actioning joins.
Only bump rate limit if we will persist the event; otherwise we'll see
it over replication
  • Loading branch information
David Robertson committed Jul 4, 2022
1 parent 36b3ee2 commit f272919
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 0 deletions.
16 changes: 16 additions & 0 deletions synapse/federation/federation_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ def __init__(self, hs: "HomeServer"):
self._federation_event_handler = hs.get_federation_event_handler()
self.state = hs.get_state_handler()
self._event_auth_handler = hs.get_event_auth_handler()
self._room_member_handler = hs.get_room_member_handler()

self._state_storage_controller = hs.get_storage_controllers().state

Expand Down Expand Up @@ -620,6 +621,15 @@ async def on_make_join_request(
)
raise IncompatibleRoomVersionError(room_version=room_version)

# Refuse the request if that room has seen too many joins recently.
# This is in addition to the HS-level rate limiting applied by
# BaseFederationServlet.
# type-ignore: mypy doesn't seem able to deduce the type of the limiter(!?)
await self._room_member_handler._join_rate_per_room_limiter.ratelimit( # type: ignore[has-type]
requester=None,
key=room_id,
update=False,
)
pdu = await self.handler.on_make_join_request(origin, room_id, user_id)
return {"event": pdu.get_templated_pdu_json(), "room_version": room_version}

Expand Down Expand Up @@ -654,6 +664,12 @@ async def on_send_join_request(
room_id: str,
caller_supports_partial_state: bool = False,
) -> Dict[str, Any]:
await self._room_member_handler._join_rate_per_room_limiter.ratelimit( # type: ignore[has-type]
requester=None,
key=room_id,
update=self.hs.persists_events_for_room(room_id),
)

event, context = await self._on_send_membership_event(
origin, content, Membership.JOIN, room_id
)
Expand Down
35 changes: 35 additions & 0 deletions synapse/handlers/room_member.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,19 @@ def __init__(self, hs: "HomeServer"):
rate_hz=hs.config.ratelimiting.rc_joins_remote.per_second,
burst_count=hs.config.ratelimiting.rc_joins_remote.burst_count,
)
# TODO: find a better place to keep this Ratelimiter.
# It needs to be
# - written to by something which can snoop on replication streams
# - read by the RoomMemberHandler to rate limit joins from local users
# - read by the FederationServer to rate limit make_joins and send_joins from
# other homeservers
# I wonder if a homeserver-wide collection of rate limiters might be cleaner?
self._join_rate_per_room_limiter = Ratelimiter(
store=self.store,
clock=self.clock,
rate_hz=hs.config.ratelimiting.rc_joins_per_room.per_second,
burst_count=hs.config.ratelimiting.rc_joins_per_room.burst_count,
)

self._invites_per_room_limiter = Ratelimiter(
store=self.store,
Expand All @@ -125,6 +138,18 @@ def __init__(self, hs: "HomeServer"):
)

self.request_ratelimiter = hs.get_request_ratelimiter()
hs.get_notifier().add_new_join_in_room_callback(self._on_user_joined_room)

def _on_user_joined_room(self, event_id: str, room_id: str) -> None:
"""Notify the rate limiter that a room join has occurred.
Use this to inform the RoomMemberHandler about joins that have either
- taken place on another homeserver, or
- on another worker in this homeserver.
Joins actioned by this worker should use the usual `ratelimit` method, which
checks the limit and increments the counter in one go.
"""
self._join_rate_per_room_limiter.record_action(requester=None, key=room_id)

@abc.abstractmethod
async def _remote_join(
Expand Down Expand Up @@ -378,6 +403,11 @@ async def _local_membership_update(
# up blocking profile updates.
if newly_joined and ratelimit:
await self._join_rate_limiter_local.ratelimit(requester)
await self._join_rate_per_room_limiter.ratelimit(
requester,
key=room_id,
update=self.hs.persists_events_for_room(room_id),
)

result_event = await self.event_creation_handler.handle_new_client_event(
requester,
Expand Down Expand Up @@ -826,6 +856,11 @@ async def update_membership_locked(
await self._join_rate_limiter_remote.ratelimit(
requester,
)
await self._join_rate_per_room_limiter.ratelimit(
requester,
key=room_id,
update=self.hs.persists_events_for_room(room_id),
)

inviter = await self._get_inviter(target.to_string(), room_id)
if inviter and not self.hs.is_mine(inviter):
Expand Down
12 changes: 12 additions & 0 deletions synapse/replication/http/send_event.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

from twisted.web.server import Request

from synapse.api.constants import EventTypes, Membership
from synapse.api.room_versions import KNOWN_ROOM_VERSIONS
from synapse.events import EventBase, make_event_from_dict
from synapse.events.snapshot import EventContext
Expand Down Expand Up @@ -72,6 +73,7 @@ def __init__(self, hs: "HomeServer"):
self.store = hs.get_datastores().main
self._storage_controllers = hs.get_storage_controllers()
self.clock = hs.get_clock()
self._notifier = hs.get_notifier()

@staticmethod
async def _serialize_payload( # type: ignore[override]
Expand Down Expand Up @@ -138,6 +140,16 @@ async def _handle_request( # type: ignore[override]
"Got event to send with ID: %s into room: %s", event.event_id, event.room_id
)

if event.type == EventTypes.Member and event.membership == Membership.JOIN:
(
current_membership,
_,
) = await self.store.get_local_current_membership_for_user_in_room(
event.state_key, event.room_id
)
if current_membership != Membership.JOIN:
self._notifier.notify_user_joined_room(event.event_id, event.room_id)

event = await self.event_creation_handler.persist_and_notify_client_event(
requester, event, context, ratelimit=ratelimit, extra_users=extra_users
)
Expand Down

0 comments on commit f272919

Please sign in to comment.