From b1bf8d4a5470facb6e63f33ff57a144b20e2a319 Mon Sep 17 00:00:00 2001 From: Sanket Mehta Date: Tue, 29 Mar 2022 06:35:08 +0530 Subject: [PATCH] =?UTF-8?q?code=20change=20to=20add=20custom=20http=20and?= =?UTF-8?q?=20websocket=20request=20and=20response=20hea=E2=80=A6=20(#1004?= =?UTF-8?q?)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * code change to add custom http and websocket request and response headers as span attributes. Issue: https://github.com/open-telemetry/opentelemetry-python-contrib/issues/919 * adding entry to changelog * changes after running "tox -e generate" locally * - added server_span.is_recording() in _get_otel_send() just to make sure the span is recording before adding the attributes to span. - changed span to current_span to make sure attributes are being added to proper span. * removed commented code Co-authored-by: Leighton Chen Co-authored-by: Srikanth Chekuri --- CHANGELOG.md | 3 + .../instrumentation/asgi/__init__.py | 63 +++- .../tests/test_asgi_middleware.py | 277 ++++++++++++++++++ 3 files changed, 342 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 6784dc9e25..b62122d051 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -22,6 +22,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [1.10.0-0.29b0](https://github.com/open-telemetry/opentelemetry-python/releases/tag/v1.10.0-0.29b0) - 2022-03-10 +- `opentelemetry-instrumentation-wsgi` Capture custom request/response headers in span attributes + ([#1004])(https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1004) + - `opentelemetry-instrumentation-wsgi` Capture custom request/response headers in span attributes ([#925])(https://github.com/open-telemetry/opentelemetry-python-contrib/pull/925) - `opentelemetry-instrumentation-flask` Flask: Capture custom request/response headers in span attributes diff --git a/instrumentation/opentelemetry-instrumentation-asgi/src/opentelemetry/instrumentation/asgi/__init__.py b/instrumentation/opentelemetry-instrumentation-asgi/src/opentelemetry/instrumentation/asgi/__init__.py index 713a6ace7c..d8932da996 100644 --- a/instrumentation/opentelemetry-instrumentation-asgi/src/opentelemetry/instrumentation/asgi/__init__.py +++ b/instrumentation/opentelemetry-instrumentation-asgi/src/opentelemetry/instrumentation/asgi/__init__.py @@ -115,7 +115,14 @@ def client_response_hook(span: Span, message: dict): from opentelemetry.semconv.trace import SpanAttributes from opentelemetry.trace import Span, set_span_in_context from opentelemetry.trace.status import Status, StatusCode -from opentelemetry.util.http import remove_url_credentials +from opentelemetry.util.http import ( + OTEL_INSTRUMENTATION_HTTP_CAPTURE_HEADERS_SERVER_REQUEST, + OTEL_INSTRUMENTATION_HTTP_CAPTURE_HEADERS_SERVER_RESPONSE, + get_custom_headers, + normalise_request_header_name, + normalise_response_header_name, + remove_url_credentials, +) _ServerRequestHookT = typing.Optional[typing.Callable[[Span, dict], None]] _ClientRequestHookT = typing.Optional[typing.Callable[[Span, dict], None]] @@ -223,6 +230,41 @@ def collect_request_attributes(scope): return result +def collect_custom_request_headers_attributes(scope): + """returns custom HTTP request headers to be added into SERVER span as span attributes + Refer specification https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/semantic_conventions/http.md#http-request-and-response-headers""" + + attributes = {} + custom_request_headers = get_custom_headers( + OTEL_INSTRUMENTATION_HTTP_CAPTURE_HEADERS_SERVER_REQUEST + ) + + for header in custom_request_headers: + values = asgi_getter.get(scope, header) + if values: + key = normalise_request_header_name(header) + attributes.setdefault(key, []).extend(values) + + return attributes + + +def collect_custom_response_headers_attributes(message): + """returns custom HTTP response headers to be added into SERVER span as span attributes + Refer specification https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/semantic_conventions/http.md#http-request-and-response-headers""" + attributes = {} + custom_response_headers = get_custom_headers( + OTEL_INSTRUMENTATION_HTTP_CAPTURE_HEADERS_SERVER_RESPONSE + ) + + for header in custom_response_headers: + values = asgi_getter.get(message, header) + if values: + key = normalise_response_header_name(header) + attributes.setdefault(key, []).extend(values) + + return attributes + + def get_host_port_url_tuple(scope): """Returns (host, port, full_url) tuple.""" server = scope.get("server") or ["0.0.0.0", 80] @@ -342,6 +384,13 @@ async def __call__(self, scope, receive, send): for key, value in attributes.items(): current_span.set_attribute(key, value) + if current_span.kind == trace.SpanKind.SERVER: + custom_attributes = ( + collect_custom_request_headers_attributes(scope) + ) + if len(custom_attributes) > 0: + current_span.set_attributes(custom_attributes) + if callable(self.server_request_hook): self.server_request_hook(current_span, scope) @@ -395,6 +444,18 @@ async def otel_send(message): set_status_code(server_span, 200) set_status_code(send_span, 200) send_span.set_attribute("type", message["type"]) + if ( + server_span.is_recording() + and server_span.kind == trace.SpanKind.SERVER + and "headers" in message + ): + custom_response_attributes = ( + collect_custom_response_headers_attributes(message) + ) + if len(custom_response_attributes) > 0: + server_span.set_attributes( + custom_response_attributes + ) propagator = get_global_response_propagator() if propagator: diff --git a/instrumentation/opentelemetry-instrumentation-asgi/tests/test_asgi_middleware.py b/instrumentation/opentelemetry-instrumentation-asgi/tests/test_asgi_middleware.py index 4396dd224f..3a1e8424a8 100644 --- a/instrumentation/opentelemetry-instrumentation-asgi/tests/test_asgi_middleware.py +++ b/instrumentation/opentelemetry-instrumentation-asgi/tests/test_asgi_middleware.py @@ -31,6 +31,10 @@ ) from opentelemetry.test.test_base import TestBase from opentelemetry.trace import SpanKind, format_span_id, format_trace_id +from opentelemetry.util.http import ( + OTEL_INSTRUMENTATION_HTTP_CAPTURE_HEADERS_SERVER_REQUEST, + OTEL_INSTRUMENTATION_HTTP_CAPTURE_HEADERS_SERVER_RESPONSE, +) async def http_app(scope, receive, send): @@ -62,6 +66,47 @@ async def websocket_app(scope, receive, send): break +async def http_app_with_custom_headers(scope, receive, send): + message = await receive() + assert scope["type"] == "http" + if message.get("type") == "http.request": + await send( + { + "type": "http.response.start", + "status": 200, + "headers": [ + (b"Content-Type", b"text/plain"), + (b"custom-test-header-1", b"test-header-value-1"), + (b"custom-test-header-2", b"test-header-value-2"), + ], + } + ) + await send({"type": "http.response.body", "body": b"*"}) + + +async def websocket_app_with_custom_headers(scope, receive, send): + assert scope["type"] == "websocket" + while True: + message = await receive() + if message.get("type") == "websocket.connect": + await send( + { + "type": "websocket.accept", + "headers": [ + (b"custom-test-header-1", b"test-header-value-1"), + (b"custom-test-header-2", b"test-header-value-2"), + ], + } + ) + + if message.get("type") == "websocket.receive": + if message.get("text") == "ping": + await send({"type": "websocket.send", "text": "pong"}) + + if message.get("type") == "websocket.disconnect": + break + + async def simple_asgi(scope, receive, send): assert isinstance(scope, dict) if scope["type"] == "http": @@ -583,5 +628,237 @@ async def wrapped_app(scope, receive, send): ) +@mock.patch.dict( + "os.environ", + { + OTEL_INSTRUMENTATION_HTTP_CAPTURE_HEADERS_SERVER_REQUEST: "Custom-Test-Header-1,Custom-Test-Header-2,Custom-Test-Header-3", + OTEL_INSTRUMENTATION_HTTP_CAPTURE_HEADERS_SERVER_RESPONSE: "Custom-Test-Header-1,Custom-Test-Header-2,Custom-Test-Header-3", + }, +) +class TestCustomHeaders(AsgiTestBase, TestBase): + def setUp(self): + super().setUp() + self.tracer_provider, self.exporter = TestBase.create_tracer_provider() + self.tracer = self.tracer_provider.get_tracer(__name__) + self.app = otel_asgi.OpenTelemetryMiddleware( + simple_asgi, tracer_provider=self.tracer_provider + ) + + def test_http_custom_request_headers_in_span_attributes(self): + self.scope["headers"].extend( + [ + (b"custom-test-header-1", b"test-header-value-1"), + (b"custom-test-header-2", b"test-header-value-2"), + ] + ) + self.seed_app(self.app) + self.send_default_request() + self.get_all_output() + span_list = self.exporter.get_finished_spans() + expected = { + "http.request.header.custom_test_header_1": ( + "test-header-value-1", + ), + "http.request.header.custom_test_header_2": ( + "test-header-value-2", + ), + } + for span in span_list: + if span.kind == SpanKind.SERVER: + self.assertSpanHasAttributes(span, expected) + + def test_http_custom_request_headers_not_in_span_attributes(self): + self.scope["headers"].extend( + [ + (b"custom-test-header-1", b"test-header-value-1"), + ] + ) + self.seed_app(self.app) + self.send_default_request() + self.get_all_output() + span_list = self.exporter.get_finished_spans() + expected = { + "http.request.header.custom_test_header_1": ( + "test-header-value-1", + ), + } + not_expected = { + "http.request.header.custom_test_header_2": ( + "test-header-value-2", + ), + } + for span in span_list: + if span.kind == SpanKind.SERVER: + self.assertSpanHasAttributes(span, expected) + for key, _ in not_expected.items(): + self.assertNotIn(key, span.attributes) + + def test_http_custom_response_headers_in_span_attributes(self): + self.app = otel_asgi.OpenTelemetryMiddleware( + http_app_with_custom_headers, tracer_provider=self.tracer_provider + ) + self.seed_app(self.app) + self.send_default_request() + self.get_all_output() + span_list = self.exporter.get_finished_spans() + expected = { + "http.response.header.custom_test_header_1": ( + "test-header-value-1", + ), + "http.response.header.custom_test_header_2": ( + "test-header-value-2", + ), + } + for span in span_list: + if span.kind == SpanKind.SERVER: + self.assertSpanHasAttributes(span, expected) + + def test_http_custom_response_headers_not_in_span_attributes(self): + self.app = otel_asgi.OpenTelemetryMiddleware( + http_app_with_custom_headers, tracer_provider=self.tracer_provider + ) + self.seed_app(self.app) + self.send_default_request() + self.get_all_output() + span_list = self.exporter.get_finished_spans() + not_expected = { + "http.response.header.custom_test_header_3": ( + "test-header-value-3", + ), + } + for span in span_list: + if span.kind == SpanKind.SERVER: + for key, _ in not_expected.items(): + self.assertNotIn(key, span.attributes) + + def test_websocket_custom_request_headers_in_span_attributes(self): + self.scope = { + "type": "websocket", + "http_version": "1.1", + "scheme": "ws", + "path": "/", + "query_string": b"", + "headers": [ + (b"custom-test-header-1", b"test-header-value-1"), + (b"custom-test-header-2", b"test-header-value-2"), + ], + "client": ("127.0.0.1", 32767), + "server": ("127.0.0.1", 80), + } + self.seed_app(self.app) + self.send_input({"type": "websocket.connect"}) + self.send_input({"type": "websocket.receive", "text": "ping"}) + self.send_input({"type": "websocket.disconnect"}) + + self.get_all_output() + span_list = self.exporter.get_finished_spans() + expected = { + "http.request.header.custom_test_header_1": ( + "test-header-value-1", + ), + "http.request.header.custom_test_header_2": ( + "test-header-value-2", + ), + } + for span in span_list: + if span.kind == SpanKind.SERVER: + self.assertSpanHasAttributes(span, expected) + + def test_websocket_custom_request_headers_not_in_span_attributes(self): + self.scope = { + "type": "websocket", + "http_version": "1.1", + "scheme": "ws", + "path": "/", + "query_string": b"", + "headers": [ + (b"Custom-Test-Header-1", b"test-header-value-1"), + (b"Custom-Test-Header-2", b"test-header-value-2"), + ], + "client": ("127.0.0.1", 32767), + "server": ("127.0.0.1", 80), + } + self.seed_app(self.app) + self.send_input({"type": "websocket.connect"}) + self.send_input({"type": "websocket.receive", "text": "ping"}) + self.send_input({"type": "websocket.disconnect"}) + + self.get_all_output() + span_list = self.exporter.get_finished_spans() + not_expected = { + "http.request.header.custom_test_header_3": ( + "test-header-value-3", + ), + } + for span in span_list: + if span.kind == SpanKind.SERVER: + for key, _ in not_expected.items(): + self.assertNotIn(key, span.attributes) + + def test_websocket_custom_response_headers_in_span_attributes(self): + self.scope = { + "type": "websocket", + "http_version": "1.1", + "scheme": "ws", + "path": "/", + "query_string": b"", + "headers": [], + "client": ("127.0.0.1", 32767), + "server": ("127.0.0.1", 80), + } + self.app = otel_asgi.OpenTelemetryMiddleware( + websocket_app_with_custom_headers, + tracer_provider=self.tracer_provider, + ) + self.seed_app(self.app) + self.send_input({"type": "websocket.connect"}) + self.send_input({"type": "websocket.receive", "text": "ping"}) + self.send_input({"type": "websocket.disconnect"}) + self.get_all_output() + span_list = self.exporter.get_finished_spans() + expected = { + "http.response.header.custom_test_header_1": ( + "test-header-value-1", + ), + "http.response.header.custom_test_header_2": ( + "test-header-value-2", + ), + } + for span in span_list: + if span.kind == SpanKind.SERVER: + self.assertSpanHasAttributes(span, expected) + + def test_websocket_custom_response_headers_not_in_span_attributes(self): + self.scope = { + "type": "websocket", + "http_version": "1.1", + "scheme": "ws", + "path": "/", + "query_string": b"", + "headers": [], + "client": ("127.0.0.1", 32767), + "server": ("127.0.0.1", 80), + } + self.app = otel_asgi.OpenTelemetryMiddleware( + websocket_app_with_custom_headers, + tracer_provider=self.tracer_provider, + ) + self.seed_app(self.app) + self.send_input({"type": "websocket.connect"}) + self.send_input({"type": "websocket.receive", "text": "ping"}) + self.send_input({"type": "websocket.disconnect"}) + self.get_all_output() + span_list = self.exporter.get_finished_spans() + not_expected = { + "http.response.header.custom_test_header_3": ( + "test-header-value-3", + ), + } + for span in span_list: + if span.kind == SpanKind.SERVER: + for key, _ in not_expected.items(): + self.assertNotIn(key, span.attributes) + + if __name__ == "__main__": unittest.main()