Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Improve opentracing for incoming HTTP requests #11618

Merged
merged 5 commits into from
Dec 20, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/11618.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Improve opentracing support for incoming HTTP requests.
39 changes: 13 additions & 26 deletions synapse/federation/transport/server/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,11 @@
from synapse.http.server import HttpServer, ServletCallback
from synapse.http.servlet import parse_json_object_from_request
from synapse.http.site import SynapseRequest
from synapse.logging import opentracing
from synapse.logging.context import run_in_background
from synapse.logging.opentracing import (
SynapseTags,
start_active_span,
start_active_span_from_request,
tags,
set_tag,
span_context_from_request,
start_active_span_follows_from,
whitelisted_homeserver,
)
from synapse.server import HomeServer
Expand Down Expand Up @@ -279,30 +277,19 @@ async def new_func(
logger.warning("authenticate_request failed: %s", e)
raise

request_tags = {
SynapseTags.REQUEST_ID: request.get_request_id(),
tags.SPAN_KIND: tags.SPAN_KIND_RPC_SERVER,
tags.HTTP_METHOD: request.get_method(),
tags.HTTP_URL: request.get_redacted_uri(),
tags.PEER_HOST_IPV6: request.getClientIP(),
"authenticated_entity": origin,
"servlet_name": request.request_metrics.name,
}

# Only accept the span context if the origin is authenticated
# and whitelisted
# update the active opentracing span with the authenticated entity
set_tag("authenticated_entity", origin)

# if the origin is authenticated and whitelisted, link to its span context
context = None
if origin and whitelisted_homeserver(origin):
scope = start_active_span_from_request(
request, "incoming-federation-request", tags=request_tags
)
else:
scope = start_active_span(
"incoming-federation-request", tags=request_tags
)
context = span_context_from_request(request)

with scope:
opentracing.inject_response_headers(request.responseHeaders)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For my own reference, this is also called via trace_servlet which is used by _AsyncResource which (I think) is an eventual super-class of all federation servlets, so we were doing work twice when we didn't need to?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, that's right. Sorry I didn't make it clear!

scope = start_active_span_follows_from(
"incoming-federation-request", contexts=(context,) if context else ()
)

with scope:
if origin and self.RATELIMIT:
with ratelimiter.ratelimit(origin) as d:
await d
Expand Down
30 changes: 29 additions & 1 deletion synapse/http/site.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
import contextlib
import logging
import time
from typing import Any, Generator, Optional, Tuple, Union
from typing import TYPE_CHECKING, Any, Generator, Optional, Tuple, Union

import attr
from zope.interface import implementer
Expand All @@ -35,6 +35,9 @@
)
from synapse.types import Requester

if TYPE_CHECKING:
import opentracing

logger = logging.getLogger(__name__)

