diff --git a/CHANGELOG.md b/CHANGELOG.md index 1fad0c2fc7..8c755d76f0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,14 +7,19 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased](https://github.com/open-telemetry/opentelemetry-python/compare/v1.9.1-0.28b1...HEAD) -- `opentelemetry-instrumentation-wsgi` WSGI: Conditionally create SERVER spans - ([#903](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/903)) +### Added + +- `opentelemetry-instrumentation-dbapi` add experimental sql commenter capability + ([#908](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/908)) ### Fixed - `opentelemetry-instrumentation-logging` retrieves service name defensively. ([#890](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/890)) +- `opentelemetry-instrumentation-wsgi` WSGI: Conditionally create SERVER spans + ([#903](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/903)) + ## [1.9.1-0.28b1](https://github.com/open-telemetry/opentelemetry-python/releases/tag/v1.9.1-0.28b1) - 2022-01-29 diff --git a/instrumentation/opentelemetry-instrumentation-dbapi/src/opentelemetry/instrumentation/dbapi/__init__.py b/instrumentation/opentelemetry-instrumentation-dbapi/src/opentelemetry/instrumentation/dbapi/__init__.py index 762d2904d2..1645a8a48e 100644 --- a/instrumentation/opentelemetry-instrumentation-dbapi/src/opentelemetry/instrumentation/dbapi/__init__.py +++ b/instrumentation/opentelemetry-instrumentation-dbapi/src/opentelemetry/instrumentation/dbapi/__init__.py @@ -45,11 +45,11 @@ from opentelemetry import trace as trace_api from opentelemetry.instrumentation.dbapi.version import __version__ -from opentelemetry.instrumentation.utils import unwrap +from opentelemetry.instrumentation.utils import _generate_sql_comment, unwrap from opentelemetry.semconv.trace import SpanAttributes -from opentelemetry.trace import SpanKind, TracerProvider, get_tracer +from opentelemetry.trace import Span, SpanKind, TracerProvider, get_tracer -logger = logging.getLogger(__name__) +_logger = logging.getLogger(__name__) def trace_integration( @@ -59,6 +59,7 @@ def trace_integration( connection_attributes: typing.Dict = None, tracer_provider: typing.Optional[TracerProvider] = None, capture_parameters: bool = False, + enable_commenter: bool = False, db_api_integration_factory=None, ): """Integrate with DB API library. @@ -84,6 +85,7 @@ def trace_integration( version=__version__, tracer_provider=tracer_provider, capture_parameters=capture_parameters, + enable_commenter=enable_commenter, db_api_integration_factory=db_api_integration_factory, ) @@ -97,6 +99,7 @@ def wrap_connect( version: str = "", tracer_provider: typing.Optional[TracerProvider] = None, capture_parameters: bool = False, + enable_commenter: bool = False, db_api_integration_factory=None, ): """Integrate with DB API library. @@ -132,6 +135,7 @@ def wrap_connect_( version=version, tracer_provider=tracer_provider, capture_parameters=capture_parameters, + enable_commenter=enable_commenter, ) return db_integration.wrapped_connection(wrapped, args, kwargs) @@ -140,7 +144,7 @@ def wrap_connect_( connect_module, connect_method_name, wrap_connect_ ) except Exception as ex: # pylint: disable=broad-except - logger.warning("Failed to integrate with DB API. %s", str(ex)) + _logger.warning("Failed to integrate with DB API. %s", str(ex)) def unwrap_connect( @@ -163,7 +167,8 @@ def instrument_connection( connection_attributes: typing.Dict = None, version: str = "", tracer_provider: typing.Optional[TracerProvider] = None, - capture_parameters=False, + capture_parameters: bool = False, + enable_commenter: bool = False, ): """Enable instrumentation in a database connection. @@ -180,7 +185,7 @@ def instrument_connection( An instrumented connection. """ if isinstance(connection, wrapt.ObjectProxy): - logger.warning("Connection already instrumented") + _logger.warning("Connection already instrumented") return connection db_integration = DatabaseApiIntegration( @@ -190,6 +195,7 @@ def instrument_connection( version=version, tracer_provider=tracer_provider, capture_parameters=capture_parameters, + enable_commenter=enable_commenter, ) db_integration.get_connection_attributes(connection) return get_traced_connection_proxy(connection, db_integration) @@ -207,7 +213,7 @@ def uninstrument_connection(connection): if isinstance(connection, wrapt.ObjectProxy): return connection.__wrapped__ - logger.warning("Connection is not instrumented") + _logger.warning("Connection is not instrumented") return connection @@ -220,6 +226,7 @@ def __init__( version: str = "", tracer_provider: typing.Optional[TracerProvider] = None, capture_parameters: bool = False, + enable_commenter: bool = False, ): self.connection_attributes = connection_attributes if self.connection_attributes is None: @@ -237,6 +244,7 @@ def __init__( tracer_provider=tracer_provider, ) self.capture_parameters = capture_parameters + self.enable_commenter = enable_commenter self.database_system = database_system self.connection_props = {} self.span_attributes = {} @@ -313,8 +321,9 @@ def __exit__(self, *args, **kwargs): class CursorTracer: - def __init__(self, db_api_integration: DatabaseApiIntegration): + def __init__(self, db_api_integration: DatabaseApiIntegration) -> None: self._db_api_integration = db_api_integration + self._commenter_enabled = self._db_api_integration.enable_commenter def _populate_span( self, @@ -355,6 +364,22 @@ def get_statement(self, cursor, args): # pylint: disable=no-self-use return statement.decode("utf8", "replace") return statement + @staticmethod + def _generate_comment(span: Span) -> str: + span_context = span.get_span_context() + meta = {} + if span_context.is_valid: + meta.update( + { + "trace_id": span_context.trace_id, + "span_id": span_context.span_id, + "trace_flags": span_context.trace_flags, + "trace_state": span_context.trace_state.to_header(), + } + ) + # TODO(schekuri): revisit to enrich with info such as route, db_driver etc... + return _generate_sql_comment(**meta) + def traced_execution( self, cursor, @@ -374,6 +399,18 @@ def traced_execution( name, kind=SpanKind.CLIENT ) as span: self._populate_span(span, cursor, *args) + if args and self._commenter_enabled: + try: + comment = self._generate_comment(span) + if isinstance(args[0], bytes): + comment = comment.encode("utf8") + args_list = list(args) + args_list[0] += comment + args = tuple(args_list) + except Exception as exc: # pylint: disable=broad-except + _logger.exception( + "Exception while generating sql comment: %s", exc + ) return query_method(*args, **kwargs) diff --git a/instrumentation/opentelemetry-instrumentation-dbapi/tests/test_dbapi_integration.py b/instrumentation/opentelemetry-instrumentation-dbapi/tests/test_dbapi_integration.py index df7f1870f3..0302824db4 100644 --- a/instrumentation/opentelemetry-instrumentation-dbapi/tests/test_dbapi_integration.py +++ b/instrumentation/opentelemetry-instrumentation-dbapi/tests/test_dbapi_integration.py @@ -228,6 +228,21 @@ def test_executemany(self): span.attributes[SpanAttributes.DB_STATEMENT], "Test query" ) + def test_executemany_comment(self): + db_integration = dbapi.DatabaseApiIntegration( + "testname", "testcomponent", enable_commenter=True + ) + mock_connection = db_integration.wrapped_connection( + mock_connect, {}, {} + ) + cursor = mock_connection.cursor() + cursor.executemany("Test query") + spans_list = self.memory_exporter.get_finished_spans() + self.assertEqual(len(spans_list), 1) + span = spans_list[0] + comment = dbapi.CursorTracer._generate_comment(span) + self.assertIn(comment, cursor.query) + def test_callproc(self): db_integration = dbapi.DatabaseApiIntegration( "testname", "testcomponent" @@ -308,6 +323,10 @@ def cursor(self): class MockCursor: + def __init__(self) -> None: + self.query = "" + self.params = None + # pylint: disable=unused-argument, no-self-use def execute(self, query, params=None, throw_exception=False): if throw_exception: @@ -317,6 +336,8 @@ def execute(self, query, params=None, throw_exception=False): def executemany(self, query, params=None, throw_exception=False): if throw_exception: raise Exception("Test Exception") + self.query = query + self.params = params # pylint: disable=unused-argument, no-self-use def callproc(self, query, params=None, throw_exception=False): diff --git a/opentelemetry-instrumentation/src/opentelemetry/instrumentation/utils.py b/opentelemetry-instrumentation/src/opentelemetry/instrumentation/utils.py index e4b0b03ba6..b53b4f2308 100644 --- a/opentelemetry-instrumentation/src/opentelemetry/instrumentation/utils.py +++ b/opentelemetry-instrumentation/src/opentelemetry/instrumentation/utils.py @@ -12,6 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +import urllib.parse from typing import Dict, Sequence from wrapt import ObjectProxy @@ -115,3 +116,38 @@ def _start_internal_or_server_span( attributes=attributes, ) return span, token + + +_KEY_VALUE_DELIMITER = "," + + +def _generate_sql_comment(**meta): + """ + Return a SQL comment with comma delimited key=value pairs created from + **meta kwargs. + """ + if not meta: # No entries added. + return "" + + # Sort the keywords to ensure that caching works and that testing is + # deterministic. It eases visual inspection as well. + return ( + " /*" + + _KEY_VALUE_DELIMITER.join( + "{}={!r}".format(_url_quote(key), _url_quote(value)) + for key, value in sorted(meta.items()) + if value is not None + ) + + "*/" + ) + + +def _url_quote(s): # pylint: disable=invalid-name + if not isinstance(s, (str, bytes)): + return s + quoted = urllib.parse.quote(s) + # Since SQL uses '%' as a keyword, '%' is a by-product of url quoting + # e.g. foo,bar --> foo%2Cbar + # thus in our quoting, we need to escape it too to finally give + # foo,bar --> foo%%2Cbar + return quoted.replace("%", "%%")