From f0c9dcb148ed85c7afd2ce7636df80c99da0c7c7 Mon Sep 17 00:00:00 2001 From: Itay Gibel Date: Mon, 13 Sep 2021 12:58:43 +0300 Subject: [PATCH] botocore hooks --- CHANGELOG.md | 4 +- .../instrumentation/botocore/__init__.py | 161 +++++++++++++----- .../tests/test_botocore_instrumentation.py | 83 +++++++++ 3 files changed, 205 insertions(+), 43 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 4d44be1acd..e29a6c2896 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,10 +12,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added - `opentelemetry-instrumentation-elasticsearch` Added `response_hook` and `request_hook` callbacks ([#670](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/670)) - -### Added - `opentelemetry-instrumentation-redis` added request_hook and response_hook callbacks passed as arguments to the instrument method. ([#669](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/669)) +- `opentelemetry-instrumentation-botocore` add request_hooks and response_hooks + ([679](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/679)) ### Changed - `opentelemetry-instrumentation-botocore` Unpatch botocore Endpoint.prepare_request on uninstrument diff --git a/instrumentation/opentelemetry-instrumentation-botocore/src/opentelemetry/instrumentation/botocore/__init__.py b/instrumentation/opentelemetry-instrumentation-botocore/src/opentelemetry/instrumentation/botocore/__init__.py index 3212866b2e..5c8c48f089 100644 --- a/instrumentation/opentelemetry-instrumentation-botocore/src/opentelemetry/instrumentation/botocore/__init__.py +++ b/instrumentation/opentelemetry-instrumentation-botocore/src/opentelemetry/instrumentation/botocore/__init__.py @@ -44,6 +44,40 @@ API --- + +The `instrument` method accepts the following keyword args: + +tracer_provider (TracerProvider) - an optional tracer provider +request_hooks (dict) - a mapping between service names their respective callable request hooks +* a request hook signature is: def request_hook(span: Span, operation_name: str, api_params: dict) -> None +response_hooks (dict) - a mapping between service names their respective callable response hooks +* a response hook signature is: def response_hook(span: Span, operation_name: str, result: dict) -> None + +for example: + +.. code: python + + from opentelemetry.instrumentation.botocore import BotocoreInstrumentor + import botocore + + def ec2_request_hook(span, operation_name, api_params): + # request hook logic + + def ec2_response_hook(span, operation_name, result): + # response hook logic + + # Instrument Botocore with hooks + BotocoreInstrumentor().instrument( + request_hooks={"ec2": ec2_request_hook}, response_hooks={"ec2": ec2_response_hook} + ) + + # This will create a span with Botocore-specific attributes, including custom attributes added from the hooks + session = botocore.session.get_session() + session.set_credentials( + access_key="access-key", secret_key="secret-key" + ) + ec2 = self.session.create_client("ec2", region_name="us-west-2") + ec2.describe_instances() """ import json @@ -91,16 +125,29 @@ class BotocoreInstrumentor(BaseInstrumentor): See `BaseInstrumentor` """ + def __init__(self): + super().__init__() + self.request_hooks = dict() + self.response_hooks = dict() + def instrumentation_dependencies(self) -> Collection[str]: return _instruments def _instrument(self, **kwargs): - # pylint: disable=attribute-defined-outside-init self._tracer = get_tracer( __name__, __version__, kwargs.get("tracer_provider") ) + request_hooks = kwargs.get("request_hooks") + response_hooks = kwargs.get("response_hooks") + + if isinstance(request_hooks, dict): + self.request_hooks = request_hooks + + if isinstance(response_hooks, dict): + self.response_hooks = response_hooks + wrap_function_wrapper( "botocore.client", "BaseClient._make_api_call", @@ -159,21 +206,18 @@ def _patched_api_call(self, original_func, instance, args, kwargs): ): BotocoreInstrumentor._patch_lambda_invoke(api_params) - if span.is_recording(): - span.set_attribute("aws.operation", operation_name) - span.set_attribute("aws.region", instance.meta.region_name) - span.set_attribute("aws.service", service_name) - if "QueueUrl" in api_params: - span.set_attribute("aws.queue_url", api_params["QueueUrl"]) - if "TableName" in api_params: - span.set_attribute( - "aws.table_name", api_params["TableName"] - ) + self._set_api_call_attributes( + span, instance, service_name, operation_name, api_params + ) token = context_api.attach( context_api.set_value(_SUPPRESS_HTTP_INSTRUMENTATION_KEY, True) ) + self.apply_request_hook( + span, service_name, operation_name, api_params + ) + try: result = original_func(*args, **kwargs) except ClientError as ex: @@ -184,38 +228,73 @@ def _patched_api_call(self, original_func, instance, args, kwargs): if error: result = error.response - if span.is_recording(): - if "ResponseMetadata" in result: - metadata = result["ResponseMetadata"] - req_id = None - if "RequestId" in metadata: - req_id = metadata["RequestId"] - elif "HTTPHeaders" in metadata: - headers = metadata["HTTPHeaders"] - if "x-amzn-RequestId" in headers: - req_id = headers["x-amzn-RequestId"] - elif "x-amz-request-id" in headers: - req_id = headers["x-amz-request-id"] - elif "x-amz-id-2" in headers: - req_id = headers["x-amz-id-2"] - - if req_id: - span.set_attribute( - "aws.request_id", req_id, - ) - - if "RetryAttempts" in metadata: - span.set_attribute( - "retry_attempts", metadata["RetryAttempts"], - ) - - if "HTTPStatusCode" in metadata: - span.set_attribute( - SpanAttributes.HTTP_STATUS_CODE, - metadata["HTTPStatusCode"], - ) + self.apply_response_hook( + span, service_name, operation_name, result + ) + + self._set_api_call_result_attributes(span, result) if error: raise error return result + + @staticmethod + def _set_api_call_attributes( + span, instance, service_name, operation_name, api_params + ): + if span.is_recording(): + span.set_attribute("aws.operation", operation_name) + span.set_attribute("aws.region", instance.meta.region_name) + span.set_attribute("aws.service", service_name) + if "QueueUrl" in api_params: + span.set_attribute("aws.queue_url", api_params["QueueUrl"]) + if "TableName" in api_params: + span.set_attribute("aws.table_name", api_params["TableName"]) + + @staticmethod + def _set_api_call_result_attributes(span, result): + if span.is_recording(): + if "ResponseMetadata" in result: + metadata = result["ResponseMetadata"] + req_id = None + if "RequestId" in metadata: + req_id = metadata["RequestId"] + elif "HTTPHeaders" in metadata: + headers = metadata["HTTPHeaders"] + if "x-amzn-RequestId" in headers: + req_id = headers["x-amzn-RequestId"] + elif "x-amz-request-id" in headers: + req_id = headers["x-amz-request-id"] + elif "x-amz-id-2" in headers: + req_id = headers["x-amz-id-2"] + + if req_id: + span.set_attribute( + "aws.request_id", req_id, + ) + + if "RetryAttempts" in metadata: + span.set_attribute( + "retry_attempts", metadata["RetryAttempts"], + ) + + if "HTTPStatusCode" in metadata: + span.set_attribute( + SpanAttributes.HTTP_STATUS_CODE, + metadata["HTTPStatusCode"], + ) + + def apply_request_hook( + self, span, service_name, operation_name, api_params + ): + if service_name in self.request_hooks: + request_hook = self.request_hooks.get(service_name) + if callable(request_hook): + request_hook(span, operation_name, api_params) + + def apply_response_hook(self, span, service_name, operation_name, result): + if service_name in self.response_hooks: + response_hook = self.response_hooks.get(service_name) + if callable(response_hook): + response_hook(span, operation_name, result) diff --git a/instrumentation/opentelemetry-instrumentation-botocore/tests/test_botocore_instrumentation.py b/instrumentation/opentelemetry-instrumentation-botocore/tests/test_botocore_instrumentation.py index 3c6a50251f..53a99c36d8 100644 --- a/instrumentation/opentelemetry-instrumentation-botocore/tests/test_botocore_instrumentation.py +++ b/instrumentation/opentelemetry-instrumentation-botocore/tests/test_botocore_instrumentation.py @@ -629,3 +629,86 @@ def test_dynamodb_client(self): SpanAttributes.HTTP_STATUS_CODE: 200, }, ) + + @mock_dynamodb2 + def test_hooks(self): + request_hook_operation_attribute_name = "request_hook.operation_name" + request_hook_api_params_attribute_name = "request_hook.api_params" + response_hook_operation_attribute_name = "response_hook.operation_name" + response_hook_result_attribute_name = "response_hook.result" + + def request_hook(span, operation_name, api_params): + hook_attributes = { + request_hook_operation_attribute_name: operation_name, + request_hook_api_params_attribute_name: json.dumps(api_params), + } + if span and span.is_recording(): + span.set_attributes(hook_attributes) + + def response_hook(span, operation_name, result): + if span and span.is_recording(): + span.set_attribute( + response_hook_operation_attribute_name, operation_name, + ) + span.set_attribute( + response_hook_result_attribute_name, list(result.keys()), + ) + + BotocoreInstrumentor().uninstrument() + BotocoreInstrumentor().instrument( + request_hooks={"dynamodb": request_hook}, + response_hooks={"dynamodb": response_hook}, + ) + + self.session = botocore.session.get_session() + self.session.set_credentials( + access_key="access-key", secret_key="secret-key" + ) + + ddb = self.session.create_client("dynamodb", region_name="us-west-2") + + test_table_name = "test_table_name" + + ddb.create_table( + AttributeDefinitions=[ + {"AttributeName": "id", "AttributeType": "S"}, + ], + KeySchema=[{"AttributeName": "id", "KeyType": "HASH"}], + ProvisionedThroughput={ + "ReadCapacityUnits": 5, + "WriteCapacityUnits": 5, + }, + TableName=test_table_name, + ) + + item = {"id": {"S": "test_key"}} + + ddb.put_item(TableName=test_table_name, Item=item) + + spans = self.memory_exporter.get_finished_spans() + assert spans + self.assertEqual(len(spans), 2) + get_item_attributes = spans[1].attributes + + expected_api_params = json.dumps( + {"TableName": test_table_name, "Item": item} + ) + + expected_result_keys = ("ConsumedCapacity", "ResponseMetadata") + + self.assertEqual( + "PutItem", + get_item_attributes.get(request_hook_operation_attribute_name), + ) + self.assertEqual( + expected_api_params, + get_item_attributes.get(request_hook_api_params_attribute_name), + ) + self.assertEqual( + "PutItem", + get_item_attributes.get(response_hook_operation_attribute_name), + ) + self.assertEqual( + expected_result_keys, + get_item_attributes.get(response_hook_result_attribute_name), + )