diff --git a/changelog.d/5986.feature b/changelog.d/5986.feature new file mode 100644 index 000000000000..f56aec1b32a0 --- /dev/null +++ b/changelog.d/5986.feature @@ -0,0 +1 @@ +Trace replication send times. diff --git a/synapse/http/client.py b/synapse/http/client.py index 0ae6db8ea777..51765ae3c0ad 100644 --- a/synapse/http/client.py +++ b/synapse/http/client.py @@ -46,6 +46,7 @@ redact_uri, ) from synapse.logging.context import make_deferred_yieldable +from synapse.logging.opentracing import set_tag, start_active_span, tags from synapse.util.async_helpers import timeout_deferred from synapse.util.caches import CACHE_SIZE_FACTOR @@ -269,42 +270,56 @@ def request(self, method, uri, data=None, headers=None): # log request but strip `access_token` (AS requests for example include this) logger.info("Sending request %s %s", method, redact_uri(uri)) - try: - body_producer = None - if data is not None: - body_producer = QuieterFileBodyProducer(BytesIO(data)) - - request_deferred = treq.request( - method, - uri, - agent=self.agent, - data=body_producer, - headers=headers, - **self._extra_treq_args - ) - request_deferred = timeout_deferred( - request_deferred, - 60, - self.hs.get_reactor(), - cancelled_to_request_timed_out_error, - ) - response = yield make_deferred_yieldable(request_deferred) - - incoming_responses_counter.labels(method, response.code).inc() - logger.info( - "Received response to %s %s: %s", method, redact_uri(uri), response.code - ) - return response - except Exception as e: - incoming_responses_counter.labels(method, "ERR").inc() - logger.info( - "Error sending request to %s %s: %s %s", - method, - redact_uri(uri), - type(e).__name__, - e.args[0], - ) - raise + with start_active_span( + "outgoing-client-request", + tags={ + tags.SPAN_KIND: tags.SPAN_KIND_RPC_CLIENT, + tags.HTTP_METHOD: method, + tags.HTTP_URL: uri, + }, + finish_on_close=True, + ): + try: + body_producer = None + if data is not None: + body_producer = QuieterFileBodyProducer(BytesIO(data)) + + request_deferred = treq.request( + method, + uri, + agent=self.agent, + data=body_producer, + headers=headers, + **self._extra_treq_args + ) + request_deferred = timeout_deferred( + request_deferred, + 60, + self.hs.get_reactor(), + cancelled_to_request_timed_out_error, + ) + response = yield make_deferred_yieldable(request_deferred) + + incoming_responses_counter.labels(method, response.code).inc() + logger.info( + "Received response to %s %s: %s", + method, + redact_uri(uri), + response.code, + ) + return response + except Exception as e: + incoming_responses_counter.labels(method, "ERR").inc() + logger.info( + "Error sending request to %s %s: %s %s", + method, + redact_uri(uri), + type(e).__name__, + e.args[0], + ) + set_tag(tags.ERROR, True) + set_tag("error_reason", e.args[0]) + raise @defer.inlineCallbacks def post_urlencoded_get_json(self, uri, args={}, headers=None): diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py index 4326e98a28f1..3f7c93ffcbfc 100644 --- a/synapse/http/matrixfederationclient.py +++ b/synapse/http/matrixfederationclient.py @@ -345,7 +345,6 @@ def _send_request( else: query_bytes = b"" - # Retreive current span scope = start_active_span( "outgoing-federation-request", tags={ diff --git a/synapse/replication/http/_base.py b/synapse/replication/http/_base.py index afc9a8ff2996..03560c1f0e43 100644 --- a/synapse/replication/http/_base.py +++ b/synapse/replication/http/_base.py @@ -28,7 +28,11 @@ RequestSendFailed, SynapseError, ) -from synapse.logging.opentracing import inject_active_span_byte_dict, trace_servlet +from synapse.logging.opentracing import ( + inject_active_span_byte_dict, + trace, + trace_servlet, +) from synapse.util.caches.response_cache import ResponseCache from synapse.util.stringutils import random_string @@ -129,6 +133,7 @@ def make_client(cls, hs): client = hs.get_simple_http_client() + @trace(opname="outgoing_replication_request") @defer.inlineCallbacks def send_request(**kwargs): data = yield cls._serialize_payload(**kwargs)