Skip to content

Commit

Permalink
support topic as kwarg (#949)
Browse files Browse the repository at this point in the history
  • Loading branch information
nozik authored Mar 14, 2022
1 parent d86f164 commit 8fc95ca
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 8 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
([#903](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/903))
- `opentelemetry-instrumentation-falcon` Safer patching mechanism
([#895](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/895))
- `opentelemetry-instrumentation-kafka-python` Fix topic extraction
([#949](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/949))

## [1.9.1-0.28b1](https://github.com/open-telemetry/opentelemetry-python/releases/tag/v1.9.1-0.28b1) - 2022-01-29

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,11 @@ def _extract_argument(key, position, default_value, args, kwargs):
return kwargs.get(key, default_value)

@staticmethod
def extract_send_topic(args):
def extract_send_topic(args, kwargs):
"""extract topic from `send` method arguments in KafkaProducer class"""
if len(args) > 0:
return args[0]
return "unknown"
return KafkaPropertiesExtractor._extract_argument(
"topic", 0, "unknown", args, kwargs
)

@staticmethod
def extract_send_value(args, kwargs):
Expand All @@ -56,7 +56,7 @@ def extract_send_headers(args, kwargs):
def extract_send_partition(instance, args, kwargs):
"""extract partition `send` method arguments, using the `_partition` method in KafkaProducer class"""
try:
topic = KafkaPropertiesExtractor.extract_send_topic(args)
topic = KafkaPropertiesExtractor.extract_send_topic(args, kwargs)
key = KafkaPropertiesExtractor.extract_send_key(args, kwargs)
value = KafkaPropertiesExtractor.extract_send_value(args, kwargs)
partition = KafkaPropertiesExtractor._extract_argument(
Expand Down Expand Up @@ -145,7 +145,7 @@ def _traced_send(func, instance, args, kwargs):
headers = []
kwargs["headers"] = headers

topic = KafkaPropertiesExtractor.extract_send_topic(args)
topic = KafkaPropertiesExtractor.extract_send_topic(args, kwargs)
bootstrap_servers = KafkaPropertiesExtractor.extract_bootstrap_servers(
instance
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,57 @@ def setUp(self) -> None:
@mock.patch("opentelemetry.instrumentation.kafka.utils._enrich_span")
@mock.patch("opentelemetry.trace.set_span_in_context")
@mock.patch("opentelemetry.propagate.inject")
def test_wrap_send(
def test_wrap_send_with_topic_as_arg(
self,
inject: mock.MagicMock,
set_span_in_context: mock.MagicMock,
enrich_span: mock.MagicMock,
extract_send_partition: mock.MagicMock,
extract_bootstrap_servers: mock.MagicMock,
):
) -> None:
self.wrap_send_helper(
inject,
set_span_in_context,
enrich_span,
extract_send_partition,
extract_bootstrap_servers,
)

@mock.patch(
"opentelemetry.instrumentation.kafka.utils.KafkaPropertiesExtractor.extract_bootstrap_servers"
)
@mock.patch(
"opentelemetry.instrumentation.kafka.utils.KafkaPropertiesExtractor.extract_send_partition"
)
@mock.patch("opentelemetry.instrumentation.kafka.utils._enrich_span")
@mock.patch("opentelemetry.trace.set_span_in_context")
@mock.patch("opentelemetry.propagate.inject")
def test_wrap_send_with_topic_as_kwarg(
self,
inject: mock.MagicMock,
set_span_in_context: mock.MagicMock,
enrich_span: mock.MagicMock,
extract_send_partition: mock.MagicMock,
extract_bootstrap_servers: mock.MagicMock,
) -> None:
self.args = []
self.kwargs["topic"] = self.topic_name
self.wrap_send_helper(
inject,
set_span_in_context,
enrich_span,
extract_send_partition,
extract_bootstrap_servers,
)

def wrap_send_helper(
self,
inject: mock.MagicMock,
set_span_in_context: mock.MagicMock,
enrich_span: mock.MagicMock,
extract_send_partition: mock.MagicMock,
extract_bootstrap_servers: mock.MagicMock,
) -> None:
tracer = mock.MagicMock()
produce_hook = mock.MagicMock()
original_send_callback = mock.MagicMock()
Expand Down

0 comments on commit 8fc95ca

Please sign in to comment.