Skip to content

Commit

Permalink
add v3 version of nats jetstream protocol with integration tests and …
Browse files Browse the repository at this point in the history
…samples

Signed-off-by: stephen-totty-hpe <stephen.totty@hpe.com>
  • Loading branch information
stephen-totty-hpe committed Oct 8, 2024
1 parent 78d3fba commit 9d9a30e
Show file tree
Hide file tree
Showing 15 changed files with 1,597 additions and 12 deletions.
27 changes: 27 additions & 0 deletions protocol/nats_jetstream/v3/go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
module github.com/cloudevents/sdk-go/protocol/nats_jetstream/v3

go 1.18

replace github.com/cloudevents/sdk-go/v2 => ../../../v2

require (
github.com/cloudevents/sdk-go/v2 v2.15.2
github.com/nats-io/nats.go v1.37.0
)

require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/google/go-cmp v0.5.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/klauspost/compress v1.17.9 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/nats-io/nkeys v0.4.7 // indirect
github.com/nats-io/nuid v1.0.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/stretchr/testify v1.8.0 // indirect
golang.org/x/crypto v0.27.0 // indirect
golang.org/x/sys v0.25.0 // indirect
golang.org/x/text v0.18.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
47 changes: 47 additions & 0 deletions protocol/nats_jetstream/v3/go.sum
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/google/go-cmp v0.5.0 h1:/QaMHBdZ26BB3SSst0Iwl10Epc+xhTquomWX0oZEB6w=
github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM=
github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo=
github.com/klauspost/compress v1.17.9 h1:6KIumPrER1LHsvBVuDa0r5xaG0Es51mhhB9BQB2qeMA=
github.com/klauspost/compress v1.17.9/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg=
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M=
github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk=
github.com/nats-io/nats.go v1.37.0 h1:07rauXbVnnJvv1gfIyghFEo6lUcYRY0WXc3x7x0vUxE=
github.com/nats-io/nats.go v1.37.0/go.mod h1:Ubdu4Nh9exXdSz0RVWRFBbRfrbSxOYd26oF0wkWclB8=
github.com/nats-io/nkeys v0.4.7 h1:RwNJbbIdYCoClSDNY7QVKZlyb/wfT6ugvFCiKy6vDvI=
github.com/nats-io/nkeys v0.4.7/go.mod h1:kqXRgRDPlGy7nGaEDMuYzmiJCIAAWDK0IMBtDmGD0nc=
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c=
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e h1:fD57ERR4JtEqsWbfPhv4DMiApHyliiK5xCTNVSPiaAs=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PKk=
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
go.uber.org/atomic v1.4.0 h1:cxzIVoETapQEqDhQu3QfnvXAV4AlzcvUCxkVUFw3+EU=
go.uber.org/multierr v1.1.0 h1:HoEmRHQPVSqub6w2z2d2EOVs2fjyFRGyofhKuyDq0QI=
go.uber.org/zap v1.10.0 h1:ORx85nbTijNz8ljznvCMR1ZBIPKFn3jQrag10X2AsuM=
golang.org/x/crypto v0.27.0 h1:GXm2NjJrPaiv/h1tb2UH8QfgC/hOf/+z0p6PT8o1w7A=
golang.org/x/crypto v0.27.0/go.mod h1:1Xngt8kV6Dvbssa53Ziq6Eqn0HqbZi5Z6R0ZpwQzt70=
golang.org/x/sys v0.25.0 h1:r+8e+loiHxRqhXVl6ML1nO3l1+oFoWbnlu2Ehimmi34=
golang.org/x/sys v0.25.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/text v0.18.0 h1:XvMDiNzPAl0jr17s6W9lcaIhGUfUORdGCNsuLmPG224=
golang.org/x/text v0.18.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f h1:BLraFXnmrev5lT+xlilqcH8XK9/i0At2xKjWk4p6zsU=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
153 changes: 153 additions & 0 deletions protocol/nats_jetstream/v3/message.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
/*
Copyright 2024 The CloudEvents Authors
SPDX-License-Identifier: Apache-2.0
*/

