Skip to content

Commit

Permalink
Add instrumentor and auto instrumentation support for aiohttp (#1075)
Browse files Browse the repository at this point in the history
  • Loading branch information
mariojonke authored Oct 9, 2020
1 parent 2b37136 commit a7144ba
Show file tree
Hide file tree
Showing 4 changed files with 378 additions and 70 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ Released 2020-09-17

- Updating span name to match semantic conventions
([#972](https://github.com/open-telemetry/opentelemetry-python/pull/972))
- Add instrumentor and auto instrumentation support for aiohttp
([#1075](https://github.com/open-telemetry/opentelemetry-python/pull/1075))

## Version 0.12b0

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,17 @@ package_dir=
=src
packages=find_namespace:
install_requires =
opentelemetry-api >= 0.12.dev0
opentelemetry-api == 0.14.dev0
opentelemetry-instrumentation == 0.14.dev0
aiohttp ~= 3.0
wrapt >= 1.0.0, < 2.0.0

[options.packages.find]
where = src

[options.extras_require]
test =

[options.entry_points]
opentelemetry_instrumentor =
aiohttp-client = opentelemetry.instrumentation.aiohttp_client:AioHttpClientInstrumentor
Original file line number Diff line number Diff line change
Expand Up @@ -18,44 +18,73 @@
Usage
-----
Explicitly instrumenting a single client session:
.. code:: python
.. code:: python
import aiohttp
from opentelemetry.instrumentation.aiohttp_client import (
create_trace_config,
url_path_span_name
)
import yarl
import aiohttp
from opentelemetry.instrumentation.aiohttp_client import (
create_trace_config,
url_path_span_name
)
import yarl
def strip_query_params(url: yarl.URL) -> str:
return str(url.with_query(None))
def strip_query_params(url: yarl.URL) -> str:
return str(url.with_query(None))
async with aiohttp.ClientSession(trace_configs=[create_trace_config(
# Remove all query params from the URL attribute on the span.
url_filter=strip_query_params,
# Use the URL's path as the span name.
span_name=url_path_span_name
)]) as session:
async with session.get(url) as response:
await response.text()
async with aiohttp.ClientSession(trace_configs=[create_trace_config(
# Remove all query params from the URL attribute on the span.
url_filter=strip_query_params,
# Use the URL's path as the span name.
span_name=url_path_span_name
)]) as session:
async with session.get(url) as response:
await response.text()
Instrumenting all client sessions:
.. code:: python
import aiohttp
from opentelemetry.instrumentation.aiohttp_client import (
AioHttpClientInstrumentor
)
# Enable instrumentation
AioHttpClientInstrumentor().instrument()
# Create a session and make an HTTP get request
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
await response.text()
API
---
"""

import contextlib
import socket
import types
import typing

import aiohttp
import wrapt

from opentelemetry import context as context_api
from opentelemetry import propagators, trace
from opentelemetry.instrumentation.aiohttp_client.version import __version__
from opentelemetry.instrumentation.utils import http_status_to_canonical_code
from opentelemetry.trace import SpanKind
from opentelemetry.instrumentation.instrumentor import BaseInstrumentor
from opentelemetry.instrumentation.utils import (
http_status_to_canonical_code,
unwrap,
)
from opentelemetry.trace import SpanKind, TracerProvider, get_tracer
from opentelemetry.trace.status import Status, StatusCanonicalCode

_UrlFilterT = typing.Optional[typing.Callable[[str], str]]
_SpanNameT = typing.Optional[
typing.Union[typing.Callable[[aiohttp.TraceRequestStartParams], str], str]
]


def url_path_span_name(params: aiohttp.TraceRequestStartParams) -> str:
"""Extract a span name from the request URL path.
Expand All @@ -73,12 +102,9 @@ def url_path_span_name(params: aiohttp.TraceRequestStartParams) -> str:


def create_trace_config(
url_filter: typing.Optional[typing.Callable[[str], str]] = None,
span_name: typing.Optional[
typing.Union[
typing.Callable[[aiohttp.TraceRequestStartParams], str], str
]
] = None,
url_filter: _UrlFilterT = None,
span_name: _SpanNameT = None,
tracer_provider: TracerProvider = None,
) -> aiohttp.TraceConfig:
"""Create an aiohttp-compatible trace configuration.
Expand All @@ -104,6 +130,7 @@ def create_trace_config(
such as API keys or user personal information.
:param str span_name: Override the default span name.
:param tracer_provider: optional TracerProvider from which to get a Tracer
:return: An object suitable for use with :py:class:`aiohttp.ClientSession`.
:rtype: :py:class:`aiohttp.TraceConfig`
Expand All @@ -113,7 +140,7 @@ def create_trace_config(
# Explicitly specify the type for the `span_name` param and rtype to work
# around this issue.

tracer = trace.get_tracer_provider().get_tracer(__name__, __version__)
tracer = get_tracer(__name__, __version__, tracer_provider)

def _end_trace(trace_config_ctx: types.SimpleNamespace):
context_api.detach(trace_config_ctx.token)
Expand All @@ -124,6 +151,10 @@ async def on_request_start(
trace_config_ctx: types.SimpleNamespace,
params: aiohttp.TraceRequestStartParams,
):
if context_api.get_value("suppress_instrumentation"):
trace_config_ctx.span = None
return

http_method = params.method.upper()
if trace_config_ctx.span_name is None:
request_span_name = "HTTP {}".format(http_method)
Expand Down Expand Up @@ -158,6 +189,9 @@ async def on_request_end(
trace_config_ctx: types.SimpleNamespace,
params: aiohttp.TraceRequestEndParams,
):
if trace_config_ctx.span is None:
return

if trace_config_ctx.span.is_recording():
trace_config_ctx.span.set_status(
Status(
Expand All @@ -177,6 +211,9 @@ async def on_request_exception(
trace_config_ctx: types.SimpleNamespace,
params: aiohttp.TraceRequestExceptionParams,
):
if trace_config_ctx.span is None:
return

if trace_config_ctx.span.is_recording():
if isinstance(
params.exception,
Expand All @@ -193,6 +230,7 @@ async def on_request_exception(
status = StatusCanonicalCode.UNAVAILABLE

trace_config_ctx.span.set_status(Status(status))
trace_config_ctx.span.record_exception(params.exception)
_end_trace(trace_config_ctx)

def _trace_config_ctx_factory(**kwargs):
Expand All @@ -210,3 +248,84 @@ def _trace_config_ctx_factory(**kwargs):
trace_config.on_request_exception.append(on_request_exception)

return trace_config


def _instrument(
tracer_provider: TracerProvider = None,
url_filter: _UrlFilterT = None,
span_name: _SpanNameT = None,
):
"""Enables tracing of all ClientSessions
When a ClientSession gets created a TraceConfig is automatically added to
the session's trace_configs.
"""
# pylint:disable=unused-argument
def instrumented_init(wrapped, instance, args, kwargs):
if context_api.get_value("suppress_instrumentation"):
return wrapped(*args, **kwargs)

trace_configs = list(kwargs.get("trace_configs") or ())

trace_config = create_trace_config(
url_filter=url_filter,
span_name=span_name,
tracer_provider=tracer_provider,
)
trace_config.opentelemetry_aiohttp_instrumented = True
trace_configs.append(trace_config)

kwargs["trace_configs"] = trace_configs
return wrapped(*args, **kwargs)

wrapt.wrap_function_wrapper(
aiohttp.ClientSession, "__init__", instrumented_init
)


def _uninstrument():
"""Disables instrumenting for all newly created ClientSessions"""
unwrap(aiohttp.ClientSession, "__init__")


def _uninstrument_session(client_session: aiohttp.ClientSession):
"""Disables instrumentation for the given ClientSession"""
# pylint: disable=protected-access
trace_configs = client_session._trace_configs
client_session._trace_configs = [
trace_config
for trace_config in trace_configs
if not hasattr(trace_config, "opentelemetry_aiohttp_instrumented")
]


class AioHttpClientInstrumentor(BaseInstrumentor):
"""An instrumentor for aiohttp client sessions
See `BaseInstrumentor`
"""

def _instrument(self, **kwargs):
"""Instruments aiohttp ClientSession
Args:
**kwargs: Optional arguments
``tracer_provider``: a TracerProvider, defaults to global
``url_filter``: A callback to process the requested URL prior to adding
it as a span attribute. This can be useful to remove sensitive data
such as API keys or user personal information.
``span_name``: Override the default span name.
"""
_instrument(
tracer_provider=kwargs.get("tracer_provider"),
url_filter=kwargs.get("url_filter"),
span_name=kwargs.get("span_name"),
)

def _uninstrument(self, **kwargs):
_uninstrument()

@staticmethod
def uninstrument_session(client_session: aiohttp.ClientSession):
"""Disables instrumentation for the given session"""
_uninstrument_session(client_session)
Loading

0 comments on commit a7144ba

Please sign in to comment.