diff --git a/events/events.go b/events/events.go index b7d2832..c42e459 100644 --- a/events/events.go +++ b/events/events.go @@ -28,6 +28,11 @@ const ( Delete EventType = "delete" ) +// when updating any of the interfaces here, make sure to regenerate the mocks +// +// mockgen -package=mock_events -source=events/events.go > events/mock/events.go +// + // Stream provides methods to interact with the event stream. type Stream interface { // Open sets up the stream connection. @@ -72,6 +77,9 @@ type Message interface { // Data returns the data contained in the message. Data() []byte + + // ExtractOtelTraceContext returns a context populated with the parent trace if any. + ExtractOtelTraceContext(ctx context.Context) context.Context } // NewStream returns a Stream implementation. diff --git a/events/mock/events.go b/events/mock/events.go index 4b58a3e..aee4343 100644 --- a/events/mock/events.go +++ b/events/mock/events.go @@ -1,5 +1,5 @@ // Code generated by MockGen. DO NOT EDIT. -// Source: events.go +// Source: events/events.go // Package mock_events is a generated GoMock package. package mock_events @@ -181,6 +181,20 @@ func (mr *MockMessageMockRecorder) Data() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Data", reflect.TypeOf((*MockMessage)(nil).Data)) } +// ExtractOtelTraceContext mocks base method. +func (m *MockMessage) ExtractOtelTraceContext(ctx context.Context) context.Context { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "ExtractOtelTraceContext", ctx) + ret0, _ := ret[0].(context.Context) + return ret0 +} + +// ExtractOtelTraceContext indicates an expected call of ExtractOtelTraceContext. +func (mr *MockMessageMockRecorder) ExtractOtelTraceContext(ctx interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ExtractOtelTraceContext", reflect.TypeOf((*MockMessage)(nil).ExtractOtelTraceContext), ctx) +} + // InProgress mocks base method. func (m *MockMessage) InProgress() error { m.ctrl.T.Helper() diff --git a/events/nats.go b/events/nats.go index 5c10a5c..1c7dcf3 100644 --- a/events/nats.go +++ b/events/nats.go @@ -11,6 +11,8 @@ import ( "github.com/hashicorp/go-multierror" "github.com/nats-io/nats.go" "github.com/pkg/errors" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/propagation" ) var ( @@ -226,7 +228,7 @@ func (n *NatsJetstream) addConsumer() error { // Publish publishes an event onto the NATS Jetstream. The caller is responsible for message // addressing and data serialization. NOTE: The subject passed here will be prepended with any // configured PublisherSubjectPrefix. -func (n *NatsJetstream) Publish(_ context.Context, subjectSuffix string, data []byte) error { +func (n *NatsJetstream) Publish(ctx context.Context, subjectSuffix string, data []byte) error { if n.jsctx == nil { return errors.Wrap(ErrNatsJetstreamAddConsumer, "Jetstream context is not setup") } @@ -242,10 +244,24 @@ func (n *NatsJetstream) Publish(_ context.Context, subjectSuffix string, data [] subjectSuffix, }, ".") - _, err := n.jsctx.Publish(fullSubject, data, options...) + msg := nats.NewMsg(fullSubject) + msg.Data = data + + // inject otel trace context + injectOtelTraceContext(ctx, msg) + + _, err := n.jsctx.PublishMsg(msg, options...) return err } +func injectOtelTraceContext(ctx context.Context, msg *nats.Msg) { + if msg.Header == nil { + msg.Header = make(nats.Header) + } + + otel.GetTextMapPropagator().Inject(ctx, propagation.HeaderCarrier(msg.Header)) +} + // Subscribe to all configured SubscribeSubjects func (n *NatsJetstream) Subscribe(ctx context.Context) (MsgCh, error) { if n.jsctx == nil { diff --git a/events/nats_message.go b/events/nats_message.go index 886d423..424097c 100644 --- a/events/nats_message.go +++ b/events/nats_message.go @@ -2,8 +2,12 @@ package events import ( + "context" + "github.com/nats-io/nats.go" "github.com/pkg/errors" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/propagation" ) // here we implement the Message interface for nats.Msg @@ -50,6 +54,14 @@ func (nm *natsMsg) Data() []byte { return nm.msg.Data } +func (nm *natsMsg) ExtractOtelTraceContext(ctx context.Context) context.Context { + if nm == nil || nm.msg.Header == nil { + return ctx + } + + return otel.GetTextMapPropagator().Extract(ctx, propagation.HeaderCarrier(nm.msg.Header)) +} + func msgIfFromNats(natsMsgs ...*nats.Msg) []Message { msgs := make([]Message, 0, len(natsMsgs)) for _, m := range natsMsgs { diff --git a/events/nats_message_test.go b/events/nats_message_test.go index a03467c..e7caf2a 100644 --- a/events/nats_message_test.go +++ b/events/nats_message_test.go @@ -2,6 +2,7 @@ package events import ( + "context" "testing" "github.com/nats-io/nats.go" @@ -34,6 +35,10 @@ func (_ *bogusMsg) Data() []byte { return nil } +func (_ *bogusMsg) ExtractOtelTraceContext(ctx context.Context) context.Context { + return ctx +} + func TestConversions(t *testing.T) { nm := &natsMsg{ msg: nats.NewMsg("some.subject"), diff --git a/events/nats_test.go b/events/nats_test.go index d363ccf..c43c251 100644 --- a/events/nats_test.go +++ b/events/nats_test.go @@ -6,7 +6,12 @@ import ( "testing" "github.com/nats-io/nats.go" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/propagation" + traceSDK "go.opentelemetry.io/otel/sdk/trace" + "go.opentelemetry.io/otel/trace" natsTest "go.hollow.sh/toolbox/events/internal/test" ) @@ -84,3 +89,49 @@ func TestPublishAndSubscribe(t *testing.T) { require.Equal(t, 1, len(msgs)) require.Equal(t, payload, msgs[0].Data()) } + +func TestInjectOtelTraceContext(t *testing.T) { + // set the tracing propagator so its available for injection + otel.SetTextMapPropagator( + propagation.NewCompositeTextMapPropagator(propagation.TraceContext{}), + ) + + // setup a new trace provider + ctx, span := traceSDK.NewTracerProvider().Tracer("testing").Start(context.Background(), "foo.bar") + defer span.End() + + msg := nats.NewMsg("foo.bar") + msg.Data = []byte(`hello`) + + injectOtelTraceContext(ctx, msg) + + assert.NotEmpty(t, msg.Header.Get("Traceparent")) +} + +func TestExtractOtelTraceContext(t *testing.T) { + // set the tracing propagator so its available for injection + otel.SetTextMapPropagator( + propagation.NewCompositeTextMapPropagator(propagation.TraceContext{}), + ) + + // setup a new trace provider + ctx, span := traceSDK.NewTracerProvider().Tracer("testing").Start(context.Background(), "foo.bar") + defer span.End() + + msg := nats.NewMsg("foo.bar") + msg.Data = []byte(`hello`) + + // inject + injectOtelTraceContext(ctx, msg) + + // msg header gets a trace parent added + traceParent := msg.Header.Get("Traceparent") + + // wrap natsMsg to pass to extract method + nm := &natsMsg{msg} + + ctxWithTrace := nm.ExtractOtelTraceContext(context.Background()) + got := trace.SpanFromContext(ctxWithTrace).SpanContext().TraceID().String() + + assert.Contains(t, traceParent, got) +} diff --git a/go.mod b/go.mod index f0d3e59..652ffef 100644 --- a/go.mod +++ b/go.mod @@ -14,7 +14,10 @@ require ( github.com/spf13/cobra v1.6.1 github.com/spf13/pflag v1.0.5 github.com/spf13/viper v1.15.0 - github.com/stretchr/testify v1.8.2 + github.com/stretchr/testify v1.8.3 + go.opentelemetry.io/otel v1.16.0 + go.opentelemetry.io/otel/sdk v1.16.0 + go.opentelemetry.io/otel/trace v1.16.0 go.uber.org/zap v1.24.0 golang.org/x/exp v0.0.0-20230321023759-10a507213a29 golang.org/x/net v0.8.0 @@ -27,6 +30,8 @@ require ( github.com/davecgh/go-spew v1.1.1 // indirect github.com/fsnotify/fsnotify v1.6.0 // indirect github.com/gin-contrib/sse v0.1.0 // indirect + github.com/go-logr/logr v1.2.4 // indirect + github.com/go-logr/stdr v1.2.2 // indirect github.com/go-playground/locales v0.14.1 // indirect github.com/go-playground/universal-translator v0.18.1 // indirect github.com/go-playground/validator/v10 v10.12.0 // indirect @@ -55,11 +60,12 @@ require ( github.com/subosito/gotenv v1.4.2 // indirect github.com/twitchyliquid64/golang-asm v0.15.1 // indirect github.com/ugorji/go/codec v1.2.11 // indirect + go.opentelemetry.io/otel/metric v1.16.0 // indirect go.uber.org/atomic v1.10.0 // indirect go.uber.org/multierr v1.11.0 // indirect golang.org/x/arch v0.3.0 // indirect golang.org/x/crypto v0.7.0 // indirect - golang.org/x/sys v0.6.0 // indirect + golang.org/x/sys v0.8.0 // indirect golang.org/x/text v0.8.0 // indirect golang.org/x/time v0.3.0 // indirect google.golang.org/protobuf v1.30.0 // indirect diff --git a/go.sum b/go.sum index 8654fb3..6a04055 100644 --- a/go.sum +++ b/go.sum @@ -73,6 +73,11 @@ github.com/gin-gonic/gin v1.9.0/go.mod h1:W1Me9+hsUSyj3CePGrd1/QrKJMSJ1Tu/0hFEH8 github.com/go-gl/glfw v0.0.0-20190409004039-e6da0acd62b1/go.mod h1:vR7hzQXu2zJy9AVAgeJqvqgH9Q5CA+iKCZ2gyEVpxRU= github.com/go-gl/glfw/v3.3/glfw v0.0.0-20191125211704-12ad95a8df72/go.mod h1:tQ2UAYgL5IevRw8kRxooKSPJfGvJ9fJQFa0TUsXzTg8= github.com/go-gl/glfw/v3.3/glfw v0.0.0-20200222043503-6f7a984d4dc4/go.mod h1:tQ2UAYgL5IevRw8kRxooKSPJfGvJ9fJQFa0TUsXzTg8= +github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= +github.com/go-logr/logr v1.2.4 h1:g01GSCwiDw2xSZfjJ2/T9M+S6pFdcNtFYsp+Y43HYDQ= +github.com/go-logr/logr v1.2.4/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= +github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= +github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= github.com/go-playground/assert/v2 v2.2.0 h1:JvknZsQTYeFEAhQwI4qEt9cyV5ONwRHC+lYKSsYSR8s= github.com/go-playground/locales v0.14.1 h1:EWaQ/wswjilfKLTECiXz7Rh+3BjFhfDFKv/oXslEjJA= github.com/go-playground/locales v0.14.1/go.mod h1:hxrqLVvrK65+Rwrd5Fc6F2O76J/NuW9t0sjnWqG1slY= @@ -237,8 +242,9 @@ github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/ github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= -github.com/stretchr/testify v1.8.2 h1:+h33VjcLVPDHtOdpUCuF+7gSuG3yGIftsP1YvFihtJ8= github.com/stretchr/testify v1.8.2/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= +github.com/stretchr/testify v1.8.3 h1:RP3t2pwF7cMEbC1dqtB6poj3niw/9gnV4Cjg5oW5gtY= +github.com/stretchr/testify v1.8.3/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= github.com/subosito/gotenv v1.4.2 h1:X1TuBLAMDFbaTAChgCBLu3DU3UPyELpnF2jjJ2cz/S8= github.com/subosito/gotenv v1.4.2/go.mod h1:ayKnFf/c6rvx/2iiLrJUk1e6plDbT3edrFNGqEflhK0= github.com/twitchyliquid64/golang-asm v0.15.1 h1:SU5vSMR7hnwNxj24w34ZyCi/FmDZTkS4MhqMhdFk5YI= @@ -255,6 +261,14 @@ go.opencensus.io v0.22.2/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= go.opencensus.io v0.22.3/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= go.opencensus.io v0.22.4/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= go.opencensus.io v0.22.5/go.mod h1:5pWMHQbX5EPX2/62yrJeAkowc+lfs/XD7Uxpq3pI6kk= +go.opentelemetry.io/otel v1.16.0 h1:Z7GVAX/UkAXPKsy94IU+i6thsQS4nb7LviLpnaNeW8s= +go.opentelemetry.io/otel v1.16.0/go.mod h1:vl0h9NUa1D5s1nv3A5vZOYWn8av4K8Ml6JDeHrT/bx4= +go.opentelemetry.io/otel/metric v1.16.0 h1:RbrpwVG1Hfv85LgnZ7+txXioPDoh6EdbZHo26Q3hqOo= +go.opentelemetry.io/otel/metric v1.16.0/go.mod h1:QE47cpOmkwipPiefDwo2wDzwJrlfxxNYodqc4xnGCo4= +go.opentelemetry.io/otel/sdk v1.16.0 h1:Z1Ok1YsijYL0CSJpHt4cS3wDDh7p572grzNrBMiMWgE= +go.opentelemetry.io/otel/sdk v1.16.0/go.mod h1:tMsIuKXuuIWPBAOrH+eHtvhTL+SntFtXF9QD68aP6p4= +go.opentelemetry.io/otel/trace v1.16.0 h1:8JRpaObFoW0pxuVPapkgH8UhHQj+bJW8jJsCZEu5MQs= +go.opentelemetry.io/otel/trace v1.16.0/go.mod h1:Yt9vYq1SdNz3xdjZZK7wcXv1qv2pwLkqr2QVwea0ef0= go.uber.org/atomic v1.10.0 h1:9qC72Qh0+3MqyJbAn8YU5xVq1frD8bn3JtD2oXtafVQ= go.uber.org/atomic v1.10.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0= go.uber.org/goleak v1.1.11 h1:wy28qYRKZgnJTxGxvye5/wgWr1EKjmUDGYox5mGlRlI= @@ -401,8 +415,9 @@ golang.org/x/sys v0.0.0-20210423185535-09eb48e85fd7/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220704084225-05e143d24a9e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220908164124-27713097b956/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.6.0 h1:MVltZSvRTcU2ljQOhs94SXPftV6DCNnZViHeQps87pQ= golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.8.0 h1:EBmGv8NaZBZTWvrbjNoL6HVt+IVy3QDQpJs7VRIw3tU= +golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=