Skip to content

Commit

Permalink
feat: Use MessagingIncubatingAttributes for gcp_pubsub attribute names
Browse files Browse the repository at this point in the history
  • Loading branch information
michaelpri10 committed Sep 26, 2024
1 parent a08d169 commit 305610e
Show file tree
Hide file tree
Showing 5 changed files with 39 additions and 27 deletions.
5 changes: 5 additions & 0 deletions google-cloud-pubsub/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,11 @@
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-semconv</artifactId>
</dependency>
<dependency>
<groupId>io.opentelemetry.semconv</groupId>
<artifactId>opentelemetry-semconv-incubating</artifactId>
<version>1.27.0-alpha</version>
</dependency>

<!-- Test dependencies -->
<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import io.opentelemetry.api.trace.StatusCode;
import io.opentelemetry.api.trace.Tracer;
import io.opentelemetry.context.Context;
import io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes;
import io.opentelemetry.semconv.trace.attributes.SemanticAttributes;
import java.util.List;

Expand All @@ -40,14 +41,8 @@ public class OpenTelemetryPubsubTracer {
"subscriber concurrency control";
private static final String SUBSCRIBE_SCHEDULER_SPAN_NAME = "subscriber scheduler";

private static final String MESSAGE_SIZE_ATTR_KEY = "messaging.message.body.size";
private static final String ORDERING_KEY_ATTR_KEY = "messaging.gcp_pubsub.message.ordering_key";
private static final String MESSAGE_ACK_ID_ATTR_KEY = "messaging.gcp_pubsub.message.ack_id";
private static final String MESSAGE_EXACTLY_ONCE_ATTR_KEY =
"messaging.gcp_pubsub.message.exactly_once_delivery";
private static final String MESSAGE_DELIVERY_ATTEMPT_ATTR_KEY =
"messaging.gcp_pubsub.message.delivery_attempt";
private static final String ACK_DEADLINE_ATTR_KEY = "messaging.gcp_pubsub.message.ack_deadline";
private static final String RECEIPT_MODACK_ATTR_KEY = "messaging.gcp_pubsub.is_receipt_modack";
private static final String PROJECT_ATTR_KEY = "gcp.project_id";
private static final String PUBLISH_RPC_SPAN_SUFFIX = " publish";
Expand Down Expand Up @@ -93,9 +88,9 @@ void startPublisherSpan(PubsubMessageWrapper message) {
createCommonSpanAttributesBuilder(
message.getTopicName(), message.getTopicProject(), "publish", "create");

attributesBuilder.put(MESSAGE_SIZE_ATTR_KEY, message.getDataSize());
attributesBuilder.put(MessagingIncubatingAttributes.MESSAGING_MESSAGE_BODY_SIZE, message.getDataSize());
if (!message.getOrderingKey().isEmpty()) {
attributesBuilder.put(ORDERING_KEY_ATTR_KEY, message.getOrderingKey());
attributesBuilder.put(MessagingIncubatingAttributes.MESSAGING_GCP_PUBSUB_MESSAGE_ORDERING_KEY, message.getOrderingKey());
}

Span publisherSpan =
Expand Down Expand Up @@ -239,14 +234,14 @@ void startSubscriberSpan(PubsubMessageWrapper message, boolean exactlyOnceDelive

attributesBuilder
.put(SemanticAttributes.MESSAGING_MESSAGE_ID, message.getMessageId())
.put(MESSAGE_SIZE_ATTR_KEY, message.getDataSize())
.put(MESSAGE_ACK_ID_ATTR_KEY, message.getAckId())
.put(MessagingIncubatingAttributes.MESSAGING_MESSAGE_BODY_SIZE, message.getDataSize())
.put(MessagingIncubatingAttributes.MESSAGING_GCP_PUBSUB_MESSAGE_ACK_ID, message.getAckId())
.put(MESSAGE_EXACTLY_ONCE_ATTR_KEY, exactlyOnceDeliveryEnabled);
if (!message.getOrderingKey().isEmpty()) {
attributesBuilder.put(ORDERING_KEY_ATTR_KEY, message.getOrderingKey());
attributesBuilder.put(MessagingIncubatingAttributes.MESSAGING_GCP_PUBSUB_MESSAGE_ORDERING_KEY, message.getOrderingKey());
}
if (message.getDeliveryAttempt() > 0) {
attributesBuilder.put(MESSAGE_DELIVERY_ATTEMPT_ATTR_KEY, message.getDeliveryAttempt());
attributesBuilder.put(MessagingIncubatingAttributes.MESSAGING_GCP_PUBSUB_MESSAGE_DELIVERY_ATTEMPT, message.getDeliveryAttempt());
}
Attributes attributes = attributesBuilder.build();
Context publisherSpanContext = message.extractSpanContext(attributes);
Expand Down Expand Up @@ -380,7 +375,7 @@ Span startSubscribeRpcSpan(
// Ack deadline and receipt modack are specific to the modack operation
if (rpcOperation == "modack") {
attributesBuilder
.put(ACK_DEADLINE_ATTR_KEY, ackDeadline)
.put(MessagingIncubatingAttributes.MESSAGING_GCP_PUBSUB_MESSAGE_ACK_DEADLINE, ackDeadline)
.put(RECEIPT_MODACK_ATTR_KEY, isReceiptModack);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -927,6 +927,15 @@ public Builder setCompressionBytesThreshold(long compressionBytesThreshold) {
return this;
}


/**
* OpenTelemetry will be enabled if setEnableOpenTelemetry is true and and instance of OpenTelemetry has been provied.
Warning: traces are subject to change. The name and attributes of a span might
change without notice. Only use run traces interactively. Don't use in
automation. Running non-interactive traces can cause problems if the underlying
trace architecture changes without notice.
*/

/** Gives the ability to enable Open Telemetry Tracing */
public Builder setEnableOpenTelemetryTracing(boolean enableOpenTelemetryTracing) {
this.enableOpenTelemetryTracing = enableOpenTelemetryTracing;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -707,13 +707,21 @@ Builder setClock(ApiClock clock) {
return this;
}

/**
* OpenTelemetry will be enabled if setEnableOpenTelemetry is true and and instance of OpenTelemetry has been provied.
Warning: traces are subject to change. The name and attributes of a span might
change without notice. Only use run traces interactively. Don't use in
automation. Running non-interactive traces can cause problems if the underlying
trace architecture changes without notice.
*/

/** Gives the ability to enable Open Telemetry Tracing */
public Builder setEnableOpenTelemetryTracing(boolean enableOpenTelemetryTracing) {
this.enableOpenTelemetryTracing = enableOpenTelemetryTracing;
return this;
}

/** Sets the instance of OpenTelemetry for the Publisher class. */
/** Sets the instance of OpenTelemetry for the Subscriber class. */
public Builder setOpenTelemetry(OpenTelemetry openTelemetry) {
this.openTelemetry = openTelemetry;
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import io.opentelemetry.sdk.trace.data.LinkData;
import io.opentelemetry.sdk.trace.data.SpanData;
import io.opentelemetry.sdk.trace.data.StatusData;
import io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes;
import io.opentelemetry.semconv.trace.attributes.SemanticAttributes;
import java.util.Arrays;
import java.util.List;
Expand Down Expand Up @@ -84,16 +85,10 @@ public class OpenTelemetryTest {

private static final String MESSAGING_SYSTEM_VALUE = "gcp_pubsub";
private static final String PROJECT_ATTR_KEY = "gcp.project_id";
private static final String MESSAGE_SIZE_ATTR_KEY = "messaging.message.body.size";
private static final String ORDERING_KEY_ATTR_KEY = "messaging.gcp_pubsub.message.ordering_key";
private static final String ACK_DEADLINE_ATTR_KEY = "messaging.gcp_pubsub.message.ack_deadline";
private static final String RECEIPT_MODACK_ATTR_KEY = "messaging.gcp_pubsub.is_receipt_modack";
private static final String MESSAGE_ACK_ID_ATTR_KEY = "messaging.gcp_pubsub.message.ack_id";
private static final String MESSAGE_EXACTLY_ONCE_ATTR_KEY =
"messaging.gcp_pubsub.message.exactly_once_delivery";
private static final String MESSAGE_RESULT_ATTR_KEY = "messaging.gcp_pubsub.result";
private static final String MESSAGE_DELIVERY_ATTEMPT_ATTR_KEY =
"messaging.gcp_pubsub.message.delivery_attempt";

private static final String TRACEPARENT_ATTRIBUTE = "googclient_traceparent";

Expand Down Expand Up @@ -195,8 +190,8 @@ public void testPublishSpansSuccess() {
.containsEntry(PROJECT_ATTR_KEY, PROJECT_NAME)
.containsEntry(SemanticAttributes.CODE_FUNCTION, "publish")
.containsEntry(SemanticAttributes.MESSAGING_OPERATION, "create")
.containsEntry(ORDERING_KEY_ATTR_KEY, ORDERING_KEY)
.containsEntry(MESSAGE_SIZE_ATTR_KEY, messageSize)
.containsEntry(MessagingIncubatingAttributes.MESSAGING_GCP_PUBSUB_MESSAGE_ORDERING_KEY, ORDERING_KEY)
.containsEntry(MessagingIncubatingAttributes.MESSAGING_MESSAGE_BODY_SIZE, messageSize)
.containsEntry(SemanticAttributes.MESSAGING_MESSAGE_ID, MESSAGE_ID);

// Check that the message has the attribute containing the trace context.
Expand Down Expand Up @@ -406,7 +401,7 @@ public void testSubscribeSpansSuccess() {
.containsEntry(SemanticAttributes.MESSAGING_OPERATION, "modack")
.containsEntry(
SemanticAttributes.MESSAGING_BATCH_MESSAGE_COUNT, subscribeMessageWrappers.size())
.containsEntry(ACK_DEADLINE_ATTR_KEY, 10)
.containsEntry(MessagingIncubatingAttributes.MESSAGING_GCP_PUBSUB_MESSAGE_ACK_DEADLINE, 10)
.containsEntry(RECEIPT_MODACK_ATTR_KEY, true);

// Check span data, links, and attributes for the ack RPC span
Expand Down Expand Up @@ -503,10 +498,10 @@ public void testSubscribeSpansSuccess() {
SemanticAttributes.MESSAGING_DESTINATION_NAME, FULL_SUBSCRIPTION_NAME.getSubscription())
.containsEntry(PROJECT_ATTR_KEY, PROJECT_NAME)
.containsEntry(SemanticAttributes.CODE_FUNCTION, "onResponse")
.containsEntry(MESSAGE_SIZE_ATTR_KEY, messageSize)
.containsEntry(ORDERING_KEY_ATTR_KEY, ORDERING_KEY)
.containsEntry(MESSAGE_ACK_ID_ATTR_KEY, ACK_ID)
.containsEntry(MESSAGE_DELIVERY_ATTEMPT_ATTR_KEY, DELIVERY_ATTEMPT)
.containsEntry(MessagingIncubatingAttributes.MESSAGING_MESSAGE_BODY_SIZE, messageSize)
.containsEntry(MessagingIncubatingAttributes.MESSAGING_GCP_PUBSUB_MESSAGE_ORDERING_KEY, ORDERING_KEY)
.containsEntry(MessagingIncubatingAttributes.MESSAGING_GCP_PUBSUB_MESSAGE_ACK_ID, ACK_ID)
.containsEntry(MessagingIncubatingAttributes.MESSAGING_GCP_PUBSUB_MESSAGE_DELIVERY_ATTEMPT, DELIVERY_ATTEMPT)
.containsEntry(MESSAGE_EXACTLY_ONCE_ATTR_KEY, EXACTLY_ONCE_ENABLED)
.containsEntry(MESSAGE_RESULT_ATTR_KEY, PROCESS_ACTION)
.containsEntry(SemanticAttributes.MESSAGING_MESSAGE_ID, MESSAGE_ID);
Expand Down

0 comments on commit 305610e

Please sign in to comment.