Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Trace how long it takes for the send trasaction to complete, including retrys #5986

Merged
merged 7 commits into from
Sep 5, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/5986.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Trace replication send times.
87 changes: 51 additions & 36 deletions synapse/http/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
redact_uri,
)
from synapse.logging.context import make_deferred_yieldable
from synapse.logging.opentracing import set_tag, start_active_span, tags
from synapse.util.async_helpers import timeout_deferred
from synapse.util.caches import CACHE_SIZE_FACTOR

Expand Down Expand Up @@ -269,42 +270,56 @@ def request(self, method, uri, data=None, headers=None):
# log request but strip `access_token` (AS requests for example include this)
logger.info("Sending request %s %s", method, redact_uri(uri))

try:
body_producer = None
if data is not None:
body_producer = QuieterFileBodyProducer(BytesIO(data))

request_deferred = treq.request(
method,
uri,
agent=self.agent,
data=body_producer,
headers=headers,
**self._extra_treq_args
)
request_deferred = timeout_deferred(
request_deferred,
60,
self.hs.get_reactor(),
cancelled_to_request_timed_out_error,
)
response = yield make_deferred_yieldable(request_deferred)

incoming_responses_counter.labels(method, response.code).inc()
logger.info(
"Received response to %s %s: %s", method, redact_uri(uri), response.code
)
return response
except Exception as e:
incoming_responses_counter.labels(method, "ERR").inc()
logger.info(
"Error sending request to %s %s: %s %s",
method,
redact_uri(uri),
type(e).__name__,
e.args[0],
)
raise
with start_active_span(
"outgoing-client-request",
tags={
tags.SPAN_KIND: tags.SPAN_KIND_RPC_CLIENT,
tags.HTTP_METHOD: method,
tags.HTTP_URL: uri,
},
finish_on_close=True,
):
try:
body_producer = None
if data is not None:
body_producer = QuieterFileBodyProducer(BytesIO(data))

request_deferred = treq.request(
method,
uri,
agent=self.agent,
data=body_producer,
headers=headers,
**self._extra_treq_args
)
request_deferred = timeout_deferred(
request_deferred,
60,
self.hs.get_reactor(),
cancelled_to_request_timed_out_error,
)
response = yield make_deferred_yieldable(request_deferred)

incoming_responses_counter.labels(method, response.code).inc()
logger.info(
"Received response to %s %s: %s",
method,
redact_uri(uri),
response.code,
)
return response
except Exception as e:
incoming_responses_counter.labels(method, "ERR").inc()
logger.info(
"Error sending request to %s %s: %s %s",
method,
redact_uri(uri),
type(e).__name__,
e.args[0],
)
set_tag(tags.ERROR, True)
set_tag("error_reason", e.args[0])
raise

@defer.inlineCallbacks
def post_urlencoded_get_json(self, uri, args={}, headers=None):
Expand Down
1 change: 0 additions & 1 deletion synapse/http/matrixfederationclient.py
Original file line number Diff line number Diff line change
Expand Up @@ -345,7 +345,6 @@ def _send_request(
else:
query_bytes = b""

# Retreive current span
scope = start_active_span(
"outgoing-federation-request",
tags={
Expand Down
7 changes: 6 additions & 1 deletion synapse/replication/http/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,11 @@
RequestSendFailed,
SynapseError,
)
from synapse.logging.opentracing import inject_active_span_byte_dict, trace_servlet
from synapse.logging.opentracing import (
inject_active_span_byte_dict,
trace,
trace_servlet,
)
from synapse.util.caches.response_cache import ResponseCache
from synapse.util.stringutils import random_string

Expand Down Expand Up @@ -129,6 +133,7 @@ def make_client(cls, hs):

client = hs.get_simple_http_client()

@trace(opname="outgoing_replication_request")
@defer.inlineCallbacks
def send_request(**kwargs):
data = yield cls._serialize_payload(**kwargs)
Expand Down