Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Add @cancellable decorator, for use on endpoint methods that can be cancelled when clients disconnect #12583

Closed
wants to merge 21 commits into from
Closed
Show file tree
Hide file tree
Changes from 17 commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
2780bed
Add `@cancellable` decorator, for use on request handlers
Apr 28, 2022
1ce7dbf
Improve logging for cancelled requests
Apr 28, 2022
0dc4178
Add ability to cancel disconnected requests to `SynapseRequest`
Apr 28, 2022
5a9991c
Capture the `Deferred` for request cancellation in `_AsyncResource`
Apr 28, 2022
46cdb4b
Respect the `@cancellable` flag for `DirectServe{Html,Json}Resource`s
Apr 28, 2022
2326bbf
Respect the `@cancellable` flag for `RestServlet`s and `BaseFederatio…
Apr 28, 2022
c3eb1e3
Complain if a federation endpoint has the `@cancellable` flag
Apr 28, 2022
62d3b91
Respect the `@cancellable` flag for `ReplicationEndpoint`s
Apr 28, 2022
3d89472
Expose the `SynapseRequest` from `FakeChannel` for testing disconnection
Apr 28, 2022
2bbad29
Add helper class for testing request cancellation
Apr 28, 2022
6720b87
Test the `@cancellable` flag on `RestServlet` methods
Apr 28, 2022
92b7b17
Test the `@cancellable` flag on `DirectServe{Html,Json}Resource` methods
Apr 28, 2022
d3f75f3
Test the `@cancellable` flag on `ReplicationEndpoint._handle_request`
Apr 28, 2022
3544cfd
Fix `make_signed_federation_request` turning empty dicts into `b""`
Apr 28, 2022
89cb0f1
Test the `@cancellable` flag on `BaseFederationServlet` methods
Apr 28, 2022
342a502
Disable tests for the `@cancellable` flag on `BaseFederationServlet` …
Apr 28, 2022
a89fc72
Add newsfile
Apr 28, 2022
3f8a59f
Don't trash the logging context when cancelling request processing
May 6, 2022
08acc0c
Rename to `EndpointCancellationTestCase` to `EndpointCancellationTest…
May 6, 2022
4976ae5
Add missing docstring for `expected_body` parameter
May 6, 2022
4e644ea
Improve assertion message when `await_result=False` is forgotten
May 6, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/12583.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add `@cancellable` decorator, for use on endpoint methods that can be cancelled when clients disconnect.
13 changes: 12 additions & 1 deletion synapse/federation/transport/server/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@

from synapse.api.errors import Codes, FederationDeniedError, SynapseError
from synapse.api.urls import FEDERATION_V1_PREFIX
from synapse.http.server import HttpServer, ServletCallback
from synapse.http.server import HttpServer, ServletCallback, is_method_cancellable
from synapse.http.servlet import parse_json_object_from_request
from synapse.http.site import SynapseRequest
from synapse.logging.context import run_in_background
Expand Down Expand Up @@ -373,6 +373,17 @@ def register(self, server: HttpServer) -> None:
if code is None:
continue

if is_method_cancellable(code):
# The wrapper added by `self._wrap` will inherit the cancellable flag,
# but the wrapper itself does not support cancellation yet.
# Once resolved, the cancellation tests in
# `tests/federation/transport/server/test__base.py` can be re-enabled.
raise Exception(
f"{self.__class__.__name__}.on_{method} has been marked as "
"cancellable, but federation servlets do not support cancellation "
"yet."
)

server.register_paths(
method,
(pattern,),
Expand Down
89 changes: 88 additions & 1 deletion synapse/http/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
Optional,
Pattern,
Tuple,
TypeVar,
Union,
)

Expand All @@ -43,6 +44,7 @@
from zope.interface import implementer

from twisted.internet import defer, interfaces
from twisted.internet.defer import CancelledError
from twisted.python import failure
from twisted.web import resource
from twisted.web.server import NOT_DONE_YET, Request
Expand Down Expand Up @@ -82,6 +84,61 @@
</html>
"""

# A fictional HTTP status code for requests where the client has disconnected and we
# successfully cancelled the request. Used only for logging purposes. Clients will never
# observe this code unless cancellations leak across requests or we raise a
# `CancelledError` ourselves.
# Analogous to nginx's 499 status code:
# https://github.com/nginx/nginx/blob/release-1.21.6/src/http/ngx_http_request.h#L128-L134
HTTP_STATUS_REQUEST_CANCELLED = 499


F = TypeVar("F", bound=Callable[..., Any])


_cancellable_method_names = frozenset(
{
# `RestServlet`, `BaseFederationServlet` and `BaseFederationServerServlet`
# methods
"on_GET",
"on_PUT",
"on_POST",
"on_DELETE",
# `_AsyncResource`, `DirectServeHtmlResource` and `DirectServeJsonResource`
# methods
"_async_render_GET",
"_async_render_PUT",
"_async_render_POST",
"_async_render_DELETE",
"_async_render_OPTIONS",
# `ReplicationEndpoint` methods
"_handle_request",
}
)


def cancellable(method: F) -> F:
"""Marks a servlet method as cancellable.

