Skip to content

Commit

Permalink
events: adds methods to inject and extract otel context (#144)
Browse files Browse the repository at this point in the history
* events: add methods to inject and extract otel context

* events/Message: declare ExtractOtelTraceContext method in interface

* update mocks for interface changes

* events: fix up missing implementation in test
  • Loading branch information
joelrebel authored Jun 30, 2023
1 parent e8bdb6e commit c3044c6
Show file tree
Hide file tree
Showing 8 changed files with 134 additions and 7 deletions.
8 changes: 8 additions & 0 deletions events/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,11 @@ const (
Delete EventType = "delete"
)

// when updating any of the interfaces here, make sure to regenerate the mocks
//
// mockgen -package=mock_events -source=events/events.go > events/mock/events.go
//

// Stream provides methods to interact with the event stream.
type Stream interface {
// Open sets up the stream connection.
Expand Down Expand Up @@ -72,6 +77,9 @@ type Message interface {

// Data returns the data contained in the message.
Data() []byte

// ExtractOtelTraceContext returns a context populated with the parent trace if any.
ExtractOtelTraceContext(ctx context.Context) context.Context
}

// NewStream returns a Stream implementation.
Expand Down
16 changes: 15 additions & 1 deletion events/mock/events.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

20 changes: 18 additions & 2 deletions events/nats.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import (
"github.com/hashicorp/go-multierror"
"github.com/nats-io/nats.go"
"github.com/pkg/errors"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/propagation"
)

var (
Expand Down Expand Up @@ -226,7 +228,7 @@ func (n *NatsJetstream) addConsumer() error {
// Publish publishes an event onto the NATS Jetstream. The caller is responsible for message
// addressing and data serialization. NOTE: The subject passed here will be prepended with any
// configured PublisherSubjectPrefix.
func (n *NatsJetstream) Publish(_ context.Context, subjectSuffix string, data []byte) error {
func (n *NatsJetstream) Publish(ctx context.Context, subjectSuffix string, data []byte) error {
if n.jsctx == nil {
return errors.Wrap(ErrNatsJetstreamAddConsumer, "Jetstream context is not setup")
}
Expand All @@ -242,10 +244,24 @@ func (n *NatsJetstream) Publish(_ context.Context, subjectSuffix string, data []
subjectSuffix,
}, ".")

_, err := n.jsctx.Publish(fullSubject, data, options...)
msg := nats.NewMsg(fullSubject)
msg.Data = data

// inject otel trace context
injectOtelTraceContext(ctx, msg)

_, err := n.jsctx.PublishMsg(msg, options...)
return err
}

func injectOtelTraceContext(ctx context.Context, msg *nats.Msg) {
if msg.Header == nil {
msg.Header = make(nats.Header)
}

otel.GetTextMapPropagator().Inject(ctx, propagation.HeaderCarrier(msg.Header))
}

// Subscribe to all configured SubscribeSubjects
func (n *NatsJetstream) Subscribe(ctx context.Context) (MsgCh, error) {
if n.jsctx == nil {
Expand Down
12 changes: 12 additions & 0 deletions events/nats_message.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,12 @@
package events

import (
"context"

"github.com/nats-io/nats.go"
"github.com/pkg/errors"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/propagation"
)

// here we implement the Message interface for nats.Msg
Expand Down Expand Up @@ -50,6 +54,14 @@ func (nm *natsMsg) Data() []byte {
return nm.msg.Data
}

func (nm *natsMsg) ExtractOtelTraceContext(ctx context.Context) context.Context {
if nm == nil || nm.msg.Header == nil {
return ctx
}

return otel.GetTextMapPropagator().Extract(ctx, propagation.HeaderCarrier(nm.msg.Header))
}

func msgIfFromNats(natsMsgs ...*nats.Msg) []Message {
msgs := make([]Message, 0, len(natsMsgs))
for _, m := range natsMsgs {
Expand Down
5 changes: 5 additions & 0 deletions events/nats_message_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
package events

import (
"context"
"testing"

"github.com/nats-io/nats.go"
Expand Down Expand Up @@ -34,6 +35,10 @@ func (_ *bogusMsg) Data() []byte {
return nil
}

func (_ *bogusMsg) ExtractOtelTraceContext(ctx context.Context) context.Context {
return ctx
}

func TestConversions(t *testing.T) {
nm := &natsMsg{
msg: nats.NewMsg("some.subject"),
Expand Down
51 changes: 51 additions & 0 deletions events/nats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,12 @@ import (
"testing"

"github.com/nats-io/nats.go"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/propagation"
traceSDK "go.opentelemetry.io/otel/sdk/trace"
"go.opentelemetry.io/otel/trace"

natsTest "go.hollow.sh/toolbox/events/internal/test"
)
Expand Down Expand Up @@ -84,3 +89,49 @@ func TestPublishAndSubscribe(t *testing.T) {
require.Equal(t, 1, len(msgs))
require.Equal(t, payload, msgs[0].Data())
}

func TestInjectOtelTraceContext(t *testing.T) {
// set the tracing propagator so its available for injection
otel.SetTextMapPropagator(
propagation.NewCompositeTextMapPropagator(propagation.TraceContext{}),
)

// setup a new trace provider
ctx, span := traceSDK.NewTracerProvider().Tracer("testing").Start(context.Background(), "foo.bar")
defer span.End()

msg := nats.NewMsg("foo.bar")
msg.Data = []byte(`hello`)

injectOtelTraceContext(ctx, msg)

assert.NotEmpty(t, msg.Header.Get("Traceparent"))
}

func TestExtractOtelTraceContext(t *testing.T) {
// set the tracing propagator so its available for injection
otel.SetTextMapPropagator(
propagation.NewCompositeTextMapPropagator(propagation.TraceContext{}),
)

// setup a new trace provider
ctx, span := traceSDK.NewTracerProvider().Tracer("testing").Start(context.Background(), "foo.bar")
defer span.End()

msg := nats.NewMsg("foo.bar")
msg.Data = []byte(`hello`)

// inject
injectOtelTraceContext(ctx, msg)

// msg header gets a trace parent added
traceParent := msg.Header.Get("Traceparent")

// wrap natsMsg to pass to extract method
nm := &natsMsg{msg}

ctxWithTrace := nm.ExtractOtelTraceContext(context.Background())
got := trace.SpanFromContext(ctxWithTrace).SpanContext().TraceID().String()

assert.Contains(t, traceParent, got)
}
10 changes: 8 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,10 @@ require (
github.com/spf13/cobra v1.6.1
github.com/spf13/pflag v1.0.5
github.com/spf13/viper v1.15.0
github.com/stretchr/testify v1.8.2
github.com/stretchr/testify v1.8.3
go.opentelemetry.io/otel v1.16.0
go.opentelemetry.io/otel/sdk v1.16.0
go.opentelemetry.io/otel/trace v1.16.0
go.uber.org/zap v1.24.0
golang.org/x/exp v0.0.0-20230321023759-10a507213a29
golang.org/x/net v0.8.0
Expand All @@ -27,6 +30,8 @@ require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/fsnotify/fsnotify v1.6.0 // indirect
github.com/gin-contrib/sse v0.1.0 // indirect
github.com/go-logr/logr v1.2.4 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/go-playground/locales v0.14.1 // indirect
github.com/go-playground/universal-translator v0.18.1 // indirect
github.com/go-playground/validator/v10 v10.12.0 // indirect
Expand Down Expand Up @@ -55,11 +60,12 @@ require (
github.com/subosito/gotenv v1.4.2 // indirect
github.com/twitchyliquid64/golang-asm v0.15.1 // indirect
github.com/ugorji/go/codec v1.2.11 // indirect
go.opentelemetry.io/otel/metric v1.16.0 // indirect
go.uber.org/atomic v1.10.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
golang.org/x/arch v0.3.0 // indirect
golang.org/x/crypto v0.7.0 // indirect
golang.org/x/sys v0.6.0 // indirect
golang.org/x/sys v0.8.0 // indirect
golang.org/x/text v0.8.0 // indirect
golang.org/x/time v0.3.0 // indirect
google.golang.org/protobuf v1.30.0 // indirect
Expand Down
19 changes: 17 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,11 @@ github.com/gin-gonic/gin v1.9.0/go.mod h1:W1Me9+hsUSyj3CePGrd1/QrKJMSJ1Tu/0hFEH8
github.com/go-gl/glfw v0.0.0-20190409004039-e6da0acd62b1/go.mod h1:vR7hzQXu2zJy9AVAgeJqvqgH9Q5CA+iKCZ2gyEVpxRU=
github.com/go-gl/glfw/v3.3/glfw v0.0.0-20191125211704-12ad95a8df72/go.mod h1:tQ2UAYgL5IevRw8kRxooKSPJfGvJ9fJQFa0TUsXzTg8=
github.com/go-gl/glfw/v3.3/glfw v0.0.0-20200222043503-6f7a984d4dc4/go.mod h1:tQ2UAYgL5IevRw8kRxooKSPJfGvJ9fJQFa0TUsXzTg8=
github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
github.com/go-logr/logr v1.2.4 h1:g01GSCwiDw2xSZfjJ2/T9M+S6pFdcNtFYsp+Y43HYDQ=
github.com/go-logr/logr v1.2.4/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag=
github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE=
github.com/go-playground/assert/v2 v2.2.0 h1:JvknZsQTYeFEAhQwI4qEt9cyV5ONwRHC+lYKSsYSR8s=
github.com/go-playground/locales v0.14.1 h1:EWaQ/wswjilfKLTECiXz7Rh+3BjFhfDFKv/oXslEjJA=
github.com/go-playground/locales v0.14.1/go.mod h1:hxrqLVvrK65+Rwrd5Fc6F2O76J/NuW9t0sjnWqG1slY=
Expand Down Expand Up @@ -237,8 +242,9 @@ github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
github.com/stretchr/testify v1.8.2 h1:+h33VjcLVPDHtOdpUCuF+7gSuG3yGIftsP1YvFihtJ8=
github.com/stretchr/testify v1.8.2/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
github.com/stretchr/testify v1.8.3 h1:RP3t2pwF7cMEbC1dqtB6poj3niw/9gnV4Cjg5oW5gtY=
github.com/stretchr/testify v1.8.3/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
github.com/subosito/gotenv v1.4.2 h1:X1TuBLAMDFbaTAChgCBLu3DU3UPyELpnF2jjJ2cz/S8=
github.com/subosito/gotenv v1.4.2/go.mod h1:ayKnFf/c6rvx/2iiLrJUk1e6plDbT3edrFNGqEflhK0=
github.com/twitchyliquid64/golang-asm v0.15.1 h1:SU5vSMR7hnwNxj24w34ZyCi/FmDZTkS4MhqMhdFk5YI=
Expand All @@ -255,6 +261,14 @@ go.opencensus.io v0.22.2/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw=
go.opencensus.io v0.22.3/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw=
go.opencensus.io v0.22.4/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw=
go.opencensus.io v0.22.5/go.mod h1:5pWMHQbX5EPX2/62yrJeAkowc+lfs/XD7Uxpq3pI6kk=
go.opentelemetry.io/otel v1.16.0 h1:Z7GVAX/UkAXPKsy94IU+i6thsQS4nb7LviLpnaNeW8s=
go.opentelemetry.io/otel v1.16.0/go.mod h1:vl0h9NUa1D5s1nv3A5vZOYWn8av4K8Ml6JDeHrT/bx4=
go.opentelemetry.io/otel/metric v1.16.0 h1:RbrpwVG1Hfv85LgnZ7+txXioPDoh6EdbZHo26Q3hqOo=
go.opentelemetry.io/otel/metric v1.16.0/go.mod h1:QE47cpOmkwipPiefDwo2wDzwJrlfxxNYodqc4xnGCo4=
go.opentelemetry.io/otel/sdk v1.16.0 h1:Z1Ok1YsijYL0CSJpHt4cS3wDDh7p572grzNrBMiMWgE=
go.opentelemetry.io/otel/sdk v1.16.0/go.mod h1:tMsIuKXuuIWPBAOrH+eHtvhTL+SntFtXF9QD68aP6p4=
go.opentelemetry.io/otel/trace v1.16.0 h1:8JRpaObFoW0pxuVPapkgH8UhHQj+bJW8jJsCZEu5MQs=
go.opentelemetry.io/otel/trace v1.16.0/go.mod h1:Yt9vYq1SdNz3xdjZZK7wcXv1qv2pwLkqr2QVwea0ef0=
go.uber.org/atomic v1.10.0 h1:9qC72Qh0+3MqyJbAn8YU5xVq1frD8bn3JtD2oXtafVQ=
go.uber.org/atomic v1.10.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0=
go.uber.org/goleak v1.1.11 h1:wy28qYRKZgnJTxGxvye5/wgWr1EKjmUDGYox5mGlRlI=
Expand Down Expand Up @@ -401,8 +415,9 @@ golang.org/x/sys v0.0.0-20210423185535-09eb48e85fd7/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220704084225-05e143d24a9e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220908164124-27713097b956/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.6.0 h1:MVltZSvRTcU2ljQOhs94SXPftV6DCNnZViHeQps87pQ=
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.8.0 h1:EBmGv8NaZBZTWvrbjNoL6HVt+IVy3QDQpJs7VRIw3tU=
golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
Expand Down

0 comments on commit c3044c6

Please sign in to comment.