package nats_jetstream

import (
"bytes"
"context"
"errors"
"fmt"
"strings"

"github.com/nats-io/nats.go/jetstream"

"github.com/cloudevents/sdk-go/v2/binding"
"github.com/cloudevents/sdk-go/v2/binding/format"
"github.com/cloudevents/sdk-go/v2/binding/spec"
)

const (
// see https://github.com/cloudevents/spec/blob/main/cloudevents/bindings/nats-protocol-binding.md
prefix = "ce-"
contentTypeHeader = "content-type"
)

var (
specs = spec.WithPrefix(prefix)

// ErrNoVersion returned when no version header is found in the protocol header.
ErrNoVersion = errors.New("message does not contain version header")
)

// Message implements binding.Message by wrapping an jetstream.Msg.
// This message *can* be read several times safely
type Message struct {
Msg jetstream.Msg
encoding binding.Encoding
}

// NewMessage wraps an *nats.Msg in a binding.Message.
// The returned message *can* be read several times safely
// The default encoding returned is EncodingStructured unless the NATS message contains a specversion header.
func NewMessage(msg jetstream.Msg) *Message {
encoding := binding.EncodingStructured
if msg.Headers() != nil {
if msg.Headers().Get(specs.PrefixedSpecVersionName()) != "" {
encoding = binding.EncodingBinary
}
}
return &Message{Msg: msg, encoding: encoding}
}

var _ binding.Message = (*Message)(nil)

// ReadEncoding return the type of the message Encoding.
func (m *Message) ReadEncoding() binding.Encoding {
return m.encoding
}

// ReadStructured transfers a structured-mode event to a StructuredWriter.
func (m *Message) ReadStructured(ctx context.Context, encoder binding.StructuredWriter) error {
if m.encoding != binding.EncodingStructured {
return binding.ErrNotStructured
}
return encoder.SetStructuredEvent(ctx, format.JSON, bytes.NewReader(m.Msg.Data()))
}

// ReadBinary transfers a binary-mode event to an BinaryWriter.
func (m *Message) ReadBinary(ctx context.Context, encoder binding.BinaryWriter) error {
if m.encoding != binding.EncodingBinary {
return binding.ErrNotBinary
}

version := m.GetVersion()
if version == nil {
return ErrNoVersion
}

var err error
for k, v := range m.Msg.Headers() {
headerValue := v[0]
if strings.HasPrefix(k, prefix) {
attr := version.Attribute(k)
if attr != nil {
err = encoder.SetAttribute(attr, headerValue)
} else {
err = encoder.SetExtension(strings.TrimPrefix(k, prefix), headerValue)
}
} else if k == contentTypeHeader {
err = encoder.SetAttribute(version.AttributeFromKind(spec.DataContentType), headerValue)
}
if err != nil {
return err
}
}

if m.Msg.Data() != nil {
err = encoder.SetData(bytes.NewBuffer(m.Msg.Data()))
}

return err
}

// Finish *must* be called when message from a Receiver can be forgotten by the receiver.
func (m *Message) Finish(err error) error {
return nil
}

// GetAttribute implements binding.MessageMetadataReader
func (m *Message) GetAttribute(attributeKind spec.Kind) (spec.Attribute, interface{}) {
key := withPrefix(attributeKind.String())
if m.Msg.Headers() != nil {
version := m.GetVersion()
headerValue := m.Msg.Headers().Get(key)
if headerValue != "" {
return version.Attribute(key), headerValue
}
return version.Attribute(key), nil
}
// if the headers are nil, the version is also nil. Therefore return nil.
return nil, nil
}

// GetExtension implements binding.MessageMetadataReader
func (m *Message) GetExtension(name string) interface{} {
key := withPrefix(name)
if m.Msg.Headers() != nil {
headerValue := m.Msg.Headers().Get(key)
if headerValue != "" {
return headerValue
}
}
return nil
}