Usage:
class SomeServlet(RestServlet):
@cancellable
async def on_GET(self, request: SynapseRequest) -> ...:
...
"""
if method.__name__ not in _cancellable_method_names:
raise ValueError(
"@cancellable decorator can only be applied to servlet methods."
)

method.cancellable = True # type: ignore[attr-defined]
return method


def is_method_cancellable(method: Callable[..., Any]) -> bool:
"""Checks whether a servlet method is cancellable."""
return getattr(method, "cancellable", False)


def return_json_error(f: failure.Failure, request: SynapseRequest) -> None:
"""Sends a JSON error response to clients."""
Expand All @@ -93,6 +150,17 @@ def return_json_error(f: failure.Failure, request: SynapseRequest) -> None:
error_dict = exc.error_dict()

logger.info("%s SynapseError: %s - %s", request, error_code, exc.msg)
elif f.check(CancelledError):
error_code = HTTP_STATUS_REQUEST_CANCELLED
error_dict = {"error": "Request cancelled", "errcode": Codes.UNKNOWN}

if not request._disconnected:
logger.error(
"Got cancellation before client disconnection from %r: %r",
request.request_metrics.name,
request,
exc_info=(f.type, f.value, f.getTracebackObject()), # type: ignore[arg-type]
)
else:
error_code = 500
error_dict = {"error": "Internal server error", "errcode": Codes.UNKNOWN}
Expand Down Expand Up @@ -155,6 +223,16 @@ def return_html_error(
request,
exc_info=(f.type, f.value, f.getTracebackObject()), # type: ignore[arg-type]
)
elif f.check(CancelledError):
code = HTTP_STATUS_REQUEST_CANCELLED
msg = "Request cancelled"

if not request._disconnected:
logger.error(
"Got cancellation before client disconnection when handling request %r",
request,
exc_info=(f.type, f.value, f.getTracebackObject()), # type: ignore[arg-type]
)
else:
code = HTTPStatus.INTERNAL_SERVER_ERROR
msg = "Internal server error"
Expand Down Expand Up @@ -223,6 +301,9 @@ def register_paths(
If the regex contains groups these gets passed to the callback via
an unpacked tuple.

The callback may be marked with the `@cancellable` decorator, which will
cause request processing to be cancelled when clients disconnect early.

Comment on lines +304 to +306
Copy link
Contributor Author

@squahtx squahtx May 5, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we should add an explicit cancellable parameter to register_paths instead.
There are a few places where we call register_paths manually, with callbacks that don't match the ^on_(GET|PUT|POST|DELETE)$ pattern, eg.

http_server.register_paths(
"GET",
client_patterns(no_state_key, v1=True),
self.on_GET_no_state_key,
self.__class__.__name__,
)

There we have the option of either relaxing the validation in the @cancellable decorator or adding an explicit cancellable parameter to register_paths.

The benefit of relaxing the decorator validation is that it's more obvious that on_GET_no_state_key has cancellation enabled:

    @cancellable
    def on_GET_no_state_key(
        self, request: SynapseRequest, room_id: str, event_type: str
    ) -> Awaitable[Tuple[int, JsonDict]]:
        return self.on_GET(request, room_id, event_type, "")

    @cancellable
    async def on_GET(

Args:
method: The HTTP method to listen to.
path_patterns: The regex used to match requests.
Expand Down Expand Up @@ -253,7 +334,9 @@ def __init__(self, extract_context: bool = False):

def render(self, request: SynapseRequest) -> int:
"""This gets called by twisted every time someone sends us a request."""
defer.ensureDeferred(self._async_render_wrapper(request))
request.render_deferred = defer.ensureDeferred(
self._async_render_wrapper(request)
)
return NOT_DONE_YET

@wrap_async_request_handler
Expand Down Expand Up @@ -289,6 +372,8 @@ async def _async_render(self, request: SynapseRequest) -> Optional[Tuple[int, An

method_handler = getattr(self, "_async_render_%s" % (request_method,), None)
if method_handler:
request.is_render_cancellable = is_method_cancellable(method_handler)

raw_callback_return = method_handler(request)

# Is it synchronous? We'll allow this for now.
Expand Down Expand Up @@ -449,6 +534,8 @@ def _get_handler_for_request(
async def _async_render(self, request: SynapseRequest) -> Tuple[int, Any]:
callback, servlet_classname, group_dict = self._get_handler_for_request(request)

request.is_render_cancellable = is_method_cancellable(callback)

# Make sure we have an appropriate name for this handler in prometheus
# (rather than the default of JsonResource).
request.request_metrics.name = servlet_classname
Expand Down
23 changes: 22 additions & 1 deletion synapse/http/site.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import attr
from zope.interface import implementer

from twisted.internet.defer import Deferred
from twisted.internet.interfaces import IAddress, IReactorTime
from twisted.python.failure import Failure
from twisted.web.http import HTTPChannel
Expand Down Expand Up @@ -91,6 +92,13 @@ def __init__(
# we can't yet create the logcontext, as we don't know the method.
self.logcontext: Optional[LoggingContext] = None

# The `Deferred` to cancel if the client disconnects early. Expected to be set
# by `Resource.render`.
self.render_deferred: Optional["Deferred[None]"] = None
# A boolean indicating whether `_render_deferred` should be cancelled if the
# client disconnects early. Expected to be set during `Resource.render`.
self.is_render_cancellable = False

global _next_request_seq
self.request_seq = _next_request_seq
_next_request_seq += 1
Expand Down Expand Up @@ -357,7 +365,20 @@ def connectionLost(self, reason: Union[Failure, Exception]) -> None:
{"event": "client connection lost", "reason": str(reason.value)}
)

if not self._is_processing:
if self._is_processing:
if self.is_render_cancellable:
if self.render_deferred is not None:
# Throw a cancellation into the request processing, in the hope
# that it will finish up sooner than it normally would.
# The `self.processing()` context manager will call
# `_finished_processing()` when done.
self.render_deferred.cancel()
else:
logger.error(
"Connection from client lost, but have no Deferred to "
"cancel even though the request is marked as cancellable."
)
else:
self._finished_processing()

def _started_processing(self, servlet_name: str) -> None:
Expand Down
21 changes: 19 additions & 2 deletions synapse/replication/http/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@

from synapse.api.errors import HttpResponseException, SynapseError
from synapse.http import RequestTimedOutError
from synapse.http.server import HttpServer
from synapse.http.server import HttpServer, is_method_cancellable
from synapse.http.site import SynapseRequest
from synapse.logging import opentracing
from synapse.logging.opentracing import trace
from synapse.types import JsonDict
Expand Down Expand Up @@ -310,6 +311,12 @@ def register(self, http_server: HttpServer) -> None:
url_args = list(self.PATH_ARGS)
method = self.METHOD

if self.CACHE and is_method_cancellable(self._handle_request):
raise Exception(
f"{self.__class__.__name__} has been marked as cancellable, but CACHE "
"is set. The cancellable flag would have no effect."
)

if self.CACHE:
url_args.append("txn_id")

Expand All @@ -324,7 +331,7 @@ def register(self, http_server: HttpServer) -> None:
)

async def _check_auth_and_handle(
self, request: Request, **kwargs: Any
self, request: SynapseRequest, **kwargs: Any
) -> Tuple[int, JsonDict]:
"""Called on new incoming requests when caching is enabled. Checks
if there is a cached response for the request and returns that,
Expand All @@ -340,8 +347,18 @@ async def _check_auth_and_handle(
if self.CACHE:
txn_id = kwargs.pop("txn_id")

# We ignore the `@cancellable` flag, since cancellation wouldn't interupt
# `_handle_request` and `ResponseCache` does not handle cancellation
# correctly yet. In particular, there may be issues to do with logging
# context lifetimes.

return await self.response_cache.wrap(
txn_id, self._handle_request, request, **kwargs
)

# The `@cancellable` decorator may be applied to `_handle_request`. But we
# told `HttpServer.register_paths` that our handler is `_check_auth_and_handle`,
# so we have to set up the cancellable flag ourselves.
request.is_render_cancellable = is_method_cancellable(self._handle_request)

return await self._handle_request(request, **kwargs)
13 changes: 13 additions & 0 deletions tests/federation/transport/server/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# Copyright 2022 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
Loading