From 23de885fdb1158858adbbb7ba7ee5a330f915f10 Mon Sep 17 00:00:00 2001 From: Neil Twigg Date: Thu, 17 Oct 2024 16:51:25 +0100 Subject: [PATCH] Revert "Fix consumer start sequence when sequence not yet in stream" This reverts commit ec54164df357fb842ae2b636c56ad99f059bf418. See discussion nats-io/nats-server#6005. --- server/consumer.go | 16 +++++------- server/jetstream_test.go | 56 ---------------------------------------- 2 files changed, 6 insertions(+), 66 deletions(-) diff --git a/server/consumer.go b/server/consumer.go index 9ef8fa1b7a..849fb1c536 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -4907,16 +4907,12 @@ func (o *consumer) selectStartingSeqNo() { o.sseq = o.cfg.OptStartSeq } - // Only clip the sseq if the OptStartSeq is not provided, otherwise - // it's possible that the stream just doesn't contain OptStartSeq yet. - if o.cfg.OptStartSeq == 0 { - if state.FirstSeq == 0 { - o.sseq = 1 - } else if o.sseq < state.FirstSeq { - o.sseq = state.FirstSeq - } else if o.sseq > state.LastSeq { - o.sseq = state.LastSeq + 1 - } + if state.FirstSeq == 0 { + o.sseq = 1 + } else if o.sseq < state.FirstSeq { + o.sseq = state.FirstSeq + } else if o.sseq > state.LastSeq { + o.sseq = state.LastSeq + 1 } } diff --git a/server/jetstream_test.go b/server/jetstream_test.go index fe95d5009c..d69ddd29a4 100644 --- a/server/jetstream_test.go +++ b/server/jetstream_test.go @@ -22836,62 +22836,6 @@ func TestJetStreamConsumerInfoNumPending(t *testing.T) { require_Equal(t, ci.NumPending, 100) } -func TestJetStreamConsumerStartSequenceNotInStream(t *testing.T) { - // This test is checking that we still correctly set the start - // sequence of a consumer if that start sequence doesn't appear - // in the stream yet. Previously this would have been clipped - // back to between the first and last seq from the stream state. - - s := RunBasicJetStreamServer(t) - defer s.Shutdown() - - nc, js := jsClientConnect(t, s) - defer nc.Close() - - _, err := js.AddStream(&nats.StreamConfig{ - Name: "TEST", - Subjects: []string{"test"}, - }) - require_NoError(t, err) - - sub, err := js.PullSubscribe("test", "test_consumer", nats.StartSequence(10)) - require_NoError(t, err) - - stream, err := s.gacc.lookupStream("TEST") - require_NoError(t, err) - consumer := stream.lookupConsumer("test_consumer") - - func() { - consumer.mu.RLock() - defer consumer.mu.RUnlock() - - require_Equal(t, consumer.dseq, 1) - require_Equal(t, consumer.sseq, 10) - }() - - for i := 1; i <= 10; i++ { - _, err = js.Publish("test", []byte{byte(i)}) - require_NoError(t, err) - } - - msgs, err := sub.Fetch(1) - require_NoError(t, err) - require_Len(t, len(msgs), 1) - require_Equal(t, msgs[0].Data[0], 10) - - require_NoError(t, msgs[0].AckSync()) - - func() { - consumer.mu.RLock() - defer consumer.mu.RUnlock() - - require_Equal(t, consumer.dseq, 2) - require_Equal(t, consumer.adflr, 1) - require_Equal(t, consumer.sseq, 11) - require_Equal(t, consumer.asflr, 10) - }() -} - func TestJetStreamInterestStreamWithDuplicateMessages(t *testing.T) { s := RunBasicJetStreamServer(t) defer s.Shutdown()