-
Notifications
You must be signed in to change notification settings - Fork 626
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add traceresponse headers for asgi apps (FastAPI, Starlette) #817
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -103,11 +103,14 @@ def client_response_hook(span: Span, message: dict): | |
|
||
from opentelemetry import context, trace | ||
from opentelemetry.instrumentation.asgi.version import __version__ # noqa | ||
from opentelemetry.instrumentation.propagators import ( | ||
get_global_response_propagator, | ||
) | ||
from opentelemetry.instrumentation.utils import http_status_to_status_code | ||
from opentelemetry.propagate import extract | ||
from opentelemetry.propagators.textmap import Getter | ||
from opentelemetry.propagators.textmap import Getter, Setter | ||
from opentelemetry.semconv.trace import SpanAttributes | ||
from opentelemetry.trace import Span | ||
from opentelemetry.trace import Span, set_span_in_context | ||
from opentelemetry.trace.status import Status, StatusCode | ||
from opentelemetry.util.http import remove_url_credentials | ||
|
||
|
@@ -152,6 +155,30 @@ def keys(self, carrier: dict) -> typing.List[str]: | |
asgi_getter = ASGIGetter() | ||
|
||
|
||
class ASGISetter(Setter): | ||
def set( | ||
self, carrier: dict, key: str, value: str | ||
) -> None: # pylint: disable=no-self-use | ||
"""Sets response header values on an ASGI scope according to `the spec <https://asgi.readthedocs.io/en/latest/specs/www.html#response-start-send-event>`_. | ||
Args: | ||
carrier: ASGI scope object | ||
key: response header name to set | ||
value: response header value | ||
Returns: | ||
None | ||
""" | ||
headers = carrier.get("headers") | ||
if not headers: | ||
headers = [] | ||
carrier["headers"] = headers | ||
|
||
headers.append([key.lower().encode(), value.encode()]) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I know the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Might be a stupid question but why do we encode the key and values in the header? Is this because asgi requires them to be byte strings? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @lzchen good question, it's non-obvious. From the spec:
Important parts:
|
||
|
||
|
||
asgi_setter = ASGISetter() | ||
|
||
|
||
def collect_request_attributes(scope): | ||
"""Collects HTTP request attributes from the ASGI scope and returns a | ||
dictionary to be used as span creation attributes.""" | ||
|
@@ -295,54 +322,84 @@ async def __call__(self, scope, receive, send): | |
return await self.app(scope, receive, send) | ||
|
||
token = context.attach(extract(scope, getter=asgi_getter)) | ||
span_name, additional_attributes = self.default_span_details(scope) | ||
server_span_name, additional_attributes = self.default_span_details( | ||
scope | ||
) | ||
|
||
try: | ||
with self.tracer.start_as_current_span( | ||
span_name, | ||
server_span_name, | ||
kind=trace.SpanKind.SERVER, | ||
) as span: | ||
if span.is_recording(): | ||
) as server_span: | ||
if server_span.is_recording(): | ||
attributes = collect_request_attributes(scope) | ||
attributes.update(additional_attributes) | ||
for key, value in attributes.items(): | ||
span.set_attribute(key, value) | ||
server_span.set_attribute(key, value) | ||
|
||
if callable(self.server_request_hook): | ||
self.server_request_hook(span, scope) | ||
|
||
@wraps(receive) | ||
async def wrapped_receive(): | ||
with self.tracer.start_as_current_span( | ||
" ".join((span_name, scope["type"], "receive")) | ||
) as receive_span: | ||
if callable(self.client_request_hook): | ||
self.client_request_hook(receive_span, scope) | ||
message = await receive() | ||
if receive_span.is_recording(): | ||
if message["type"] == "websocket.receive": | ||
set_status_code(receive_span, 200) | ||
receive_span.set_attribute("type", message["type"]) | ||
return message | ||
|
||
@wraps(send) | ||
async def wrapped_send(message): | ||
with self.tracer.start_as_current_span( | ||
" ".join((span_name, scope["type"], "send")) | ||
) as send_span: | ||
if callable(self.client_response_hook): | ||
self.client_response_hook(send_span, message) | ||
if send_span.is_recording(): | ||
if message["type"] == "http.response.start": | ||
status_code = message["status"] | ||
set_status_code(span, status_code) | ||
set_status_code(send_span, status_code) | ||
elif message["type"] == "websocket.send": | ||
set_status_code(span, 200) | ||
set_status_code(send_span, 200) | ||
send_span.set_attribute("type", message["type"]) | ||
await send(message) | ||
|
||
await self.app(scope, wrapped_receive, wrapped_send) | ||
self.server_request_hook(server_span, scope) | ||
|
||
otel_receive = self._get_otel_receive( | ||
server_span_name, scope, receive | ||
) | ||
|
||
otel_send = self._get_otel_send( | ||
server_span, | ||
server_span_name, | ||
scope, | ||
send, | ||
) | ||
|
||
await self.app(scope, otel_receive, otel_send) | ||
finally: | ||
context.detach(token) | ||
|
||
def _get_otel_receive(self, server_span_name, scope, receive): | ||
@wraps(receive) | ||
async def otel_receive(): | ||
with self.tracer.start_as_current_span( | ||
" ".join((server_span_name, scope["type"], "receive")) | ||
) as receive_span: | ||
if callable(self.client_request_hook): | ||
self.client_request_hook(receive_span, scope) | ||
message = await receive() | ||
if receive_span.is_recording(): | ||
if message["type"] == "websocket.receive": | ||
set_status_code(receive_span, 200) | ||
receive_span.set_attribute("type", message["type"]) | ||
return message | ||
|
||
return otel_receive | ||
|
||
def _get_otel_send(self, server_span, server_span_name, scope, send): | ||
@wraps(send) | ||
async def otel_send(message): | ||
with self.tracer.start_as_current_span( | ||
" ".join((server_span_name, scope["type"], "send")) | ||
) as send_span: | ||
if callable(self.client_response_hook): | ||
self.client_response_hook(send_span, message) | ||
if send_span.is_recording(): | ||
if message["type"] == "http.response.start": | ||
status_code = message["status"] | ||
set_status_code(server_span, status_code) | ||
set_status_code(send_span, status_code) | ||
elif message["type"] == "websocket.send": | ||
set_status_code(server_span, 200) | ||
set_status_code(send_span, 200) | ||
send_span.set_attribute("type", message["type"]) | ||
|
||
propagator = get_global_response_propagator() | ||
if propagator: | ||
propagator.inject( | ||
message, | ||
context=set_span_in_context( | ||
server_span, trace.context_api.Context() | ||
), | ||
Comment on lines
+397
to
+399
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. IMO this is a bit obtuse but I'm sure there is a good extensibility reason for it. What I really wanted to do was: propagator.inject(
message,
span=server_span,
setter=asgi_setter,
) But for now I will consider this yak shaved! |
||
setter=asgi_setter, | ||
) | ||
|
||
await send(message) | ||
|
||
return otel_send |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This wasn't included in the last release but seemed like the right format