Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: OB-35469 add subscription request parent span as child of disco… #334

Merged
merged 1 commit into from
Sep 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions pkg/handler/subscriber/instrument.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
"github.com/aws/aws-lambda-go/events"
"github.com/aws/aws-sdk-go-v2/service/sqs"
"github.com/aws/aws-sdk-go-v2/service/sqs/types"
"github.com/observeinc/aws-sam-apps/pkg/tracing"

"github.com/go-logr/logr"
"go.opentelemetry.io/contrib/propagators/b3"
"go.opentelemetry.io/otel/codes"
Expand All @@ -25,8 +27,6 @@ func (h *InstrumentedHandler) HandleSQS(ctx context.Context, request events.SQSE
for _, record := range request.Records {
var req Request
var err error
logger.V(3).Info("Getting context from message attributes")
ctx = NewSQSCarrier().Extract(ctx, record.MessageAttributes)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is suspicious. If you remove this, what functionality is InstrumentedHandler providing?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see your point. This effectively just becomes a wrapper around HandleRequest that does nothing. There might be some more refactoring that is possible if I do remove this. The idea with this modification is to change the place where the context is being inserted from. I removed adding the context in here because I am adding it in in a different place, as a wrapper to the invocation. This allows the parent span of the subscription request to get the context from the SQS event (the context of the previous request). I'll add more description in the commit message.


if err = json.Unmarshal([]byte(record.Body), &req); err == nil {
_, err = h.HandleRequest(ctx, &req)
Expand Down Expand Up @@ -66,7 +66,7 @@ func (q *InstrumentedSQSClient) SendMessage(ctx context.Context, msg *sqs.SendMe
logger := logr.FromContextOrDiscard(ctx)
logger.V(3).Info("Injecting context into message attributes")
msg.MessageAttributes = make(map[string]types.MessageAttributeValue)
if err := NewSQSCarrier().Inject(ctx, msg.MessageAttributes); err != nil {
if err := tracing.NewSQSCarrier().Inject(ctx, msg.MessageAttributes); err != nil {
return nil, fmt.Errorf("failed to inject context into message attributes: %w", err)
}
logger.V(3).Info("sending message %s", msg.MessageBody)
Expand Down
2 changes: 1 addition & 1 deletion pkg/lambda/subscriber/lambda.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,6 @@ func New(ctx context.Context, cfg *Config) (*Lambda, error) {
return nil, fmt.Errorf("failed to register functions: %w", err)
}

l.Entrypoint = tracing.NewLambdaHandler(mux, tracerProvider)
l.Entrypoint = tracing.WrapHandlerSQSContext(tracing.NewLambdaHandler(mux, tracerProvider))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

new wrapper that only applies in the subscriber case since in forwarder we're not injecting the context into the sqs messages/trying to read it back out.

return l, nil
}
49 changes: 49 additions & 0 deletions pkg/tracing/sqs.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package tracing

import (
"bytes"
"context"
"encoding/json"

"github.com/aws/aws-lambda-go/events"
"github.com/aws/aws-lambda-go/lambda"
"github.com/go-logr/logr"
)

type SQSWithContextHandler struct {
handler lambda.Handler
}

// Compile time check our Handler implements lambda.Handler.
var _ lambda.Handler = SQSWithContextHandler{}

// Invoke checks if the incoming payload is from a SQS event and, if so,
// extracts the context from the SQS message and injects it into the context.
// This means that the spans created with this context will appear as children
// of a span in the discover request that created the SQS event
func (h SQSWithContextHandler) Invoke(ctx context.Context, payload []byte) ([]byte, error) {
logger := logr.FromContextOrDiscard(ctx)
logger.V(3).Info("Getting context from message attributes")
var event events.SQSEvent
dec := json.NewDecoder(bytes.NewReader(payload))
dec.DisallowUnknownFields()
if err := dec.Decode(&event); err == nil {
for _, record := range event.Records {
ctx = NewSQSCarrier().Extract(ctx, record.MessageAttributes)
break
}
}

response, err := h.handler.Invoke(ctx, payload)
if err != nil {
return nil, err
}

return response, nil
}

// WrapHandlerSQSContext Provides a Handler which wraps an existing Handler while
// injecting the SQS context into the context if the payload is a SQS event.
func WrapHandlerSQSContext(handler lambda.Handler) lambda.Handler {
return SQSWithContextHandler{handler: handler}
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package subscriber
package tracing

import (
"context"
Expand Down