From 33c2e918605235a3c802b4d9c5a1a119c7010464 Mon Sep 17 00:00:00 2001 From: "H. Shay" Date: Thu, 20 Jun 2024 14:49:09 -0700 Subject: [PATCH 1/9] add an unstable federation /download endpoint --- .../federation/transport/server/__init__.py | 8 ++++ synapse/federation/transport/server/_base.py | 24 +++++++++-- .../federation/transport/server/federation.py | 41 +++++++++++++++++++ tests/server.py | 3 +- 4 files changed, 71 insertions(+), 5 deletions(-) diff --git a/synapse/federation/transport/server/__init__.py b/synapse/federation/transport/server/__init__.py index bac569e9770..edaf0196d67 100644 --- a/synapse/federation/transport/server/__init__.py +++ b/synapse/federation/transport/server/__init__.py @@ -33,6 +33,7 @@ FEDERATION_SERVLET_CLASSES, FederationAccountStatusServlet, FederationUnstableClientKeysClaimServlet, + FederationUnstableMediaDownloadServlet, ) from synapse.http.server import HttpServer, JsonResource from synapse.http.servlet import ( @@ -315,6 +316,13 @@ def register_servlets( ): continue + if servletclass == FederationUnstableMediaDownloadServlet: + if ( + not hs.config.server.enable_media_repo + or not hs.config.experimental.msc3916_authenticated_media_enabled + ): + continue + servletclass( hs=hs, authenticator=authenticator, diff --git a/synapse/federation/transport/server/_base.py b/synapse/federation/transport/server/_base.py index db0f5076a9e..4e2717b5655 100644 --- a/synapse/federation/transport/server/_base.py +++ b/synapse/federation/transport/server/_base.py @@ -360,13 +360,29 @@ async def new_func( "request" ) return None + if ( + func.__self__.__class__.__name__ # type: ignore + == "FederationUnstableMediaDownloadServlet" + ): + response = await func( + origin, content, request, *args, **kwargs + ) + else: + response = await func( + origin, content, request.args, *args, **kwargs + ) + else: + if ( + func.__self__.__class__.__name__ # type: ignore + == "FederationUnstableMediaDownloadServlet" + ): + response = await func( + origin, content, request, *args, **kwargs + ) + else: response = await func( origin, content, request.args, *args, **kwargs ) - else: - response = await func( - origin, content, request.args, *args, **kwargs - ) finally: # if we used the origin's context as the parent, add a new span using # the servlet span as a parent, so that we have a link diff --git a/synapse/federation/transport/server/federation.py b/synapse/federation/transport/server/federation.py index a59734785fa..67bb907050d 100644 --- a/synapse/federation/transport/server/federation.py +++ b/synapse/federation/transport/server/federation.py @@ -44,10 +44,13 @@ ) from synapse.http.servlet import ( parse_boolean_from_args, + parse_integer, parse_integer_from_args, parse_string_from_args, parse_strings_from_args, ) +from synapse.http.site import SynapseRequest +from synapse.media._base import DEFAULT_MAX_TIMEOUT_MS, MAXIMUM_ALLOWED_MAX_TIMEOUT_MS from synapse.types import JsonDict from synapse.util import SYNAPSE_VERSION from synapse.util.ratelimitutils import FederationRateLimiter @@ -787,6 +790,43 @@ async def on_POST( return 200, {"account_statuses": statuses, "failures": failures} +class FederationUnstableMediaDownloadServlet(BaseFederationServerServlet): + """ + Implementation of new federation media `/download` endpoint outlined in MSC3916. Returns + a multipart/mixed response consisting of a JSON object and the requested media + item. This endpoint only returns local media. + """ + + PATH = "/media/download/(?P[^/]*)" + PREFIX = FEDERATION_UNSTABLE_PREFIX + "/org.matrix.msc3916" + RATELIMIT = True + + def __init__( + self, + hs: "HomeServer", + ratelimiter: FederationRateLimiter, + authenticator: Authenticator, + server_name: str, + ): + super().__init__(hs, authenticator, ratelimiter, server_name) + self.media_repo = self.hs.get_media_repository() + + async def on_GET( + self, + origin: Optional[str], + content: Literal[None], + request: SynapseRequest, + media_id: str, + ) -> None: + max_timeout_ms = parse_integer( + request, "timeout_ms", default=DEFAULT_MAX_TIMEOUT_MS + ) + max_timeout_ms = min(max_timeout_ms, MAXIMUM_ALLOWED_MAX_TIMEOUT_MS) + await self.media_repo.get_local_media( + request, media_id, None, max_timeout_ms, federation=True + ) + + FEDERATION_SERVLET_CLASSES: Tuple[Type[BaseFederationServlet], ...] = ( FederationSendServlet, FederationEventServlet, @@ -818,4 +858,5 @@ async def on_POST( FederationV1SendKnockServlet, FederationMakeKnockServlet, FederationAccountStatusServlet, + FederationUnstableMediaDownloadServlet, ) diff --git a/tests/server.py b/tests/server.py index f3a917f835b..ef50c5e7471 100644 --- a/tests/server.py +++ b/tests/server.py @@ -291,7 +291,8 @@ def await_result(self, timeout_ms: int = 1000) -> None: while not self.is_finished(): # If there's a producer, tell it to resume producing so we get content if self._producer: - self._producer.resumeProducing() + # self._producer.resumeProducing() + pass if self._reactor.seconds() > end_time: raise TimedOutException("Timed out waiting for request to finish.") From aeeeb95c5ea7e88e96f98d1d14a72832632a190a Mon Sep 17 00:00:00 2001 From: "H. Shay" Date: Thu, 20 Jun 2024 14:50:07 -0700 Subject: [PATCH 2/9] add functionality to stream multipart response from federation /download endpoint --- synapse/media/_base.py | 79 +++++++++- synapse/media/media_repository.py | 14 +- synapse/media/media_storage.py | 234 +++++++++++++++++++++++++++++- 3 files changed, 320 insertions(+), 7 deletions(-) diff --git a/synapse/media/_base.py b/synapse/media/_base.py index 3fbed6062f8..27ad89ffe12 100644 --- a/synapse/media/_base.py +++ b/synapse/media/_base.py @@ -25,7 +25,16 @@ import urllib from abc import ABC, abstractmethod from types import TracebackType -from typing import Awaitable, Dict, Generator, List, Optional, Tuple, Type +from typing import ( + TYPE_CHECKING, + Awaitable, + Dict, + Generator, + List, + Optional, + Tuple, + Type, +) import attr @@ -37,8 +46,13 @@ from synapse.http.server import finish_request, respond_with_json from synapse.http.site import SynapseRequest from synapse.logging.context import make_deferred_yieldable +from synapse.util import Clock from synapse.util.stringutils import is_ascii +if TYPE_CHECKING: + from synapse.storage.databases.main.media_repository import LocalMedia + + logger = logging.getLogger(__name__) # list all text content types that will have the charset default to UTF-8 when @@ -260,6 +274,69 @@ def _can_encode_filename_as_token(x: str) -> bool: return True +async def respond_with_multipart_responder( + clock: Clock, + request: SynapseRequest, + responder: "Optional[Responder]", + media_info: "LocalMedia", +) -> None: + """ + Responds to requests originating from the federation media `/download` endpoint by + streaming a multipart/mixed response + + Args: + request: the federation request to respond to + responder: the responder which will send the response + media_info: metadata about the media item + """ + if not responder: + respond_404(request) + return + + # If we have a responder we *must* use it as a context manager. + with responder: + if request._disconnected: + logger.warning( + "Not sending response to request %s, already disconnected.", request + ) + return + + from synapse.media.media_storage import MultipartFileConsumer + + # note that currently the json_object is just {}, this will change when linked media + # is implemented + multipart_consumer = MultipartFileConsumer( + clock, request, media_info.media_type, {} + ) + + logger.debug("Responding to media request with responder %s", responder) + # ensure that the response length takes into account the multipart boundary and headers, + # which is currently 180 bytes (note that this will need to be determined dynamically when + # the json object passed into the multipart file consumer isn't just {}) + if media_info.media_length is not None: + request.setHeader( + b"Content-Length", b"%d" % (media_info.media_length + 180,) + ) + request.setHeader( + b"Content-Type", + b"multipart/mixed; boundary=%s" % multipart_consumer.boundary, + ) + + try: + await responder.write_to_consumer(multipart_consumer) + except Exception as e: + # The majority of the time this will be due to the client having gone + # away. Unfortunately, Twisted simply throws a generic exception at us + # in that case. + logger.warning("Failed to write to consumer: %s %s", type(e), e) + + # Unregister the producer, if it has one, so Twisted doesn't complain + if request.producer: + request.unregisterProducer() + + finish_request(request) + + async def respond_with_responder( request: SynapseRequest, responder: "Optional[Responder]", diff --git a/synapse/media/media_repository.py b/synapse/media/media_repository.py index 6ed56099ca7..1436329fad1 100644 --- a/synapse/media/media_repository.py +++ b/synapse/media/media_repository.py @@ -54,6 +54,7 @@ ThumbnailInfo, get_filename_from_headers, respond_404, + respond_with_multipart_responder, respond_with_responder, ) from synapse.media.filepath import MediaFilePaths @@ -429,6 +430,7 @@ async def get_local_media( media_id: str, name: Optional[str], max_timeout_ms: int, + federation: bool = False, ) -> None: """Responds to requests for local media, if exists, or returns 404. @@ -440,6 +442,7 @@ async def get_local_media( the filename in the Content-Disposition header of the response. max_timeout_ms: the maximum number of milliseconds to wait for the media to be uploaded. + federation: whether the local media being fetched is for a federation request Returns: Resolves once a response has successfully been written to request @@ -460,9 +463,14 @@ async def get_local_media( file_info = FileInfo(None, media_id, url_cache=bool(url_cache)) responder = await self.media_storage.fetch_media(file_info) - await respond_with_responder( - request, responder, media_type, media_length, upload_name - ) + if federation: + await respond_with_multipart_responder( + self.clock, request, responder, media_info + ) + else: + await respond_with_responder( + request, responder, media_type, media_length, upload_name + ) async def get_remote_media( self, diff --git a/synapse/media/media_storage.py b/synapse/media/media_storage.py index b3cd3fd8f48..a56721b2495 100644 --- a/synapse/media/media_storage.py +++ b/synapse/media/media_storage.py @@ -19,9 +19,12 @@ # # import contextlib +import json import logging import os import shutil +from contextlib import closing +from io import BytesIO from types import TracebackType from typing import ( IO, @@ -30,24 +33,35 @@ AsyncIterator, BinaryIO, Callable, + List, Optional, Sequence, Tuple, Type, + Union, + cast, ) +from uuid import uuid4 import attr +from zope.interface import implementer +from twisted.internet import interfaces from twisted.internet.defer import Deferred from twisted.internet.interfaces import IConsumer from twisted.protocols.basic import FileSender from synapse.api.errors import NotFoundError -from synapse.logging.context import defer_to_thread, make_deferred_yieldable +from synapse.logging.context import ( + defer_to_thread, + make_deferred_yieldable, + run_in_background, +) from synapse.logging.opentracing import start_active_span, trace, trace_with_opname from synapse.util import Clock from synapse.util.file_consumer import BackgroundFileConsumer +from ..types import JsonDict from ._base import FileInfo, Responder from .filepath import MediaFilePaths @@ -57,6 +71,8 @@ logger = logging.getLogger(__name__) +CRLF = b"\r\n" + class MediaStorage: """Responsible for storing/fetching files from local sources. @@ -174,7 +190,7 @@ async def fetch_media(self, file_info: FileInfo) -> Optional[Responder]: and configured storage providers. Args: - file_info + file_info: Metadata about the media file Returns: Returns a Responder if the file was found, otherwise None. @@ -316,7 +332,7 @@ class FileResponder(Responder): """Wraps an open file that can be sent to a request. Args: - open_file: A file like object to be streamed ot the client, + open_file: A file like object to be streamed to the client, is closed when finished streaming. """ @@ -370,3 +386,215 @@ async def write_chunks_to(self, callback: Callable[[bytes], object]) -> None: # We yield to the reactor by sleeping for 0 seconds. await self.clock.sleep(0) + + +@implementer(interfaces.IConsumer) +@implementer(interfaces.IPushProducer) +class MultipartFileConsumer: + """Wraps a given consumer so that any data that gets written to it gets + converted to a multipart format. + """ + + def __init__( + self, + clock: Clock, + wrapped_consumer: interfaces.IConsumer, + file_content_type: str, + json_object: JsonDict, + ) -> None: + self.clock = clock + self.wrapped_consumer = wrapped_consumer + self.json_field = json_object + self.json_field_written = False + self.content_type_written = False + self.file_content_type = file_content_type + self.boundary = uuid4().hex.encode("ascii") + + # The producer that registered with us, and if its a push or pull + # producer. + self.producer: Optional["interfaces.IProducer"] = None + self.streaming: Optional[bool] = None + + # Whether the wrapped consumer has asked us to pause. + self.paused = False + + ### IConsumer APIs ### + + def registerProducer( + self, producer: "interfaces.IProducer", streaming: bool + ) -> None: + """ + Register to receive data from a producer. + + This sets self to be a consumer for a producer. When this object runs + out of data (as when a send(2) call on a socket succeeds in moving the + last data from a userspace buffer into a kernelspace buffer), it will + ask the producer to resumeProducing(). + + For L{IPullProducer} providers, C{resumeProducing} will be called once + each time data is required. + + For L{IPushProducer} providers, C{pauseProducing} will be called + whenever the write buffer fills up and C{resumeProducing} will only be + called when it empties. The consumer will only call C{resumeProducing} + to balance a previous C{pauseProducing} call; the producer is assumed + to start in an un-paused state. + + @param streaming: C{True} if C{producer} provides L{IPushProducer}, + C{False} if C{producer} provides L{IPullProducer}. + + @raise RuntimeError: If a producer is already registered. + """ + self.producer = producer + self.streaming = streaming + + self.wrapped_consumer.registerProducer(self, True) + + # kick off producing if producer is not streaming producer + if not streaming: + self.resumeProducing() + + def unregisterProducer(self) -> None: + """ + Stop consuming data from a producer, without disconnecting. + """ + self.wrapped_consumer.write(CRLF + b"--" + self.boundary + b"--" + CRLF) + self.wrapped_consumer.unregisterProducer() + self.paused = True + + def write(self, data: bytes) -> None: + """ + The producer will write data by calling this method. + + The implementation must be non-blocking and perform whatever + buffering is necessary. If the producer has provided enough data + for now and it is a L{IPushProducer}, the consumer may call its + C{pauseProducing} method. + """ + if not self.json_field_written: + self.wrapped_consumer.write(CRLF + b"--" + self.boundary + CRLF) + + content_type = Header(b"Content-Type", b"application/json") + self.wrapped_consumer.write(bytes(content_type) + CRLF) + + json_field = json.dumps(self.json_field) + json_bytes = json_field.encode("utf-8") + self.wrapped_consumer.write(CRLF + json_bytes) + self.wrapped_consumer.write(CRLF + b"--" + self.boundary + CRLF) + + self.json_field_written = True + + # if we haven't written the content type yet, do so + if not self.content_type_written: + type = self.file_content_type.encode("utf-8") + content_type = Header(b"Content-Type", type) + self.wrapped_consumer.write(bytes(content_type) + CRLF + CRLF) + self.content_type_written = True + + self.wrapped_consumer.write(data) + + ### IPushProducer APIs ### + + def stopProducing(self) -> None: + """ + Stop producing data. + + This tells a producer that its consumer has died, so it must stop + producing data for good. + """ + assert self.producer is not None + + self.paused = True + self.producer.stopProducing() + + def pauseProducing(self) -> None: + """ + Pause producing data. + + Tells a producer that it has produced too much data to process for + the time being, and to stop until C{resumeProducing()} is called. + """ + assert self.producer is not None + + self.paused = True + + if self.streaming: + cast("interfaces.IPushProducer", self.producer).pauseProducing() + else: + self.paused = True + + def resumeProducing(self) -> None: + """ + Resume producing data. + + This tells a producer to re-add itself to the main loop and produce + more data for its consumer. + """ + assert self.producer is not None + + if self.streaming: + cast("interfaces.IPushProducer", self.producer).resumeProducing() + else: + # If the producer is not a streaming producer we need to start + # repeatedly calling `resumeProducing` in a loop. + run_in_background(self._resumeProducingRepeatedly) + + ### Internal APIs. ### + + async def _resumeProducingRepeatedly(self) -> None: + assert self.producer is not None + assert not self.streaming + + producer = cast("interfaces.IPullProducer", self.producer) + + self.paused = False + while not self.paused: + producer.resumeProducing() + await self.clock.sleep(0) + + +class Header: + """ + `Header` This class is a tiny wrapper that produces + request headers. We can't use standard python header + class because it encodes unicode fields using =? bla bla ?= + encoding, which is correct, but no one in HTTP world expects + that, everyone wants utf-8 raw bytes. (stolen from treq.multipart) + + """ + + def __init__( + self, + name: bytes, + value: Any, + params: Optional[List[Tuple[Any, Any]]] = None, + ): + self.name = name + self.value = value + self.params = params or [] + + def add_param(self, name: Any, value: Any) -> None: + self.params.append((name, value)) + + def __bytes__(self) -> bytes: + with closing(BytesIO()) as h: + h.write(self.name + b": " + escape(self.value).encode("us-ascii")) + if self.params: + for name, val in self.params: + h.write(b"; ") + h.write(escape(name).encode("us-ascii")) + h.write(b"=") + h.write(b'"' + escape(val).encode("utf-8") + b'"') + h.seek(0) + return h.read() + + +def escape(value: Union[str, bytes]) -> str: + """ + This function prevents header values from corrupting the request, + a newline in the file name parameter makes form-data request unreadable + for a majority of parsers. (stolen from treq.multipart) + """ + if isinstance(value, bytes): + value = value.decode("utf-8") + return value.replace("\r", "").replace("\n", "").replace('"', '\\"') From c2663f5c37fbfe585c8d12b88b35c9be44c6f8d3 Mon Sep 17 00:00:00 2001 From: "H. Shay" Date: Thu, 20 Jun 2024 14:50:12 -0700 Subject: [PATCH 3/9] tests --- tests/federation/test_federation_media.py | 173 ++++++++++++++++++++++ 1 file changed, 173 insertions(+) create mode 100644 tests/federation/test_federation_media.py diff --git a/tests/federation/test_federation_media.py b/tests/federation/test_federation_media.py new file mode 100644 index 00000000000..af2639e8a4a --- /dev/null +++ b/tests/federation/test_federation_media.py @@ -0,0 +1,173 @@ +# +# This file is licensed under the Affero General Public License (AGPL) version 3. +# +# Copyright (C) 2024 New Vector, Ltd +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as +# published by the Free Software Foundation, either version 3 of the +# License, or (at your option) any later version. +# +# See the GNU Affero General Public License for more details: +# . +# +# Originally licensed under the Apache License, Version 2.0: +# . +# +# [This file includes modifications made by New Vector Limited] +# +# +import io +import os +import shutil +import tempfile + +from twisted.test.proto_helpers import MemoryReactor + +from synapse.media.filepath import MediaFilePaths +from synapse.media.media_storage import MediaStorage +from synapse.media.storage_provider import ( + FileStorageProviderBackend, + StorageProviderWrapper, +) +from synapse.server import HomeServer +from synapse.types import UserID +from synapse.util import Clock + +from tests import unittest +from tests.test_utils import SMALL_PNG +from tests.unittest import override_config + + +class FederationUnstableMediaDownloadsTest(unittest.FederatingHomeserverTestCase): + + def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None: + super().prepare(reactor, clock, hs) + self.test_dir = tempfile.mkdtemp(prefix="synapse-tests-") + self.addCleanup(shutil.rmtree, self.test_dir) + self.primary_base_path = os.path.join(self.test_dir, "primary") + self.secondary_base_path = os.path.join(self.test_dir, "secondary") + + hs.config.media.media_store_path = self.primary_base_path + + storage_providers = [ + StorageProviderWrapper( + FileStorageProviderBackend(hs, self.secondary_base_path), + store_local=True, + store_remote=False, + store_synchronous=True, + ) + ] + + self.filepaths = MediaFilePaths(self.primary_base_path) + self.media_storage = MediaStorage( + hs, self.primary_base_path, self.filepaths, storage_providers + ) + self.media_repo = hs.get_media_repository() + + @override_config( + {"experimental_features": {"msc3916_authenticated_media_enabled": True}} + ) + def test_file_download(self) -> None: + content = io.BytesIO(b"file_to_stream") + content_uri = self.get_success( + self.media_repo.create_content( + "text/plain", + "test_upload", + content, + 46, + UserID.from_string("@user_id:whatever.org"), + ) + ) + # test with a text file + channel = self.make_signed_federation_request( + "GET", + f"/_matrix/federation/unstable/org.matrix.msc3916/media/download/{content_uri.media_id}", + ) + self.pump() + self.assertEqual(200, channel.code) + + content_type = channel.headers.getRawHeaders("content-type") + assert content_type is not None + assert "multipart/mixed" in content_type[0] + assert "boundary" in content_type[0] + + # extract boundary + boundary = content_type[0].split("boundary=")[1] + # split on boundary and check that json field and expected value exist + stripped = channel.text_body.split("\r\n" + "--" + boundary) + # TODO: the json object expected will change once MSC3911 is implemented, currently + # {} is returned for all requests as a placeholder (per MSC3196) + found_json = any( + "\r\nContent-Type: application/json\r\n\r\n{}" in field + for field in stripped + ) + self.assertTrue(found_json) + + # check that text file and expected value exist + found_file = any( + "\r\nContent-Type: text/plain\r\n\r\nfile_to_stream" in field + for field in stripped + ) + self.assertTrue(found_file) + + content = io.BytesIO(SMALL_PNG) + content_uri = self.get_success( + self.media_repo.create_content( + "image/png", + "test_png_upload", + content, + 67, + UserID.from_string("@user_id:whatever.org"), + ) + ) + # test with an image file + channel = self.make_signed_federation_request( + "GET", + f"/_matrix/federation/unstable/org.matrix.msc3916/media/download/{content_uri.media_id}", + ) + self.pump() + self.assertEqual(200, channel.code) + + content_type = channel.headers.getRawHeaders("content-type") + assert content_type is not None + assert "multipart/mixed" in content_type[0] + assert "boundary" in content_type[0] + + # extract boundary + boundary = content_type[0].split("boundary=")[1] + # split on boundary and check that json field and expected value exist + body = channel.result.get("body") + assert body is not None + stripped_bytes = body.split(b"\r\n" + b"--" + boundary.encode("utf-8")) + found_json = any( + b"\r\nContent-Type: application/json\r\n\r\n{}" in field + for field in stripped_bytes + ) + self.assertTrue(found_json) + + # check that png file exists and matches what was uploaded + found_file = any(SMALL_PNG in field for field in stripped_bytes) + self.assertTrue(found_file) + + @override_config( + {"experimental_features": {"msc3916_authenticated_media_enabled": False}} + ) + def test_disable_config(self) -> None: + content = io.BytesIO(b"file_to_stream") + content_uri = self.get_success( + self.media_repo.create_content( + "text/plain", + "test_upload", + content, + 46, + UserID.from_string("@user_id:whatever.org"), + ) + ) + channel = self.make_signed_federation_request( + "GET", + f"/_matrix/federation/unstable/org.matrix.msc3916/media/download/{content_uri.media_id}", + ) + self.pump() + self.assertEqual(404, channel.code) + self.assertEqual(channel.json_body.get("errcode"), "M_UNRECOGNIZED") From 89c866fad196d46b9ca1cdf5b04ac00866f01e43 Mon Sep 17 00:00:00 2001 From: "H. Shay" Date: Thu, 20 Jun 2024 21:29:09 -0700 Subject: [PATCH 4/9] actually it's 177 --- synapse/media/_base.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/synapse/media/_base.py b/synapse/media/_base.py index 27ad89ffe12..d060b29cd15 100644 --- a/synapse/media/_base.py +++ b/synapse/media/_base.py @@ -309,14 +309,16 @@ async def respond_with_multipart_responder( clock, request, media_info.media_type, {} ) + logger.debug("Responding to media request with responder %s", responder) # ensure that the response length takes into account the multipart boundary and headers, - # which is currently 180 bytes (note that this will need to be determined dynamically when + # which is currently 177 bytes (note that this will need to be determined dynamically when # the json object passed into the multipart file consumer isn't just {}) if media_info.media_length is not None: request.setHeader( - b"Content-Length", b"%d" % (media_info.media_length + 180,) + b"Content-Length", b"%d" % (media_info.media_length + 177,) ) + request.setHeader( b"Content-Type", b"multipart/mixed; boundary=%s" % multipart_consumer.boundary, From 6dac5fb9e19ec70bf5b249e85ad5ded2cdff8ea6 Mon Sep 17 00:00:00 2001 From: "H. Shay" Date: Fri, 21 Jun 2024 13:46:06 -0700 Subject: [PATCH 5/9] newsfragment + lint --- changelog.d/17350.feature | 2 ++ synapse/media/_base.py | 1 - 2 files changed, 2 insertions(+), 1 deletion(-) create mode 100644 changelog.d/17350.feature diff --git a/changelog.d/17350.feature b/changelog.d/17350.feature new file mode 100644 index 00000000000..c8436aa898e --- /dev/null +++ b/changelog.d/17350.feature @@ -0,0 +1,2 @@ +Support [MSC3916](https://github.com/matrix-org/matrix-spec-proposals/blob/rav/authentication-for-media/proposals/3916-authentication-for-media.md) +by adding a federation /download endpoint (#17350). \ No newline at end of file diff --git a/synapse/media/_base.py b/synapse/media/_base.py index d060b29cd15..0d510916553 100644 --- a/synapse/media/_base.py +++ b/synapse/media/_base.py @@ -309,7 +309,6 @@ async def respond_with_multipart_responder( clock, request, media_info.media_type, {} ) - logger.debug("Responding to media request with responder %s", responder) # ensure that the response length takes into account the multipart boundary and headers, # which is currently 177 bytes (note that this will need to be determined dynamically when From c48f39dd9ae3b0530482a0c10936a9e0131c5e28 Mon Sep 17 00:00:00 2001 From: "H. Shay" Date: Mon, 24 Jun 2024 14:41:07 -0700 Subject: [PATCH 6/9] add a function to dynamically calculate multipart content-length --- synapse/media/_base.py | 11 ++++------- synapse/media/media_storage.py | 25 +++++++++++++++++++++++++ 2 files changed, 29 insertions(+), 7 deletions(-) diff --git a/synapse/media/_base.py b/synapse/media/_base.py index 0d510916553..c023f21e4b2 100644 --- a/synapse/media/_base.py +++ b/synapse/media/_base.py @@ -306,17 +306,14 @@ async def respond_with_multipart_responder( # note that currently the json_object is just {}, this will change when linked media # is implemented multipart_consumer = MultipartFileConsumer( - clock, request, media_info.media_type, {} + clock, request, media_info.media_type, {}, media_info.media_length ) logger.debug("Responding to media request with responder %s", responder) - # ensure that the response length takes into account the multipart boundary and headers, - # which is currently 177 bytes (note that this will need to be determined dynamically when - # the json object passed into the multipart file consumer isn't just {}) if media_info.media_length is not None: - request.setHeader( - b"Content-Length", b"%d" % (media_info.media_length + 177,) - ) + content_length = multipart_consumer.content_length() + assert content_length is not None + request.setHeader(b"Content-Length", b"%d" % (content_length,)) request.setHeader( b"Content-Type", diff --git a/synapse/media/media_storage.py b/synapse/media/media_storage.py index a56721b2495..a6c80f57f17 100644 --- a/synapse/media/media_storage.py +++ b/synapse/media/media_storage.py @@ -401,6 +401,7 @@ def __init__( wrapped_consumer: interfaces.IConsumer, file_content_type: str, json_object: JsonDict, + content_length: Optional[int] = None, ) -> None: self.clock = clock self.wrapped_consumer = wrapped_consumer @@ -418,6 +419,8 @@ def __init__( # Whether the wrapped consumer has asked us to pause. self.paused = False + self.length = content_length + ### IConsumer APIs ### def registerProducer( @@ -539,6 +542,28 @@ def resumeProducing(self) -> None: # repeatedly calling `resumeProducing` in a loop. run_in_background(self._resumeProducingRepeatedly) + def content_length(self) -> Optional[int]: + """ + Calculate the content length of the multipart response + """ + if not self.length: + return None + else: + # calculate length of json field and content-type header + json_field = json.dumps(self.json_field) + json_bytes = json_field.encode("utf-8") + json_length = len(json_bytes) + + type = self.file_content_type.encode("utf-8") + content_type = Header(b"Content-Type", type) + type_length = len(bytes(content_type)) + + # 154 is the length of the elements that aren't variable, ie + # CRLFs and boundary strings, etc + self.length += json_length + type_length + 154 + + return self.length + ### Internal APIs. ### async def _resumeProducingRepeatedly(self) -> None: From 3851206063432b6f4ca0b118958748289f72054e Mon Sep 17 00:00:00 2001 From: "H. Shay" Date: Mon, 24 Jun 2024 14:41:21 -0700 Subject: [PATCH 7/9] remove test change --- tests/server.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/tests/server.py b/tests/server.py index ef50c5e7471..f3a917f835b 100644 --- a/tests/server.py +++ b/tests/server.py @@ -291,8 +291,7 @@ def await_result(self, timeout_ms: int = 1000) -> None: while not self.is_finished(): # If there's a producer, tell it to resume producing so we get content if self._producer: - # self._producer.resumeProducing() - pass + self._producer.resumeProducing() if self._reactor.seconds() > end_time: raise TimedOutException("Timed out waiting for request to finish.") From 86208eb4d6da2032c837e5200b39c12821afb89a Mon Sep 17 00:00:00 2001 From: Shay Date: Mon, 24 Jun 2024 14:50:20 -0700 Subject: [PATCH 8/9] Apply suggestions from code review Co-authored-by: Andrew Morgan <1342360+anoadragon453@users.noreply.github.com> --- synapse/media/_base.py | 1 + synapse/media/media_storage.py | 4 ++-- tests/federation/test_federation_media.py | 4 ++-- 3 files changed, 5 insertions(+), 4 deletions(-) diff --git a/synapse/media/_base.py b/synapse/media/_base.py index c023f21e4b2..7ad0b7c3cf8 100644 --- a/synapse/media/_base.py +++ b/synapse/media/_base.py @@ -285,6 +285,7 @@ async def respond_with_multipart_responder( streaming a multipart/mixed response Args: + clock: request: the federation request to respond to responder: the responder which will send the response media_info: metadata about the media item diff --git a/synapse/media/media_storage.py b/synapse/media/media_storage.py index a6c80f57f17..4d4a31ec150 100644 --- a/synapse/media/media_storage.py +++ b/synapse/media/media_storage.py @@ -411,7 +411,7 @@ def __init__( self.file_content_type = file_content_type self.boundary = uuid4().hex.encode("ascii") - # The producer that registered with us, and if its a push or pull + # The producer that registered with us, and if it's a push or pull # producer. self.producer: Optional["interfaces.IProducer"] = None self.streaming: Optional[bool] = None @@ -453,7 +453,7 @@ def registerProducer( self.wrapped_consumer.registerProducer(self, True) - # kick off producing if producer is not streaming producer + # kick off producing if `self.producer` is not a streaming producer if not streaming: self.resumeProducing() diff --git a/tests/federation/test_federation_media.py b/tests/federation/test_federation_media.py index af2639e8a4a..2c396adbe36 100644 --- a/tests/federation/test_federation_media.py +++ b/tests/federation/test_federation_media.py @@ -104,7 +104,7 @@ def test_file_download(self) -> None: ) self.assertTrue(found_json) - # check that text file and expected value exist + # check that the text file and expected value exist found_file = any( "\r\nContent-Type: text/plain\r\n\r\nfile_to_stream" in field for field in stripped @@ -146,7 +146,7 @@ def test_file_download(self) -> None: ) self.assertTrue(found_json) - # check that png file exists and matches what was uploaded + # check that the png file exists and matches what was uploaded found_file = any(SMALL_PNG in field for field in stripped_bytes) self.assertTrue(found_file) From 9e859433f500d163039cb76a2ec8c53087b8ca40 Mon Sep 17 00:00:00 2001 From: Andrew Morgan <1342360+anoadragon453@users.noreply.github.com> Date: Tue, 25 Jun 2024 15:09:42 +0100 Subject: [PATCH 9/9] Apply suggestions from code review --- changelog.d/17350.feature | 2 +- synapse/media/media_storage.py | 24 ++++++++++++------------ 2 files changed, 13 insertions(+), 13 deletions(-) diff --git a/changelog.d/17350.feature b/changelog.d/17350.feature index c8436aa898e..709366f5b88 100644 --- a/changelog.d/17350.feature +++ b/changelog.d/17350.feature @@ -1,2 +1,2 @@ Support [MSC3916](https://github.com/matrix-org/matrix-spec-proposals/blob/rav/authentication-for-media/proposals/3916-authentication-for-media.md) -by adding a federation /download endpoint (#17350). \ No newline at end of file +by adding a federation /download endpoint. \ No newline at end of file diff --git a/synapse/media/media_storage.py b/synapse/media/media_storage.py index 4d4a31ec150..1be2c9b5f58 100644 --- a/synapse/media/media_storage.py +++ b/synapse/media/media_storage.py @@ -545,24 +545,24 @@ def resumeProducing(self) -> None: def content_length(self) -> Optional[int]: """ Calculate the content length of the multipart response + in bytes. """ if not self.length: return None - else: - # calculate length of json field and content-type header - json_field = json.dumps(self.json_field) - json_bytes = json_field.encode("utf-8") - json_length = len(json_bytes) + # calculate length of json field and content-type header + json_field = json.dumps(self.json_field) + json_bytes = json_field.encode("utf-8") + json_length = len(json_bytes) - type = self.file_content_type.encode("utf-8") - content_type = Header(b"Content-Type", type) - type_length = len(bytes(content_type)) + type = self.file_content_type.encode("utf-8") + content_type = Header(b"Content-Type", type) + type_length = len(bytes(content_type)) - # 154 is the length of the elements that aren't variable, ie - # CRLFs and boundary strings, etc - self.length += json_length + type_length + 154 + # 154 is the length of the elements that aren't variable, ie + # CRLFs and boundary strings, etc + self.length += json_length + type_length + 154 - return self.length + return self.length ### Internal APIs. ###