Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Add MSC3030 experimental client and federation API endpoints to get t…
Browse files Browse the repository at this point in the history
…he closest event to a given timestamp (#9445)

MSC3030: matrix-org/matrix-spec-proposals#3030

Client API endpoint. This will also go and fetch from the federation API endpoint if unable to find an event locally or we found an extremity with possibly a closer event we don't know about.
```
GET /_matrix/client/unstable/org.matrix.msc3030/rooms/<roomID>/timestamp_to_event?ts=<timestamp>&dir=<direction>
{
    "event_id": ...
    "origin_server_ts": ...
}
```

Federation API endpoint:
```
GET /_matrix/federation/unstable/org.matrix.msc3030/timestamp_to_event/<roomID>?ts=<timestamp>&dir=<direction>
{
    "event_id": ...
    "origin_server_ts": ...
}
```

Co-authored-by: Erik Johnston <erik@matrix.org>
  • Loading branch information
MadLittleMods and erikjohnston authored Dec 2, 2021
1 parent 84dc50e commit a6f1a3a
Show file tree
Hide file tree
Showing 13 changed files with 674 additions and 31 deletions.
1 change: 1 addition & 0 deletions changelog.d/9445.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add [MSC3030](https://github.com/matrix-org/matrix-doc/pull/3030) experimental client and federation API endpoints to get the closest event to a given timestamp.
3 changes: 3 additions & 0 deletions synapse/config/experimental.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,3 +46,6 @@ def read_config(self, config: JsonDict, **kwargs):

# MSC3266 (room summary api)
self.msc3266_enabled: bool = experimental.get("msc3266_enabled", False)

# MSC3030 (Jump to date API endpoint)
self.msc3030_enabled: bool = experimental.get("msc3030_enabled", False)
77 changes: 77 additions & 0 deletions synapse/federation/federation_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -1517,6 +1517,83 @@ async def send_request(
self._get_room_hierarchy_cache[(room_id, suggested_only)] = result
return result

async def timestamp_to_event(
self, destination: str, room_id: str, timestamp: int, direction: str
) -> "TimestampToEventResponse":
"""
Calls a remote federating server at `destination` asking for their
closest event to the given timestamp in the given direction. Also
validates the response to always return the expected keys or raises an
error.
Args:
destination: Domain name of the remote homeserver
room_id: Room to fetch the event from
timestamp: The point in time (inclusive) we should navigate from in
the given direction to find the closest event.
direction: ["f"|"b"] to indicate whether we should navigate forward
or backward from the given timestamp to find the closest event.
Returns:
A parsed TimestampToEventResponse including the closest event_id
and origin_server_ts
Raises:
Various exceptions when the request fails
InvalidResponseError when the response does not have the correct
keys or wrong types
"""
remote_response = await self.transport_layer.timestamp_to_event(
destination, room_id, timestamp, direction
)

if not isinstance(remote_response, dict):
raise InvalidResponseError(
"Response must be a JSON dictionary but received %r" % remote_response
)

try:
return TimestampToEventResponse.from_json_dict(remote_response)
except ValueError as e:
raise InvalidResponseError(str(e))


@attr.s(frozen=True, slots=True, auto_attribs=True)
class TimestampToEventResponse:
"""Typed response dictionary for the federation /timestamp_to_event endpoint"""

event_id: str
origin_server_ts: int

# the raw data, including the above keys
data: JsonDict

@classmethod
def from_json_dict(cls, d: JsonDict) -> "TimestampToEventResponse":
"""Parsed response from the federation /timestamp_to_event endpoint
Args:
d: JSON object response to be parsed
Raises:
ValueError if d does not the correct keys or they are the wrong types
"""

event_id = d.get("event_id")
if not isinstance(event_id, str):
raise ValueError(
"Invalid response: 'event_id' must be a str but received %r" % event_id
)

origin_server_ts = d.get("origin_server_ts")
if not isinstance(origin_server_ts, int):
raise ValueError(
"Invalid response: 'origin_server_ts' must be a int but received %r"
% origin_server_ts
)

return cls(event_id, origin_server_ts, d)


@attr.s(frozen=True, slots=True, auto_attribs=True)
class FederationSpaceSummaryEventResult:
Expand Down
43 changes: 43 additions & 0 deletions synapse/federation/federation_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ def __init__(self, hs: "HomeServer"):
super().__init__(hs)

self.handler = hs.get_federation_handler()
self.storage = hs.get_storage()
self._federation_event_handler = hs.get_federation_event_handler()
self.state = hs.get_state_handler()
self._event_auth_handler = hs.get_event_auth_handler()
Expand Down Expand Up @@ -200,6 +201,48 @@ async def on_backfill_request(

return 200, res

async def on_timestamp_to_event_request(
self, origin: str, room_id: str, timestamp: int, direction: str
) -> Tuple[int, Dict[str, Any]]:
"""When we receive a federated `/timestamp_to_event` request,
handle all of the logic for validating and fetching the event.
Args:
origin: The server we received the event from
room_id: Room to fetch the event from
timestamp: The point in time (inclusive) we should navigate from in
the given direction to find the closest event.
direction: ["f"|"b"] to indicate whether we should navigate forward
or backward from the given timestamp to find the closest event.
Returns:
Tuple indicating the response status code and dictionary response
body including `event_id`.
"""
with (await self._server_linearizer.queue((origin, room_id))):
origin_host, _ = parse_server_name(origin)
await self.check_server_matches_acl(origin_host, room_id)

# We only try to fetch data from the local database
event_id = await self.store.get_event_id_for_timestamp(
room_id, timestamp, direction
)
if event_id:
event = await self.store.get_event(
event_id, allow_none=False, allow_rejected=False
)

return 200, {
"event_id": event_id,
"origin_server_ts": event.origin_server_ts,
}

raise SynapseError(
404,
"Unable to find event from %s in direction %s" % (timestamp, direction),
errcode=Codes.NOT_FOUND,
)

async def on_incoming_transaction(
self,
origin: str,
Expand Down
36 changes: 36 additions & 0 deletions synapse/federation/transport/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,42 @@ async def backfill(
destination, path=path, args=args, try_trailing_slash_on_400=True
)

@log_function
async def timestamp_to_event(
self, destination: str, room_id: str, timestamp: int, direction: str
) -> Union[JsonDict, List]:
"""
Calls a remote federating server at `destination` asking for their
closest event to the given timestamp in the given direction.
Args:
destination: Domain name of the remote homeserver
room_id: Room to fetch the event from
timestamp: The point in time (inclusive) we should navigate from in
the given direction to find the closest event.
direction: ["f"|"b"] to indicate whether we should navigate forward
or backward from the given timestamp to find the closest event.
Returns:
Response dict received from the remote homeserver.
Raises:
Various exceptions when the request fails
"""
path = _create_path(
FEDERATION_UNSTABLE_PREFIX,
"/org.matrix.msc3030/timestamp_to_event/%s",
room_id,
)

args = {"ts": [str(timestamp)], "dir": [direction]}

remote_response = await self.client.get_json(
destination, path=path, args=args, try_trailing_slash_on_400=True
)

return remote_response

@log_function
async def send_transaction(
self,
Expand Down
12 changes: 11 additions & 1 deletion synapse/federation/transport/server/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,10 @@
Authenticator,
BaseFederationServlet,
)
from synapse.federation.transport.server.federation import FEDERATION_SERVLET_CLASSES
from synapse.federation.transport.server.federation import (
FEDERATION_SERVLET_CLASSES,
FederationTimestampLookupServlet,
)
from synapse.federation.transport.server.groups_local import GROUP_LOCAL_SERVLET_CLASSES
from synapse.federation.transport.server.groups_server import (
GROUP_SERVER_SERVLET_CLASSES,
Expand Down Expand Up @@ -324,6 +327,13 @@ def register_servlets(
)

for servletclass in DEFAULT_SERVLET_GROUPS[servlet_group]:
# Only allow the `/timestamp_to_event` servlet if msc3030 is enabled
if (
servletclass == FederationTimestampLookupServlet
and not hs.config.experimental.msc3030_enabled
):
continue

servletclass(
hs=hs,
authenticator=authenticator,
Expand Down
41 changes: 41 additions & 0 deletions synapse/federation/transport/server/federation.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,46 @@ async def on_GET(
return await self.handler.on_backfill_request(origin, room_id, versions, limit)


class FederationTimestampLookupServlet(BaseFederationServerServlet):
"""
API endpoint to fetch the `event_id` of the closest event to the given
timestamp (`ts` query parameter) in the given direction (`dir` query
parameter).
Useful for other homeservers when they're unable to find an event locally.
`ts` is a timestamp in milliseconds where we will find the closest event in
the given direction.
`dir` can be `f` or `b` to indicate forwards and backwards in time from the
given timestamp.
GET /_matrix/federation/unstable/org.matrix.msc3030/timestamp_to_event/<roomID>?ts=<timestamp>&dir=<direction>
{
"event_id": ...
}
"""

PATH = "/timestamp_to_event/(?P<room_id>[^/]*)/?"
PREFIX = FEDERATION_UNSTABLE_PREFIX + "/org.matrix.msc3030"

async def on_GET(
self,
origin: str,
content: Literal[None],
query: Dict[bytes, List[bytes]],
room_id: str,
) -> Tuple[int, JsonDict]:
timestamp = parse_integer_from_args(query, "ts", required=True)
direction = parse_string_from_args(
query, "dir", default="f", allowed_values=["f", "b"], required=True
)

return await self.handler.on_timestamp_to_event_request(
origin, room_id, timestamp, direction
)


class FederationQueryServlet(BaseFederationServerServlet):
PATH = "/query/(?P<query_type>[^/]*)"

Expand Down Expand Up @@ -683,6 +723,7 @@ async def on_GET(
FederationStateV1Servlet,
FederationStateIdsServlet,
FederationBackfillServlet,
FederationTimestampLookupServlet,
FederationQueryServlet,
FederationMakeJoinServlet,
FederationMakeLeaveServlet,
Expand Down
61 changes: 31 additions & 30 deletions synapse/handlers/federation.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,37 @@
logger = logging.getLogger(__name__)


def get_domains_from_state(state: StateMap[EventBase]) -> List[Tuple[str, int]]:
"""Get joined domains from state
Args:
state: State map from type/state key to event.
Returns:
Returns a list of servers with the lowest depth of their joins.
Sorted by lowest depth first.
"""
joined_users = [
(state_key, int(event.depth))
for (e_type, state_key), event in state.items()
if e_type == EventTypes.Member and event.membership == Membership.JOIN
]

joined_domains: Dict[str, int] = {}
for u, d in joined_users:
try:
dom = get_domain_from_id(u)
old_d = joined_domains.get(dom)
if old_d:
joined_domains[dom] = min(d, old_d)
else:
joined_domains[dom] = d
except Exception:
pass

return sorted(joined_domains.items(), key=lambda d: d[1])


class FederationHandler:
"""Handles general incoming federation requests
Expand Down Expand Up @@ -268,36 +299,6 @@ async def _maybe_backfill_inner(

curr_state = await self.state_handler.get_current_state(room_id)

def get_domains_from_state(state: StateMap[EventBase]) -> List[Tuple[str, int]]:
"""Get joined domains from state
Args:
state: State map from type/state key to event.
Returns:
Returns a list of servers with the lowest depth of their joins.
Sorted by lowest depth first.
"""
joined_users = [
(state_key, int(event.depth))
for (e_type, state_key), event in state.items()
if e_type == EventTypes.Member and event.membership == Membership.JOIN
]

joined_domains: Dict[str, int] = {}
for u, d in joined_users:
try:
dom = get_domain_from_id(u)
old_d = joined_domains.get(dom)
if old_d:
joined_domains[dom] = min(d, old_d)
else:
joined_domains[dom] = d
except Exception:
pass

return sorted(joined_domains.items(), key=lambda d: d[1])

curr_domains = get_domains_from_state(curr_state)

likely_domains = [
Expand Down
Loading

0 comments on commit a6f1a3a

Please sign in to comment.