Skip to content

Commit

Permalink
OB-35469: fix: add subscription request parent span as child of disco…
Browse files Browse the repository at this point in the history
…very request, and simplify invocaton call stack

In HandleSQS, the context is being injected from the SQS event, when the lambda is triggered by a SQS request. This means that the HandleRequest from the
Subscription request, which is properly part of another trace, is assuming the parent trace of the Discovery Request.

Hence, the Discover call “steals” the spans from the Subscriber, at least, all of the spans that are descendant from the HandleRequest call of the Subscribe request. That leaves only the parent span of Subscriber, which is created (and thus has context provided to it) before we extract the context from the SQS event. Thus, it shows up as its own trace.

Wrapping the OTEL handler wrapper (which is what makes the first span in the trace) with another handler that will add context from the SQS event if the payload is of that format means that
the first span should also get the context from the discovery request.
  • Loading branch information
obs-gh-virjramakrishnan committed Sep 3, 2024
1 parent 52a46e9 commit 21c529c
Show file tree
Hide file tree
Showing 4 changed files with 54 additions and 5 deletions.
6 changes: 3 additions & 3 deletions pkg/handler/subscriber/instrument.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
"github.com/aws/aws-lambda-go/events"
"github.com/aws/aws-sdk-go-v2/service/sqs"
"github.com/aws/aws-sdk-go-v2/service/sqs/types"
"github.com/observeinc/aws-sam-apps/pkg/tracing"

"github.com/go-logr/logr"
"go.opentelemetry.io/contrib/propagators/b3"
"go.opentelemetry.io/otel/codes"
Expand All @@ -25,8 +27,6 @@ func (h *InstrumentedHandler) HandleSQS(ctx context.Context, request events.SQSE
for _, record := range request.Records {
var req Request
var err error
logger.V(3).Info("Getting context from message attributes")
ctx = NewSQSCarrier().Extract(ctx, record.MessageAttributes)

if err = json.Unmarshal([]byte(record.Body), &req); err == nil {
_, err = h.HandleRequest(ctx, &req)
Expand Down Expand Up @@ -66,7 +66,7 @@ func (q *InstrumentedSQSClient) SendMessage(ctx context.Context, msg *sqs.SendMe
logger := logr.FromContextOrDiscard(ctx)
logger.V(3).Info("Injecting context into message attributes")
msg.MessageAttributes = make(map[string]types.MessageAttributeValue)
if err := NewSQSCarrier().Inject(ctx, msg.MessageAttributes); err != nil {
if err := tracing.NewSQSCarrier().Inject(ctx, msg.MessageAttributes); err != nil {
return nil, fmt.Errorf("failed to inject context into message attributes: %w", err)
}
logger.V(3).Info("sending message %s", msg.MessageBody)
Expand Down
2 changes: 1 addition & 1 deletion pkg/lambda/subscriber/lambda.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,6 @@ func New(ctx context.Context, cfg *Config) (*Lambda, error) {
return nil, fmt.Errorf("failed to register functions: %w", err)
}

l.Entrypoint = tracing.NewLambdaHandler(mux, tracerProvider)
l.Entrypoint = tracing.WrapHandlerSQSContext(tracing.NewLambdaHandler(mux, tracerProvider))
return l, nil
}
49 changes: 49 additions & 0 deletions pkg/tracing/sqs.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package tracing

import (
"bytes"
"context"
"encoding/json"

"github.com/aws/aws-lambda-go/events"
"github.com/aws/aws-lambda-go/lambda"
"github.com/go-logr/logr"
)

type SQSWithContextHandler struct {
handler lambda.Handler
}

// Compile time check our Handler implements lambda.Handler.
var _ lambda.Handler = SQSWithContextHandler{}

// Invoke checks if the incoming payload is from a SQS event and, if so,
// extracts the context from the SQS message and injects it into the context.
// This means that the spans created with this context will appear as children
// of a span in the discover request that created the SQS event
func (h SQSWithContextHandler) Invoke(ctx context.Context, payload []byte) ([]byte, error) {
logger := logr.FromContextOrDiscard(ctx)
logger.V(3).Info("Getting context from message attributes")
var event events.SQSEvent
dec := json.NewDecoder(bytes.NewReader(payload))
dec.DisallowUnknownFields()
if err := dec.Decode(&event); err == nil {
for _, record := range event.Records {
ctx = NewSQSCarrier().Extract(ctx, record.MessageAttributes)
break
}
}

response, err := h.handler.Invoke(ctx, payload)
if err != nil {
return nil, err
}

return response, nil
}

// WrapHandlerSQSContext Provides a Handler which wraps an existing Handler while
// injecting the SQS context into the context if the payload is a SQS event.
func WrapHandlerSQSContext(handler lambda.Handler) lambda.Handler {
return SQSWithContextHandler{handler: handler}
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package subscriber
package tracing

import (
"context"
Expand Down

0 comments on commit 21c529c

Please sign in to comment.