Skip to content

Commit

Permalink
feat(opentelemetry-instrumentation-aiokafka): wrap getone instead of …
Browse files Browse the repository at this point in the history
…anext, add tests (#2874)

* add tests

* add to CHANGELOG

* add tests for baggage

* wrap getone instead of __anext__

* split sync and async tests (fix review)

* add dimastbk to component_owners.yml for aiokafka

* Update CHANGELOG.md

---------

Co-authored-by: Emídio Neto <9735060+emdneto@users.noreply.github.com>
Co-authored-by: Riccardo Magliocchetti <riccardo.magliocchetti@gmail.com>
  • Loading branch information
3 people authored Sep 26, 2024
1 parent f8bb289 commit d52f42f
Show file tree
Hide file tree
Showing 7 changed files with 302 additions and 12 deletions.
3 changes: 3 additions & 0 deletions .github/component_owners.yml
Original file line number Diff line number Diff line change
Expand Up @@ -61,5 +61,8 @@ components:
instrumentation/opentelemetry-instrumentation-psycopg:
- federicobond

instrumentation/opentelemetry-instrumentation-aiokafka:
- dimastbk

processor/opentelemetry-processor-baggage:
- codeboten
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- `opentelemetry-instrumentation-aiokafka` Add instrumentor and auto instrumentation support for aiokafka
([#2082](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/2082))


### Fixed

- `opentelemetry-instrumentation-aiokafka` Wrap `AIOKafkaConsumer.getone()` instead of `AIOKafkaConsumer.__anext__`
([#2874](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/2874))

## Version 1.27.0/0.48b0 ()

### Added
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ async def async_consume_hook(span, record, args, kwargs):
from opentelemetry import trace
from opentelemetry.instrumentation.aiokafka.package import _instruments
from opentelemetry.instrumentation.aiokafka.utils import (
_wrap_anext,
_wrap_getone,
_wrap_send,
)
from opentelemetry.instrumentation.aiokafka.version import __version__
Expand Down Expand Up @@ -126,10 +126,10 @@ def _instrument(self, **kwargs):
)
wrap_function_wrapper(
aiokafka.AIOKafkaConsumer,
"__anext__",
_wrap_anext(tracer, async_consume_hook),
"getone",
_wrap_getone(tracer, async_consume_hook),
)

def _uninstrument(self, **kwargs):
unwrap(aiokafka.AIOKafkaProducer, "send")
unwrap(aiokafka.AIOKafkaConsumer, "__anext__")
unwrap(aiokafka.AIOKafkaConsumer, "getone")
Original file line number Diff line number Diff line change
Expand Up @@ -335,7 +335,7 @@ async def _create_consumer_span(
context.detach(token)


def _wrap_anext(
def _wrap_getone(
tracer: Tracer, async_consume_hook: ConsumeHookT
) -> Callable[..., Awaitable[aiokafka.ConsumerRecord]]:
async def _traced_next(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,29 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from unittest import TestCase

from aiokafka import AIOKafkaConsumer, AIOKafkaProducer
import uuid
from typing import Any, List, Sequence, Tuple
from unittest import IsolatedAsyncioTestCase, TestCase, mock

from aiokafka import (
AIOKafkaConsumer,
AIOKafkaProducer,
ConsumerRecord,
TopicPartition,
)
from wrapt import BoundFunctionWrapper

from opentelemetry import baggage, context
from opentelemetry.instrumentation.aiokafka import AIOKafkaInstrumentor
from opentelemetry.sdk.trace import ReadableSpan
from opentelemetry.semconv._incubating.attributes import messaging_attributes
from opentelemetry.semconv.attributes import server_attributes
from opentelemetry.test.test_base import TestBase
from opentelemetry.trace import SpanKind, format_trace_id, set_span_in_context


class TestAIOKafka(TestCase):
class TestAIOKafkaInstrumentor(TestCase):
def test_instrument_api(self) -> None:
instrumentation = AIOKafkaInstrumentor()

Expand All @@ -28,13 +42,279 @@ def test_instrument_api(self) -> None:
isinstance(AIOKafkaProducer.send, BoundFunctionWrapper)
)
self.assertTrue(
isinstance(AIOKafkaConsumer.__anext__, BoundFunctionWrapper)
isinstance(AIOKafkaConsumer.getone, BoundFunctionWrapper)
)

instrumentation.uninstrument()
self.assertFalse(
isinstance(AIOKafkaProducer.send, BoundFunctionWrapper)
)
self.assertFalse(
isinstance(AIOKafkaConsumer.__anext__, BoundFunctionWrapper)
isinstance(AIOKafkaConsumer.getone, BoundFunctionWrapper)
)


class TestAIOKafkaInstrumentation(TestBase, IsolatedAsyncioTestCase):
@staticmethod
def consumer_record_factory(
number: int, headers: Tuple[Tuple[str, bytes], ...]
) -> ConsumerRecord:
return ConsumerRecord(
f"topic_{number}",
number,
number,
number,
number,
f"key_{number}".encode(),
f"value_{number}".encode(),
None,
number,
number,
headers=headers,
)

@staticmethod
async def consumer_factory(**consumer_kwargs: Any) -> AIOKafkaConsumer:
consumer = AIOKafkaConsumer(**consumer_kwargs)

consumer._client.bootstrap = mock.AsyncMock()
consumer._client._wait_on_metadata = mock.AsyncMock()

await consumer.start()

consumer._fetcher.next_record = mock.AsyncMock()

return consumer

@staticmethod
async def producer_factory() -> AIOKafkaProducer:
producer = AIOKafkaProducer(api_version="1.0")

producer.client._wait_on_metadata = mock.AsyncMock()
producer.client.bootstrap = mock.AsyncMock()
producer._message_accumulator.add_message = mock.AsyncMock()
producer._sender.start = mock.AsyncMock()
producer._partition = mock.Mock(return_value=1)

await producer.start()

return producer

async def test_getone(self) -> None:
AIOKafkaInstrumentor().uninstrument()
AIOKafkaInstrumentor().instrument(tracer_provider=self.tracer_provider)

client_id = str(uuid.uuid4())
group_id = str(uuid.uuid4())
consumer = await self.consumer_factory(
client_id=client_id, group_id=group_id
)
next_record_mock: mock.AsyncMock = consumer._fetcher.next_record

expected_spans = [
{
"name": "topic_1 receive",
"kind": SpanKind.CONSUMER,
"attributes": {
messaging_attributes.MESSAGING_SYSTEM: messaging_attributes.MessagingSystemValues.KAFKA.value,
server_attributes.SERVER_ADDRESS: '"localhost"',
messaging_attributes.MESSAGING_CLIENT_ID: client_id,
messaging_attributes.MESSAGING_DESTINATION_NAME: "topic_1",
messaging_attributes.MESSAGING_DESTINATION_PARTITION_ID: "1",
messaging_attributes.MESSAGING_KAFKA_MESSAGE_KEY: "key_1",
messaging_attributes.MESSAGING_CONSUMER_GROUP_NAME: group_id,
messaging_attributes.MESSAGING_OPERATION_NAME: "receive",
messaging_attributes.MESSAGING_OPERATION_TYPE: messaging_attributes.MessagingOperationTypeValues.RECEIVE.value,
messaging_attributes.MESSAGING_KAFKA_MESSAGE_OFFSET: 1,
messaging_attributes.MESSAGING_MESSAGE_ID: "topic_1.1.1",
},
},
{
"name": "topic_2 receive",
"kind": SpanKind.CONSUMER,
"attributes": {
messaging_attributes.MESSAGING_SYSTEM: messaging_attributes.MessagingSystemValues.KAFKA.value,
server_attributes.SERVER_ADDRESS: '"localhost"',
messaging_attributes.MESSAGING_CLIENT_ID: client_id,
messaging_attributes.MESSAGING_DESTINATION_NAME: "topic_2",
messaging_attributes.MESSAGING_DESTINATION_PARTITION_ID: "2",
messaging_attributes.MESSAGING_KAFKA_MESSAGE_KEY: "key_2",
messaging_attributes.MESSAGING_CONSUMER_GROUP_NAME: group_id,
messaging_attributes.MESSAGING_OPERATION_NAME: "receive",
messaging_attributes.MESSAGING_OPERATION_TYPE: messaging_attributes.MessagingOperationTypeValues.RECEIVE.value,
messaging_attributes.MESSAGING_KAFKA_MESSAGE_OFFSET: 2,
messaging_attributes.MESSAGING_MESSAGE_ID: "topic_2.2.2",
},
},
]
self.memory_exporter.clear()

next_record_mock.side_effect = [
self.consumer_record_factory(
1,
headers=(
(
"traceparent",
b"00-03afa25236b8cd948fa853d67038ac79-405ff022e8247c46-01",
),
),
),
self.consumer_record_factory(2, headers=()),
]

await consumer.getone()
next_record_mock.assert_awaited_with(())

first_span = self.memory_exporter.get_finished_spans()[0]
self.assertEqual(
format_trace_id(first_span.get_span_context().trace_id),
"03afa25236b8cd948fa853d67038ac79",
)

await consumer.getone()
next_record_mock.assert_awaited_with(())

span_list = self.memory_exporter.get_finished_spans()
self._compare_spans(span_list, expected_spans)

async def test_getone_baggage(self) -> None:
received_baggage = None

async def async_consume_hook(span, *_) -> None:
nonlocal received_baggage
received_baggage = baggage.get_all(set_span_in_context(span))

AIOKafkaInstrumentor().uninstrument()
AIOKafkaInstrumentor().instrument(
tracer_provider=self.tracer_provider,
async_consume_hook=async_consume_hook,
)

consumer = await self.consumer_factory()
next_record_mock: mock.AsyncMock = consumer._fetcher.next_record

self.memory_exporter.clear()

next_record_mock.side_effect = [
self.consumer_record_factory(
1,
headers=(
(
"traceparent",
b"00-03afa25236b8cd948fa853d67038ac79-405ff022e8247c46-01",
),
("baggage", b"foo=bar"),
),
),
]

await consumer.getone()
next_record_mock.assert_awaited_with(())

self.assertEqual(received_baggage, {"foo": "bar"})

async def test_getone_consume_hook(self) -> None:
async_consume_hook_mock = mock.AsyncMock()

AIOKafkaInstrumentor().uninstrument()
AIOKafkaInstrumentor().instrument(
tracer_provider=self.tracer_provider,
async_consume_hook=async_consume_hook_mock,
)

consumer = await self.consumer_factory()
next_record_mock: mock.AsyncMock = consumer._fetcher.next_record

next_record_mock.side_effect = [
self.consumer_record_factory(1, headers=())
]

await consumer.getone()

async_consume_hook_mock.assert_awaited_once()

async def test_send(self) -> None:
AIOKafkaInstrumentor().uninstrument()
AIOKafkaInstrumentor().instrument(tracer_provider=self.tracer_provider)

producer = await self.producer_factory()
add_message_mock: mock.AsyncMock = (
producer._message_accumulator.add_message
)

tracer = self.tracer_provider.get_tracer(__name__)
with tracer.start_as_current_span("test_span") as span:
await producer.send("topic_1", b"value_1")

add_message_mock.assert_awaited_with(
TopicPartition(topic="topic_1", partition=1),
None,
b"value_1",
40.0,
timestamp_ms=None,
headers=[("traceparent", mock.ANY)],
)
add_message_mock.call_args_list[0].kwargs["headers"][0][1].startswith(
f"00-{format_trace_id(span.get_span_context().trace_id)}-".encode()
)

await producer.send("topic_2", b"value_2")
add_message_mock.assert_awaited_with(
TopicPartition(topic="topic_2", partition=1),
None,
b"value_2",
40.0,
timestamp_ms=None,
headers=[("traceparent", mock.ANY)],
)

async def test_send_baggage(self) -> None:
AIOKafkaInstrumentor().uninstrument()
AIOKafkaInstrumentor().instrument(tracer_provider=self.tracer_provider)

producer = await self.producer_factory()
add_message_mock: mock.AsyncMock = (
producer._message_accumulator.add_message
)

tracer = self.tracer_provider.get_tracer(__name__)
ctx = baggage.set_baggage("foo", "bar")
context.attach(ctx)

with tracer.start_as_current_span("test_span", context=ctx):
await producer.send("topic_1", b"value_1")

add_message_mock.assert_awaited_with(
TopicPartition(topic="topic_1", partition=1),
None,
b"value_1",
40.0,
timestamp_ms=None,
headers=[("traceparent", mock.ANY), ("baggage", b"foo=bar")],
)

async def test_send_produce_hook(self) -> None:
async_produce_hook_mock = mock.AsyncMock()

AIOKafkaInstrumentor().uninstrument()
AIOKafkaInstrumentor().instrument(
tracer_provider=self.tracer_provider,
async_produce_hook=async_produce_hook_mock,
)

producer = await self.producer_factory()

await producer.send("topic_1", b"value_1")

async_produce_hook_mock.assert_awaited_once()

def _compare_spans(
self, spans: Sequence[ReadableSpan], expected_spans: List[dict]
) -> None:
self.assertEqual(len(spans), len(expected_spans))
for span, expected_span in zip(spans, expected_spans):
self.assertEqual(expected_span["name"], span.name)
self.assertEqual(expected_span["kind"], span.kind)
self.assertEqual(
expected_span["attributes"], dict(span.attributes)
)
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
_create_consumer_span,
_extract_send_partition,
_get_span_name,
_wrap_anext,
_wrap_getone,
_wrap_send,
)
from opentelemetry.trace import SpanKind
Expand Down Expand Up @@ -187,7 +187,7 @@ async def test_wrap_next(
original_next_callback = mock.AsyncMock()
kafka_consumer = mock.MagicMock()

wrapped_next = _wrap_anext(tracer, consume_hook)
wrapped_next = _wrap_getone(tracer, consume_hook)
record = await wrapped_next(
original_next_callback, kafka_consumer, self.args, self.kwargs
)
Expand Down
1 change: 1 addition & 0 deletions tox.ini
Original file line number Diff line number Diff line change
Expand Up @@ -444,6 +444,7 @@ commands_pre =
aiokafka: pip install opentelemetry-api@{env:CORE_REPO}\#egg=opentelemetry-api&subdirectory=opentelemetry-api
aiokafka: pip install opentelemetry-semantic-conventions@{env:CORE_REPO}\#egg=opentelemetry-semantic-conventions&subdirectory=opentelemetry-semantic-conventions
aiokafka: pip install opentelemetry-sdk@{env:CORE_REPO}\#egg=opentelemetry-sdk&subdirectory=opentelemetry-sdk
aiokafka: pip install opentelemetry-test-utils@{env:CORE_REPO}\#egg=opentelemetry-test-utils&subdirectory=tests/opentelemetry-test-utils
aiokafka: pip install -r {toxinidir}/instrumentation/opentelemetry-instrumentation-aiokafka/test-requirements.txt

kafka-python: pip install opentelemetry-api@{env:CORE_REPO}\#egg=opentelemetry-api&subdirectory=opentelemetry-api
Expand Down

0 comments on commit d52f42f

Please sign in to comment.