From 6acb1c5c6728210b20771dd2a9392abbf96d3956 Mon Sep 17 00:00:00 2001 From: Nikolay Sokolik Date: Mon, 13 Sep 2021 11:50:16 +0300 Subject: [PATCH 01/16] Added initial code --- .../README.rst | 23 ++++ .../setup.cfg | 50 ++++++++ .../setup.py | 38 ++++++ .../instrumentation/pika/__init__.py | 2 + .../instrumentation/pika/package.py | 16 +++ .../instrumentation/pika/pika_instrumentor.py | 108 ++++++++++++++++++ .../instrumentation/pika/utils.py | 78 +++++++++++++ .../instrumentation/pika/version.py | 15 +++ 8 files changed, 330 insertions(+) create mode 100644 instrumentation/opentelemetry-instrumentation-pika/README.rst create mode 100644 instrumentation/opentelemetry-instrumentation-pika/setup.cfg create mode 100644 instrumentation/opentelemetry-instrumentation-pika/setup.py create mode 100644 instrumentation/opentelemetry-instrumentation-pika/src/opentelemetry/instrumentation/pika/__init__.py create mode 100644 instrumentation/opentelemetry-instrumentation-pika/src/opentelemetry/instrumentation/pika/package.py create mode 100644 instrumentation/opentelemetry-instrumentation-pika/src/opentelemetry/instrumentation/pika/pika_instrumentor.py create mode 100644 instrumentation/opentelemetry-instrumentation-pika/src/opentelemetry/instrumentation/pika/utils.py create mode 100644 instrumentation/opentelemetry-instrumentation-pika/src/opentelemetry/instrumentation/pika/version.py diff --git a/instrumentation/opentelemetry-instrumentation-pika/README.rst b/instrumentation/opentelemetry-instrumentation-pika/README.rst new file mode 100644 index 0000000000..d94cde9629 --- /dev/null +++ b/instrumentation/opentelemetry-instrumentation-pika/README.rst @@ -0,0 +1,23 @@ +OpenTelemetry Instrumentation +=========================== + +|pypi| + +.. |pypi| image:: https://badge.fury.io/py/opentelemetry-instrumentation-pika.svg + :target: https://pypi.org/project/opentelemetry-instrumentation-pika/ + +This library allows tracing requests made by the library. + +Installation +------------ + +:: + + pip install opentelemetry-instrumentation-pika + + +References +---------- + +* `OpenTelemetry pika/ Tracing `_ +* `OpenTelemetry Project `_ diff --git a/instrumentation/opentelemetry-instrumentation-pika/setup.cfg b/instrumentation/opentelemetry-instrumentation-pika/setup.cfg new file mode 100644 index 0000000000..70e4f7e35f --- /dev/null +++ b/instrumentation/opentelemetry-instrumentation-pika/setup.cfg @@ -0,0 +1,50 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +[metadata] +name = opentelemetry-instrumentation-pika +description = OpenTelemetry pika instrumentation +long_description = file: README.rst +long_description_content_type = text/x-rst +author = OpenTelemetry Authors +author_email = cncf-opentelemetry-contributors@lists.cncf.io +url = https://github.com/open-telemetry/opentelemetry-python-contrib/instrumentation/opentelemetry-instrumentation-pika +platforms = any +license = Apache-2.0 +classifiers = + Development Status :: 4 - Beta + Intended Audience :: Developers + License :: OSI Approved :: Apache Software License + Programming Language :: Python + Programming Language :: Python :: 3 + Programming Language :: Python :: 3.6 + Programming Language :: Python :: 3.7 + Programming Language :: Python :: 3.8 + +[options] +python_requires = >=3.6 +package_dir= + =src +packages=find_namespace: + +install_requires = + opentelemetry-api ~= 1.5 + pika >= 1.1.0 + +[options.extras_require] +test = + +[options.packages.find] +where = src + diff --git a/instrumentation/opentelemetry-instrumentation-pika/setup.py b/instrumentation/opentelemetry-instrumentation-pika/setup.py new file mode 100644 index 0000000000..a54b3092c0 --- /dev/null +++ b/instrumentation/opentelemetry-instrumentation-pika/setup.py @@ -0,0 +1,38 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import os + +import setuptools + +BASE_DIR = os.path.dirname(__file__) +VERSION_FILENAME = os.path.join( + BASE_DIR, + "src", + "opentelemetry", + "instrumentation", + "pika", + "version.py", +) +PACKAGE_INFO = {} +with open(VERSION_FILENAME) as f: + exec(f.read(), PACKAGE_INFO) + +setuptools.setup( + version=PACKAGE_INFO["__version__"], + entry_points={ + "opentelemetry_instrumentor": [ + "pika = opentelemetry.instrumentation.pika:PikaInstrumentor" + ] + }, +) diff --git a/instrumentation/opentelemetry-instrumentation-pika/src/opentelemetry/instrumentation/pika/__init__.py b/instrumentation/opentelemetry-instrumentation-pika/src/opentelemetry/instrumentation/pika/__init__.py new file mode 100644 index 0000000000..0cb0d945a1 --- /dev/null +++ b/instrumentation/opentelemetry-instrumentation-pika/src/opentelemetry/instrumentation/pika/__init__.py @@ -0,0 +1,2 @@ +from .version import __version__ +from .pika_instrumentor import PikaInstrumentation diff --git a/instrumentation/opentelemetry-instrumentation-pika/src/opentelemetry/instrumentation/pika/package.py b/instrumentation/opentelemetry-instrumentation-pika/src/opentelemetry/instrumentation/pika/package.py new file mode 100644 index 0000000000..b991957291 --- /dev/null +++ b/instrumentation/opentelemetry-instrumentation-pika/src/opentelemetry/instrumentation/pika/package.py @@ -0,0 +1,16 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +_instruments = ("pika >= 1.1.0",) diff --git a/instrumentation/opentelemetry-instrumentation-pika/src/opentelemetry/instrumentation/pika/pika_instrumentor.py b/instrumentation/opentelemetry-instrumentation-pika/src/opentelemetry/instrumentation/pika/pika_instrumentor.py new file mode 100644 index 0000000000..92c3fb16fa --- /dev/null +++ b/instrumentation/opentelemetry-instrumentation-pika/src/opentelemetry/instrumentation/pika/pika_instrumentor.py @@ -0,0 +1,108 @@ +import pika +from logging import getLogger +from opentelemetry import trace +from typing import Dict, Callable +from typing import Collection, Any +from pika.adapters import BaseConnection +from opentelemetry.propagate import inject +from opentelemetry.instrumentation.pika import utils +from opentelemetry.trace import Tracer, TracerProvider +from opentelemetry.semconv.trace import MessagingOperationValues +from opentelemetry.instrumentation.pika.package import _instruments +from opentelemetry.instrumentation.instrumentor import BaseInstrumentor + + +_LOG = getLogger(__name__) +CTX_KEY = "__otel_task_span" + + +class PikaInstrumentation(BaseInstrumentor): + @staticmethod + def _instrument_consumers( + consumers_dict: Dict[str, Callable[..., Any]], tracer: Tracer + ) -> Any: + for key, callback in consumers_dict.items(): + + def decorated_callback( + channel: pika.channel.Channel, + method: pika.spec.Basic.Deliver, + properties: pika.spec.BasicProperties, + body: bytes, + ) -> Any: + if not properties: + properties = pika.spec.BasicProperties() + span = utils.get_span( + tracer, + channel, + properties, + task_name=key, + operation=MessagingOperationValues.RECEIVE, + ) + with trace.use_span(span, end_on_exit=True): + inject(properties.headers) + retval = callback(channel, method, properties, body) + return retval + + decorated_callback.__setattr__("_original_callback", callback) + consumers_dict[key] = decorated_callback + + @staticmethod + def _instrument_publish(channel: Any, tracer: Tracer) -> None: + original_basic_publish = channel.basic_publish + + def decorated_basic_publish( + exchange, routing_key, body, properties=None, mandatory=False + ): + if not properties: + properties = pika.spec.BasicProperties() + span = utils.get_span( + tracer, + channel, + properties, + task_name="(temporary)", + operation=None, + ) + with trace.use_span(span, end_on_exit=True): + inject(properties.headers) + retval = original_basic_publish( + exchange, routing_key, body, properties, mandatory + ) + return retval + + decorated_basic_publish.__setattr__( + "_original_function", original_basic_publish + ) + channel.basic_publish = decorated_basic_publish + + @staticmethod + def instrument_channel( + channel: Any, tracer_provider: TracerProvider + ) -> None: + if not hasattr(channel, "_impl") or not isinstance( + channel._impl, pika.channel.Channel + ): + _LOG.error("Could not find implementation for provided channel!") + return + tracer = trace.get_tracer(__name__, pika.__version__, tracer_provider) + if channel._impl._consumers: + PikaInstrumentation._instrument_consumers( + channel._impl._consumers, tracer + ) + PikaInstrumentation._instrument_publish(channel, tracer) + + def _uninstrument(self, connection: Any, **kwargs: Dict[str, Any]) -> None: + if not hasattr(connection, "_impl") or not isinstance( + connection._impl, BaseConnection + ): + _LOG.error("Could not find implementation for provided channel!") + return + for key, callback in connection._impl._consumers: + if hasattr(callback, "_original_callback"): + connection._consumers[key] = callback._original_callback + if hasattr(connection.basic_publish, "_original_function"): + connection.basic_publish = ( + connection.basic_publish._original_function + ) + + def instrumentation_dependencies(self) -> Collection[str]: + return _instruments diff --git a/instrumentation/opentelemetry-instrumentation-pika/src/opentelemetry/instrumentation/pika/utils.py b/instrumentation/opentelemetry-instrumentation-pika/src/opentelemetry/instrumentation/pika/utils.py new file mode 100644 index 0000000000..3789da44e2 --- /dev/null +++ b/instrumentation/opentelemetry-instrumentation-pika/src/opentelemetry/instrumentation/pika/utils.py @@ -0,0 +1,78 @@ +from typing import Optional +from pika.channel import Channel +from pika.spec import BasicProperties +from opentelemetry.trace import Tracer +from opentelemetry.trace.span import Span +from opentelemetry.propagate import extract +from opentelemetry.propagators.textmap import Getter +from opentelemetry.semconv.trace import ( + SpanAttributes, + MessagingOperationValues, +) + + +class PikaGetter(Getter): + def get(self, carrier, key): + value = carrier.get(key, None) + if value is None: + return None + return (value,) + + def keys(self, carrier): + return [] + + +pika_getter = PikaGetter() + + +def get_span( + tracer: Tracer, + channel: Channel, + properties: BasicProperties, + task_name: str, + operation: Optional[MessagingOperationValues], +) -> Span: + if properties.headers is None: + properties.headers = {} + ctx = extract(properties.headers, getter=pika_getter) + span = tracer.start_span( + context=ctx, name=generate_span_name(task_name, operation) + ) + enrich_span(span, channel, properties, task_name, operation) + return span + + +def generate_span_name( + task_name: str, operation: MessagingOperationValues +) -> str: + return f"{task_name} {operation.value}" + + +def enrich_span( + span: Span, + channel: Channel, + properties: BasicProperties, + task_destination: str, + operation: Optional[MessagingOperationValues], +) -> None: + span.set_attribute(SpanAttributes.MESSAGING_SYSTEM, "rabbitmq") + if operation: + span.set_attribute(SpanAttributes.MESSAGING_OPERATION, operation.value) + else: + span.set_attribute(SpanAttributes.MESSAGING_TEMP_DESTINATION, True) + span.set_attribute(SpanAttributes.MESSAGING_DESTINATION, task_destination) + span.set_attribute( + SpanAttributes.MESSAGING_DESTINATION_KIND, properties.type + ) + span.set_attribute( + SpanAttributes.MESSAGING_MESSAGE_ID, properties.message_id + ) + span.set_attribute( + SpanAttributes.MESSAGING_CONVERSATION_ID, properties.correlation_id + ) + span.set_attribute( + SpanAttributes.NET_PEER_NAME, channel.connection.params.host + ) + span.set_attribute( + SpanAttributes.NET_PEER_PORT, channel.connection.params.port + ) diff --git a/instrumentation/opentelemetry-instrumentation-pika/src/opentelemetry/instrumentation/pika/version.py b/instrumentation/opentelemetry-instrumentation-pika/src/opentelemetry/instrumentation/pika/version.py new file mode 100644 index 0000000000..d33bd87ce4 --- /dev/null +++ b/instrumentation/opentelemetry-instrumentation-pika/src/opentelemetry/instrumentation/pika/version.py @@ -0,0 +1,15 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +__version__ = "0.24b0" From b561657cad0855b02e6bfc2a37b8b442d4f930f0 Mon Sep 17 00:00:00 2001 From: Nikolay Sokolik Date: Mon, 13 Sep 2021 17:18:05 +0300 Subject: [PATCH 02/16] Add all needed spans, and add support of instrumentation and uninstrumentation --- .../instrumentation/pika/__init__.py | 55 ++++++++++++ .../instrumentation/pika/package.py | 3 +- .../instrumentation/pika/pika_instrumentor.py | 87 +++++++++++++------ .../instrumentation/pika/utils.py | 52 ++++++----- 4 files changed, 147 insertions(+), 50 deletions(-) diff --git a/instrumentation/opentelemetry-instrumentation-pika/src/opentelemetry/instrumentation/pika/__init__.py b/instrumentation/opentelemetry-instrumentation-pika/src/opentelemetry/instrumentation/pika/__init__.py index 0cb0d945a1..7e66da6642 100644 --- a/instrumentation/opentelemetry-instrumentation-pika/src/opentelemetry/instrumentation/pika/__init__.py +++ b/instrumentation/opentelemetry-instrumentation-pika/src/opentelemetry/instrumentation/pika/__init__.py @@ -1,2 +1,57 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" +Instrument `pika` to trace RabbitMQ applications. + +Usage +----- + +* Start broker backend + +.. code-block:: python + + docker run -p 5672:5672 rabbitmq + + +* Run instrumented task + +.. code:: python + import pika + from opentelemetry.instrumentation.pika import PikaInstrumentation + + connection = pika.BlockingConnection(pika.URLParameters('amqp://localhost')) + channel = connection.channel() + channel.queue_declare(queue='hello') + + pika_instrumentation = PikaInstrumentation() + pika_instrumentation.instrument(channel=channel) + + + channel.basic_publish(exchange='', routing_key='hello', body=b'Hello World!') + + pika_instrumentation.uninstrument(channel=channel) + + +PikaInstrumentation also supports instrumentation without creating an object, and receiving a tracer_provider + +.. code:: Python + PikaInstrumentation.instrument_channel(channel, tracer_provider=tracer_provider) + +API +--- +""" + + from .version import __version__ from .pika_instrumentor import PikaInstrumentation diff --git a/instrumentation/opentelemetry-instrumentation-pika/src/opentelemetry/instrumentation/pika/package.py b/instrumentation/opentelemetry-instrumentation-pika/src/opentelemetry/instrumentation/pika/package.py index b991957291..4173ccbd80 100644 --- a/instrumentation/opentelemetry-instrumentation-pika/src/opentelemetry/instrumentation/pika/package.py +++ b/instrumentation/opentelemetry-instrumentation-pika/src/opentelemetry/instrumentation/pika/package.py @@ -11,6 +11,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +from typing import Collection -_instruments = ("pika >= 1.1.0",) +_instruments: Collection[str] = ("pika >= 1.1.0",) diff --git a/instrumentation/opentelemetry-instrumentation-pika/src/opentelemetry/instrumentation/pika/pika_instrumentor.py b/instrumentation/opentelemetry-instrumentation-pika/src/opentelemetry/instrumentation/pika/pika_instrumentor.py index 92c3fb16fa..81cdf95f9f 100644 --- a/instrumentation/opentelemetry-instrumentation-pika/src/opentelemetry/instrumentation/pika/pika_instrumentor.py +++ b/instrumentation/opentelemetry-instrumentation-pika/src/opentelemetry/instrumentation/pika/pika_instrumentor.py @@ -1,12 +1,13 @@ import pika from logging import getLogger -from opentelemetry import trace -from typing import Dict, Callable -from typing import Collection, Any +from pika.channel import Channel from pika.adapters import BaseConnection +from typing import Dict, Callable, Optional, Collection, Any +from opentelemetry import trace from opentelemetry.propagate import inject from opentelemetry.instrumentation.pika import utils from opentelemetry.trace import Tracer, TracerProvider +from opentelemetry.instrumentation.pika import __version__ from opentelemetry.semconv.trace import MessagingOperationValues from opentelemetry.instrumentation.pika.package import _instruments from opentelemetry.instrumentation.instrumentor import BaseInstrumentor @@ -15,8 +16,10 @@ _LOG = getLogger(__name__) CTX_KEY = "__otel_task_span" +FUNCTIONS_TO_UNINSTRUMENT = ["basic_publish"] -class PikaInstrumentation(BaseInstrumentor): + +class PikaInstrumentation(BaseInstrumentor): # type: ignore @staticmethod def _instrument_consumers( consumers_dict: Dict[str, Callable[..., Any]], tracer: Tracer @@ -24,7 +27,7 @@ def _instrument_consumers( for key, callback in consumers_dict.items(): def decorated_callback( - channel: pika.channel.Channel, + channel: Channel, method: pika.spec.Basic.Deliver, properties: pika.spec.BasicProperties, body: bytes, @@ -47,12 +50,16 @@ def decorated_callback( consumers_dict[key] = decorated_callback @staticmethod - def _instrument_publish(channel: Any, tracer: Tracer) -> None: - original_basic_publish = channel.basic_publish + def _instrument_basic_publish(channel: Channel, tracer: Tracer) -> None: + original_function = getattr(channel, "basic_publish") - def decorated_basic_publish( - exchange, routing_key, body, properties=None, mandatory=False - ): + def decorated_function( + exchange: str, + routing_key: str, + body: bytes, + properties: pika.spec.BasicProperties = None, + mandatory: bool = False, + ) -> Any: if not properties: properties = pika.spec.BasicProperties() span = utils.get_span( @@ -64,45 +71,69 @@ def decorated_basic_publish( ) with trace.use_span(span, end_on_exit=True): inject(properties.headers) - retval = original_basic_publish( + retval = original_function( exchange, routing_key, body, properties, mandatory ) return retval - decorated_basic_publish.__setattr__( - "_original_function", original_basic_publish - ) - channel.basic_publish = decorated_basic_publish + decorated_function.__setattr__("_original_function", original_function) + channel.__setattr__("basic_publish", decorated_function) + channel.basic_publish = decorated_function + + @staticmethod + def _instrument_channel_functions( + channel: Channel, tracer: Tracer + ) -> None: + if hasattr(channel, "basic_publish"): + PikaInstrumentation._instrument_basic_publish(channel, tracer) + + @staticmethod + def _uninstrument_channel_functions(channel: Channel) -> None: + for function_name in FUNCTIONS_TO_UNINSTRUMENT: + if not hasattr(channel, function_name): + continue + function = getattr(channel, function_name) + if hasattr(function, "_original_function"): + channel.__setattr__(function_name, function._original_function) @staticmethod def instrument_channel( - channel: Any, tracer_provider: TracerProvider + channel: Channel, + tracer_provider: Optional[TracerProvider] = None, ) -> None: if not hasattr(channel, "_impl") or not isinstance( - channel._impl, pika.channel.Channel + channel._impl, Channel ): _LOG.error("Could not find implementation for provided channel!") return - tracer = trace.get_tracer(__name__, pika.__version__, tracer_provider) + tracer = trace.get_tracer(__name__, __version__, tracer_provider) + channel.__setattr__("__opentelemetry_tracer", tracer) if channel._impl._consumers: PikaInstrumentation._instrument_consumers( channel._impl._consumers, tracer ) - PikaInstrumentation._instrument_publish(channel, tracer) + PikaInstrumentation._instrument_channel_functions(channel, tracer) + + def _instrument(self, **kwargs: Dict[str, Any]) -> None: + channel: Channel = kwargs.get("channel", None) + if not channel or not isinstance(channel, Channel): + return + tracer_provider: TracerProvider = kwargs.get("tracer_provider", None) + PikaInstrumentation.instrument_channel(channel, tracer_provider=tracer_provider) - def _uninstrument(self, connection: Any, **kwargs: Dict[str, Any]) -> None: - if not hasattr(connection, "_impl") or not isinstance( - connection._impl, BaseConnection + def _uninstrument(self, **kwargs: Dict[str, Any]) -> None: + channel: Channel = kwargs.get("channel", None) + if not channel or not isinstance(channel, Channel): + return + if not hasattr(channel, "_impl") or not isinstance( + channel._impl, BaseConnection ): _LOG.error("Could not find implementation for provided channel!") return - for key, callback in connection._impl._consumers: + for key, callback in channel._impl._consumers: if hasattr(callback, "_original_callback"): - connection._consumers[key] = callback._original_callback - if hasattr(connection.basic_publish, "_original_function"): - connection.basic_publish = ( - connection.basic_publish._original_function - ) + channel._consumers[key] = callback._original_callback + PikaInstrumentation._uninstrument_channel_functions(channel) def instrumentation_dependencies(self) -> Collection[str]: return _instruments diff --git a/instrumentation/opentelemetry-instrumentation-pika/src/opentelemetry/instrumentation/pika/utils.py b/instrumentation/opentelemetry-instrumentation-pika/src/opentelemetry/instrumentation/pika/utils.py index 3789da44e2..08da9ca8f4 100644 --- a/instrumentation/opentelemetry-instrumentation-pika/src/opentelemetry/instrumentation/pika/utils.py +++ b/instrumentation/opentelemetry-instrumentation-pika/src/opentelemetry/instrumentation/pika/utils.py @@ -1,24 +1,24 @@ -from typing import Optional from pika.channel import Channel +from typing import Optional, List from pika.spec import BasicProperties from opentelemetry.trace import Tracer from opentelemetry.trace.span import Span from opentelemetry.propagate import extract -from opentelemetry.propagators.textmap import Getter +from opentelemetry.propagators.textmap import Getter, CarrierT from opentelemetry.semconv.trace import ( SpanAttributes, MessagingOperationValues, ) -class PikaGetter(Getter): - def get(self, carrier, key): +class PikaGetter(Getter): # type: ignore + def get(self, carrier: CarrierT, key: str) -> Optional[List[str]]: value = carrier.get(key, None) if value is None: return None - return (value,) + return [value] - def keys(self, carrier): + def keys(self, carrier: CarrierT) -> List[str]: return [] @@ -35,6 +35,7 @@ def get_span( if properties.headers is None: properties.headers = {} ctx = extract(properties.headers, getter=pika_getter) + task_name = properties.type if properties.type else task_name span = tracer.start_span( context=ctx, name=generate_span_name(task_name, operation) ) @@ -45,6 +46,8 @@ def get_span( def generate_span_name( task_name: str, operation: MessagingOperationValues ) -> str: + if not operation: + return f"{task_name} send" return f"{task_name} {operation.value}" @@ -61,18 +64,25 @@ def enrich_span( else: span.set_attribute(SpanAttributes.MESSAGING_TEMP_DESTINATION, True) span.set_attribute(SpanAttributes.MESSAGING_DESTINATION, task_destination) - span.set_attribute( - SpanAttributes.MESSAGING_DESTINATION_KIND, properties.type - ) - span.set_attribute( - SpanAttributes.MESSAGING_MESSAGE_ID, properties.message_id - ) - span.set_attribute( - SpanAttributes.MESSAGING_CONVERSATION_ID, properties.correlation_id - ) - span.set_attribute( - SpanAttributes.NET_PEER_NAME, channel.connection.params.host - ) - span.set_attribute( - SpanAttributes.NET_PEER_PORT, channel.connection.params.port - ) + if properties.message_id: + span.set_attribute( + SpanAttributes.MESSAGING_MESSAGE_ID, properties.message_id + ) + if properties.correlation_id: + span.set_attribute( + SpanAttributes.MESSAGING_CONVERSATION_ID, properties.correlation_id + ) + if not hasattr(channel.connection, "params"): + span.set_attribute( + SpanAttributes.NET_PEER_NAME, channel.connection._impl.params.host + ) + span.set_attribute( + SpanAttributes.NET_PEER_PORT, channel.connection._impl.params.port + ) + else: + span.set_attribute( + SpanAttributes.NET_PEER_NAME, channel.connection.params.host + ) + span.set_attribute( + SpanAttributes.NET_PEER_PORT, channel.connection.params.port + ) From 315b43848d1907d8a4464a9ac0c52bf4da5ed70e Mon Sep 17 00:00:00 2001 From: Nikolay Sokolik Date: Tue, 14 Sep 2021 17:13:11 +0300 Subject: [PATCH 03/16] Added tests. Ready for PR --- .../README.rst | 41 ++++- .../setup.cfg | 2 + .../setup.py | 7 +- .../instrumentation/pika/__init__.py | 10 +- .../instrumentation/pika/package.py | 1 - .../instrumentation/pika/pika_instrumentor.py | 100 ++++-------- .../instrumentation/pika/utils.py | 83 ++++++++-- .../tests/__init__.py | 0 .../tests/test_getter.py | 37 +++++ .../tests/test_pika_instrumentation.py | 118 +++++++++++++++ .../tests/test_utils.py | 143 ++++++++++++++++++ tox.ini | 9 ++ 12 files changed, 459 insertions(+), 92 deletions(-) create mode 100644 instrumentation/opentelemetry-instrumentation-pika/tests/__init__.py create mode 100644 instrumentation/opentelemetry-instrumentation-pika/tests/test_getter.py create mode 100644 instrumentation/opentelemetry-instrumentation-pika/tests/test_pika_instrumentation.py create mode 100644 instrumentation/opentelemetry-instrumentation-pika/tests/test_utils.py diff --git a/instrumentation/opentelemetry-instrumentation-pika/README.rst b/instrumentation/opentelemetry-instrumentation-pika/README.rst index d94cde9629..844bdefcb3 100644 --- a/instrumentation/opentelemetry-instrumentation-pika/README.rst +++ b/instrumentation/opentelemetry-instrumentation-pika/README.rst @@ -1,12 +1,12 @@ -OpenTelemetry Instrumentation -=========================== +OpenTelemetry pika Instrumentation +================================== |pypi| .. |pypi| image:: https://badge.fury.io/py/opentelemetry-instrumentation-pika.svg :target: https://pypi.org/project/opentelemetry-instrumentation-pika/ -This library allows tracing requests made by the library. +This library allows tracing requests made by the pika library. Installation ------------ @@ -15,6 +15,41 @@ Installation pip install opentelemetry-instrumentation-pika +Usage +----- + +* Start broker backend + +.. code-block:: python + + docker run -p 5672:5672 rabbitmq + + +* Run instrumented task + +.. code-block:: python + + import pika + from opentelemetry.instrumentation.pika import PikaInstrumentation + + connection = pika.BlockingConnection(pika.URLParameters('amqp://localhost')) + channel = connection.channel() + channel.queue_declare(queue='hello') + + pika_instrumentation = PikaInstrumentation() + pika_instrumentation.instrument(channel=channel) + + + channel.basic_publish(exchange='', routing_key='hello', body=b'Hello World!') + + pika_instrumentation.uninstrument(channel=channel) + + +* PikaInstrumentation also supports instrumentation without creating an object, and receiving a tracer_provider + +.. code-block:: python + + PikaInstrumentation.instrument_channel(channel, tracer_provider=tracer_provider) References ---------- diff --git a/instrumentation/opentelemetry-instrumentation-pika/setup.cfg b/instrumentation/opentelemetry-instrumentation-pika/setup.cfg index 70e4f7e35f..3f6eab990f 100644 --- a/instrumentation/opentelemetry-instrumentation-pika/setup.cfg +++ b/instrumentation/opentelemetry-instrumentation-pika/setup.cfg @@ -44,6 +44,8 @@ install_requires = [options.extras_require] test = + pytest + opentelemetry-test == 0.24b0 [options.packages.find] where = src diff --git a/instrumentation/opentelemetry-instrumentation-pika/setup.py b/instrumentation/opentelemetry-instrumentation-pika/setup.py index a54b3092c0..2053ff0987 100644 --- a/instrumentation/opentelemetry-instrumentation-pika/setup.py +++ b/instrumentation/opentelemetry-instrumentation-pika/setup.py @@ -17,12 +17,7 @@ BASE_DIR = os.path.dirname(__file__) VERSION_FILENAME = os.path.join( - BASE_DIR, - "src", - "opentelemetry", - "instrumentation", - "pika", - "version.py", + BASE_DIR, "src", "opentelemetry", "instrumentation", "pika", "version.py", ) PACKAGE_INFO = {} with open(VERSION_FILENAME) as f: diff --git a/instrumentation/opentelemetry-instrumentation-pika/src/opentelemetry/instrumentation/pika/__init__.py b/instrumentation/opentelemetry-instrumentation-pika/src/opentelemetry/instrumentation/pika/__init__.py index 7e66da6642..fe399be199 100644 --- a/instrumentation/opentelemetry-instrumentation-pika/src/opentelemetry/instrumentation/pika/__init__.py +++ b/instrumentation/opentelemetry-instrumentation-pika/src/opentelemetry/instrumentation/pika/__init__.py @@ -26,7 +26,8 @@ * Run instrumented task -.. code:: python +.. code-block:: python + import pika from opentelemetry.instrumentation.pika import PikaInstrumentation @@ -43,9 +44,10 @@ pika_instrumentation.uninstrument(channel=channel) -PikaInstrumentation also supports instrumentation without creating an object, and receiving a tracer_provider +* PikaInstrumentation also supports instrumentation without creating an object, and receiving a tracer_provider + +.. code-block:: python -.. code:: Python PikaInstrumentation.instrument_channel(channel, tracer_provider=tracer_provider) API @@ -53,5 +55,5 @@ """ -from .version import __version__ from .pika_instrumentor import PikaInstrumentation +from .version import __version__ diff --git a/instrumentation/opentelemetry-instrumentation-pika/src/opentelemetry/instrumentation/pika/package.py b/instrumentation/opentelemetry-instrumentation-pika/src/opentelemetry/instrumentation/pika/package.py index 4173ccbd80..27ceebbac7 100644 --- a/instrumentation/opentelemetry-instrumentation-pika/src/opentelemetry/instrumentation/pika/package.py +++ b/instrumentation/opentelemetry-instrumentation-pika/src/opentelemetry/instrumentation/pika/package.py @@ -13,5 +13,4 @@ # limitations under the License. from typing import Collection - _instruments: Collection[str] = ("pika >= 1.1.0",) diff --git a/instrumentation/opentelemetry-instrumentation-pika/src/opentelemetry/instrumentation/pika/pika_instrumentor.py b/instrumentation/opentelemetry-instrumentation-pika/src/opentelemetry/instrumentation/pika/pika_instrumentor.py index 81cdf95f9f..596b44a68e 100644 --- a/instrumentation/opentelemetry-instrumentation-pika/src/opentelemetry/instrumentation/pika/pika_instrumentor.py +++ b/instrumentation/opentelemetry-instrumentation-pika/src/opentelemetry/instrumentation/pika/pika_instrumentor.py @@ -1,17 +1,27 @@ -import pika +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. from logging import getLogger +from typing import Any, Callable, Collection, Dict, Optional + from pika.channel import Channel -from pika.adapters import BaseConnection -from typing import Dict, Callable, Optional, Collection, Any + from opentelemetry import trace -from opentelemetry.propagate import inject +from opentelemetry.instrumentation.instrumentor import BaseInstrumentor from opentelemetry.instrumentation.pika import utils -from opentelemetry.trace import Tracer, TracerProvider -from opentelemetry.instrumentation.pika import __version__ -from opentelemetry.semconv.trace import MessagingOperationValues from opentelemetry.instrumentation.pika.package import _instruments -from opentelemetry.instrumentation.instrumentor import BaseInstrumentor - +from opentelemetry.instrumentation.pika.version import __version__ +from opentelemetry.trace import Tracer, TracerProvider _LOG = getLogger(__name__) CTX_KEY = "__otel_task_span" @@ -25,58 +35,17 @@ def _instrument_consumers( consumers_dict: Dict[str, Callable[..., Any]], tracer: Tracer ) -> Any: for key, callback in consumers_dict.items(): - - def decorated_callback( - channel: Channel, - method: pika.spec.Basic.Deliver, - properties: pika.spec.BasicProperties, - body: bytes, - ) -> Any: - if not properties: - properties = pika.spec.BasicProperties() - span = utils.get_span( - tracer, - channel, - properties, - task_name=key, - operation=MessagingOperationValues.RECEIVE, - ) - with trace.use_span(span, end_on_exit=True): - inject(properties.headers) - retval = callback(channel, method, properties, body) - return retval - - decorated_callback.__setattr__("_original_callback", callback) + decorated_callback = utils.decorate_callback(callback, tracer, key) + setattr(decorated_callback, "_original_callback", callback) consumers_dict[key] = decorated_callback @staticmethod def _instrument_basic_publish(channel: Channel, tracer: Tracer) -> None: original_function = getattr(channel, "basic_publish") - - def decorated_function( - exchange: str, - routing_key: str, - body: bytes, - properties: pika.spec.BasicProperties = None, - mandatory: bool = False, - ) -> Any: - if not properties: - properties = pika.spec.BasicProperties() - span = utils.get_span( - tracer, - channel, - properties, - task_name="(temporary)", - operation=None, - ) - with trace.use_span(span, end_on_exit=True): - inject(properties.headers) - retval = original_function( - exchange, routing_key, body, properties, mandatory - ) - return retval - - decorated_function.__setattr__("_original_function", original_function) + decorated_function = utils.decorate_basic_publish( + original_function, channel, tracer + ) + setattr(decorated_function, "_original_function", original_function) channel.__setattr__("basic_publish", decorated_function) channel.basic_publish = decorated_function @@ -98,12 +67,9 @@ def _uninstrument_channel_functions(channel: Channel) -> None: @staticmethod def instrument_channel( - channel: Channel, - tracer_provider: Optional[TracerProvider] = None, + channel: Channel, tracer_provider: Optional[TracerProvider] = None, ) -> None: - if not hasattr(channel, "_impl") or not isinstance( - channel._impl, Channel - ): + if not hasattr(channel, "_impl"): _LOG.error("Could not find implementation for provided channel!") return tracer = trace.get_tracer(__name__, __version__, tracer_provider) @@ -119,20 +85,20 @@ def _instrument(self, **kwargs: Dict[str, Any]) -> None: if not channel or not isinstance(channel, Channel): return tracer_provider: TracerProvider = kwargs.get("tracer_provider", None) - PikaInstrumentation.instrument_channel(channel, tracer_provider=tracer_provider) + PikaInstrumentation.instrument_channel( + channel, tracer_provider=tracer_provider + ) def _uninstrument(self, **kwargs: Dict[str, Any]) -> None: channel: Channel = kwargs.get("channel", None) if not channel or not isinstance(channel, Channel): return - if not hasattr(channel, "_impl") or not isinstance( - channel._impl, BaseConnection - ): + if not hasattr(channel, "_impl"): _LOG.error("Could not find implementation for provided channel!") return - for key, callback in channel._impl._consumers: + for key, callback in channel._impl._consumers.items(): if hasattr(callback, "_original_callback"): - channel._consumers[key] = callback._original_callback + channel._impl._consumers[key] = callback._original_callback PikaInstrumentation._uninstrument_channel_functions(channel) def instrumentation_dependencies(self) -> Collection[str]: diff --git a/instrumentation/opentelemetry-instrumentation-pika/src/opentelemetry/instrumentation/pika/utils.py b/instrumentation/opentelemetry-instrumentation-pika/src/opentelemetry/instrumentation/pika/utils.py index 08da9ca8f4..1621900a45 100644 --- a/instrumentation/opentelemetry-instrumentation-pika/src/opentelemetry/instrumentation/pika/utils.py +++ b/instrumentation/opentelemetry-instrumentation-pika/src/opentelemetry/instrumentation/pika/utils.py @@ -1,14 +1,16 @@ +from typing import Any, Callable, List, Optional + from pika.channel import Channel -from typing import Optional, List -from pika.spec import BasicProperties -from opentelemetry.trace import Tracer -from opentelemetry.trace.span import Span -from opentelemetry.propagate import extract -from opentelemetry.propagators.textmap import Getter, CarrierT +from pika.spec import Basic, BasicProperties + +from opentelemetry import propagate, trace +from opentelemetry.propagators.textmap import CarrierT, Getter from opentelemetry.semconv.trace import ( - SpanAttributes, MessagingOperationValues, + SpanAttributes, ) +from opentelemetry.trace import Tracer +from opentelemetry.trace.span import Span class PikaGetter(Getter): # type: ignore @@ -25,16 +27,75 @@ def keys(self, carrier: CarrierT) -> List[str]: pika_getter = PikaGetter() +def decorate_callback( + callback: Callable[[Channel, Basic.Deliver, BasicProperties, bytes], Any], + tracer: Tracer, + task_name: str, +): + def decorated_callback( + channel: Channel, + method: Basic.Deliver, + properties: BasicProperties, + body: bytes, + ) -> Any: + if not properties: + properties = BasicProperties() + span = get_span( + tracer, + channel, + properties, + task_name=task_name, + operation=MessagingOperationValues.RECEIVE, + ) + with trace.use_span(span, end_on_exit=True): + propagate.inject(properties.headers) + retval = callback(channel, method, properties, body) + return retval + + return decorated_callback + + +def decorate_basic_publish( + original_function: Callable[[str, str, bytes, BasicProperties, bool], Any], + channel: Channel, + tracer: Tracer, +): + def decorated_function( + exchange: str, + routing_key: str, + body: bytes, + properties: BasicProperties = None, + mandatory: bool = False, + ) -> Any: + if not properties: + properties = BasicProperties() + span = get_span( + tracer, + channel, + properties, + task_name="(temporary)", + operation=None, + ) + with trace.use_span(span, end_on_exit=True): + propagate.inject(properties.headers) + retval = original_function( + exchange, routing_key, body, properties, mandatory + ) + return retval + + return decorated_function + + def get_span( tracer: Tracer, channel: Channel, properties: BasicProperties, task_name: str, - operation: Optional[MessagingOperationValues], + operation: Optional[MessagingOperationValues] = None, ) -> Span: if properties.headers is None: properties.headers = {} - ctx = extract(properties.headers, getter=pika_getter) + ctx = propagate.extract(properties.headers, getter=pika_getter) task_name = properties.type if properties.type else task_name span = tracer.start_span( context=ctx, name=generate_span_name(task_name, operation) @@ -44,7 +105,7 @@ def get_span( def generate_span_name( - task_name: str, operation: MessagingOperationValues + task_name: str, operation: Optional[MessagingOperationValues] ) -> str: if not operation: return f"{task_name} send" @@ -56,7 +117,7 @@ def enrich_span( channel: Channel, properties: BasicProperties, task_destination: str, - operation: Optional[MessagingOperationValues], + operation: Optional[MessagingOperationValues] = None, ) -> None: span.set_attribute(SpanAttributes.MESSAGING_SYSTEM, "rabbitmq") if operation: diff --git a/instrumentation/opentelemetry-instrumentation-pika/tests/__init__.py b/instrumentation/opentelemetry-instrumentation-pika/tests/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/instrumentation/opentelemetry-instrumentation-pika/tests/test_getter.py b/instrumentation/opentelemetry-instrumentation-pika/tests/test_getter.py new file mode 100644 index 0000000000..dff2787c8a --- /dev/null +++ b/instrumentation/opentelemetry-instrumentation-pika/tests/test_getter.py @@ -0,0 +1,37 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from unittest import TestCase + +from opentelemetry.instrumentation.pika.utils import PikaGetter + + +class TestPikaGetter(TestCase): + def setUp(self) -> None: + self.getter = PikaGetter() + + def test_get_none(self) -> None: + carrier = {} + value = self.getter.get(carrier, "test") + self.assertIsNone(value) + + def test_get_value(self) -> None: + key = "test" + value = "value" + carrier = {key: value} + val = self.getter.get(carrier, key) + self.assertEqual(val, [value]) + + def test_keys(self): + keys = self.getter.keys({}) + self.assertEqual(keys, []) diff --git a/instrumentation/opentelemetry-instrumentation-pika/tests/test_pika_instrumentation.py b/instrumentation/opentelemetry-instrumentation-pika/tests/test_pika_instrumentation.py new file mode 100644 index 0000000000..758f092456 --- /dev/null +++ b/instrumentation/opentelemetry-instrumentation-pika/tests/test_pika_instrumentation.py @@ -0,0 +1,118 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from unittest import TestCase, mock + +from pika.adapters import BaseConnection +from pika.channel import Channel + +from opentelemetry.instrumentation.pika import PikaInstrumentation +from opentelemetry.trace import Tracer + + +class TestPika(TestCase): + def setUp(self) -> None: + self.channel = mock.MagicMock(spec=Channel) + self.channel._impl = mock.MagicMock(spec=BaseConnection) + self.mock_callback = mock.MagicMock() + self.channel._impl._consumers = {"mock_key": self.mock_callback} + + @mock.patch( + "opentelemetry.instrumentation.pika.PikaInstrumentation.instrument_channel" + ) + @mock.patch( + "opentelemetry.instrumentation.pika.PikaInstrumentation._uninstrument_channel_functions" + ) + def test_instrument_api( + self, + uninstrument_channel_functions: mock.MagicMock, + instrument_channel: mock.MagicMock, + ) -> None: + instrumentation = PikaInstrumentation() + instrumentation.instrument(channel=self.channel) + instrument_channel.assert_called_once_with( + self.channel, tracer_provider=None + ) + self.channel._impl._consumers = {"mock_key": mock.MagicMock()} + self.channel._impl._consumers[ + "mock_key" + ]._original_callback = self.mock_callback + instrumentation.uninstrument(channel=self.channel) + uninstrument_channel_functions.assert_called_once() + self.assertEqual( + self.channel._impl._consumers["mock_key"], self.mock_callback + ) + + @mock.patch( + "opentelemetry.instrumentation.pika.PikaInstrumentation._instrument_channel_functions" + ) + @mock.patch( + "opentelemetry.instrumentation.pika.PikaInstrumentation._instrument_consumers" + ) + def test_instrument( + self, + instrument_consumers: mock.MagicMock, + instrument_channel_functions: mock.MagicMock, + ): + PikaInstrumentation.instrument_channel(channel=self.channel) + assert hasattr( + self.channel, "__opentelemetry_tracer" + ), "Tracer not set for the channel!" + instrument_consumers.assert_called_once() + instrument_channel_functions.assert_called_once() + + @mock.patch("opentelemetry.instrumentation.pika.utils.decorate_callback") + def test_instrument_consumers( + self, decorate_callback: mock.MagicMock + ) -> None: + tracer = mock.MagicMock(spec=Tracer) + expected_decoration_calls = [ + mock.call(value, tracer, key) + for key, value in self.channel._impl._consumers.items() + ] + PikaInstrumentation._instrument_consumers( + self.channel._impl._consumers, tracer + ) + decorate_callback.assert_has_calls( + calls=expected_decoration_calls, any_order=True + ) + assert all( + hasattr(callback, "_original_callback") + for callback in self.channel._impl._consumers.values() + ) + + @mock.patch( + "opentelemetry.instrumentation.pika.utils.decorate_basic_publish" + ) + def test_instrument_basic_publish( + self, decorate_basic_publish: mock.MagicMock + ) -> None: + tracer = mock.MagicMock(spec=Tracer) + original_function = self.channel.basic_publish + PikaInstrumentation._instrument_basic_publish(self.channel, tracer) + decorate_basic_publish.assert_called_once_with( + original_function, self.channel, tracer + ) + self.assertEqual( + self.channel.basic_publish, decorate_basic_publish.return_value + ) + self.assertEqual( + self.channel.basic_publish._original_function, original_function + ) + + def test_uninstrument_channel_functions(self) -> None: + original_function = self.channel.basic_publish + self.channel.basic_publish = mock.MagicMock() + self.channel.basic_publish._original_function = original_function + PikaInstrumentation._uninstrument_channel_functions(self.channel) + self.assertEqual(self.channel.basic_publish, original_function) diff --git a/instrumentation/opentelemetry-instrumentation-pika/tests/test_utils.py b/instrumentation/opentelemetry-instrumentation-pika/tests/test_utils.py new file mode 100644 index 0000000000..a6c188c459 --- /dev/null +++ b/instrumentation/opentelemetry-instrumentation-pika/tests/test_utils.py @@ -0,0 +1,143 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from unittest import TestCase, mock + +from opentelemetry.instrumentation.pika import utils +from opentelemetry.semconv.trace import SpanAttributes +from opentelemetry.trace import Span, Tracer + + +class TestUtils(TestCase): + @staticmethod + @mock.patch("opentelemetry.instrumentation.pika.utils.generate_span_name") + @mock.patch("opentelemetry.instrumentation.pika.utils.enrich_span") + @mock.patch("opentelemetry.propagate.extract") + def test_get_span( + extract: mock.MagicMock, + enrich_span: mock.MagicMock, + generate_span_name: mock.MagicMock, + ) -> None: + tracer = mock.MagicMock(spec=Tracer) + channel = mock.MagicMock() + properties = mock.MagicMock() + task_name = "test.test" + span = utils.get_span(tracer, channel, properties, task_name) + extract.assert_called_once() + generate_span_name.assert_called_once() + tracer.start_span.assert_called_once_with( + context=extract.return_value, name=generate_span_name.return_value + ) + enrich_span.assert_called_once() + assert any( + span in call.args or span in call.kwargs.values() + for call in enrich_span.call_args_list + ), "The returned span was not enriched using enrich_span!" + + def test_generate_span_name_no_operation(self) -> None: + task_name = "test.test" + operation = None + span_name = utils.generate_span_name(task_name, operation) + self.assertEqual(span_name, f"{task_name} send") + + def test_generate_span_name_with_operation(self) -> None: + task_name = "test.test" + operation = mock.MagicMock() + operation.value = "process" + span_name = utils.generate_span_name(task_name, operation) + self.assertEqual(span_name, f"{task_name} {operation.value}") + + @staticmethod + def test_enrich_span_basic_values() -> None: + channel = mock.MagicMock() + properties = mock.MagicMock() + task_destination = "test.test" + span = mock.MagicMock(spec=Span) + utils.enrich_span(span, channel, properties, task_destination) + span.set_attribute.assert_has_calls( + any_order=True, + calls=[ + mock.call(SpanAttributes.MESSAGING_SYSTEM, "rabbitmq"), + mock.call(SpanAttributes.MESSAGING_TEMP_DESTINATION, True), + mock.call( + SpanAttributes.MESSAGING_DESTINATION, task_destination + ), + mock.call( + SpanAttributes.MESSAGING_MESSAGE_ID, properties.message_id + ), + mock.call( + SpanAttributes.MESSAGING_CONVERSATION_ID, + properties.correlation_id, + ), + mock.call( + SpanAttributes.NET_PEER_NAME, + channel.connection.params.host, + ), + mock.call( + SpanAttributes.NET_PEER_PORT, + channel.connection.params.port, + ), + ], + ) + + @staticmethod + def test_enrich_span_with_operation() -> None: + channel = mock.MagicMock() + properties = mock.MagicMock() + task_destination = "test.test" + operation = mock.MagicMock() + span = mock.MagicMock(spec=Span) + utils.enrich_span( + span, channel, properties, task_destination, operation + ) + span.set_attribute.assert_has_calls( + any_order=True, + calls=[ + mock.call(SpanAttributes.MESSAGING_OPERATION, operation.value) + ], + ) + + @staticmethod + def test_enrich_span_without_operation() -> None: + channel = mock.MagicMock() + properties = mock.MagicMock() + task_destination = "test.test" + span = mock.MagicMock(spec=Span) + utils.enrich_span(span, channel, properties, task_destination) + span.set_attribute.assert_has_calls( + any_order=True, + calls=[mock.call(SpanAttributes.MESSAGING_TEMP_DESTINATION, True)], + ) + + @staticmethod + def test_enrich_span_unique_connection() -> None: + channel = mock.MagicMock() + properties = mock.MagicMock() + task_destination = "test.test" + span = mock.MagicMock(spec=Span) + # We do this to create the behaviour of hasattr(channel.connection, "params") == False + del channel.connection.params + utils.enrich_span(span, channel, properties, task_destination) + span.set_attribute.assert_has_calls( + any_order=True, + calls=[ + mock.call( + SpanAttributes.NET_PEER_NAME, + channel.connection._impl.params.host, + ), + mock.call( + SpanAttributes.NET_PEER_PORT, + channel.connection._impl.params.port, + ), + ], + ) diff --git a/tox.ini b/tox.ini index 70a8968607..9f165a52c7 100644 --- a/tox.ini +++ b/tox.ini @@ -152,6 +152,10 @@ envlist = py3{6,7,8,9}-test-propagator-ot-trace pypy3-test-propagator-ot-trace + ; opentelemetry-instrumentation-pika + py3{6,7,8,9}-test-instrumentation-pika + pypy3-test-instrumentation-pika + lint docker-tests docs @@ -201,6 +205,7 @@ changedir = test-instrumentation-jinja2: instrumentation/opentelemetry-instrumentation-jinja2/tests test-instrumentation-logging: instrumentation/opentelemetry-instrumentation-logging/tests test-instrumentation-mysql: instrumentation/opentelemetry-instrumentation-mysql/tests + test-instrumentation-pika: instrumentation/opentelemetry-instrumentation-pika/tests test-instrumentation-psycopg2: instrumentation/opentelemetry-instrumentation-psycopg2/tests test-instrumentation-pymemcache: instrumentation/opentelemetry-instrumentation-pymemcache/tests test-instrumentation-pymongo: instrumentation/opentelemetry-instrumentation-pymongo/tests @@ -233,6 +238,8 @@ commands_pre = celery: pip install {toxinidir}/instrumentation/opentelemetry-instrumentation-celery[test] + pika: pip install {toxinidir}/instrumentation/opentelemetry-instrumentation-pika[test] + grpc: pip install {toxinidir}/instrumentation/opentelemetry-instrumentation-grpc[test] falcon,flask,django,pyramid,tornado,starlette,fastapi,aiohttp,asgi,requests,urllib,wsgi: pip install {toxinidir}/util/opentelemetry-util-http[test] @@ -363,6 +370,7 @@ commands_pre = python -m pip install -e {toxinidir}/instrumentation/opentelemetry-instrumentation-flask[test] python -m pip install -e {toxinidir}/instrumentation/opentelemetry-instrumentation-sqlalchemy[test] python -m pip install -e {toxinidir}/instrumentation/opentelemetry-instrumentation-celery[test] + python -m pip install -e {toxinidir}/instrumentation/opentelemetry-instrumentation-pika[test] python -m pip install -e {toxinidir}/instrumentation/opentelemetry-instrumentation-sklearn[test] python -m pip install -e {toxinidir}/instrumentation/opentelemetry-instrumentation-redis[test] python -m pip install -e {toxinidir}/instrumentation/opentelemetry-instrumentation-fastapi[test] @@ -421,6 +429,7 @@ commands_pre = -e {toxinidir}/opentelemetry-python-core/tests/util \ -e {toxinidir}/instrumentation/opentelemetry-instrumentation-asyncpg \ -e {toxinidir}/instrumentation/opentelemetry-instrumentation-celery \ + -e {toxinidir}/instrumentation/opentelemetry-instrumentation-pika \ -e {toxinidir}/instrumentation/opentelemetry-instrumentation-dbapi \ -e {toxinidir}/instrumentation/opentelemetry-instrumentation-mysql \ -e {toxinidir}/instrumentation/opentelemetry-instrumentation-psycopg2 \ From d73b88f8e51073fe8d8ba18e8179b0ff094d2bcc Mon Sep 17 00:00:00 2001 From: Nikolay Sokolik Date: Sun, 19 Sep 2021 09:47:34 +0300 Subject: [PATCH 04/16] Rename RequestsInstrumentation to RequestsInstrumentor to follow conventions --- .../opentelemetry-instrumentation-pika/README.rst | 8 ++++---- .../opentelemetry-instrumentation-pika/setup.cfg | 3 +++ .../opentelemetry/instrumentation/pika/__init__.py | 10 +++++----- .../instrumentation/pika/pika_instrumentor.py | 12 ++++++------ .../tests/test_pika_instrumentation.py | 12 ++++++------ 5 files changed, 24 insertions(+), 21 deletions(-) diff --git a/instrumentation/opentelemetry-instrumentation-pika/README.rst b/instrumentation/opentelemetry-instrumentation-pika/README.rst index 844bdefcb3..55343fbb55 100644 --- a/instrumentation/opentelemetry-instrumentation-pika/README.rst +++ b/instrumentation/opentelemetry-instrumentation-pika/README.rst @@ -30,13 +30,13 @@ Usage .. code-block:: python import pika - from opentelemetry.instrumentation.pika import PikaInstrumentation + from opentelemetry.instrumentation.pika import PikaInstrumentor connection = pika.BlockingConnection(pika.URLParameters('amqp://localhost')) channel = connection.channel() channel.queue_declare(queue='hello') - pika_instrumentation = PikaInstrumentation() + pika_instrumentation = PikaInstrumentor() pika_instrumentation.instrument(channel=channel) @@ -45,11 +45,11 @@ Usage pika_instrumentation.uninstrument(channel=channel) -* PikaInstrumentation also supports instrumentation without creating an object, and receiving a tracer_provider +* PikaInstrumentor also supports instrumentation without creating an object, and receiving a tracer_provider .. code-block:: python - PikaInstrumentation.instrument_channel(channel, tracer_provider=tracer_provider) + PikaInstrumentor.instrument_channel(channel, tracer_provider=tracer_provider) References ---------- diff --git a/instrumentation/opentelemetry-instrumentation-pika/setup.cfg b/instrumentation/opentelemetry-instrumentation-pika/setup.cfg index 3f6eab990f..3de0471f4d 100644 --- a/instrumentation/opentelemetry-instrumentation-pika/setup.cfg +++ b/instrumentation/opentelemetry-instrumentation-pika/setup.cfg @@ -50,3 +50,6 @@ test = [options.packages.find] where = src +[options.entry_points] +opentelemetry_instrumentor = + pika = opentelemetry.instrumentation.pika:PikaInstrumentor diff --git a/instrumentation/opentelemetry-instrumentation-pika/src/opentelemetry/instrumentation/pika/__init__.py b/instrumentation/opentelemetry-instrumentation-pika/src/opentelemetry/instrumentation/pika/__init__.py index fe399be199..a92f2e9ff7 100644 --- a/instrumentation/opentelemetry-instrumentation-pika/src/opentelemetry/instrumentation/pika/__init__.py +++ b/instrumentation/opentelemetry-instrumentation-pika/src/opentelemetry/instrumentation/pika/__init__.py @@ -29,13 +29,13 @@ .. code-block:: python import pika - from opentelemetry.instrumentation.pika import PikaInstrumentation + from opentelemetry.instrumentation.pika import PikaInstrumentor connection = pika.BlockingConnection(pika.URLParameters('amqp://localhost')) channel = connection.channel() channel.queue_declare(queue='hello') - pika_instrumentation = PikaInstrumentation() + pika_instrumentation = PikaInstrumentor() pika_instrumentation.instrument(channel=channel) @@ -44,16 +44,16 @@ pika_instrumentation.uninstrument(channel=channel) -* PikaInstrumentation also supports instrumentation without creating an object, and receiving a tracer_provider +* PikaInstrumentor also supports instrumentation without creating an object, and receiving a tracer_provider .. code-block:: python - PikaInstrumentation.instrument_channel(channel, tracer_provider=tracer_provider) + PikaInstrumentor.instrument_channel(channel, tracer_provider=tracer_provider) API --- """ -from .pika_instrumentor import PikaInstrumentation +from .pika_instrumentor import PikaInstrumentor from .version import __version__ diff --git a/instrumentation/opentelemetry-instrumentation-pika/src/opentelemetry/instrumentation/pika/pika_instrumentor.py b/instrumentation/opentelemetry-instrumentation-pika/src/opentelemetry/instrumentation/pika/pika_instrumentor.py index 596b44a68e..4f03c9def3 100644 --- a/instrumentation/opentelemetry-instrumentation-pika/src/opentelemetry/instrumentation/pika/pika_instrumentor.py +++ b/instrumentation/opentelemetry-instrumentation-pika/src/opentelemetry/instrumentation/pika/pika_instrumentor.py @@ -29,7 +29,7 @@ FUNCTIONS_TO_UNINSTRUMENT = ["basic_publish"] -class PikaInstrumentation(BaseInstrumentor): # type: ignore +class PikaInstrumentor(BaseInstrumentor): # type: ignore @staticmethod def _instrument_consumers( consumers_dict: Dict[str, Callable[..., Any]], tracer: Tracer @@ -54,7 +54,7 @@ def _instrument_channel_functions( channel: Channel, tracer: Tracer ) -> None: if hasattr(channel, "basic_publish"): - PikaInstrumentation._instrument_basic_publish(channel, tracer) + PikaInstrumentor._instrument_basic_publish(channel, tracer) @staticmethod def _uninstrument_channel_functions(channel: Channel) -> None: @@ -75,17 +75,17 @@ def instrument_channel( tracer = trace.get_tracer(__name__, __version__, tracer_provider) channel.__setattr__("__opentelemetry_tracer", tracer) if channel._impl._consumers: - PikaInstrumentation._instrument_consumers( + PikaInstrumentor._instrument_consumers( channel._impl._consumers, tracer ) - PikaInstrumentation._instrument_channel_functions(channel, tracer) + PikaInstrumentor._instrument_channel_functions(channel, tracer) def _instrument(self, **kwargs: Dict[str, Any]) -> None: channel: Channel = kwargs.get("channel", None) if not channel or not isinstance(channel, Channel): return tracer_provider: TracerProvider = kwargs.get("tracer_provider", None) - PikaInstrumentation.instrument_channel( + PikaInstrumentor.instrument_channel( channel, tracer_provider=tracer_provider ) @@ -99,7 +99,7 @@ def _uninstrument(self, **kwargs: Dict[str, Any]) -> None: for key, callback in channel._impl._consumers.items(): if hasattr(callback, "_original_callback"): channel._impl._consumers[key] = callback._original_callback - PikaInstrumentation._uninstrument_channel_functions(channel) + PikaInstrumentor._uninstrument_channel_functions(channel) def instrumentation_dependencies(self) -> Collection[str]: return _instruments diff --git a/instrumentation/opentelemetry-instrumentation-pika/tests/test_pika_instrumentation.py b/instrumentation/opentelemetry-instrumentation-pika/tests/test_pika_instrumentation.py index 758f092456..20030678ed 100644 --- a/instrumentation/opentelemetry-instrumentation-pika/tests/test_pika_instrumentation.py +++ b/instrumentation/opentelemetry-instrumentation-pika/tests/test_pika_instrumentation.py @@ -16,7 +16,7 @@ from pika.adapters import BaseConnection from pika.channel import Channel -from opentelemetry.instrumentation.pika import PikaInstrumentation +from opentelemetry.instrumentation.pika import PikaInstrumentor from opentelemetry.trace import Tracer @@ -38,7 +38,7 @@ def test_instrument_api( uninstrument_channel_functions: mock.MagicMock, instrument_channel: mock.MagicMock, ) -> None: - instrumentation = PikaInstrumentation() + instrumentation = PikaInstrumentor() instrumentation.instrument(channel=self.channel) instrument_channel.assert_called_once_with( self.channel, tracer_provider=None @@ -64,7 +64,7 @@ def test_instrument( instrument_consumers: mock.MagicMock, instrument_channel_functions: mock.MagicMock, ): - PikaInstrumentation.instrument_channel(channel=self.channel) + PikaInstrumentor.instrument_channel(channel=self.channel) assert hasattr( self.channel, "__opentelemetry_tracer" ), "Tracer not set for the channel!" @@ -80,7 +80,7 @@ def test_instrument_consumers( mock.call(value, tracer, key) for key, value in self.channel._impl._consumers.items() ] - PikaInstrumentation._instrument_consumers( + PikaInstrumentor._instrument_consumers( self.channel._impl._consumers, tracer ) decorate_callback.assert_has_calls( @@ -99,7 +99,7 @@ def test_instrument_basic_publish( ) -> None: tracer = mock.MagicMock(spec=Tracer) original_function = self.channel.basic_publish - PikaInstrumentation._instrument_basic_publish(self.channel, tracer) + PikaInstrumentor._instrument_basic_publish(self.channel, tracer) decorate_basic_publish.assert_called_once_with( original_function, self.channel, tracer ) @@ -114,5 +114,5 @@ def test_uninstrument_channel_functions(self) -> None: original_function = self.channel.basic_publish self.channel.basic_publish = mock.MagicMock() self.channel.basic_publish._original_function = original_function - PikaInstrumentation._uninstrument_channel_functions(self.channel) + PikaInstrumentor._uninstrument_channel_functions(self.channel) self.assertEqual(self.channel.basic_publish, original_function) From 15bf875d119ddea05919e242a04e54ba4383191e Mon Sep 17 00:00:00 2001 From: Nikolay Sokolik Date: Sun, 19 Sep 2021 10:45:26 +0300 Subject: [PATCH 05/16] Add suppress_instrumentation functionality --- .../instrumentation/pika/utils.py | 14 +++++++++++-- .../tests/test_pika_instrumentation.py | 8 +++---- .../tests/test_utils.py | 21 +++++++++++++++++++ 3 files changed, 37 insertions(+), 6 deletions(-) diff --git a/instrumentation/opentelemetry-instrumentation-pika/src/opentelemetry/instrumentation/pika/utils.py b/instrumentation/opentelemetry-instrumentation-pika/src/opentelemetry/instrumentation/pika/utils.py index 1621900a45..2190c184d8 100644 --- a/instrumentation/opentelemetry-instrumentation-pika/src/opentelemetry/instrumentation/pika/utils.py +++ b/instrumentation/opentelemetry-instrumentation-pika/src/opentelemetry/instrumentation/pika/utils.py @@ -4,6 +4,7 @@ from pika.spec import Basic, BasicProperties from opentelemetry import propagate, trace +from opentelemetry.instrumentation.utils import _SUPPRESS_INSTRUMENTATION_KEY from opentelemetry.propagators.textmap import CarrierT, Getter from opentelemetry.semconv.trace import ( MessagingOperationValues, @@ -76,8 +77,13 @@ def decorated_function( task_name="(temporary)", operation=None, ) + if not span: + return original_function( + exchange, routing_key, body, properties, mandatory + ) with trace.use_span(span, end_on_exit=True): - propagate.inject(properties.headers) + if span.is_recording(): + propagate.inject(properties.headers) retval = original_function( exchange, routing_key, body, properties, mandatory ) @@ -92,10 +98,14 @@ def get_span( properties: BasicProperties, task_name: str, operation: Optional[MessagingOperationValues] = None, -) -> Span: +) -> Optional[Span]: if properties.headers is None: properties.headers = {} ctx = propagate.extract(properties.headers, getter=pika_getter) + if ctx.get_value("suppress_instrumentation") or ctx.get_value( + _SUPPRESS_INSTRUMENTATION_KEY + ): + return None task_name = properties.type if properties.type else task_name span = tracer.start_span( context=ctx, name=generate_span_name(task_name, operation) diff --git a/instrumentation/opentelemetry-instrumentation-pika/tests/test_pika_instrumentation.py b/instrumentation/opentelemetry-instrumentation-pika/tests/test_pika_instrumentation.py index 20030678ed..25052f3124 100644 --- a/instrumentation/opentelemetry-instrumentation-pika/tests/test_pika_instrumentation.py +++ b/instrumentation/opentelemetry-instrumentation-pika/tests/test_pika_instrumentation.py @@ -28,10 +28,10 @@ def setUp(self) -> None: self.channel._impl._consumers = {"mock_key": self.mock_callback} @mock.patch( - "opentelemetry.instrumentation.pika.PikaInstrumentation.instrument_channel" + "opentelemetry.instrumentation.pika.PikaInstrumentor.instrument_channel" ) @mock.patch( - "opentelemetry.instrumentation.pika.PikaInstrumentation._uninstrument_channel_functions" + "opentelemetry.instrumentation.pika.PikaInstrumentor._uninstrument_channel_functions" ) def test_instrument_api( self, @@ -54,10 +54,10 @@ def test_instrument_api( ) @mock.patch( - "opentelemetry.instrumentation.pika.PikaInstrumentation._instrument_channel_functions" + "opentelemetry.instrumentation.pika.PikaInstrumentor._instrument_channel_functions" ) @mock.patch( - "opentelemetry.instrumentation.pika.PikaInstrumentation._instrument_consumers" + "opentelemetry.instrumentation.pika.PikaInstrumentor._instrument_consumers" ) def test_instrument( self, diff --git a/instrumentation/opentelemetry-instrumentation-pika/tests/test_utils.py b/instrumentation/opentelemetry-instrumentation-pika/tests/test_utils.py index a6c188c459..912f8b30ca 100644 --- a/instrumentation/opentelemetry-instrumentation-pika/tests/test_utils.py +++ b/instrumentation/opentelemetry-instrumentation-pika/tests/test_utils.py @@ -32,6 +32,9 @@ def test_get_span( channel = mock.MagicMock() properties = mock.MagicMock() task_name = "test.test" + context = mock.MagicMock() + context.get_value.return_value = None + extract.return_value = context span = utils.get_span(tracer, channel, properties, task_name) extract.assert_called_once() generate_span_name.assert_called_once() @@ -44,6 +47,24 @@ def test_get_span( for call in enrich_span.call_args_list ), "The returned span was not enriched using enrich_span!" + @mock.patch("opentelemetry.instrumentation.pika.utils.generate_span_name") + @mock.patch("opentelemetry.instrumentation.pika.utils.enrich_span") + @mock.patch("opentelemetry.propagate.extract") + def test_get_span_suppressed( + self, + extract: mock.MagicMock, + enrich_span: mock.MagicMock, + generate_span_name: mock.MagicMock, + ) -> None: + tracer = mock.MagicMock(spec=Tracer) + channel = mock.MagicMock() + properties = mock.MagicMock() + task_name = "test.test" + span = utils.get_span(tracer, channel, properties, task_name) + self.assertEqual(span, None) + extract.assert_called_once() + generate_span_name.assert_not_called() + def test_generate_span_name_no_operation(self) -> None: task_name = "test.test" operation = None From 12a4746f08ec83837064d85d13924bf2e5bf22d9 Mon Sep 17 00:00:00 2001 From: Nikolay Sokolik Date: Sun, 19 Sep 2021 11:16:45 +0300 Subject: [PATCH 06/16] Fix suppress_instrumentation functionality --- .../src/opentelemetry/instrumentation/pika/utils.py | 5 +++-- .../tests/test_utils.py | 9 ++++++--- 2 files changed, 9 insertions(+), 5 deletions(-) diff --git a/instrumentation/opentelemetry-instrumentation-pika/src/opentelemetry/instrumentation/pika/utils.py b/instrumentation/opentelemetry-instrumentation-pika/src/opentelemetry/instrumentation/pika/utils.py index 2190c184d8..e8608c7227 100644 --- a/instrumentation/opentelemetry-instrumentation-pika/src/opentelemetry/instrumentation/pika/utils.py +++ b/instrumentation/opentelemetry-instrumentation-pika/src/opentelemetry/instrumentation/pika/utils.py @@ -3,7 +3,7 @@ from pika.channel import Channel from pika.spec import Basic, BasicProperties -from opentelemetry import propagate, trace +from opentelemetry import context, propagate, trace from opentelemetry.instrumentation.utils import _SUPPRESS_INSTRUMENTATION_KEY from opentelemetry.propagators.textmap import CarrierT, Getter from opentelemetry.semconv.trace import ( @@ -102,9 +102,10 @@ def get_span( if properties.headers is None: properties.headers = {} ctx = propagate.extract(properties.headers, getter=pika_getter) - if ctx.get_value("suppress_instrumentation") or ctx.get_value( + if context.get_value("suppress_instrumentation") or context.get_value( _SUPPRESS_INSTRUMENTATION_KEY ): + print("Suppressing instrumentation!") return None task_name = properties.type if properties.type else task_name span = tracer.start_span( diff --git a/instrumentation/opentelemetry-instrumentation-pika/tests/test_utils.py b/instrumentation/opentelemetry-instrumentation-pika/tests/test_utils.py index 912f8b30ca..4c48f0b233 100644 --- a/instrumentation/opentelemetry-instrumentation-pika/tests/test_utils.py +++ b/instrumentation/opentelemetry-instrumentation-pika/tests/test_utils.py @@ -20,6 +20,7 @@ class TestUtils(TestCase): @staticmethod + @mock.patch("opentelemetry.context.get_value") @mock.patch("opentelemetry.instrumentation.pika.utils.generate_span_name") @mock.patch("opentelemetry.instrumentation.pika.utils.enrich_span") @mock.patch("opentelemetry.propagate.extract") @@ -27,14 +28,13 @@ def test_get_span( extract: mock.MagicMock, enrich_span: mock.MagicMock, generate_span_name: mock.MagicMock, + get_value: mock.MagicMock, ) -> None: tracer = mock.MagicMock(spec=Tracer) channel = mock.MagicMock() properties = mock.MagicMock() task_name = "test.test" - context = mock.MagicMock() - context.get_value.return_value = None - extract.return_value = context + get_value.return_value = None span = utils.get_span(tracer, channel, properties, task_name) extract.assert_called_once() generate_span_name.assert_called_once() @@ -47,6 +47,7 @@ def test_get_span( for call in enrich_span.call_args_list ), "The returned span was not enriched using enrich_span!" + @mock.patch("opentelemetry.context.get_value") @mock.patch("opentelemetry.instrumentation.pika.utils.generate_span_name") @mock.patch("opentelemetry.instrumentation.pika.utils.enrich_span") @mock.patch("opentelemetry.propagate.extract") @@ -55,11 +56,13 @@ def test_get_span_suppressed( extract: mock.MagicMock, enrich_span: mock.MagicMock, generate_span_name: mock.MagicMock, + get_value: mock.MagicMock, ) -> None: tracer = mock.MagicMock(spec=Tracer) channel = mock.MagicMock() properties = mock.MagicMock() task_name = "test.test" + get_value.return_value = True span = utils.get_span(tracer, channel, properties, task_name) self.assertEqual(span, None) extract.assert_called_once() From fedd7687c28fc74d1fcb493e4b7d3ff95a32a2a8 Mon Sep 17 00:00:00 2001 From: Nikolay Sokolik Date: Sun, 19 Sep 2021 17:24:41 +0300 Subject: [PATCH 07/16] Fix CR comments and lint test failures --- CHANGELOG.md | 3 +- .../instrumentation/pika/__init__.py | 2 +- .../instrumentation/pika/pika_instrumentor.py | 52 ++++++++++++------- .../instrumentation/pika/utils.py | 24 ++++----- .../tests/test_getter.py | 4 +- .../tests/test_pika_instrumentation.py | 35 ++++--------- .../tests/test_utils.py | 24 ++++----- 7 files changed, 71 insertions(+), 73 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index ffe410a6a1..2bd0b0d20c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,7 +13,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - `opentelemetry-sdk-extension-aws` Add AWS resource detectors to extension package ([#586](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/586)) - `opentelemetry-instrumentation-asgi`, `opentelemetry-instrumentation-aiohttp-client`, `openetelemetry-instrumentation-fastapi`, - `opentelemetry-instrumentation-starlette`, `opentelemetry-instrumentation-urllib`, `opentelemetry-instrumentation-urllib3` Added `request_hook` and `response_hook` callbacks + `opentelemetry-instrumentation-starlette`, `opentelemetry-instrumentation-urllib`, `opentelemetry-instrumentation-urllib3`, + `opentelemetry-instrumentation-pika` Added `request_hook` and `response_hook` callbacks ([#576](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/576)) ### Changed diff --git a/instrumentation/opentelemetry-instrumentation-pika/src/opentelemetry/instrumentation/pika/__init__.py b/instrumentation/opentelemetry-instrumentation-pika/src/opentelemetry/instrumentation/pika/__init__.py index a92f2e9ff7..a0bffe4f8b 100644 --- a/instrumentation/opentelemetry-instrumentation-pika/src/opentelemetry/instrumentation/pika/__init__.py +++ b/instrumentation/opentelemetry-instrumentation-pika/src/opentelemetry/instrumentation/pika/__init__.py @@ -54,6 +54,6 @@ --- """ - +# pylint: disable=unused-argument from .pika_instrumentor import PikaInstrumentor from .version import __version__ diff --git a/instrumentation/opentelemetry-instrumentation-pika/src/opentelemetry/instrumentation/pika/pika_instrumentor.py b/instrumentation/opentelemetry-instrumentation-pika/src/opentelemetry/instrumentation/pika/pika_instrumentor.py index 4f03c9def3..ca54947dcc 100644 --- a/instrumentation/opentelemetry-instrumentation-pika/src/opentelemetry/instrumentation/pika/pika_instrumentor.py +++ b/instrumentation/opentelemetry-instrumentation-pika/src/opentelemetry/instrumentation/pika/pika_instrumentor.py @@ -14,6 +14,7 @@ from logging import getLogger from typing import Any, Callable, Collection, Dict, Optional +from pika.adapters import BlockingConnection from pika.channel import Channel from opentelemetry import trace @@ -24,25 +25,28 @@ from opentelemetry.trace import Tracer, TracerProvider _LOG = getLogger(__name__) -CTX_KEY = "__otel_task_span" +_CTX_KEY = "__otel_task_span" -FUNCTIONS_TO_UNINSTRUMENT = ["basic_publish"] +_FUNCTIONS_TO_UNINSTRUMENT = ["basic_publish"] class PikaInstrumentor(BaseInstrumentor): # type: ignore + # pylint: disable=attribute-defined-outside-init @staticmethod def _instrument_consumers( consumers_dict: Dict[str, Callable[..., Any]], tracer: Tracer ) -> Any: for key, callback in consumers_dict.items(): - decorated_callback = utils.decorate_callback(callback, tracer, key) + decorated_callback = utils._decorate_callback( + callback, tracer, key + ) setattr(decorated_callback, "_original_callback", callback) consumers_dict[key] = decorated_callback @staticmethod def _instrument_basic_publish(channel: Channel, tracer: Tracer) -> None: original_function = getattr(channel, "basic_publish") - decorated_function = utils.decorate_basic_publish( + decorated_function = utils._decorate_basic_publish( original_function, channel, tracer ) setattr(decorated_function, "_original_function", original_function) @@ -58,7 +62,7 @@ def _instrument_channel_functions( @staticmethod def _uninstrument_channel_functions(channel: Channel) -> None: - for function_name in FUNCTIONS_TO_UNINSTRUMENT: + for function_name in _FUNCTIONS_TO_UNINSTRUMENT: if not hasattr(channel, function_name): continue function = getattr(channel, function_name) @@ -69,30 +73,19 @@ def _uninstrument_channel_functions(channel: Channel) -> None: def instrument_channel( channel: Channel, tracer_provider: Optional[TracerProvider] = None, ) -> None: + tracer = trace.get_tracer(__name__, __version__, tracer_provider) + channel.__setattr__("__opentelemetry_tracer", tracer) if not hasattr(channel, "_impl"): _LOG.error("Could not find implementation for provided channel!") return - tracer = trace.get_tracer(__name__, __version__, tracer_provider) - channel.__setattr__("__opentelemetry_tracer", tracer) if channel._impl._consumers: PikaInstrumentor._instrument_consumers( channel._impl._consumers, tracer ) PikaInstrumentor._instrument_channel_functions(channel, tracer) - def _instrument(self, **kwargs: Dict[str, Any]) -> None: - channel: Channel = kwargs.get("channel", None) - if not channel or not isinstance(channel, Channel): - return - tracer_provider: TracerProvider = kwargs.get("tracer_provider", None) - PikaInstrumentor.instrument_channel( - channel, tracer_provider=tracer_provider - ) - - def _uninstrument(self, **kwargs: Dict[str, Any]) -> None: - channel: Channel = kwargs.get("channel", None) - if not channel or not isinstance(channel, Channel): - return + @staticmethod + def uninstrument_channel(channel: Channel) -> None: if not hasattr(channel, "_impl"): _LOG.error("Could not find implementation for provided channel!") return @@ -101,5 +94,24 @@ def _uninstrument(self, **kwargs: Dict[str, Any]) -> None: channel._impl._consumers[key] = callback._original_callback PikaInstrumentor._uninstrument_channel_functions(channel) + def _decorate_channel_function( + self, tracer_provider: Optional[TracerProvider] + ) -> None: + self.original_channel_func = BlockingConnection.channel + + def _wrapper(*args, **kwargs): + channel = self.original_channel_func(*args, **kwargs) + self.instrument_channel(channel, tracer_provider=tracer_provider) + return channel + + BlockingConnection.channel = _wrapper + + def _instrument(self, **kwargs: Dict[str, Any]) -> None: + tracer_provider: TracerProvider = kwargs.get("tracer_provider", None) + self._decorate_channel_function(tracer_provider) + + def _uninstrument(self, **kwargs: Dict[str, Any]) -> None: + BlockingConnection.channel = self.original_channel_func + def instrumentation_dependencies(self) -> Collection[str]: return _instruments diff --git a/instrumentation/opentelemetry-instrumentation-pika/src/opentelemetry/instrumentation/pika/utils.py b/instrumentation/opentelemetry-instrumentation-pika/src/opentelemetry/instrumentation/pika/utils.py index e8608c7227..572389b7ff 100644 --- a/instrumentation/opentelemetry-instrumentation-pika/src/opentelemetry/instrumentation/pika/utils.py +++ b/instrumentation/opentelemetry-instrumentation-pika/src/opentelemetry/instrumentation/pika/utils.py @@ -14,7 +14,7 @@ from opentelemetry.trace.span import Span -class PikaGetter(Getter): # type: ignore +class _PikaGetter(Getter): # type: ignore def get(self, carrier: CarrierT, key: str) -> Optional[List[str]]: value = carrier.get(key, None) if value is None: @@ -25,10 +25,10 @@ def keys(self, carrier: CarrierT) -> List[str]: return [] -pika_getter = PikaGetter() +_pika_getter = _PikaGetter() -def decorate_callback( +def _decorate_callback( callback: Callable[[Channel, Basic.Deliver, BasicProperties, bytes], Any], tracer: Tracer, task_name: str, @@ -41,7 +41,7 @@ def decorated_callback( ) -> Any: if not properties: properties = BasicProperties() - span = get_span( + span = _get_span( tracer, channel, properties, @@ -56,7 +56,7 @@ def decorated_callback( return decorated_callback -def decorate_basic_publish( +def _decorate_basic_publish( original_function: Callable[[str, str, bytes, BasicProperties, bool], Any], channel: Channel, tracer: Tracer, @@ -70,7 +70,7 @@ def decorated_function( ) -> Any: if not properties: properties = BasicProperties() - span = get_span( + span = _get_span( tracer, channel, properties, @@ -92,7 +92,7 @@ def decorated_function( return decorated_function -def get_span( +def _get_span( tracer: Tracer, channel: Channel, properties: BasicProperties, @@ -101,7 +101,7 @@ def get_span( ) -> Optional[Span]: if properties.headers is None: properties.headers = {} - ctx = propagate.extract(properties.headers, getter=pika_getter) + ctx = propagate.extract(properties.headers, getter=_pika_getter) if context.get_value("suppress_instrumentation") or context.get_value( _SUPPRESS_INSTRUMENTATION_KEY ): @@ -109,13 +109,13 @@ def get_span( return None task_name = properties.type if properties.type else task_name span = tracer.start_span( - context=ctx, name=generate_span_name(task_name, operation) + context=ctx, name=_generate_span_name(task_name, operation) ) - enrich_span(span, channel, properties, task_name, operation) + _enrich_span(span, channel, properties, task_name, operation) return span -def generate_span_name( +def _generate_span_name( task_name: str, operation: Optional[MessagingOperationValues] ) -> str: if not operation: @@ -123,7 +123,7 @@ def generate_span_name( return f"{task_name} {operation.value}" -def enrich_span( +def _enrich_span( span: Span, channel: Channel, properties: BasicProperties, diff --git a/instrumentation/opentelemetry-instrumentation-pika/tests/test_getter.py b/instrumentation/opentelemetry-instrumentation-pika/tests/test_getter.py index dff2787c8a..db74aa3f51 100644 --- a/instrumentation/opentelemetry-instrumentation-pika/tests/test_getter.py +++ b/instrumentation/opentelemetry-instrumentation-pika/tests/test_getter.py @@ -13,12 +13,12 @@ # limitations under the License. from unittest import TestCase -from opentelemetry.instrumentation.pika.utils import PikaGetter +from opentelemetry.instrumentation.pika.utils import _PikaGetter class TestPikaGetter(TestCase): def setUp(self) -> None: - self.getter = PikaGetter() + self.getter = _PikaGetter() def test_get_none(self) -> None: carrier = {} diff --git a/instrumentation/opentelemetry-instrumentation-pika/tests/test_pika_instrumentation.py b/instrumentation/opentelemetry-instrumentation-pika/tests/test_pika_instrumentation.py index 25052f3124..77ce1c6256 100644 --- a/instrumentation/opentelemetry-instrumentation-pika/tests/test_pika_instrumentation.py +++ b/instrumentation/opentelemetry-instrumentation-pika/tests/test_pika_instrumentation.py @@ -13,7 +13,7 @@ # limitations under the License. from unittest import TestCase, mock -from pika.adapters import BaseConnection +from pika.adapters import BaseConnection, BlockingConnection from pika.channel import Channel from opentelemetry.instrumentation.pika import PikaInstrumentor @@ -27,31 +27,16 @@ def setUp(self) -> None: self.mock_callback = mock.MagicMock() self.channel._impl._consumers = {"mock_key": self.mock_callback} - @mock.patch( - "opentelemetry.instrumentation.pika.PikaInstrumentor.instrument_channel" - ) - @mock.patch( - "opentelemetry.instrumentation.pika.PikaInstrumentor._uninstrument_channel_functions" - ) - def test_instrument_api( - self, - uninstrument_channel_functions: mock.MagicMock, - instrument_channel: mock.MagicMock, - ) -> None: + def test_instrument_api(self) -> None: + original_channel = BlockingConnection.channel instrumentation = PikaInstrumentor() - instrumentation.instrument(channel=self.channel) - instrument_channel.assert_called_once_with( - self.channel, tracer_provider=None - ) - self.channel._impl._consumers = {"mock_key": mock.MagicMock()} - self.channel._impl._consumers[ - "mock_key" - ]._original_callback = self.mock_callback - instrumentation.uninstrument(channel=self.channel) - uninstrument_channel_functions.assert_called_once() + instrumentation.instrument() + self.assertTrue(hasattr(instrumentation, "original_channel_func")) self.assertEqual( - self.channel._impl._consumers["mock_key"], self.mock_callback + original_channel, instrumentation.original_channel_func ) + instrumentation.uninstrument(channel=self.channel) + self.assertEqual(original_channel, BlockingConnection.channel) @mock.patch( "opentelemetry.instrumentation.pika.PikaInstrumentor._instrument_channel_functions" @@ -71,7 +56,7 @@ def test_instrument( instrument_consumers.assert_called_once() instrument_channel_functions.assert_called_once() - @mock.patch("opentelemetry.instrumentation.pika.utils.decorate_callback") + @mock.patch("opentelemetry.instrumentation.pika.utils._decorate_callback") def test_instrument_consumers( self, decorate_callback: mock.MagicMock ) -> None: @@ -92,7 +77,7 @@ def test_instrument_consumers( ) @mock.patch( - "opentelemetry.instrumentation.pika.utils.decorate_basic_publish" + "opentelemetry.instrumentation.pika.utils._decorate_basic_publish" ) def test_instrument_basic_publish( self, decorate_basic_publish: mock.MagicMock diff --git a/instrumentation/opentelemetry-instrumentation-pika/tests/test_utils.py b/instrumentation/opentelemetry-instrumentation-pika/tests/test_utils.py index 4c48f0b233..dc41db1993 100644 --- a/instrumentation/opentelemetry-instrumentation-pika/tests/test_utils.py +++ b/instrumentation/opentelemetry-instrumentation-pika/tests/test_utils.py @@ -21,8 +21,8 @@ class TestUtils(TestCase): @staticmethod @mock.patch("opentelemetry.context.get_value") - @mock.patch("opentelemetry.instrumentation.pika.utils.generate_span_name") - @mock.patch("opentelemetry.instrumentation.pika.utils.enrich_span") + @mock.patch("opentelemetry.instrumentation.pika.utils._generate_span_name") + @mock.patch("opentelemetry.instrumentation.pika.utils._enrich_span") @mock.patch("opentelemetry.propagate.extract") def test_get_span( extract: mock.MagicMock, @@ -35,7 +35,7 @@ def test_get_span( properties = mock.MagicMock() task_name = "test.test" get_value.return_value = None - span = utils.get_span(tracer, channel, properties, task_name) + span = utils._get_span(tracer, channel, properties, task_name) extract.assert_called_once() generate_span_name.assert_called_once() tracer.start_span.assert_called_once_with( @@ -48,8 +48,8 @@ def test_get_span( ), "The returned span was not enriched using enrich_span!" @mock.patch("opentelemetry.context.get_value") - @mock.patch("opentelemetry.instrumentation.pika.utils.generate_span_name") - @mock.patch("opentelemetry.instrumentation.pika.utils.enrich_span") + @mock.patch("opentelemetry.instrumentation.pika.utils._generate_span_name") + @mock.patch("opentelemetry.instrumentation.pika.utils._enrich_span") @mock.patch("opentelemetry.propagate.extract") def test_get_span_suppressed( self, @@ -63,7 +63,7 @@ def test_get_span_suppressed( properties = mock.MagicMock() task_name = "test.test" get_value.return_value = True - span = utils.get_span(tracer, channel, properties, task_name) + span = utils._get_span(tracer, channel, properties, task_name) self.assertEqual(span, None) extract.assert_called_once() generate_span_name.assert_not_called() @@ -71,14 +71,14 @@ def test_get_span_suppressed( def test_generate_span_name_no_operation(self) -> None: task_name = "test.test" operation = None - span_name = utils.generate_span_name(task_name, operation) + span_name = utils._generate_span_name(task_name, operation) self.assertEqual(span_name, f"{task_name} send") def test_generate_span_name_with_operation(self) -> None: task_name = "test.test" operation = mock.MagicMock() operation.value = "process" - span_name = utils.generate_span_name(task_name, operation) + span_name = utils._generate_span_name(task_name, operation) self.assertEqual(span_name, f"{task_name} {operation.value}") @staticmethod @@ -87,7 +87,7 @@ def test_enrich_span_basic_values() -> None: properties = mock.MagicMock() task_destination = "test.test" span = mock.MagicMock(spec=Span) - utils.enrich_span(span, channel, properties, task_destination) + utils._enrich_span(span, channel, properties, task_destination) span.set_attribute.assert_has_calls( any_order=True, calls=[ @@ -121,7 +121,7 @@ def test_enrich_span_with_operation() -> None: task_destination = "test.test" operation = mock.MagicMock() span = mock.MagicMock(spec=Span) - utils.enrich_span( + utils._enrich_span( span, channel, properties, task_destination, operation ) span.set_attribute.assert_has_calls( @@ -137,7 +137,7 @@ def test_enrich_span_without_operation() -> None: properties = mock.MagicMock() task_destination = "test.test" span = mock.MagicMock(spec=Span) - utils.enrich_span(span, channel, properties, task_destination) + utils._enrich_span(span, channel, properties, task_destination) span.set_attribute.assert_has_calls( any_order=True, calls=[mock.call(SpanAttributes.MESSAGING_TEMP_DESTINATION, True)], @@ -151,7 +151,7 @@ def test_enrich_span_unique_connection() -> None: span = mock.MagicMock(spec=Span) # We do this to create the behaviour of hasattr(channel.connection, "params") == False del channel.connection.params - utils.enrich_span(span, channel, properties, task_destination) + utils._enrich_span(span, channel, properties, task_destination) span.set_attribute.assert_has_calls( any_order=True, calls=[ From e37b5fa9378a9fcc5875dac00bfb7b509097ae96 Mon Sep 17 00:00:00 2001 From: Nikolay Sokolik Date: Mon, 20 Sep 2021 10:43:39 +0300 Subject: [PATCH 08/16] Add usage of wrapt according to CR comments --- .../README.rst | 20 +++++++++++++++---- .../setup.cfg | 2 ++ .../instrumentation/pika/__init__.py | 20 +++++++++++++++---- .../instrumentation/pika/pika_instrumentor.py | 14 +++++++------ .../tests/test_pika_instrumentation.py | 14 ++++++------- 5 files changed, 48 insertions(+), 22 deletions(-) diff --git a/instrumentation/opentelemetry-instrumentation-pika/README.rst b/instrumentation/opentelemetry-instrumentation-pika/README.rst index 55343fbb55..0f6ab3fd78 100644 --- a/instrumentation/opentelemetry-instrumentation-pika/README.rst +++ b/instrumentation/opentelemetry-instrumentation-pika/README.rst @@ -24,9 +24,22 @@ Usage docker run -p 5672:5672 rabbitmq - * Run instrumented task +.. code-block:: python + + import pika + from opentelemetry.instrumentation.pika import PikaInstrumentor + + PikaInstrumentor().instrument() + + connection = pika.BlockingConnection(pika.URLParameters('amqp://localhost')) + channel = connection.channel() + channel.queue_declare(queue='hello') + channel.basic_publish(exchange='', routing_key='hello', body=b'Hello World!') + +* PikaInstrumentor also supports instrumentation of a single channel + .. code-block:: python import pika @@ -37,13 +50,12 @@ Usage channel.queue_declare(queue='hello') pika_instrumentation = PikaInstrumentor() - pika_instrumentation.instrument(channel=channel) + pika_instrumentation.instrument_channel(channel=channel) channel.basic_publish(exchange='', routing_key='hello', body=b'Hello World!') - pika_instrumentation.uninstrument(channel=channel) - + pika_instrumentation.uninstrument_channel(channel=channel) * PikaInstrumentor also supports instrumentation without creating an object, and receiving a tracer_provider diff --git a/instrumentation/opentelemetry-instrumentation-pika/setup.cfg b/instrumentation/opentelemetry-instrumentation-pika/setup.cfg index 3de0471f4d..9c2a29cd3b 100644 --- a/instrumentation/opentelemetry-instrumentation-pika/setup.cfg +++ b/instrumentation/opentelemetry-instrumentation-pika/setup.cfg @@ -40,11 +40,13 @@ packages=find_namespace: install_requires = opentelemetry-api ~= 1.5 + wrapt >= 1.0.0, < 2.0.0 pika >= 1.1.0 [options.extras_require] test = pytest + wrapt >= 1.0.0, < 2.0.0 opentelemetry-test == 0.24b0 [options.packages.find] diff --git a/instrumentation/opentelemetry-instrumentation-pika/src/opentelemetry/instrumentation/pika/__init__.py b/instrumentation/opentelemetry-instrumentation-pika/src/opentelemetry/instrumentation/pika/__init__.py index a0bffe4f8b..4f8d6b0b50 100644 --- a/instrumentation/opentelemetry-instrumentation-pika/src/opentelemetry/instrumentation/pika/__init__.py +++ b/instrumentation/opentelemetry-instrumentation-pika/src/opentelemetry/instrumentation/pika/__init__.py @@ -23,9 +23,22 @@ docker run -p 5672:5672 rabbitmq - * Run instrumented task +.. code-block:: python + + import pika + from opentelemetry.instrumentation.pika import PikaInstrumentor + + PikaInstrumentor().instrument() + + connection = pika.BlockingConnection(pika.URLParameters('amqp://localhost')) + channel = connection.channel() + channel.queue_declare(queue='hello') + channel.basic_publish(exchange='', routing_key='hello', body=b'Hello World!') + +* PikaInstrumentor also supports instrumentation of a single channel + .. code-block:: python import pika @@ -36,13 +49,12 @@ channel.queue_declare(queue='hello') pika_instrumentation = PikaInstrumentor() - pika_instrumentation.instrument(channel=channel) + pika_instrumentation.instrument_channel(channel=channel) channel.basic_publish(exchange='', routing_key='hello', body=b'Hello World!') - pika_instrumentation.uninstrument(channel=channel) - + pika_instrumentation.uninstrument_channel(channel=channel) * PikaInstrumentor also supports instrumentation without creating an object, and receiving a tracer_provider diff --git a/instrumentation/opentelemetry-instrumentation-pika/src/opentelemetry/instrumentation/pika/pika_instrumentor.py b/instrumentation/opentelemetry-instrumentation-pika/src/opentelemetry/instrumentation/pika/pika_instrumentor.py index ca54947dcc..96084dff4a 100644 --- a/instrumentation/opentelemetry-instrumentation-pika/src/opentelemetry/instrumentation/pika/pika_instrumentor.py +++ b/instrumentation/opentelemetry-instrumentation-pika/src/opentelemetry/instrumentation/pika/pika_instrumentor.py @@ -14,6 +14,7 @@ from logging import getLogger from typing import Any, Callable, Collection, Dict, Optional +import wrapt from pika.adapters import BlockingConnection from pika.channel import Channel @@ -22,6 +23,7 @@ from opentelemetry.instrumentation.pika import utils from opentelemetry.instrumentation.pika.package import _instruments from opentelemetry.instrumentation.pika.version import __version__ +from opentelemetry.instrumentation.utils import unwrap from opentelemetry.trace import Tracer, TracerProvider _LOG = getLogger(__name__) @@ -93,25 +95,25 @@ def uninstrument_channel(channel: Channel) -> None: if hasattr(callback, "_original_callback"): channel._impl._consumers[key] = callback._original_callback PikaInstrumentor._uninstrument_channel_functions(channel) + if hasattr(channel, "__opentelemetry_tracer"): + delattr(channel, "__opentelemetry_tracer") def _decorate_channel_function( self, tracer_provider: Optional[TracerProvider] ) -> None: - self.original_channel_func = BlockingConnection.channel - - def _wrapper(*args, **kwargs): - channel = self.original_channel_func(*args, **kwargs) + def wrapper(wrapped, instance, args, kwargs): + channel = wrapped(*args, **kwargs) self.instrument_channel(channel, tracer_provider=tracer_provider) return channel - BlockingConnection.channel = _wrapper + wrapt.wrap_function_wrapper(BlockingConnection, "channel", wrapper) def _instrument(self, **kwargs: Dict[str, Any]) -> None: tracer_provider: TracerProvider = kwargs.get("tracer_provider", None) self._decorate_channel_function(tracer_provider) def _uninstrument(self, **kwargs: Dict[str, Any]) -> None: - BlockingConnection.channel = self.original_channel_func + unwrap(BlockingConnection, "channel") def instrumentation_dependencies(self) -> Collection[str]: return _instruments diff --git a/instrumentation/opentelemetry-instrumentation-pika/tests/test_pika_instrumentation.py b/instrumentation/opentelemetry-instrumentation-pika/tests/test_pika_instrumentation.py index 77ce1c6256..2ac0e08bee 100644 --- a/instrumentation/opentelemetry-instrumentation-pika/tests/test_pika_instrumentation.py +++ b/instrumentation/opentelemetry-instrumentation-pika/tests/test_pika_instrumentation.py @@ -15,6 +15,7 @@ from pika.adapters import BaseConnection, BlockingConnection from pika.channel import Channel +from wrapt import BoundFunctionWrapper from opentelemetry.instrumentation.pika import PikaInstrumentor from opentelemetry.trace import Tracer @@ -28,15 +29,15 @@ def setUp(self) -> None: self.channel._impl._consumers = {"mock_key": self.mock_callback} def test_instrument_api(self) -> None: - original_channel = BlockingConnection.channel instrumentation = PikaInstrumentor() instrumentation.instrument() - self.assertTrue(hasattr(instrumentation, "original_channel_func")) - self.assertEqual( - original_channel, instrumentation.original_channel_func + self.assertTrue( + isinstance(BlockingConnection.channel, BoundFunctionWrapper) ) instrumentation.uninstrument(channel=self.channel) - self.assertEqual(original_channel, BlockingConnection.channel) + self.assertFalse( + isinstance(BlockingConnection.channel, BoundFunctionWrapper) + ) @mock.patch( "opentelemetry.instrumentation.pika.PikaInstrumentor._instrument_channel_functions" @@ -91,9 +92,6 @@ def test_instrument_basic_publish( self.assertEqual( self.channel.basic_publish, decorate_basic_publish.return_value ) - self.assertEqual( - self.channel.basic_publish._original_function, original_function - ) def test_uninstrument_channel_functions(self) -> None: original_function = self.channel.basic_publish From a7f118a25eea2c415d5c2afd55b8ce9edb4788ad Mon Sep 17 00:00:00 2001 From: Nikolay Sokolik Date: Wed, 22 Sep 2021 10:07:22 +0300 Subject: [PATCH 09/16] Fix according to CR Comments --- CHANGELOG.md | 3 +- .../setup.cfg | 1 - .../setup.py | 70 +++++++++++++++++-- .../instrumentation/pika/__init__.py | 7 +- .../instrumentation/pika/pika_instrumentor.py | 15 ++++ .../instrumentation/pika/utils.py | 4 +- 6 files changed, 85 insertions(+), 15 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 2bd0b0d20c..ffe410a6a1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,8 +13,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - `opentelemetry-sdk-extension-aws` Add AWS resource detectors to extension package ([#586](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/586)) - `opentelemetry-instrumentation-asgi`, `opentelemetry-instrumentation-aiohttp-client`, `openetelemetry-instrumentation-fastapi`, - `opentelemetry-instrumentation-starlette`, `opentelemetry-instrumentation-urllib`, `opentelemetry-instrumentation-urllib3`, - `opentelemetry-instrumentation-pika` Added `request_hook` and `response_hook` callbacks + `opentelemetry-instrumentation-starlette`, `opentelemetry-instrumentation-urllib`, `opentelemetry-instrumentation-urllib3` Added `request_hook` and `response_hook` callbacks ([#576](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/576)) ### Changed diff --git a/instrumentation/opentelemetry-instrumentation-pika/setup.cfg b/instrumentation/opentelemetry-instrumentation-pika/setup.cfg index 9c2a29cd3b..18a9101433 100644 --- a/instrumentation/opentelemetry-instrumentation-pika/setup.cfg +++ b/instrumentation/opentelemetry-instrumentation-pika/setup.cfg @@ -41,7 +41,6 @@ packages=find_namespace: install_requires = opentelemetry-api ~= 1.5 wrapt >= 1.0.0, < 2.0.0 - pika >= 1.1.0 [options.extras_require] test = diff --git a/instrumentation/opentelemetry-instrumentation-pika/setup.py b/instrumentation/opentelemetry-instrumentation-pika/setup.py index 2053ff0987..f3e6fdd2fe 100644 --- a/instrumentation/opentelemetry-instrumentation-pika/setup.py +++ b/instrumentation/opentelemetry-instrumentation-pika/setup.py @@ -11,23 +11,79 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. + + +# DO NOT EDIT. THIS FILE WAS AUTOGENERATED FROM templates/instrumentation_setup.py.txt. +# RUN `python scripts/generate_setup.py` TO REGENERATE. + + +import distutils.cmd +import json import os +from configparser import ConfigParser import setuptools +config = ConfigParser() +config.read("setup.cfg") + +# We provide extras_require parameter to setuptools.setup later which +# overwrites the extra_require section from setup.cfg. To support extra_require +# secion in setup.cfg, we load it here and merge it with the extra_require param. +extras_require = {} +if "options.extras_require" in config: + for key, value in config["options.extras_require"].items(): + extras_require[key] = [v for v in value.split("\n") if v.strip()] + BASE_DIR = os.path.dirname(__file__) +PACKAGE_INFO = {} + VERSION_FILENAME = os.path.join( - BASE_DIR, "src", "opentelemetry", "instrumentation", "pika", "version.py", + BASE_DIR, "src", "opentelemetry", "instrumentation", "pika", "version.py" ) -PACKAGE_INFO = {} with open(VERSION_FILENAME) as f: exec(f.read(), PACKAGE_INFO) +PACKAGE_FILENAME = os.path.join( + BASE_DIR, "src", "opentelemetry", "instrumentation", "pika", "package.py" +) +with open(PACKAGE_FILENAME) as f: + exec(f.read(), PACKAGE_INFO) + +# Mark any instruments/runtime dependencies as test dependencies as well. +extras_require["instruments"] = PACKAGE_INFO["_instruments"] +test_deps = extras_require.get("test", []) +for dep in extras_require["instruments"]: + test_deps.append(dep) + +extras_require["test"] = test_deps + + +class JSONMetadataCommand(distutils.cmd.Command): + + description = ( + "print out package metadata as JSON. This is used by OpenTelemetry dev scripts to ", + "auto-generate code in other places", + ) + user_options = [] + + def initialize_options(self): + pass + + def finalize_options(self): + pass + + def run(self): + metadata = { + "name": config["metadata"]["name"], + "version": PACKAGE_INFO["__version__"], + "instruments": PACKAGE_INFO["_instruments"], + } + print(json.dumps(metadata)) + + setuptools.setup( + cmdclass={"meta": JSONMetadataCommand}, version=PACKAGE_INFO["__version__"], - entry_points={ - "opentelemetry_instrumentor": [ - "pika = opentelemetry.instrumentation.pika:PikaInstrumentor" - ] - }, + extras_require=extras_require, ) diff --git a/instrumentation/opentelemetry-instrumentation-pika/src/opentelemetry/instrumentation/pika/__init__.py b/instrumentation/opentelemetry-instrumentation-pika/src/opentelemetry/instrumentation/pika/__init__.py index 4f8d6b0b50..b26ce301cd 100644 --- a/instrumentation/opentelemetry-instrumentation-pika/src/opentelemetry/instrumentation/pika/__init__.py +++ b/instrumentation/opentelemetry-instrumentation-pika/src/opentelemetry/instrumentation/pika/__init__.py @@ -66,6 +66,7 @@ --- """ -# pylint: disable=unused-argument -from .pika_instrumentor import PikaInstrumentor -from .version import __version__ +from .pika_instrumentor import ( # pylint: disable=unused-argument + PikaInstrumentor, +) +from .version import __version__ # pylint: disable=unused-argument diff --git a/instrumentation/opentelemetry-instrumentation-pika/src/opentelemetry/instrumentation/pika/pika_instrumentor.py b/instrumentation/opentelemetry-instrumentation-pika/src/opentelemetry/instrumentation/pika/pika_instrumentor.py index 96084dff4a..ec57f2794c 100644 --- a/instrumentation/opentelemetry-instrumentation-pika/src/opentelemetry/instrumentation/pika/pika_instrumentor.py +++ b/instrumentation/opentelemetry-instrumentation-pika/src/opentelemetry/instrumentation/pika/pika_instrumentor.py @@ -75,6 +75,13 @@ def _uninstrument_channel_functions(channel: Channel) -> None: def instrument_channel( channel: Channel, tracer_provider: Optional[TracerProvider] = None, ) -> None: + if not hasattr(channel, "_is_instrumented_by_opentelemetry"): + channel._is_instrumented_by_opentelemetry = False + if channel._is_instrumented_by_opentelemetry: + _LOG.warning( + "Attempting to instrument Pika channel while already instrumented!" + ) + return tracer = trace.get_tracer(__name__, __version__, tracer_provider) channel.__setattr__("__opentelemetry_tracer", tracer) if not hasattr(channel, "_impl"): @@ -88,6 +95,14 @@ def instrument_channel( @staticmethod def uninstrument_channel(channel: Channel) -> None: + if ( + not hasattr(channel, "_is_instrumented_by_opentelemetry") + or not channel._is_instrumented_by_opentelemetry + ): + _LOG.error( + "Attempting to uninstrument Pika channel while already uninstrumented!" + ) + return if not hasattr(channel, "_impl"): _LOG.error("Could not find implementation for provided channel!") return diff --git a/instrumentation/opentelemetry-instrumentation-pika/src/opentelemetry/instrumentation/pika/utils.py b/instrumentation/opentelemetry-instrumentation-pika/src/opentelemetry/instrumentation/pika/utils.py index 572389b7ff..7ad1440572 100644 --- a/instrumentation/opentelemetry-instrumentation-pika/src/opentelemetry/instrumentation/pika/utils.py +++ b/instrumentation/opentelemetry-instrumentation-pika/src/opentelemetry/instrumentation/pika/utils.py @@ -105,13 +105,13 @@ def _get_span( if context.get_value("suppress_instrumentation") or context.get_value( _SUPPRESS_INSTRUMENTATION_KEY ): - print("Suppressing instrumentation!") return None task_name = properties.type if properties.type else task_name span = tracer.start_span( context=ctx, name=_generate_span_name(task_name, operation) ) - _enrich_span(span, channel, properties, task_name, operation) + if span.is_recording(): + _enrich_span(span, channel, properties, task_name, operation) return span From 76368f2bc33d1600638734732f76aaab503b90e9 Mon Sep 17 00:00:00 2001 From: Nikolay Sokolik Date: Wed, 22 Sep 2021 14:09:02 +0300 Subject: [PATCH 10/16] Move the tracer to be an attribute of the instrumentor instead of the channel --- .../opentelemetry/instrumentation/pika/pika_instrumentor.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/instrumentation/opentelemetry-instrumentation-pika/src/opentelemetry/instrumentation/pika/pika_instrumentor.py b/instrumentation/opentelemetry-instrumentation-pika/src/opentelemetry/instrumentation/pika/pika_instrumentor.py index ec57f2794c..ea97c3ffda 100644 --- a/instrumentation/opentelemetry-instrumentation-pika/src/opentelemetry/instrumentation/pika/pika_instrumentor.py +++ b/instrumentation/opentelemetry-instrumentation-pika/src/opentelemetry/instrumentation/pika/pika_instrumentor.py @@ -83,7 +83,6 @@ def instrument_channel( ) return tracer = trace.get_tracer(__name__, __version__, tracer_provider) - channel.__setattr__("__opentelemetry_tracer", tracer) if not hasattr(channel, "_impl"): _LOG.error("Could not find implementation for provided channel!") return @@ -110,8 +109,6 @@ def uninstrument_channel(channel: Channel) -> None: if hasattr(callback, "_original_callback"): channel._impl._consumers[key] = callback._original_callback PikaInstrumentor._uninstrument_channel_functions(channel) - if hasattr(channel, "__opentelemetry_tracer"): - delattr(channel, "__opentelemetry_tracer") def _decorate_channel_function( self, tracer_provider: Optional[TracerProvider] @@ -125,9 +122,12 @@ def wrapper(wrapped, instance, args, kwargs): def _instrument(self, **kwargs: Dict[str, Any]) -> None: tracer_provider: TracerProvider = kwargs.get("tracer_provider", None) + channel.__setattr__("__opentelemetry_tracer", tracer) self._decorate_channel_function(tracer_provider) def _uninstrument(self, **kwargs: Dict[str, Any]) -> None: + if hasattr(channel, "__opentelemetry_tracer"): + delattr(channel, "__opentelemetry_tracer") unwrap(BlockingConnection, "channel") def instrumentation_dependencies(self) -> Collection[str]: From 7339ecbec33d016e6dc6db8eb02a60bd1dc22500 Mon Sep 17 00:00:00 2001 From: Nikolay Sokolik Date: Wed, 22 Sep 2021 14:31:10 +0300 Subject: [PATCH 11/16] Fix Tests --- .../instrumentation/pika/pika_instrumentor.py | 6 +++--- .../tests/test_pika_instrumentation.py | 7 +++++-- 2 files changed, 8 insertions(+), 5 deletions(-) diff --git a/instrumentation/opentelemetry-instrumentation-pika/src/opentelemetry/instrumentation/pika/pika_instrumentor.py b/instrumentation/opentelemetry-instrumentation-pika/src/opentelemetry/instrumentation/pika/pika_instrumentor.py index ea97c3ffda..60e8906369 100644 --- a/instrumentation/opentelemetry-instrumentation-pika/src/opentelemetry/instrumentation/pika/pika_instrumentor.py +++ b/instrumentation/opentelemetry-instrumentation-pika/src/opentelemetry/instrumentation/pika/pika_instrumentor.py @@ -122,12 +122,12 @@ def wrapper(wrapped, instance, args, kwargs): def _instrument(self, **kwargs: Dict[str, Any]) -> None: tracer_provider: TracerProvider = kwargs.get("tracer_provider", None) - channel.__setattr__("__opentelemetry_tracer", tracer) + self.__setattr__("__opentelemetry_tracer", tracer_provider) self._decorate_channel_function(tracer_provider) def _uninstrument(self, **kwargs: Dict[str, Any]) -> None: - if hasattr(channel, "__opentelemetry_tracer"): - delattr(channel, "__opentelemetry_tracer") + if hasattr(self, "__opentelemetry_tracer"): + delattr(self, "__opentelemetry_tracer") unwrap(BlockingConnection, "channel") def instrumentation_dependencies(self) -> Collection[str]: diff --git a/instrumentation/opentelemetry-instrumentation-pika/tests/test_pika_instrumentation.py b/instrumentation/opentelemetry-instrumentation-pika/tests/test_pika_instrumentation.py index 2ac0e08bee..b638e5d2b0 100644 --- a/instrumentation/opentelemetry-instrumentation-pika/tests/test_pika_instrumentation.py +++ b/instrumentation/opentelemetry-instrumentation-pika/tests/test_pika_instrumentation.py @@ -34,6 +34,9 @@ def test_instrument_api(self) -> None: self.assertTrue( isinstance(BlockingConnection.channel, BoundFunctionWrapper) ) + assert hasattr( + instrumentation, "__opentelemetry_tracer" + ), "Tracer not stored for the object!" instrumentation.uninstrument(channel=self.channel) self.assertFalse( isinstance(BlockingConnection.channel, BoundFunctionWrapper) @@ -52,8 +55,8 @@ def test_instrument( ): PikaInstrumentor.instrument_channel(channel=self.channel) assert hasattr( - self.channel, "__opentelemetry_tracer" - ), "Tracer not set for the channel!" + self.channel, "_is_instrumented_by_opentelemetry" + ), "channel is not marked as instrumented!" instrument_consumers.assert_called_once() instrument_channel_functions.assert_called_once() From adad45463327fe7effb9e57d4be1ddd290b22fcd Mon Sep 17 00:00:00 2001 From: Nikolay Sokolik Date: Thu, 23 Sep 2021 09:41:10 +0300 Subject: [PATCH 12/16] Update Changelog and fix failing test --- CHANGELOG.md | 2 ++ .../src/opentelemetry/instrumentation/pika/__init__.py | 9 +++++---- .../tests/test_utils.py | 6 +----- 3 files changed, 8 insertions(+), 9 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index ffe410a6a1..7784b51c9a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -15,6 +15,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - `opentelemetry-instrumentation-asgi`, `opentelemetry-instrumentation-aiohttp-client`, `openetelemetry-instrumentation-fastapi`, `opentelemetry-instrumentation-starlette`, `opentelemetry-instrumentation-urllib`, `opentelemetry-instrumentation-urllib3` Added `request_hook` and `response_hook` callbacks ([#576](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/576)) +- `opentelemetry-instrumentation-pika` added RabbitMQ's pika module instrumentation. + ([#680](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/680)) ### Changed diff --git a/instrumentation/opentelemetry-instrumentation-pika/src/opentelemetry/instrumentation/pika/__init__.py b/instrumentation/opentelemetry-instrumentation-pika/src/opentelemetry/instrumentation/pika/__init__.py index b26ce301cd..55ff695820 100644 --- a/instrumentation/opentelemetry-instrumentation-pika/src/opentelemetry/instrumentation/pika/__init__.py +++ b/instrumentation/opentelemetry-instrumentation-pika/src/opentelemetry/instrumentation/pika/__init__.py @@ -65,8 +65,9 @@ API --- """ +# pylint: disable=import-error -from .pika_instrumentor import ( # pylint: disable=unused-argument - PikaInstrumentor, -) -from .version import __version__ # pylint: disable=unused-argument +from .pika_instrumentor import PikaInstrumentor +from .version import __version__ + +__all__ = ["PikaInstrumentor", "__version__"] diff --git a/instrumentation/opentelemetry-instrumentation-pika/tests/test_utils.py b/instrumentation/opentelemetry-instrumentation-pika/tests/test_utils.py index dc41db1993..d8ce6d5364 100644 --- a/instrumentation/opentelemetry-instrumentation-pika/tests/test_utils.py +++ b/instrumentation/opentelemetry-instrumentation-pika/tests/test_utils.py @@ -35,17 +35,13 @@ def test_get_span( properties = mock.MagicMock() task_name = "test.test" get_value.return_value = None - span = utils._get_span(tracer, channel, properties, task_name) + _ = utils._get_span(tracer, channel, properties, task_name) extract.assert_called_once() generate_span_name.assert_called_once() tracer.start_span.assert_called_once_with( context=extract.return_value, name=generate_span_name.return_value ) enrich_span.assert_called_once() - assert any( - span in call.args or span in call.kwargs.values() - for call in enrich_span.call_args_list - ), "The returned span was not enriched using enrich_span!" @mock.patch("opentelemetry.context.get_value") @mock.patch("opentelemetry.instrumentation.pika.utils._generate_span_name") From da90f114f915be967f90bc226256b1d3d9ac194c Mon Sep 17 00:00:00 2001 From: Nikolay Sokolik Date: Wed, 29 Sep 2021 09:12:07 +0300 Subject: [PATCH 13/16] update code using tox -e generate --- instrumentation/README.md | 1 + instrumentation/opentelemetry-instrumentation-pika/setup.py | 4 ++-- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/instrumentation/README.md b/instrumentation/README.md index 8c59661d55..d0594d56b4 100644 --- a/instrumentation/README.md +++ b/instrumentation/README.md @@ -19,6 +19,7 @@ | [opentelemetry-instrumentation-jinja2](./opentelemetry-instrumentation-jinja2) | jinja2~=2.7 | | [opentelemetry-instrumentation-logging](./opentelemetry-instrumentation-logging) | logging | | [opentelemetry-instrumentation-mysql](./opentelemetry-instrumentation-mysql) | mysql-connector-python ~= 8.0 | +| [opentelemetry-instrumentation-pika](./opentelemetry-instrumentation-pika) | pika >= 1.1.0 | | [opentelemetry-instrumentation-psycopg2](./opentelemetry-instrumentation-psycopg2) | psycopg2 >= 2.7.3.1 | | [opentelemetry-instrumentation-pymemcache](./opentelemetry-instrumentation-pymemcache) | pymemcache ~= 1.3 | | [opentelemetry-instrumentation-pymongo](./opentelemetry-instrumentation-pymongo) | pymongo ~= 3.1 | diff --git a/instrumentation/opentelemetry-instrumentation-pika/setup.py b/instrumentation/opentelemetry-instrumentation-pika/setup.py index f3e6fdd2fe..ac600392c1 100644 --- a/instrumentation/opentelemetry-instrumentation-pika/setup.py +++ b/instrumentation/opentelemetry-instrumentation-pika/setup.py @@ -41,13 +41,13 @@ VERSION_FILENAME = os.path.join( BASE_DIR, "src", "opentelemetry", "instrumentation", "pika", "version.py" ) -with open(VERSION_FILENAME) as f: +with open(VERSION_FILENAME, encoding="utf-8") as f: exec(f.read(), PACKAGE_INFO) PACKAGE_FILENAME = os.path.join( BASE_DIR, "src", "opentelemetry", "instrumentation", "pika", "package.py" ) -with open(PACKAGE_FILENAME) as f: +with open(PACKAGE_FILENAME, encoding="utf-8") as f: exec(f.read(), PACKAGE_INFO) # Mark any instruments/runtime dependencies as test dependencies as well. From 51dc2c3ed930f7eb77f69df24410ffeca30d3a51 Mon Sep 17 00:00:00 2001 From: Nikolay Sokolik Date: Wed, 6 Oct 2021 18:32:45 +0300 Subject: [PATCH 14/16] Update the name of the variable to store the tracer provider. --- .../opentelemetry/instrumentation/pika/pika_instrumentor.py | 6 +++--- .../tests/test_pika_instrumentation.py | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/instrumentation/opentelemetry-instrumentation-pika/src/opentelemetry/instrumentation/pika/pika_instrumentor.py b/instrumentation/opentelemetry-instrumentation-pika/src/opentelemetry/instrumentation/pika/pika_instrumentor.py index 60e8906369..a48e46034e 100644 --- a/instrumentation/opentelemetry-instrumentation-pika/src/opentelemetry/instrumentation/pika/pika_instrumentor.py +++ b/instrumentation/opentelemetry-instrumentation-pika/src/opentelemetry/instrumentation/pika/pika_instrumentor.py @@ -122,12 +122,12 @@ def wrapper(wrapped, instance, args, kwargs): def _instrument(self, **kwargs: Dict[str, Any]) -> None: tracer_provider: TracerProvider = kwargs.get("tracer_provider", None) - self.__setattr__("__opentelemetry_tracer", tracer_provider) + self.__setattr__("__opentelemetry_tracer_provider", tracer_provider) self._decorate_channel_function(tracer_provider) def _uninstrument(self, **kwargs: Dict[str, Any]) -> None: - if hasattr(self, "__opentelemetry_tracer"): - delattr(self, "__opentelemetry_tracer") + if hasattr(self, "__opentelemetry_tracer_provider"): + delattr(self, "__opentelemetry_tracer_provider") unwrap(BlockingConnection, "channel") def instrumentation_dependencies(self) -> Collection[str]: diff --git a/instrumentation/opentelemetry-instrumentation-pika/tests/test_pika_instrumentation.py b/instrumentation/opentelemetry-instrumentation-pika/tests/test_pika_instrumentation.py index b638e5d2b0..508d49c3bd 100644 --- a/instrumentation/opentelemetry-instrumentation-pika/tests/test_pika_instrumentation.py +++ b/instrumentation/opentelemetry-instrumentation-pika/tests/test_pika_instrumentation.py @@ -35,7 +35,7 @@ def test_instrument_api(self) -> None: isinstance(BlockingConnection.channel, BoundFunctionWrapper) ) assert hasattr( - instrumentation, "__opentelemetry_tracer" + instrumentation, "__opentelemetry_tracer_provider" ), "Tracer not stored for the object!" instrumentation.uninstrument(channel=self.channel) self.assertFalse( From d59b5fd3e62bbe360942e515242f99e06046c469 Mon Sep 17 00:00:00 2001 From: Nikolay Sokolik Date: Wed, 6 Oct 2021 19:28:54 +0300 Subject: [PATCH 15/16] Update the core repo hash in the workflow --- .github/workflows/test.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index ddc2d7a82d..6f5a34b5c7 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -6,7 +6,7 @@ on: - 'release/*' pull_request: env: - CORE_REPO_SHA: 10208c1be1e720925a80a66f711b8afbe67537f4 + CORE_REPO_SHA: a0d1267b3aec027b72c39296ea350fa5210332fc jobs: build: From dd0525c937653b60ffedef7c02254979c77f2c13 Mon Sep 17 00:00:00 2001 From: Nikolay Sokolik Date: Wed, 6 Oct 2021 19:48:02 +0300 Subject: [PATCH 16/16] Update the core repo hash in the workflow --- .github/workflows/test.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 6f5a34b5c7..833acd981b 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -6,7 +6,7 @@ on: - 'release/*' pull_request: env: - CORE_REPO_SHA: a0d1267b3aec027b72c39296ea350fa5210332fc + CORE_REPO_SHA: adad94bfa69520cb4cbabca714827fd14503baf0 jobs: build: