Skip to content

Commit

Permalink
When checking interest state, make sure to take into account filtered…
Browse files Browse the repository at this point in the history
… consumers when acking msgs.

Signed-off-by: Derek Collison <derek@nats.io>
  • Loading branch information
derekcollison authored and wallyqs committed Jul 10, 2024
1 parent 7c645cf commit 4b1f83f
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 3 deletions.
7 changes: 5 additions & 2 deletions server/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -5508,10 +5508,13 @@ func (o *consumer) checkStateForInterestStream() error {
o.mu.RUnlock()

// If we have pending, we will need to walk through to delivered in case we missed any of those acks as well.
if state != nil && len(state.Pending) > 0 {
if state != nil && len(state.Pending) > 0 && state.AckFloor.Stream > 0 {
for seq := state.AckFloor.Stream + 1; seq <= state.Delivered.Stream; seq++ {
if _, ok := state.Pending[seq]; !ok {
mset.ackMsg(o, seq)
// Want to call needAck since it is filter aware.
if o.needAck(seq, _EMPTY_) {
mset.ackMsg(o, seq)
}
}
}
}
Expand Down
50 changes: 49 additions & 1 deletion server/jetstream_cluster_2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7322,7 +7322,55 @@ func TestJetStreamClusterCompressedStreamMessages(t *testing.T) {
}
}

// https://github.com/nats-io/nats-server/issues/5612
func TestJetStreamClusterWorkQueueLosingMessagesOnConsumerDelete(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3F", 3)
defer c.shutdown()

s := c.randomServer()
nc, js := jsClientConnect(t, s)
defer nc.Close()

_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"1", "2", "3", "4", "5", "6", "7", "8", "9", "10"},
Retention: nats.WorkQueuePolicy,
Replicas: 3,
})
require_NoError(t, err)

msg := []byte("test alskdjalksdjalskdjaksdjlaksjdlkajsdlakjsdlakjsdlakjdlakjsdlaksjdlj")
for _, subj := range []string{"2", "5", "7", "9"} {
for i := 0; i < 10; i++ {
js.Publish(subj, msg)
}
}

cfg := &nats.ConsumerConfig{
Name: "test",
FilterSubjects: []string{"6", "7", "8", "9", "10"},
DeliverSubject: "bob",
AckPolicy: nats.AckExplicitPolicy,
AckWait: time.Minute,
MaxAckPending: 1,
}

_, err = nc.SubscribeSync("bob")
require_NoError(t, err)

for i := 0; i < 5; i++ {
_, err = js.AddConsumer("TEST", cfg)
require_NoError(t, err)
time.Sleep(time.Second)
js.DeleteConsumer("TEST", "test")
}

si, err := js.StreamInfo("TEST")
require_NoError(t, err)
require_Equal(t, si.State.Msgs, 40)
}

//
// DO NOT ADD NEW TESTS IN THIS FILE
// DO NOT ADD NEW TESTS IN THIS FILE (unless to balance test times)
// Add at the end of jetstream_cluster_<n>_test.go, with <n> being the highest value.
//

0 comments on commit 4b1f83f

Please sign in to comment.