Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Add Admin API to Fetch Messages Within a Particular Window #13672

Merged
merged 18 commits into from
Sep 7, 2022
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/13672.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add Admin APIs to Fetch Messages Within a Particular Window.
46 changes: 30 additions & 16 deletions synapse/handlers/pagination.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
from synapse.handlers.room import ShutdownRoomResponse
from synapse.logging.opentracing import trace
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.rest.admin._base import assert_user_is_admin
from synapse.storage.state import StateFilter
from synapse.streams.config import PaginationConfig
from synapse.types import JsonDict, Requester, StreamKeyType
Expand Down Expand Up @@ -423,6 +424,7 @@ async def get_messages(
pagin_config: PaginationConfig,
as_client_event: bool = True,
event_filter: Optional[Filter] = None,
use_admin_priviledge: bool = False,
) -> JsonDict:
"""Get messages in a room.

Expand All @@ -432,10 +434,16 @@ async def get_messages(
pagin_config: The pagination config rules to apply, if any.
as_client_event: True to get events in client-server format.
event_filter: Filter to apply to results or None
use_admin_priviledge: if `True`, return all events, regardless
of whether `user` has access to them. To be used **ONLY**
from the admin API.

Returns:
Pagination API results
"""
if use_admin_priviledge:
await assert_user_is_admin(self.auth, requester)

user_id = requester.user.to_string()

