Skip to content

Commit

Permalink
Capture the SNS target ARN as the messaging.destination.name attribut…
Browse files Browse the repository at this point in the history
…e if provided with preference for the topic ARN.
  • Loading branch information
tduncan committed Dec 21, 2023
1 parent 5c9c14c commit db7ec8d
Show file tree
Hide file tree
Showing 5 changed files with 77 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,18 @@ static String getTableName(Object request) {
return invokeOrNull(access.getTableName, request);
}

@Nullable
static String getTopicArn(Object request) {
RequestAccess access = REQUEST_ACCESSORS.get(request.getClass());
return invokeOrNull(access.getTopicArn, request);
}

@Nullable
static String getTargetArn(Object request) {
RequestAccess access = REQUEST_ACCESSORS.get(request.getClass());
return invokeOrNull(access.getTargetArn, request);
}

@Nullable
private static String invokeOrNull(@Nullable MethodHandle method, Object obj) {
if (method == null) {
Expand All @@ -73,6 +80,7 @@ private static String invokeOrNull(@Nullable MethodHandle method, Object obj) {
@Nullable private final MethodHandle getStreamName;
@Nullable private final MethodHandle getTableName;
@Nullable private final MethodHandle getTopicArn;
@Nullable private final MethodHandle getTargetArn;

private RequestAccess(Class<?> clz) {
getBucketName = findAccessorOrNull(clz, "getBucketName");
Expand All @@ -81,6 +89,7 @@ private RequestAccess(Class<?> clz) {
getStreamName = findAccessorOrNull(clz, "getStreamName");
getTableName = findAccessorOrNull(clz, "getTableName");
getTopicArn = findAccessorOrNull(clz, "getTopicArn");
getTargetArn = findAccessorOrNull(clz, "getTargetArn");
}

@Nullable
Expand Down
Original file line number Diff line number Diff line change
@@ -1,31 +1,33 @@
package io.opentelemetry.instrumentation.awssdk.v1_11;

import com.amazonaws.AmazonWebServiceRequest;
import com.amazonaws.Request;
import com.amazonaws.Response;
import io.opentelemetry.api.common.AttributeKey;
import io.opentelemetry.api.common.AttributesBuilder;
import io.opentelemetry.context.Context;
import io.opentelemetry.instrumentation.api.instrumenter.AttributesExtractor;
import io.opentelemetry.semconv.SemanticAttributes;
import java.util.function.Function;
import javax.annotation.Nullable;

public class SnsAttributesExtractor implements AttributesExtractor<Request<?>, Response<?>> {
@Override
public void onStart(AttributesBuilder attributes, Context parentContext, Request<?> request) {
setRequestAttribute(attributes, SemanticAttributes.MESSAGING_DESTINATION_NAME,
request.getOriginalRequest(), RequestAccess::getTopicArn);
String destination = findMessageDestination(request.getOriginalRequest());
if (destination != null) {
attributes.put(SemanticAttributes.MESSAGING_DESTINATION_NAME, destination);
}
}

private static void setRequestAttribute(
AttributesBuilder attributes,
AttributeKey<String> key,
Object request,
Function<Object, String> getter) {
String value = getter.apply(request);
if (value != null) {
attributes.put(key, value);
/*
* Attempt to discover the destination of the SNS message by first checking for a topic ARN and
* falling back to the target ARN. If neither is found null is returned.
*/
private static String findMessageDestination(AmazonWebServiceRequest request) {
String destination = RequestAccess.getTopicArn(request);
if (destination != null) {
return destination;
}
return RequestAccess.getTargetArn(request);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,16 @@ abstract class AbstractAws1ClientTest extends InstrumentationSpecification {
</ResponseMetadata>
</PublishResponse>
"""
"SNS" | "Publish" | "POST" | "d74b8436-ae13-5ab4-a9ff-ce54dfea72a0" | AmazonSNSClientBuilder.standard() | { c -> c.publish(new PublishRequest().withMessage("somemessage").withTargetArn("somearn")) } | ["$SemanticAttributes.MESSAGING_DESTINATION_NAME": "somearn"] | """
<PublishResponse xmlns="https://sns.amazonaws.com/doc/2010-03-31/">
<PublishResult>
<MessageId>567910cd-659e-55d4-8ccb-5aaf14679dc0</MessageId>
</PublishResult>
<ResponseMetadata>
<RequestId>d74b8436-ae13-5ab4-a9ff-ce54dfea72a0</RequestId>
</ResponseMetadata>
</PublishResponse>
"""
}

def "send #operation request to closed port"() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@

package io.opentelemetry.instrumentation.awssdk.v2_2;

import io.opentelemetry.semconv.SemanticAttributes;

import static io.opentelemetry.instrumentation.awssdk.v2_2.FieldMapping.request;

import java.util.Collections;
Expand All @@ -16,7 +18,10 @@ enum AwsSdkRequestType {
SQS(request("aws.queue.url", "QueueUrl"), request("aws.queue.name", "QueueName")),
KINESIS(request("aws.stream.name", "StreamName")),
DYNAMODB(request("aws.table.name", "TableName")),
SNS(request("messaging.destination.name", "TopicArn"));
SNS(
request(SemanticAttributes.MESSAGING_DESTINATION_NAME.getKey(), "TargetArn"),
request(SemanticAttributes.MESSAGING_DESTINATION_NAME.getKey(), "TopicArn")
);

// Wrapping in unmodifiableMap
@SuppressWarnings("ImmutableEnumChecker")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,10 +86,10 @@ abstract class AbstractAws2ClientTest extends AbstractAws2ClientCoreTest {
setup:
configureSdkClient(builder)
def client = builder
.endpointOverride(clientUri)
.region(Region.AP_NORTHEAST_1)
.credentialsProvider(CREDENTIALS_PROVIDER)
.build()
.endpointOverride(clientUri)
.region(Region.AP_NORTHEAST_1)
.credentialsProvider(CREDENTIALS_PROVIDER)
.build()

if (body instanceof Closure) {
server.enqueue(body.call())
Expand Down Expand Up @@ -170,6 +170,16 @@ abstract class AbstractAws2ClientTest extends AbstractAws2ClientCoreTest {
</ResponseMetadata>
</PublishResponse>
"""
"Sns" | "Publish" | "POST" | "d74b8436-ae13-5ab4-a9ff-ce54dfea72a0" | SnsClient.builder() | { c -> c.publish(PublishRequest.builder().message("somemessage").targetArn("somearn").build()) } | """
<PublishResponse xmlns="https://sns.amazonaws.com/doc/2010-03-31/">
<PublishResult>
<MessageId>567910cd-659e-55d4-8ccb-5aaf14679dc0</MessageId>
</PublishResult>
<ResponseMetadata>
<RequestId>d74b8436-ae13-5ab4-a9ff-ce54dfea72a0</RequestId>
</ResponseMetadata>
</PublishResponse>
"""
"Sqs" | "CreateQueue" | "POST" | "7a62c49f-347e-4fc4-9331-6e8e7a96aa73" | SqsClient.builder() | { c -> c.createQueue(CreateQueueRequest.builder().queueName("somequeue").build()) } | {
if (!Boolean.getBoolean("testLatestDeps")) {
def content = """
Expand All @@ -186,9 +196,9 @@ abstract class AbstractAws2ClientTest extends AbstractAws2ClientCoreTest {
}
"""
ResponseHeaders headers = ResponseHeaders.builder(HttpStatus.OK)
.contentType(MediaType.PLAIN_TEXT_UTF_8)
.add("x-amzn-RequestId", "7a62c49f-347e-4fc4-9331-6e8e7a96aa73")
.build()
.contentType(MediaType.PLAIN_TEXT_UTF_8)
.add("x-amzn-RequestId", "7a62c49f-347e-4fc4-9331-6e8e7a96aa73")
.build()
return HttpResponse.of(headers, HttpData.of(StandardCharsets.UTF_8, content))
}
"Sqs" | "SendMessage" | "POST" | "27daac76-34dd-47df-bd01-1f6e873584a0" | SqsClient.builder() | { c -> c.sendMessage(SendMessageRequest.builder().queueUrl(QUEUE_URL).messageBody("").build()) } | {
Expand All @@ -213,9 +223,9 @@ abstract class AbstractAws2ClientTest extends AbstractAws2ClientCoreTest {
}
"""
ResponseHeaders headers = ResponseHeaders.builder(HttpStatus.OK)
.contentType(MediaType.PLAIN_TEXT_UTF_8)
.add("x-amzn-RequestId", "27daac76-34dd-47df-bd01-1f6e873584a0")
.build()
.contentType(MediaType.PLAIN_TEXT_UTF_8)
.add("x-amzn-RequestId", "27daac76-34dd-47df-bd01-1f6e873584a0")
.build()
return HttpResponse.of(headers, HttpData.of(StandardCharsets.UTF_8, content))
}
"Ec2" | "AllocateAddress" | "POST" | "59dbff89-35bd-4eac-99ed-be587EXAMPLE" | Ec2Client.builder() | { c -> c.allocateAddress() } | """
Expand All @@ -237,10 +247,10 @@ abstract class AbstractAws2ClientTest extends AbstractAws2ClientCoreTest {
setup:
configureSdkClient(builder)
def client = builder
.endpointOverride(clientUri)
.region(Region.AP_NORTHEAST_1)
.credentialsProvider(CREDENTIALS_PROVIDER)
.build()
.endpointOverride(clientUri)
.region(Region.AP_NORTHEAST_1)
.credentialsProvider(CREDENTIALS_PROVIDER)
.build()

if (body instanceof Closure) {
server.enqueue(body.call())
Expand Down Expand Up @@ -338,9 +348,9 @@ abstract class AbstractAws2ClientTest extends AbstractAws2ClientCoreTest {
}
"""
ResponseHeaders headers = ResponseHeaders.builder(HttpStatus.OK)
.contentType(MediaType.PLAIN_TEXT_UTF_8)
.add("x-amzn-RequestId", "7a62c49f-347e-4fc4-9331-6e8e7a96aa73")
.build()
.contentType(MediaType.PLAIN_TEXT_UTF_8)
.add("x-amzn-RequestId", "7a62c49f-347e-4fc4-9331-6e8e7a96aa73")
.build()
return HttpResponse.of(headers, HttpData.of(StandardCharsets.UTF_8, content))
}
"Sqs" | "SendMessage" | "POST" | "27daac76-34dd-47df-bd01-1f6e873584a0" | SqsAsyncClient.builder() | { c -> c.sendMessage(SendMessageRequest.builder().queueUrl(QUEUE_URL).messageBody("").build()) } | {
Expand All @@ -365,9 +375,9 @@ abstract class AbstractAws2ClientTest extends AbstractAws2ClientCoreTest {
}
"""
ResponseHeaders headers = ResponseHeaders.builder(HttpStatus.OK)
.contentType(MediaType.PLAIN_TEXT_UTF_8)
.add("x-amzn-RequestId", "27daac76-34dd-47df-bd01-1f6e873584a0")
.build()
.contentType(MediaType.PLAIN_TEXT_UTF_8)
.add("x-amzn-RequestId", "27daac76-34dd-47df-bd01-1f6e873584a0")
.build()
return HttpResponse.of(headers, HttpData.of(StandardCharsets.UTF_8, content))
}
"Ec2" | "AllocateAddress" | "POST" | "59dbff89-35bd-4eac-99ed-be587EXAMPLE" | Ec2AsyncClient.builder() | { c -> c.allocateAddress() } | """
Expand All @@ -382,7 +392,7 @@ abstract class AbstractAws2ClientTest extends AbstractAws2ClientCoreTest {
<ResponseMetadata><RequestId>0ac9cda2-bbf4-11d3-f92b-31fa5e8dbc99</RequestId></ResponseMetadata>
</DeleteOptionGroupResponse>
"""
"Sns" | "Publish" | "POST" | "f187a3c1-376f-11df-8963-01868b7c937a" | SnsAsyncClient.builder() | { SnsAsyncClient c -> c.publish(r -> r.message("hello").topicArn("somearn")) } | """
"Sns" | "Publish" | "POST" | "f187a3c1-376f-11df-8963-01868b7c937a" | SnsAsyncClient.builder() | { SnsAsyncClient c -> c.publish(r -> r.message("hello").topicArn("somearn")) } | """
<PublishResponse xmlns="https://sns.amazonaws.com/doc/2010-03-31/">
<PublishResult>
<MessageId>94f20ce6-13c5-43a0-9a9e-ca52d816e90b</MessageId>
Expand All @@ -403,13 +413,13 @@ abstract class AbstractAws2ClientTest extends AbstractAws2ClientCoreTest {
server.enqueue(HttpResponse.delayed(HttpResponse.of(HttpStatus.OK), Duration.ofMillis(5000)))
server.enqueue(HttpResponse.delayed(HttpResponse.of(HttpStatus.OK), Duration.ofMillis(5000)))
def builder = S3Client.builder()
.overrideConfiguration(createOverrideConfigurationBuilder()
.retryPolicy(RetryPolicy.builder().numRetries(1).build())
.build())
.endpointOverride(clientUri)
.region(Region.AP_NORTHEAST_1)
.credentialsProvider(CREDENTIALS_PROVIDER)
.httpClientBuilder(ApacheHttpClient.builder().socketTimeout(Duration.ofMillis(50)))
.overrideConfiguration(createOverrideConfigurationBuilder()
.retryPolicy(RetryPolicy.builder().numRetries(1).build())
.build())
.endpointOverride(clientUri)
.region(Region.AP_NORTHEAST_1)
.credentialsProvider(CREDENTIALS_PROVIDER)
.httpClientBuilder(ApacheHttpClient.builder().socketTimeout(Duration.ofMillis(50)))

if (Boolean.getBoolean("testLatestDeps")) {
builder.forcePathStyle(true)
Expand Down

0 comments on commit db7ec8d

Please sign in to comment.