Skip to content

Commit

Permalink
Add NATS Ack/Nak to nats jetstream V3 Finish
Browse files Browse the repository at this point in the history
Signed-off-by: stephen-totty-hpe <stephen.totty@hpe.com>
  • Loading branch information
stephen-totty-hpe committed Oct 22, 2024
1 parent 70fb951 commit eb93b65
Show file tree
Hide file tree
Showing 5 changed files with 157 additions and 6 deletions.
30 changes: 30 additions & 0 deletions protocol/nats_jetstream/v3/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/cloudevents/sdk-go/v2/binding"
"github.com/cloudevents/sdk-go/v2/binding/format"
"github.com/cloudevents/sdk-go/v2/binding/spec"
"github.com/cloudevents/sdk-go/v2/protocol"
)

const (
Expand Down Expand Up @@ -105,6 +106,35 @@ func (m *Message) ReadBinary(ctx context.Context, encoder binding.BinaryWriter)

// Finish *must* be called when message from a Receiver can be forgotten by the receiver.
func (m *Message) Finish(err error) error {
// Ack and Nak first checks to see if the message has been acknowleged
// and if Ack/Nak was done, it immediately returns an error without applying any logic to the message on the server.
// Nak will only be sent if the error given is explictly a NACK error(protocol.ResultNACK).
// AckPolicy effects if an explict Ack/Nak is needed.
// AckExplicit: The default policy. Each individual message must be acknowledged.
// Recommended for most reliability and functionality.
// AckNone: No acknowledgment needed; the server assumes acknowledgment on delivery.
// AckAll: Acknowledge only the last message received in a series; all previous messages are automatically acknowledged.
// Will acknowledge all pending messages for all subscribers for Pull Consumer.
// see: github.com/nats-io/nats.go/jetstream/ConsumerConfig.AckPolicy
if m.Msg == nil {
return nil
}
if protocol.IsNACK(err) {
if err = m.Msg.Nak(); err != jetstream.ErrMsgAlreadyAckd {
return err
}
}
if protocol.IsACK(err) {
if err = m.Msg.Ack(); err != jetstream.ErrMsgAlreadyAckd {
return err
}
}

// In the case that we receive an unknown error, the intent of whether the message should Ack/Nak is unknown.
// When this happens, the ack/nak behavior will be based on the consumer configuration. There are several options such as:
// AckPolicy, AckWait, MaxDeliver, MaxAckPending
// that determine how messages would be redelivered by the server.
// [consumers configuration]: https://docs.nats.io/nats-concepts/jetstream/consumers#configuration
return nil
}

Expand Down
123 changes: 122 additions & 1 deletion protocol/nats_jetstream/v3/message_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,13 @@ import (
"bytes"
"context"
"encoding/json"
"errors"
"reflect"
"testing"

"github.com/cloudevents/sdk-go/v2/binding/spec"
bindingtest "github.com/cloudevents/sdk-go/v2/binding/test"
"github.com/cloudevents/sdk-go/v2/protocol"

"github.com/cloudevents/sdk-go/v2/binding"
"github.com/cloudevents/sdk-go/v2/test"
Expand All @@ -23,11 +25,17 @@ import (

type jetStreamMsg struct {
jetstream.Msg
msg *nats.Msg
msg *nats.Msg
ackCalled bool
ackErr error
nackCalled bool
nackErr error
}

func (j *jetStreamMsg) Data() []byte { return j.msg.Data }
func (j *jetStreamMsg) Headers() nats.Header { return j.msg.Header }
func (j *jetStreamMsg) Ack() error { j.ackCalled = true; return j.ackErr }
func (j *jetStreamMsg) Nak() error { j.nackCalled = true; return j.nackErr }

var (
outBinaryMessage = bindingtest.MockBinaryMessage{
Expand Down Expand Up @@ -190,3 +198,116 @@ func TestGetExtension(t *testing.T) {
})
}
}

func TestFinish(t *testing.T) {
type args struct {
err error
ackErr error
nakErr error
}
type wants struct {
err error
ackCalled bool
nackCalled bool
}
tests := []struct {
name string
args args
wants wants
}{
{
name: "nil error given",
args: args{
err: nil,
},
wants: wants{
err: nil,
ackCalled: true,
nackCalled: false,
},
},
{
name: "ACK error given",
args: args{
err: protocol.ResultACK,
},
wants: wants{
err: nil,
ackCalled: true,
nackCalled: false,
},
},
{
name: "NACK error given",
args: args{
err: protocol.ResultNACK,
},
wants: wants{
err: nil,
ackCalled: false,
nackCalled: true,
},
},
{
name: "unknown error given",
args: args{
err: errors.New("unknown"),
},
wants: wants{
err: nil,
ackCalled: false,
nackCalled: false,
},
},
{
name: "jetstream.ErrMsgAlreadyAckd error returned from Ack",
args: args{
err: protocol.ResultACK,
ackErr: jetstream.ErrMsgAlreadyAckd,
},
wants: wants{
err: nil,
ackCalled: true,
nackCalled: false,
},
},
{
name: "jetstream.ErrMsgAlreadyAckd error returned from Nak",
args: args{
err: protocol.ResultNACK,
nakErr: jetstream.ErrMsgAlreadyAckd,
},
wants: wants{
err: nil,
ackCalled: false,
nackCalled: true,
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
binaryReceiverMessage.ackCalled = false
binaryReceiverMessage.ackErr = tt.args.ackErr
binaryReceiverMessage.nackCalled = false
binaryReceiverMessage.nackErr = tt.args.nakErr
message := NewMessage(binaryReceiverMessage)
if message == nil {
t.Errorf("Error in NewMessage!")
}
gotErr := message.Finish(tt.args.err)
if gotErr != tt.wants.err {
t.Errorf("ExpectedErr %s, while got %s", tt.wants.err, gotErr)
}
var mockMessage *jetStreamMsg
if message != nil {
mockMessage = message.Msg.(*jetStreamMsg)
}
if mockMessage.ackCalled != tt.wants.ackCalled {
t.Errorf("ExpectedAck %t, while got %t", tt.wants.ackCalled, mockMessage.ackCalled)
}
if mockMessage.nackCalled != tt.wants.nackCalled {
t.Errorf("ExpectedNack %t, while got %t", tt.wants.nackCalled, mockMessage.nackCalled)
}
})
}
}
2 changes: 1 addition & 1 deletion protocol/nats_jetstream/v3/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func WithConnection(conn *nats.Conn) ProtocolOption {
// WithJetStreamOptions sets jetstream options used in the protocol sender and receiver
func WithJetStreamOptions(jetStreamOpts []jetstream.JetStreamOpt) ProtocolOption {
return func(p *Protocol) error {
p.jetSteamOpts = jetStreamOpts
p.jetStreamOpts = jetStreamOpts
return nil
}
}
Expand Down
2 changes: 1 addition & 1 deletion protocol/nats_jetstream/v3/options_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -474,7 +474,7 @@ func TestWithJetStreamOptions(t *testing.T) {
wants: wants{
err: nil,
protocol: &Protocol{
jetSteamOpts: jetStreamOpts,
jetStreamOpts: jetStreamOpts,
},
},
},
Expand Down
6 changes: 3 additions & 3 deletions protocol/nats_jetstream/v3/protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ type Protocol struct {
natsOpts []nats.Option

// jetstream options
jetSteamOpts []jetstream.JetStreamOpt
jetStream jetstream.JetStream
jetStreamOpts []jetstream.JetStreamOpt
jetStream jetstream.JetStream

// receiver
incoming chan msgErr
Expand Down Expand Up @@ -76,7 +76,7 @@ func New(ctx context.Context, opts ...ProtocolOption) (*Protocol, error) {
}
}

if p.jetStream, errConnection = jetstream.New(p.conn, p.jetSteamOpts...); errConnection != nil {
if p.jetStream, errConnection = jetstream.New(p.conn, p.jetStreamOpts...); errConnection != nil {
return nil, errConnection
}

Expand Down

0 comments on commit eb93b65

Please sign in to comment.