// GetVersion looks for specVersion header and returns a Version object
func (m *Message) GetVersion() spec.Version {
if m.Msg.Headers() == nil {
return nil
}
versionValue := m.Msg.Headers().Get(specs.PrefixedSpecVersionName())
if versionValue == "" {
return nil
}
return specs.Version(versionValue)
}

// withPrefix prepends the prefix to the attribute name
func withPrefix(attributeName string) string {
return fmt.Sprintf("%s%s", prefix, attributeName)
}
116 changes: 116 additions & 0 deletions protocol/nats_jetstream/v3/message_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
/*
Copyright 2024 The CloudEvents Authors
SPDX-License-Identifier: Apache-2.0
*/

package nats_jetstream

import (
"context"
"encoding/json"
"testing"

"github.com/cloudevents/sdk-go/v2/binding/spec"
bindingtest "github.com/cloudevents/sdk-go/v2/binding/test"

"github.com/cloudevents/sdk-go/v2/binding"
"github.com/cloudevents/sdk-go/v2/test"
"github.com/nats-io/nats.go"
"github.com/nats-io/nats.go/jetstream"
)

type jetStreamMsg struct {
jetstream.Msg
msg *nats.Msg
}

func (j *jetStreamMsg) Data() []byte { return j.msg.Data }
func (j *jetStreamMsg) Headers() nats.Header { return j.msg.Header }

var (
outBinaryMessage = bindingtest.MockBinaryMessage{
Metadata: map[spec.Attribute]interface{}{},
Extensions: map[string]interface{}{},
}
outStructMessage = bindingtest.MockStructuredMessage{}

testEvent = test.FullEvent()
binaryData, _ = json.Marshal(map[string]string{
"ce-type": testEvent.Type(),
"ce-source": testEvent.Source(),
"ce-id": testEvent.ID(),
"ce-time": test.Timestamp.String(),
"ce-specversion": "1.0",
"ce-dataschema": test.Schema.String(),
"ce-datacontenttype": "text/json",
"ce-subject": "receiverTopic",
"ce-exta": "someext",
})
structuredReceiverMessage = &jetStreamMsg{
msg: &nats.Msg{
Subject: "hello",
Data: binaryData,
},
}
binaryReceiverMessage = &jetStreamMsg{
msg: &nats.Msg{
Subject: "hello",
Data: testEvent.Data(),
Header: nats.Header{
"ce-type": {testEvent.Type()},
"ce-source": {testEvent.Source()},
"ce-id": {testEvent.ID()},
"ce-time": {test.Timestamp.String()},
"ce-specversion": {"1.0"},
"ce-dataschema": {test.Schema.String()},
"ce-datacontenttype": {"text/json"},
"ce-subject": {"receiverTopic"},
"ce-exta": {"someext"},
},
},
}
)

func TestNewMessage(t *testing.T) {
tests := []struct {
name string
receiverMessage jetstream.Msg
expectedEncoding binding.Encoding
expectedStructuredError error
expectedBinaryError error
}{
{
name: "Structured encoding",
receiverMessage: structuredReceiverMessage,
expectedEncoding: binding.EncodingStructured,
expectedStructuredError: nil,
expectedBinaryError: binding.ErrNotBinary,
},
{
name: "Binary encoding",
receiverMessage: binaryReceiverMessage,
expectedEncoding: binding.EncodingBinary,
expectedStructuredError: binding.ErrNotStructured,
expectedBinaryError: nil,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got := NewMessage(tt.receiverMessage)
if got == nil {
t.Errorf("Error in NewMessage!")
}
err := got.ReadBinary(context.TODO(), &outBinaryMessage)
if err != tt.expectedBinaryError {
t.Errorf("ReadBinary err:%s", err.Error())
}
err = got.ReadStructured(context.TODO(), &outStructMessage)
if err != tt.expectedStructuredError {
t.Errorf("ReadStructured err:%s", err.Error())
}
if got.ReadEncoding() != tt.expectedEncoding {
t.Errorf("ExpectedEncoding %s, while got %s", tt.expectedEncoding, got.ReadEncoding())
}
})
}
}
Loading

0 comments on commit 9d9a30e

Please sign in to comment.