if pagin_config.from_token:
Expand All @@ -458,12 +466,14 @@ async def get_messages(
room_token = from_token.room_key

async with self.pagination_lock.read(room_id):
(
membership,
member_event_id,
) = await self.auth.check_user_in_room_or_world_readable(
room_id, requester, allow_departed_users=True
)
(membership, member_event_id) = (None, None)
if not use_admin_priviledge:
(
membership,
member_event_id,
) = await self.auth.check_user_in_room_or_world_readable(
room_id, requester, allow_departed_users=True
)

if pagin_config.direction == "b":
# if we're going backwards, we might need to backfill. This
Expand All @@ -475,7 +485,7 @@ async def get_messages(
room_id, room_token.stream
)

if membership == Membership.LEAVE:
if not use_admin_priviledge and membership == Membership.LEAVE:
# If they have left the room then clamp the token to be before
# they left the room, to save the effort of loading from the
# database.
Expand Down Expand Up @@ -528,12 +538,13 @@ async def get_messages(
if event_filter:
events = await event_filter.filter(events)

events = await filter_events_for_client(
self._storage_controllers,
user_id,
events,
is_peeking=(member_event_id is None),
)
if not use_admin_priviledge:
events = await filter_events_for_client(
self._storage_controllers,
user_id,
events,
is_peeking=(member_event_id is None),
)

# if after the filter applied there are no more events
# return immediately - but there might be more in next_token batch
Expand Down Expand Up @@ -561,9 +572,12 @@ async def get_messages(
state_dict = await self.store.get_events(list(state_ids.values()))
state = state_dict.values()

aggregations = await self._relations_handler.get_bundled_aggregations(
events, user_id
)
if not use_admin_priviledge:
aggregations = await self._relations_handler.get_bundled_aggregations(
events, user_id
)
else:
aggregations = None # TODO: an admin might want aggregations
dav-is marked this conversation as resolved.
Show resolved Hide resolved

time_now = self.clock.time_msec()

Expand Down
4 changes: 4 additions & 0 deletions synapse/rest/admin/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,11 @@
MakeRoomAdminRestServlet,
RoomEventContextServlet,
RoomMembersRestServlet,
RoomMessagesRestServlet,
RoomRestServlet,
RoomRestV2Servlet,
RoomStateRestServlet,
RoomTimestampToEventRestServlet,
)
from synapse.rest.admin.server_notice_servlet import SendServerNoticeServlet
from synapse.rest.admin.statistics import UserMediaStatisticsRestServlet
Expand Down Expand Up @@ -271,6 +273,8 @@ def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None:
DestinationResetConnectionRestServlet(hs).register(http_server)
DestinationRestServlet(hs).register(http_server)
ListDestinationsRestServlet(hs).register(http_server)
RoomMessagesRestServlet(hs).register(http_server)
RoomTimestampToEventRestServlet(hs).register(http_server)

# Some servlets only get registered for the main process.
if hs.config.worker.worker_app is None:
Expand Down
105 changes: 105 additions & 0 deletions synapse/rest/admin/rooms.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
)
from synapse.storage.databases.main.room import RoomSortOrder
from synapse.storage.state import StateFilter
from synapse.streams.config import PaginationConfig
from synapse.types import JsonDict, RoomID, UserID, create_requester
from synapse.util import json_decoder

Expand Down Expand Up @@ -858,3 +859,107 @@ async def on_PUT(
await self._store.unblock_room(room_id)

return HTTPStatus.OK, {"block": block}


class RoomMessagesRestServlet(RestServlet):
"""
Get messages list of a room.
"""

PATTERNS = admin_patterns("/rooms/(?P<room_id>[^/]*)/messages$")

def __init__(self, hs: "HomeServer"):
self._hs = hs
self._clock = hs.get_clock()
self._pagination_handler = hs.get_pagination_handler()
self._auth = hs.get_auth()
self._store = hs.get_datastores().main

async def on_GET(
self, request: SynapseRequest, room_id: str
) -> Tuple[int, JsonDict]:
requester = await self._auth.get_user_by_req(request)
await assert_user_is_admin(self._auth, requester)

pagination_config = await PaginationConfig.from_request(
self._store, request, default_limit=10
)
# Twisted will have processed the args by now.
assert request.args is not None
as_client_event = b"raw" not in request.args
filter_str = parse_string(request, "filter", encoding="utf-8")
if filter_str:
filter_json = urlparse.unquote(filter_str)
event_filter: Optional[Filter] = Filter(
self._hs, json_decoder.decode(filter_json)
)
if (
event_filter
and event_filter.filter_json.get("event_format", "client")
== "federation"
):
as_client_event = False
else:
event_filter = None

msgs = await self._pagination_handler.get_messages(
room_id=room_id,
requester=requester,
pagin_config=pagination_config,
as_client_event=as_client_event,
event_filter=event_filter,
use_admin_priviledge=True,
)

return HTTPStatus.OK, msgs


class RoomTimestampToEventRestServlet(RestServlet):
"""
API endpoint to fetch the `event_id` of the closest event to the given
timestamp (`ts` query parameter) in the given direction (`dir` query
parameter).

Useful for cases like jump to date so you can start paginating messages from
a given date in the archive.

`ts` is a timestamp in milliseconds where we will find the closest event in
the given direction.

`dir` can be `f` or `b` to indicate forwards and backwards in time from the
given timestamp.

GET /_synapse/admin/v1/rooms/<roomID>/timestamp_to_event?ts=<timestamp>&dir=<direction>
{
"event_id": ...
}
"""

PATTERNS = admin_patterns("/rooms/(?P<room_id>[^/]*)/timestamp_to_event$")

def __init__(self, hs: "HomeServer"):
super().__init__()
dav-is marked this conversation as resolved.
Show resolved Hide resolved
self._auth = hs.get_auth()
self._store = hs.get_datastores().main
self._timestamp_lookup_handler = hs.get_timestamp_lookup_handler()

async def on_GET(
self, request: SynapseRequest, room_id: str
) -> Tuple[int, JsonDict]:
requester = await self._auth.get_user_by_req(request)
await assert_user_is_admin(self._auth, requester)

timestamp = parse_integer(request, "ts", required=True)
direction = parse_string(request, "dir", default="f", allowed_values=["f", "b"])

(
event_id,
origin_server_ts,
) = await self._timestamp_lookup_handler.get_event_for_timestamp(
requester, room_id, timestamp, direction
)

return HTTPStatus.OK, {
"event_id": event_id,
"origin_server_ts": origin_server_ts,
}
Loading