Skip to content

Commit

Permalink
feat: add messaging tags for sns, sqs and kinesis
Browse files Browse the repository at this point in the history
  • Loading branch information
povilasv committed May 27, 2024
1 parent 380c142 commit 6d1b6cd
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -427,7 +427,11 @@ def _instrumented_lambda_handler_call( # noqa pylint: disable=too-many-branches
"aws:sqs",
}:
links = []
queue_url = ""
for record in lambda_event["Records"]:
if queue_url == "":
queue_url = record.get("eventSourceARN")

attributes = record.get("messageAttributes")
if attributes is not None:
ctx = get_global_textmap().extract(carrier=attributes, getter=SQSGetter())
Expand All @@ -439,6 +443,14 @@ def _instrumented_lambda_handler_call( # noqa pylint: disable=too-many-branches
sqsTriggerSpan = tracer.start_span(span_name, context=parent_context, kind=SpanKind.CONSUMER, links=links)
sqsTriggerSpan.set_attribute(SpanAttributes.FAAS_TRIGGER, "pubsub")
sqsTriggerSpan.set_attribute("faas.trigger.type", "SQS")
sqsTriggerSpan.set_attribute(SpanAttributes.MESSAGING_SYSTEM, "aws.sqs")
sqsTriggerSpan.set_attribute(SpanAttributes.MESSAGING_URL, queue_url)

try:
# Example queue_url: arn:aws:sqs:us-east-1:123456789012:my_queue_name
sqsTriggerSpan.set_attribute(SpanAttributes.MESSAGING_DESTINATION, queue_url.split(":")[-1])
except IndexError:
pass

parent_context = set_span_in_context(sqsTriggerSpan)

Expand All @@ -455,21 +467,34 @@ def _instrumented_lambda_handler_call( # noqa pylint: disable=too-many-branches
try:
if lambda_event["Records"][0]["EventSource"] == "aws:sns":
links = []

queue_url = ""
for record in lambda_event["Records"]:
if record.get("Sns") is None:
continue

if queue_url == "":
queue_url = record.get("Sns").get("TopicArn")

attributes = record.get("Sns").get("MessageAttributes")
if attributes is not None:
ctx = get_global_textmap().extract(carrier=attributes, getter=SNSGetter())
span_ctx = get_current_span(ctx).get_span_context()
if span_ctx.span_id != INVALID_SPAN_ID:
links.append(Link(span_ctx))

span_kind = SpanKind.INTERNAL
span_name = orig_handler_name
snsTriggerSpan = tracer.start_span(span_name, context=parent_context, kind=SpanKind.CONSUMER, links=links)
snsTriggerSpan.set_attribute(SpanAttributes.FAAS_TRIGGER, "pubsub")
snsTriggerSpan.set_attribute("faas.trigger.type", "SNS")
snsTriggerSpan.set_attribute(SpanAttributes.MESSAGING_SYSTEM, "aws.sns")
snsTriggerSpan.set_attribute(SpanAttributes.MESSAGING_URL, queue_url)

try:
# Example queue_url: arn:aws:sns:us-east-1:123456789012:my_topic_name
snsTriggerSpan.set_attribute(SpanAttributes.MESSAGING_DESTINATION, queue_url.split(":")[-1])
except IndexError:
pass

parent_context = set_span_in_context(snsTriggerSpan)

Expand All @@ -485,10 +510,15 @@ def _instrumented_lambda_handler_call( # noqa pylint: disable=too-many-branches
try:
if lambda_event["Records"][0]["eventSource"] == "aws:kinesis":
links = []
queue_url = ""

for record in lambda_event["Records"]:
if record.get("kinesis") is None:
continue

if queue_url == "":
queue_url = record.get("eventSourceARN")

data = record["kinesis"].get("data")
if data is not None:
decoded_bytes = base64.b64decode(data)
Expand All @@ -503,6 +533,15 @@ def _instrumented_lambda_handler_call( # noqa pylint: disable=too-many-branches
kinesisTriggerSpan = tracer.start_span(span_name, context=parent_context, kind=SpanKind.CONSUMER, links=links)
kinesisTriggerSpan.set_attribute(SpanAttributes.FAAS_TRIGGER, "pubsub")
kinesisTriggerSpan.set_attribute("faas.trigger.type", "Kinesis")
kinesisTriggerSpan.set_attribute(SpanAttributes.MESSAGING_SYSTEM, "aws.kinesis")
kinesisTriggerSpan.set_attribute(SpanAttributes.MESSAGING_URL, queue_url)

try:
# Example queue_url: arn:aws:kinesis:us-east-1:123456789012:stream/my_stream_name
kinesisTriggerSpan.set_attribute(SpanAttributes.MESSAGING_DESTINATION, queue_url.split("/")[-1])
except IndexError:
pass


parent_context = set_span_in_context(kinesisTriggerSpan)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,11 @@ def _patched_api_call(self, original_func, instance, args, kwargs):
attributes["rpc.request.payload"] = limit_string_size(json.dumps(call_context.params, default=str))
elif call_context.service == "kinesis" and (call_context.operation == "PutRecord" or call_context.operation == "PutRecords"):
call_context.span_kind = SpanKind.PRODUCER
streamName = call_context.params.get("StreamName")
if streamName:
attributes[SpanAttributes.MESSAGING_SYSTEM] = "aws.kinesis"
attributes[SpanAttributes.MESSAGING_DESTINATION] = streamName

attributes["rpc.request.payload"] = limit_string_size(json.dumps(call_context.params, default=str))
elif call_context.service == "sqs" and (call_context.operation == "SendMessageBatch" or call_context.operation == "SendMessage"):
call_context.span_kind = SpanKind.PRODUCER
Expand Down

0 comments on commit 6d1b6cd

Please sign in to comment.