Skip to content

Commit

Permalink
Add modack span
Browse files Browse the repository at this point in the history
  • Loading branch information
mukund-ananthu committed Sep 22, 2024
1 parent c384466 commit c08287b
Show file tree
Hide file tree
Showing 3 changed files with 104 additions and 6 deletions.
48 changes: 46 additions & 2 deletions google/cloud/pubsub_v1/open_telemetry/subscribe_opentelemetry.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from typing import Optional
from typing import Optional, List
from datetime import datetime

from opentelemetry import trace, context
Expand Down Expand Up @@ -56,10 +56,22 @@ def __init__(self, message: PubsubMessage):
# proces span to add links to the publisher create span.
self._publisher_create_span_context: Optional[context.Context] = None

# This will be set by `start_subscribe_span` method and will be used
# for other spans, such as modack span.
self._project_id: Optional[str] = None

@property
def subscription_id(self):
def subscription_id(self) -> Optional[str]:
return self._subscription_id

@property
def project_id(self) -> Optional[str]:
return self._project_id

@property
def subscribe_span(self) -> Optional[trace.Span]:
return self._subscribe_span

def start_subscribe_span(
self,
subscription: str,
Expand All @@ -75,6 +87,7 @@ def start_subscribe_span(
self._publisher_create_span_context = parent_span_context
assert len(subscription.split("/")) == 4
subscription_short_name = subscription.split("/")[3]
self._project_id = subscription.split("/")[1]
self._subscription_id = subscription_short_name
with tracer.start_as_current_span(
name=f"{subscription_short_name} subscribe",
Expand Down Expand Up @@ -185,3 +198,34 @@ def add_process_span_event(self, event: str) -> None:
"timestamp": str(datetime.now()),
},
)


def start_modack_span(
subscribe_span_links: List[trace.Link],
subscription_id: Optional[str],
message_count: int,
deadline: float,
project_id: Optional[str],
code_function: str,
) -> trace.Span:
_OPEN_TELEMETRY_TRACER_NAME: str = "google.cloud.pubsub_v1"
_OPEN_TELEMETRY_MESSAGING_SYSTEM: str = "gcp_pubsub"
assert subscription_id is not None
assert project_id is not None
tracer = trace.get_tracer(_OPEN_TELEMETRY_TRACER_NAME)
with tracer.start_as_current_span(
name=f"{subscription_id} modack",
attributes={
"messaging.system": _OPEN_TELEMETRY_MESSAGING_SYSTEM,
"messaging.batch.message_count": message_count,
"messaging.gcp_pubsub.message.ack_deadline": deadline,
"messaging.destination.name": subscription_id,
"gcp.project_id": project_id,
"messaging.operation.name": "modack",
"code.function": code_function,
},
links=subscribe_span_links,
kind=trace.SpanKind.CLIENT,
end_on_exit=False,
) as modack_span:
return modack_span
38 changes: 37 additions & 1 deletion google/cloud/pubsub_v1/subscriber/_protocol/leaser.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,17 @@
import threading
import time
import typing
from typing import Dict, Iterable, Optional, Union
from typing import Dict, Iterable, Optional, Union, List

from opentelemetry import trace

from google.cloud.pubsub_v1.subscriber._protocol.dispatcher import _MAX_BATCH_LATENCY
from google.cloud.pubsub_v1.open_telemetry.subscribe_opentelemetry import (
SubscribeOpenTelemetry,
)
from google.cloud.pubsub_v1.open_telemetry.subscribe_opentelemetry import (
start_modack_span,
)

try:
from collections.abc import KeysView
Expand Down Expand Up @@ -219,12 +224,43 @@ def maintain_leases(self) -> None:
for message in list(leased_messages.values())
if message.opentelemetry_data
]

subscribe_span_links: List[trace.Link] = []
subscription_id: Optional[str] = None
project_id: Optional[str] = None
if len(opentelemetry_data) > 0:
subscription_id = opentelemetry_data[0].subscription_id
project_id = opentelemetry_data[0].project_id
for data in opentelemetry_data:
subscribe_span: Optional[trace.Span] = data.subscribe_span
if (
subscribe_span
and subscribe_span.get_span_context().trace_flags.sampled
):
subscribe_span_links.append(
trace.Link(subscribe_span.get_span_context())
)

modack_span: Optional[trace.Span] = None
if len(opentelemetry_data) > 0:
modack_span = start_modack_span(
subscribe_span_links,
subscription_id,
len(opentelemetry_data),
deadline,
project_id,
"maintain_leases",
)

expired_ack_ids = self._manager._send_lease_modacks(
ack_id_gen,
deadline,
opentelemetry_data,
)

if modack_span:
modack_span.end()

start_time = time.time()
# If exactly once delivery is enabled, we should drop all expired ack_ids from lease management.
if self._manager._exactly_once_delivery_enabled() and len(expired_ack_ids):
Expand Down
24 changes: 21 additions & 3 deletions tests/unit/pubsub_v1/subscriber/test_leaser.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@
import logging
import sys
import threading
import math

from opentelemetry import trace

from google.cloud.pubsub_v1 import types
from google.cloud.pubsub_v1.subscriber._protocol import dispatcher
Expand Down Expand Up @@ -201,7 +204,7 @@ def test_opentelemetry_expired_message_exactly_once_process_span(span_exporter):
msg.message_id = 3
opentelemetry_data = SubscribeOpenTelemetry(msg)
opentelemetry_data.start_subscribe_span(
subscription="projects/projectId/subscriptions/subscriptionID",
subscription="projects/projectID/subscriptions/subscriptionID",
exactly_once_enabled=True,
ack_id="ack_id",
delivery_attempt=4,
Expand All @@ -223,8 +226,8 @@ def test_opentelemetry_expired_message_exactly_once_process_span(span_exporter):

opentelemetry_data.end_subscribe_span()
spans = span_exporter.get_finished_spans()
assert len(spans) == 2
process_span, subscribe_span = spans
assert len(spans) == 3
modack_span, process_span, subscribe_span = spans

assert process_span.name == "subscriptionID process"
assert subscribe_span.name == "subscriptionID subscribe"
Expand All @@ -234,6 +237,21 @@ def test_opentelemetry_expired_message_exactly_once_process_span(span_exporter):

assert process_span.parent == subscribe_span.context

assert modack_span.name == "subscriptionID modack"
assert modack_span.parent is None
assert modack_span.kind == trace.SpanKind.CLIENT
assert len(modack_span.links) == 1
modack_span_attributes = modack_span.attributes
assert modack_span_attributes["messaging.system"] == "gcp_pubsub"
assert modack_span_attributes["messaging.batch.message_count"] == 1
assert math.isclose(
modack_span_attributes["messaging.gcp_pubsub.message.ack_deadline"], 10
)
assert modack_span_attributes["messaging.destination.name"] == "subscriptionID"
assert modack_span_attributes["gcp.project_id"] == "projectID"
assert modack_span_attributes["messaging.operation.name"] == "modack"
assert modack_span_attributes["code.function"] == "maintain_leases"


def test_maintain_leases_ack_ids():
manager = create_manager()
Expand Down

0 comments on commit c08287b

Please sign in to comment.