Skip to content

Commit

Permalink
Add ack span
Browse files Browse the repository at this point in the history
  • Loading branch information
mukund-ananthu committed Sep 23, 2024
1 parent 2410a4e commit b66fd19
Show file tree
Hide file tree
Showing 3 changed files with 113 additions and 24 deletions.
44 changes: 33 additions & 11 deletions google/cloud/pubsub_v1/open_telemetry/subscribe_opentelemetry.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,11 @@
)
from google.pubsub_v1.types import PubsubMessage

_OPEN_TELEMETRY_TRACER_NAME: str = "google.cloud.pubsub_v1"
_OPEN_TELEMETRY_MESSAGING_SYSTEM: str = "gcp_pubsub"

class SubscribeOpenTelemetry:
_OPEN_TELEMETRY_TRACER_NAME: str = "google.cloud.pubsub_v1"
_OPEN_TELEMETRY_MESSAGING_SYSTEM: str = "gcp_pubsub"

class SubscribeOpenTelemetry:
def __init__(self, message: PubsubMessage):
self._message: PubsubMessage = message

Expand Down Expand Up @@ -79,7 +79,7 @@ def start_subscribe_span(
ack_id: str,
delivery_attempt: int,
) -> None:
tracer = trace.get_tracer(self._OPEN_TELEMETRY_TRACER_NAME)
tracer = trace.get_tracer(_OPEN_TELEMETRY_TRACER_NAME)
parent_span_context = TraceContextTextMapPropagator().extract(
carrier=self._message,
getter=OpenTelemetryContextGetter(),
Expand All @@ -94,7 +94,7 @@ def start_subscribe_span(
context=parent_span_context if parent_span_context else None,
kind=trace.SpanKind.CONSUMER,
attributes={
"messaging.system": self._OPEN_TELEMETRY_MESSAGING_SYSTEM,
"messaging.system": _OPEN_TELEMETRY_MESSAGING_SYSTEM,
"messaging.destination.name": subscription_short_name,
"gcp.project_id": subscription.split("/")[1],
"messaging.message.id": self._message.message_id,
Expand Down Expand Up @@ -131,7 +131,7 @@ def set_subscribe_span_result(self, result: str) -> None:

def start_subscribe_concurrency_control_span(self) -> None:
assert self._subscribe_span is not None
tracer = trace.get_tracer(self._OPEN_TELEMETRY_TRACER_NAME)
tracer = trace.get_tracer(_OPEN_TELEMETRY_TRACER_NAME)
with tracer.start_as_current_span(
name="subscriber concurrency control",
kind=trace.SpanKind.INTERNAL,
Expand All @@ -146,7 +146,7 @@ def end_subscribe_concurrency_control_span(self) -> None:

def start_subscribe_scheduler_span(self) -> None:
assert self._subscribe_span is not None
tracer = trace.get_tracer(self._OPEN_TELEMETRY_TRACER_NAME)
tracer = trace.get_tracer(_OPEN_TELEMETRY_TRACER_NAME)
with tracer.start_as_current_span(
name="subscriber scheduler",
kind=trace.SpanKind.INTERNAL,
Expand All @@ -161,7 +161,7 @@ def end_subscribe_scheduler_span(self) -> None:

def start_process_span(self) -> None:
assert self._subscribe_span is not None
tracer = trace.get_tracer(self._OPEN_TELEMETRY_TRACER_NAME)
tracer = trace.get_tracer(_OPEN_TELEMETRY_TRACER_NAME)
publish_create_span_link: Optional[trace.Link] = None
if self._publisher_create_span_context:
publish_create_span: trace.Span = trace.get_current_span(
Expand All @@ -177,7 +177,7 @@ def start_process_span(self) -> None:
with tracer.start_as_current_span(
name=f"{self._subscription_id} process",
attributes={
"messaging.system": self._OPEN_TELEMETRY_MESSAGING_SYSTEM,
"messaging.system": _OPEN_TELEMETRY_MESSAGING_SYSTEM,
},
kind=trace.SpanKind.INTERNAL,
context=set_span_in_context(self._subscribe_span),
Expand Down Expand Up @@ -209,8 +209,6 @@ def start_modack_span(
code_function: str,
receipt_modack: bool,
) -> trace.Span:
_OPEN_TELEMETRY_TRACER_NAME: str = "google.cloud.pubsub_v1"
_OPEN_TELEMETRY_MESSAGING_SYSTEM: str = "gcp_pubsub"
assert subscription_id is not None
assert project_id is not None
tracer = trace.get_tracer(_OPEN_TELEMETRY_TRACER_NAME)
Expand All @@ -231,3 +229,27 @@ def start_modack_span(
end_on_exit=False,
) as modack_span:
return modack_span


def start_ack_span(
subscription_id: str,
message_count: int,
project_id: str,
links: List[trace.Link],
) -> trace.Span:
tracer = trace.get_tracer(_OPEN_TELEMETRY_TRACER_NAME)
with tracer.start_as_current_span(
name=f"{subscription_id} ack",
attributes={
"messaging.system": _OPEN_TELEMETRY_MESSAGING_SYSTEM,
"messaging.batch.message_count": message_count,
"messaging.operation": "ack",
"gcp.project_id": project_id,
"messaging.destination.name": subscription_id,
"code.function": "ack",
},
kind=trace.SpanKind.CLIENT,
links=links,
end_on_exit=False,
) as ack_span:
return ack_span
34 changes: 34 additions & 0 deletions google/cloud/pubsub_v1/subscriber/_protocol/dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,14 @@
import warnings
from google.api_core.retry import exponential_sleep_generator

from opentelemetry import trace

from google.cloud.pubsub_v1.subscriber._protocol import helper_threads
from google.cloud.pubsub_v1.subscriber._protocol import requests
from google.cloud.pubsub_v1.subscriber.exceptions import (
AcknowledgeStatus,
)
from google.cloud.pubsub_v1.open_telemetry.subscribe_opentelemetry import start_ack_span

if typing.TYPE_CHECKING: # pragma: NO COVER
import queue
Expand Down Expand Up @@ -232,18 +235,49 @@ def ack(self, items: Sequence[requests.AckRequest]) -> None:
items_gen = iter(items)
ack_ids_gen = (item.ack_id for item in items)
total_chunks = int(math.ceil(len(items) / _ACK_IDS_BATCH_SIZE))
subscription_id: Optional[str] = None
project_id: Optional[str] = None
for item in items:
if item.opentelemetry_data:
item.opentelemetry_data.add_subscribe_span_event("ack start")
if subscription_id is None:
subscription_id = item.opentelemetry_data.subscription_id
if project_id is None:
project_id = item.opentelemetry_data.project_id

for _ in range(total_chunks):
ack_reqs_dict = {
req.ack_id: req
for req in itertools.islice(items_gen, _ACK_IDS_BATCH_SIZE)
}

subscribe_links: List[trace.Link] = []
for ack_req in ack_reqs_dict.values():
if ack_req.opentelemetry_data:
subscribe_span: Optional[
trace.Span
] = ack_req.opentelemetry_data.subscribe_span
if (
subscribe_span
and subscribe_span.get_span_context().trace_flags.sampled
):
subscribe_links.append(
trace.Link(subscribe_span.get_span_context())
)
ack_span: Optional[trace.Span] = None
if subscription_id and project_id:
ack_span = start_ack_span(
subscription_id,
len(ack_reqs_dict),
project_id,
subscribe_links,
)
requests_completed, requests_to_retry = self._manager.send_unary_ack(
ack_ids=list(itertools.islice(ack_ids_gen, _ACK_IDS_BATCH_SIZE)),
ack_reqs_dict=ack_reqs_dict,
)
if ack_span:
ack_span.end()

for completed_ack in requests_completed:
if completed_ack.opentelemetry_data:
Expand Down
59 changes: 46 additions & 13 deletions tests/unit/pubsub_v1/subscriber/test_dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
import sys
import threading

from opentelemetry import trace

from google.cloud.pubsub_v1.subscriber._protocol import dispatcher
from google.cloud.pubsub_v1.subscriber._protocol import helper_threads
from google.cloud.pubsub_v1.subscriber._protocol import requests
Expand Down Expand Up @@ -410,8 +412,15 @@ def test_opentelemetry_ack(span_exporter):
)
dispatcher_ = dispatcher.Dispatcher(manager, mock.sentinel.queue)

opentelemetry_data = SubscribeOpenTelemetry(message=PubsubMessage(data=b"foo"))
opentelemetry_data.start_subscribe_span(
data1 = SubscribeOpenTelemetry(message=PubsubMessage(data=b"foo"))
data1.start_subscribe_span(
subscription="projects/projectID/subscriptions/subscriptionID",
exactly_once_enabled=True,
ack_id="ack_id",
delivery_attempt=5,
)
data2 = SubscribeOpenTelemetry(message=PubsubMessage(data=b"foo"))
data2.start_subscribe_span(
subscription="projects/projectID/subscriptions/subscriptionID",
exactly_once_enabled=True,
ack_id="ack_id",
Expand All @@ -424,22 +433,46 @@ def test_opentelemetry_ack(span_exporter):
time_to_ack=20,
ordering_key="",
future=None,
opentelemetry_data=opentelemetry_data,
)
opentelemetry_data=data1,
),
requests.AckRequest(
ack_id="ack_id_string2",
byte_size=0,
time_to_ack=20,
ordering_key="",
future=None,
opentelemetry_data=data2,
),
]
manager.send_unary_ack.return_value = (items, [])
dispatcher_.ack(items)
mock_span_context = mock.Mock(spec=trace.SpanContext)
mock_span_context.trace_flags.sampled = False
with mock.patch.object(
data2._subscribe_span, "get_span_context", return_value=mock_span_context
):
dispatcher_.ack(items)

spans = span_exporter.get_finished_spans()

assert len(spans) == 1
subscribe_span = spans[0]

assert "messaging.gcp_pubsub.result" in subscribe_span.attributes
assert subscribe_span.attributes["messaging.gcp_pubsub.result"] == "acked"
assert len(subscribe_span.events) == 2
assert subscribe_span.events[0].name == "ack start"
assert subscribe_span.events[1].name == "ack end"
assert len(spans) == 3
ack_span = spans[0]

for subscribe_span in spans[1:]:
assert subscribe_span.attributes["messaging.gcp_pubsub.result"] == "acked"
assert len(subscribe_span.events) == 2
assert subscribe_span.events[0].name == "ack start"
assert subscribe_span.events[1].name == "ack end"

assert ack_span.name == "subscriptionID ack"
assert ack_span.kind == trace.SpanKind.CLIENT
assert ack_span.parent is None
assert len(ack_span.links) == 1
assert ack_span.attributes["messaging.system"] == "gcp_pubsub"
assert ack_span.attributes["messaging.batch.message_count"] == 2
assert ack_span.attributes["messaging.operation"] == "ack"
assert ack_span.attributes["gcp.project_id"] == "projectID"
assert ack_span.attributes["messaging.destination.name"] == "subscriptionID"
assert ack_span.attributes["code.function"] == "ack"


def test_ack():
Expand Down

0 comments on commit b66fd19

Please sign in to comment.