Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

botocore hooks #679

Merged
merged 9 commits into from
Sep 30, 2021
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
([#670](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/670))
- `opentelemetry-instrumentation-redis` added request_hook and response_hook callbacks passed as arguments to the instrument method.
([#669](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/669))
- `opentelemetry-instrumentation-botocore` add `request_hook` and `response_hook` callbacks
([679](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/679))
- `opentelemetry-exporter-richconsole` Initial release
([#686](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/686))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,38 @@

API
---

The `instrument` method accepts the following keyword args:

tracer_provider (TracerProvider) - an optional tracer provider
request_hook (Callable) - a function with extra user-defined logic to be performed before performing the request
this function signature is: def request_hook(span: Span, service_name: str, operation_name: str, api_params: dict) -> None
response_hook (Callable) - a function with extra user-defined logic to be performed after performing the request
this function signature is: def request_hook(span: Span, service_name: str, operation_name: str, result: dict) -> None

for example:

.. code: python

from opentelemetry.instrumentation.botocore import BotocoreInstrumentor
import botocore

def request_hook(span, service_name, operation_name, api_params):
# request hook logic

def response_hook(span, service_name, operation_name, result):
# response hook logic

# Instrument Botocore with hooks
BotocoreInstrumentor().instrument(request_hook=request_hook, response_hooks=response_hook)

# This will create a span with Botocore-specific attributes, including custom attributes added from the hooks
session = botocore.session.get_session()
session.set_credentials(
access_key="access-key", secret_key="secret-key"
)
ec2 = self.session.create_client("ec2", region_name="us-west-2")
ec2.describe_instances()
"""

import json
Expand Down Expand Up @@ -91,16 +123,23 @@ class BotocoreInstrumentor(BaseInstrumentor):
See `BaseInstrumentor`
"""

def __init__(self):
super().__init__()
self.request_hook = None
self.response_hook = None

def instrumentation_dependencies(self) -> Collection[str]:
return _instruments

def _instrument(self, **kwargs):

# pylint: disable=attribute-defined-outside-init
self._tracer = get_tracer(
__name__, __version__, kwargs.get("tracer_provider")
)

self.request_hook = kwargs.get("request_hook")
self.response_hook = kwargs.get("response_hook")

wrap_function_wrapper(
"botocore.client",
"BaseClient._make_api_call",
Expand Down Expand Up @@ -159,21 +198,19 @@ def _patched_api_call(self, original_func, instance, args, kwargs):
):
BotocoreInstrumentor._patch_lambda_invoke(api_params)

if span.is_recording():
span.set_attribute("aws.operation", operation_name)
span.set_attribute("aws.region", instance.meta.region_name)
span.set_attribute("aws.service", service_name)
if "QueueUrl" in api_params:
span.set_attribute("aws.queue_url", api_params["QueueUrl"])
if "TableName" in api_params:
span.set_attribute(
"aws.table_name", api_params["TableName"]
)
self._set_api_call_attributes(
span, instance, service_name, operation_name, api_params
)

token = context_api.attach(
context_api.set_value(_SUPPRESS_HTTP_INSTRUMENTATION_KEY, True)
)

if callable(self.request_hook):
self.request_hook(
span, service_name, operation_name, api_params
)

try:
result = original_func(*args, **kwargs)
except ClientError as ex:
Expand All @@ -184,38 +221,58 @@ def _patched_api_call(self, original_func, instance, args, kwargs):
if error:
result = error.response

if span.is_recording():
if "ResponseMetadata" in result:
metadata = result["ResponseMetadata"]
req_id = None
if "RequestId" in metadata:
req_id = metadata["RequestId"]
elif "HTTPHeaders" in metadata:
headers = metadata["HTTPHeaders"]
if "x-amzn-RequestId" in headers:
req_id = headers["x-amzn-RequestId"]
elif "x-amz-request-id" in headers:
req_id = headers["x-amz-request-id"]
elif "x-amz-id-2" in headers:
req_id = headers["x-amz-id-2"]

if req_id:
span.set_attribute(
"aws.request_id", req_id,
)

if "RetryAttempts" in metadata:
span.set_attribute(
"retry_attempts", metadata["RetryAttempts"],
)

if "HTTPStatusCode" in metadata:
span.set_attribute(
SpanAttributes.HTTP_STATUS_CODE,
metadata["HTTPStatusCode"],
)
if callable(self.response_hook):
self.response_hook(span, service_name, operation_name, result)

self._set_api_call_result_attributes(span, result)

if error:
raise error

return result

@staticmethod
def _set_api_call_attributes(
span, instance, service_name, operation_name, api_params
):
if span.is_recording():
span.set_attribute("aws.operation", operation_name)
span.set_attribute("aws.region", instance.meta.region_name)
span.set_attribute("aws.service", service_name)
if "QueueUrl" in api_params:
span.set_attribute("aws.queue_url", api_params["QueueUrl"])
if "TableName" in api_params:
span.set_attribute("aws.table_name", api_params["TableName"])

@staticmethod
def _set_api_call_result_attributes(span, result):
if span.is_recording():
if "ResponseMetadata" in result:
metadata = result["ResponseMetadata"]
req_id = None
if "RequestId" in metadata:
req_id = metadata["RequestId"]
elif "HTTPHeaders" in metadata:
headers = metadata["HTTPHeaders"]
if "x-amzn-RequestId" in headers:
req_id = headers["x-amzn-RequestId"]
elif "x-amz-request-id" in headers:
req_id = headers["x-amz-request-id"]
elif "x-amz-id-2" in headers:
req_id = headers["x-amz-id-2"]

if req_id:
span.set_attribute(
"aws.request_id", req_id,
)

if "RetryAttempts" in metadata:
span.set_attribute(
"retry_attempts", metadata["RetryAttempts"],
)

if "HTTPStatusCode" in metadata:
span.set_attribute(
SpanAttributes.HTTP_STATUS_CODE,
metadata["HTTPStatusCode"],
)
Original file line number Diff line number Diff line change
Expand Up @@ -629,3 +629,131 @@ def test_dynamodb_client(self):
SpanAttributes.HTTP_STATUS_CODE: 200,
},
)

@mock_dynamodb2
def test_request_hook(self):
request_hook_service_attribute_name = "request_hook.service_name"
request_hook_operation_attribute_name = "request_hook.operation_name"
request_hook_api_params_attribute_name = "request_hook.api_params"

def request_hook(span, service_name, operation_name, api_params):
hook_attributes = {
request_hook_service_attribute_name: service_name,
request_hook_operation_attribute_name: operation_name,
request_hook_api_params_attribute_name: json.dumps(api_params),
}
if span and span.is_recording():
span.set_attributes(hook_attributes)

BotocoreInstrumentor().uninstrument()
BotocoreInstrumentor().instrument(request_hook=request_hook,)

self.session = botocore.session.get_session()
self.session.set_credentials(
access_key="access-key", secret_key="secret-key"
)

ddb = self.session.create_client("dynamodb", region_name="us-west-2")

test_table_name = "test_table_name"

ddb.create_table(
AttributeDefinitions=[
{"AttributeName": "id", "AttributeType": "S"},
],
KeySchema=[{"AttributeName": "id", "KeyType": "HASH"}],
ProvisionedThroughput={
"ReadCapacityUnits": 5,
"WriteCapacityUnits": 5,
},
TableName=test_table_name,
)

item = {"id": {"S": "test_key"}}

ddb.put_item(TableName=test_table_name, Item=item)

spans = self.memory_exporter.get_finished_spans()
assert spans
self.assertEqual(len(spans), 2)
put_item_attributes = spans[1].attributes

expected_api_params = json.dumps(
{"TableName": test_table_name, "Item": item}
)

self.assertEqual(
"dynamodb",
put_item_attributes.get(request_hook_service_attribute_name),
)
self.assertEqual(
"PutItem",
put_item_attributes.get(request_hook_operation_attribute_name),
)
self.assertEqual(
expected_api_params,
put_item_attributes.get(request_hook_api_params_attribute_name),
)

@mock_dynamodb2
def test_response_hook(self):
response_hook_service_attribute_name = "request_hook.service_name"
response_hook_operation_attribute_name = "response_hook.operation_name"
response_hook_result_attribute_name = "response_hook.result"

def response_hook(span, service_name, operation_name, result):
hook_attributes = {
response_hook_service_attribute_name: service_name,
response_hook_operation_attribute_name: operation_name,
response_hook_result_attribute_name: list(result.keys()),
}
if span and span.is_recording():
span.set_attributes(hook_attributes)

BotocoreInstrumentor().uninstrument()
BotocoreInstrumentor().instrument(response_hook=response_hook,)

self.session = botocore.session.get_session()
self.session.set_credentials(
access_key="access-key", secret_key="secret-key"
)

ddb = self.session.create_client("dynamodb", region_name="us-west-2")

test_table_name = "test_table_name"

ddb.create_table(
AttributeDefinitions=[
{"AttributeName": "id", "AttributeType": "S"},
],
KeySchema=[{"AttributeName": "id", "KeyType": "HASH"}],
ProvisionedThroughput={
"ReadCapacityUnits": 5,
"WriteCapacityUnits": 5,
},
TableName=test_table_name,
)

item = {"id": {"S": "test_key"}}

ddb.put_item(TableName=test_table_name, Item=item)

spans = self.memory_exporter.get_finished_spans()
assert spans
self.assertEqual(len(spans), 2)
put_item_attributes = spans[1].attributes

expected_result_keys = ("ResponseMetadata",)

self.assertEqual(
"dynamodb",
put_item_attributes.get(response_hook_service_attribute_name),
)
self.assertEqual(
"PutItem",
put_item_attributes.get(response_hook_operation_attribute_name),
)
self.assertEqual(
expected_result_keys,
put_item_attributes.get(response_hook_result_attribute_name),
)