From 59684132f242522c799e5a72d681ebdcdaf1e31b Mon Sep 17 00:00:00 2001 From: tamsin johnson Date: Tue, 11 Jul 2023 14:15:21 -0700 Subject: [PATCH] feat: Add Bunny configuration for context propagation_style (#526) open-telemetry/oteps#220 recommends: > For each message it accounts for, the "Deliver" or "Receive" span SHOULD link to the message's creation context. In addition, if it is possible the creation context MAY be set as a parent of the "Deliver" or "Receive" span. But also acknolwedges: > open-telemetry/opentelemetry-specification#454 To instrument pull-based "Receive" operations as described in this document, it is necessary to add links to spans after those spans were created. The reason for this is, that not all messages are present at the start of a "Receive" operations, so links to related contexts cannot be added at the start of the span. Our "Receive" spans do not link to the message's creation context, and indeed it doesn't seem possible to do so. Likewise for setting the parent. This tries to do it anyway, and results in: - "Receive" span links remain unset; - "Receive" span is a root span for a new trace; - "Process" span trace_id is correctly set to the "Send" context; - "Process" span's parent is incorrectly set to "Send" instead of "Receive". --- .../instrumentation/bunny/instrumentation.rb | 2 ++ .../instrumentation/bunny/patches/channel.rb | 8 +++++ .../bunny/patches/queue_test.rb | 36 ++++++++++++++++++- 3 files changed, 45 insertions(+), 1 deletion(-) diff --git a/instrumentation/bunny/lib/opentelemetry/instrumentation/bunny/instrumentation.rb b/instrumentation/bunny/lib/opentelemetry/instrumentation/bunny/instrumentation.rb index 3e772534c..8a6ae3af7 100644 --- a/instrumentation/bunny/lib/opentelemetry/instrumentation/bunny/instrumentation.rb +++ b/instrumentation/bunny/lib/opentelemetry/instrumentation/bunny/instrumentation.rb @@ -21,6 +21,8 @@ class Instrumentation < OpenTelemetry::Instrumentation::Base defined?(::Bunny) end + option :propagation_style, default: :link, validate: %i[link child none] + private def require_patches diff --git a/instrumentation/bunny/lib/opentelemetry/instrumentation/bunny/patches/channel.rb b/instrumentation/bunny/lib/opentelemetry/instrumentation/bunny/patches/channel.rb index 4f75ace1a..65b2b2230 100644 --- a/instrumentation/bunny/lib/opentelemetry/instrumentation/bunny/patches/channel.rb +++ b/instrumentation/bunny/lib/opentelemetry/instrumentation/bunny/patches/channel.rb @@ -16,11 +16,19 @@ def basic_get(queue, opts = { manual_ack: false }) tracer.in_span("#{queue} receive", attributes: attributes, kind: :consumer) do |span, _ctx| delivery_info, properties, payload = super + if Bunny::Instrumentation.instance.config[:propagation_style] == :child + ctx = OpenTelemetry.propagation.extract(properties[:headers]) + token = OpenTelemetry::Context.attach(ctx) + end + return [delivery_info, properties, payload] unless delivery_info OpenTelemetry::Instrumentation::Bunny::PatchHelpers.trace_enrich_receive_span(span, self, delivery_info, properties) [delivery_info, properties, payload] + ensure + OpenTelemetry::Context.detach(token) if + Bunny::Instrumentation.instance.config[:propagation_style] == :child && token end end diff --git a/instrumentation/bunny/test/opentelemetry/instrumentation/bunny/patches/queue_test.rb b/instrumentation/bunny/test/opentelemetry/instrumentation/bunny/patches/queue_test.rb index c0c90a292..b968fd77c 100644 --- a/instrumentation/bunny/test/opentelemetry/instrumentation/bunny/patches/queue_test.rb +++ b/instrumentation/bunny/test/opentelemetry/instrumentation/bunny/patches/queue_test.rb @@ -12,6 +12,7 @@ describe OpenTelemetry::Instrumentation::Bunny::Patches::Queue do let(:instrumentation) { OpenTelemetry::Instrumentation::Bunny::Instrumentation.instance } + let(:config) { {} } let(:exporter) { EXPORTER } let(:spans) { exporter.finished_spans } @@ -30,7 +31,7 @@ # Clear spans exporter.reset - instrumentation.install + instrumentation.install(config) end after do @@ -72,5 +73,38 @@ process_span = spans.find { |span| span.name == ".#{queue_name} process" } _(process_span).must_be_nil end + + describe 'when propagation_style is child' do + let(:config) { { propagation_style: :child } } + + it 'maintains a continuous trace' do + queue.publish('Hello, opentelemetry!') + + queue.pop { |_delivery_info, _metadata, _payload| break } + + send_span = spans.find { |span| span.name == ".#{queue_name} send" } + receive_span = spans.find { |span| span.name == ".#{queue_name} receive" } + process_span = spans.find { |span| span.name == ".#{queue_name} process" } + + + _(receive_span.parent_span_id).must_equal(send_span.span_id) # fails; is INVALID_SPAN_ID + _(receive_span.trace_id).must_equal(send_span.trace_id) # fails. + + _(process_span.parent_span_id).must_equal(receive_span.span_id) # fails; is send_span.span_id + _(process_span.trace_id).must_equal(send_span.trace_id) # succeeds + end + + it 'propagates baggage' do + ctx = OpenTelemetry::Baggage.set_value('testing_baggage', 'it_worked') + + OpenTelemetry::Context.with_current(ctx) do + queue.publish('Hello, opentelemetry!') + end + + queue.pop do |_delivery_info, _metadata, _payload| + _(OpenTelemetry::Baggage.value('testing_baggage')).must_equal('it_worked') + end + end + end end end unless ENV['OMIT_SERVICES']