From 4916a4fe945cb265798d68c165a0dcccde93b584 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Wed, 10 Jul 2024 13:12:30 -0700 Subject: [PATCH] When checking interest state, make sure to take into account filtered consumers when acking msgs. Signed-off-by: Derek Collison --- server/consumer.go | 7 +++-- server/jetstream_cluster_2_test.go | 50 +++++++++++++++++++++++++++++- 2 files changed, 54 insertions(+), 3 deletions(-) diff --git a/server/consumer.go b/server/consumer.go index 85a5fe90d07..308fa0e6bed 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -5610,10 +5610,13 @@ func (o *consumer) checkStateForInterestStream() error { o.mu.RUnlock() // If we have pending, we will need to walk through to delivered in case we missed any of those acks as well. - if state != nil && len(state.Pending) > 0 { + if state != nil && len(state.Pending) > 0 && state.AckFloor.Stream > 0 { for seq := state.AckFloor.Stream + 1; seq <= state.Delivered.Stream; seq++ { if _, ok := state.Pending[seq]; !ok { - mset.ackMsg(o, seq) + // Want to call needAck since it is filter aware. + if o.needAck(seq, _EMPTY_) { + mset.ackMsg(o, seq) + } } } } diff --git a/server/jetstream_cluster_2_test.go b/server/jetstream_cluster_2_test.go index c551155ab69..b5dd2c16f4f 100644 --- a/server/jetstream_cluster_2_test.go +++ b/server/jetstream_cluster_2_test.go @@ -7322,7 +7322,55 @@ func TestJetStreamClusterCompressedStreamMessages(t *testing.T) { } } +// https://github.com/nats-io/nats-server/issues/5612 +func TestJetStreamClusterWorkQueueLosingMessagesOnConsumerDelete(t *testing.T) { + c := createJetStreamClusterExplicit(t, "R3F", 3) + defer c.shutdown() + + s := c.randomServer() + nc, js := jsClientConnect(t, s) + defer nc.Close() + + _, err := js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"1", "2", "3", "4", "5", "6", "7", "8", "9", "10"}, + Retention: nats.WorkQueuePolicy, + Replicas: 3, + }) + require_NoError(t, err) + + msg := []byte("test alskdjalksdjalskdjaksdjlaksjdlkajsdlakjsdlakjsdlakjdlakjsdlaksjdlj") + for _, subj := range []string{"2", "5", "7", "9"} { + for i := 0; i < 10; i++ { + js.Publish(subj, msg) + } + } + + cfg := &nats.ConsumerConfig{ + Name: "test", + FilterSubjects: []string{"6", "7", "8", "9", "10"}, + DeliverSubject: "bob", + AckPolicy: nats.AckExplicitPolicy, + AckWait: time.Minute, + MaxAckPending: 1, + } + + _, err = nc.SubscribeSync("bob") + require_NoError(t, err) + + for i := 0; i < 5; i++ { + _, err = js.AddConsumer("TEST", cfg) + require_NoError(t, err) + time.Sleep(time.Second) + js.DeleteConsumer("TEST", "test") + } + + si, err := js.StreamInfo("TEST") + require_NoError(t, err) + require_Equal(t, si.State.Msgs, 40) +} + // -// DO NOT ADD NEW TESTS IN THIS FILE +// DO NOT ADD NEW TESTS IN THIS FILE (unless to balance test times) // Add at the end of jetstream_cluster__test.go, with being the highest value. //