_next_request_seq = 0
Expand Down Expand Up @@ -81,6 +84,10 @@ def __init__(
# server name, for client requests this is the Requester object.
self._requester: Optional[Union[Requester, str]] = None

# An opentracing span for this request. Will be closed when the request is
# completely processed.
self._opentracing_span: "Optional[opentracing.Span]" = None

# we can't yet create the logcontext, as we don't know the method.
self.logcontext: Optional[LoggingContext] = None

Expand Down Expand Up @@ -148,6 +155,13 @@ def requester(self, value: Union[Requester, str]) -> None:
# If there's no authenticated entity, it was the requester.
self.logcontext.request.authenticated_entity = authenticated_entity or requester

def set_opentracing_span(self, span: "opentracing.Span") -> None:
"""attach an opentracing span to this request

Doing so will cause the span to be closed when we finish processing the request
"""
self._opentracing_span = span

def get_request_id(self) -> str:
return "%s-%i" % (self.get_method(), self.request_seq)

Expand Down Expand Up @@ -286,6 +300,9 @@ async def handle_request(request):
self._processing_finished_time = time.time()
self._is_processing = False

if self._opentracing_span:
self._opentracing_span.log_kv({"event": "finished processing"})

# if we've already sent the response, log it now; otherwise, we wait for the
# response to be sent.
if self.finish_time is not None:
Expand All @@ -299,6 +316,8 @@ def finish(self) -> None:
"""
self.finish_time = time.time()
Request.finish(self)
if self._opentracing_span:
self._opentracing_span.log_kv({"event": "response sent"})
if not self._is_processing:
assert self.logcontext is not None
with PreserveLoggingContext(self.logcontext):
Expand Down Expand Up @@ -333,6 +352,11 @@ def connectionLost(self, reason: Union[Failure, Exception]) -> None:
with PreserveLoggingContext(self.logcontext):
logger.info("Connection from client lost before response was sent")

if self._opentracing_span:
self._opentracing_span.log_kv(
{"event": "client connection lost", "reason": str(reason.value)}
)

if not self._is_processing:
self._finished_processing()

Expand Down Expand Up @@ -421,6 +445,10 @@ def _finished_processing(self) -> None:
usage.evt_db_fetch_count,
)

# complete the opentracing span, if any.
if self._opentracing_span:
self._opentracing_span.finish()

try:
self.request_metrics.stop(self.finish_time, self.code, self.sentLength)
except Exception as e:
Expand Down
68 changes: 22 additions & 46 deletions synapse/logging/opentracing.py
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@ def set_fates(clotho, lachesis, atropos, father="Zues", mother="Themis"):
import attr

from twisted.internet import defer
from twisted.web.http import Request
from twisted.web.http_headers import Headers

from synapse.config import ConfigError
Expand Down Expand Up @@ -490,48 +491,6 @@ def start_active_span_follows_from(
return scope


def start_active_span_from_request(
request,
operation_name,
references=None,
tags=None,
start_time=None,
ignore_active_span=False,
finish_on_close=True,
):
"""
Extracts a span context from a Twisted Request.
args:
headers (twisted.web.http.Request)

For the other args see opentracing.tracer

returns:
span_context (opentracing.span.SpanContext)
"""
# Twisted encodes the values as lists whereas opentracing doesn't.
# So, we take the first item in the list.
# Also, twisted uses byte arrays while opentracing expects strings.

if opentracing is None:
return noop_context_manager() # type: ignore[unreachable]

header_dict = {
k.decode(): v[0].decode() for k, v in request.requestHeaders.getAllRawHeaders()
}
context = opentracing.tracer.extract(opentracing.Format.HTTP_HEADERS, header_dict)

return opentracing.tracer.start_active_span(
operation_name,
child_of=context,
references=references,
tags=tags,
start_time=start_time,
ignore_active_span=ignore_active_span,
finish_on_close=finish_on_close,
)


def start_active_span_from_edu(
edu_content,
operation_name,
Expand Down Expand Up @@ -743,6 +702,20 @@ def active_span_context_as_string():
return json_encoder.encode(carrier)


def span_context_from_request(request: Request) -> "Optional[opentracing.SpanContext]":
"""Extract an opentracing context from the headers on an HTTP request

This is useful when we have received an HTTP request from another part of our
system, and want to link our spans to those of the remote system.
"""
if not opentracing:
return None
header_dict = {
k.decode(): v[0].decode() for k, v in request.requestHeaders.getAllRawHeaders()
}
return opentracing.tracer.extract(opentracing.Format.HTTP_HEADERS, header_dict)


@only_if_tracing
def span_context_from_string(carrier):
"""
Expand Down Expand Up @@ -882,10 +855,13 @@ def trace_servlet(request: "SynapseRequest", extract_context: bool = False):
}

request_name = request.request_metrics.name
if extract_context:
scope = start_active_span_from_request(request, request_name)
else:
scope = start_active_span(request_name)
context = span_context_from_request(request) if extract_context else None

# we configure the scope not to finish the span immediately on exit, and instead
# pass the span into the SynapseRequest, which will finish it once we've finished
# sending the response to the client.
scope = start_active_span(request_name, child_of=context, finish_on_close=False)
request.set_opentracing_span(scope.span)

with scope:
inject_response_headers(request.responseHeaders)
Expand Down