From 875ff96d4a4de8e605ac87ff1471ec4b99c1ff62 Mon Sep 17 00:00:00 2001 From: obs-gh-virjramakrishnan Date: Wed, 4 Sep 2024 15:28:20 -0700 Subject: [PATCH] OB-35469: fix: add subscription request parent span as child of discovery request, and simplify invocaton call stack (#334) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit In HandleSQS, the context is being injected from the SQS event, when the lambda is triggered by a SQS request. This means that the HandleRequest from the Subscription request, which is properly part of another trace, is assuming the parent trace of the Discovery Request. Hence, the Discover call “steals” the spans from the Subscriber, at least, all of the spans that are descendant from the HandleRequest call of the Subscribe request. That leaves only the parent span of Subscriber, which is created (and thus has context provided to it) before we extract the context from the SQS event. Thus, it shows up as its own trace. Wrapping the OTEL handler wrapper (which is what makes the first span in the trace) with another handler that will add context from the SQS event if the payload is of that format means that the first span should also get the context from the discovery request. --- pkg/handler/subscriber/instrument.go | 6 +-- pkg/lambda/subscriber/lambda.go | 2 +- pkg/tracing/sqs.go | 49 +++++++++++++++++++ .../subscriber => tracing}/sqspropagator.go | 2 +- 4 files changed, 54 insertions(+), 5 deletions(-) create mode 100644 pkg/tracing/sqs.go rename pkg/{handler/subscriber => tracing}/sqspropagator.go (99%) diff --git a/pkg/handler/subscriber/instrument.go b/pkg/handler/subscriber/instrument.go index 8ddb59a2..b4dce65b 100644 --- a/pkg/handler/subscriber/instrument.go +++ b/pkg/handler/subscriber/instrument.go @@ -8,6 +8,8 @@ import ( "github.com/aws/aws-lambda-go/events" "github.com/aws/aws-sdk-go-v2/service/sqs" "github.com/aws/aws-sdk-go-v2/service/sqs/types" + "github.com/observeinc/aws-sam-apps/pkg/tracing" + "github.com/go-logr/logr" "go.opentelemetry.io/contrib/propagators/b3" "go.opentelemetry.io/otel/codes" @@ -25,8 +27,6 @@ func (h *InstrumentedHandler) HandleSQS(ctx context.Context, request events.SQSE for _, record := range request.Records { var req Request var err error - logger.V(3).Info("Getting context from message attributes") - ctx = NewSQSCarrier().Extract(ctx, record.MessageAttributes) if err = json.Unmarshal([]byte(record.Body), &req); err == nil { _, err = h.HandleRequest(ctx, &req) @@ -66,7 +66,7 @@ func (q *InstrumentedSQSClient) SendMessage(ctx context.Context, msg *sqs.SendMe logger := logr.FromContextOrDiscard(ctx) logger.V(3).Info("Injecting context into message attributes") msg.MessageAttributes = make(map[string]types.MessageAttributeValue) - if err := NewSQSCarrier().Inject(ctx, msg.MessageAttributes); err != nil { + if err := tracing.NewSQSCarrier().Inject(ctx, msg.MessageAttributes); err != nil { return nil, fmt.Errorf("failed to inject context into message attributes: %w", err) } logger.V(3).Info("sending message %s", msg.MessageBody) diff --git a/pkg/lambda/subscriber/lambda.go b/pkg/lambda/subscriber/lambda.go index 285517e4..69b386ed 100644 --- a/pkg/lambda/subscriber/lambda.go +++ b/pkg/lambda/subscriber/lambda.go @@ -130,6 +130,6 @@ func New(ctx context.Context, cfg *Config) (*Lambda, error) { return nil, fmt.Errorf("failed to register functions: %w", err) } - l.Entrypoint = tracing.NewLambdaHandler(mux, tracerProvider) + l.Entrypoint = tracing.WrapHandlerSQSContext(tracing.NewLambdaHandler(mux, tracerProvider)) return l, nil } diff --git a/pkg/tracing/sqs.go b/pkg/tracing/sqs.go new file mode 100644 index 00000000..3ec1749c --- /dev/null +++ b/pkg/tracing/sqs.go @@ -0,0 +1,49 @@ +package tracing + +import ( + "bytes" + "context" + "encoding/json" + + "github.com/aws/aws-lambda-go/events" + "github.com/aws/aws-lambda-go/lambda" + "github.com/go-logr/logr" +) + +type SQSWithContextHandler struct { + handler lambda.Handler +} + +// Compile time check our Handler implements lambda.Handler. +var _ lambda.Handler = SQSWithContextHandler{} + +// Invoke checks if the incoming payload is from a SQS event and, if so, +// extracts the context from the SQS message and injects it into the context. +// This means that the spans created with this context will appear as children +// of a span in the discover request that created the SQS event +func (h SQSWithContextHandler) Invoke(ctx context.Context, payload []byte) ([]byte, error) { + logger := logr.FromContextOrDiscard(ctx) + logger.V(3).Info("Getting context from message attributes") + var event events.SQSEvent + dec := json.NewDecoder(bytes.NewReader(payload)) + dec.DisallowUnknownFields() + if err := dec.Decode(&event); err == nil { + for _, record := range event.Records { + ctx = NewSQSCarrier().Extract(ctx, record.MessageAttributes) + break + } + } + + response, err := h.handler.Invoke(ctx, payload) + if err != nil { + return nil, err + } + + return response, nil +} + +// WrapHandlerSQSContext Provides a Handler which wraps an existing Handler while +// injecting the SQS context into the context if the payload is a SQS event. +func WrapHandlerSQSContext(handler lambda.Handler) lambda.Handler { + return SQSWithContextHandler{handler: handler} +} diff --git a/pkg/handler/subscriber/sqspropagator.go b/pkg/tracing/sqspropagator.go similarity index 99% rename from pkg/handler/subscriber/sqspropagator.go rename to pkg/tracing/sqspropagator.go index 0356e287..da9865e7 100644 --- a/pkg/handler/subscriber/sqspropagator.go +++ b/pkg/tracing/sqspropagator.go @@ -1,4 +1,4 @@ -package subscriber +package tracing import ( "context"