Skip to content

Commit

Permalink
Make sure to not run away when acking from the middle of pending.
Browse files Browse the repository at this point in the history
Also optimize loop for interest based streams to ack msgs based on actual floor of pending.

Signed-off-by: Derek Collison <derek@nats.io>
  • Loading branch information
derekcollison authored and wallyqs committed May 20, 2024
1 parent 571dbf9 commit 57e780f
Show file tree
Hide file tree
Showing 3 changed files with 100 additions and 12 deletions.
32 changes: 24 additions & 8 deletions server/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2748,7 +2748,7 @@ func (o *consumer) processAckMsg(sseq, dseq, dc uint64, reply string, doSample b
return
}

var sagap uint64
var sgap, floor uint64
var needSignal bool

switch o.cfg.AckPolicy {
Expand Down Expand Up @@ -2792,12 +2792,29 @@ func (o *consumer) processAckMsg(sseq, dseq, dc uint64, reply string, doSample b
if o.maxp > 0 && len(o.pending) >= o.maxp {
needSignal = true
}
sagap = sseq - o.asflr
sgap = sseq - o.asflr
floor = sgap // start at same and set lower as we go.
o.adflr, o.asflr = dseq, sseq
for seq := sseq; seq > sseq-sagap && len(o.pending) > 0; seq-- {

remove := func(seq uint64) {
delete(o.pending, seq)
delete(o.rdc, seq)
o.removeFromRedeliverQueue(seq)
if seq < floor {
floor = seq
}
}
// Determine if smarter to walk all of pending vs the sequence range.
if sgap > uint64(len(o.pending)) {
for seq := range o.pending {
if seq <= sseq {
remove(seq)
}
}
} else {
for seq := sseq; seq > sseq-sgap && len(o.pending) > 0; seq-- {
remove(seq)
}
}
case AckNone:
// FIXME(dlc) - This is error but do we care?
Expand All @@ -2808,20 +2825,19 @@ func (o *consumer) processAckMsg(sseq, dseq, dc uint64, reply string, doSample b
// Update underlying store.
o.updateAcks(dseq, sseq, reply)

clustered := o.node != nil

// In case retention changes for a stream, this ought to have been updated
// using the consumer lock to avoid a race.
retention := o.retention
clustered := o.node != nil
o.mu.Unlock()

// Let the owning stream know if we are interest or workqueue retention based.
// If this consumer is clustered this will be handled by processReplicatedAck
// after the ack has propagated.
if !clustered && mset != nil && retention != LimitsPolicy {
if sagap > 1 {
// FIXME(dlc) - This is very inefficient, will need to fix.
for seq := sseq; seq > sseq-sagap; seq-- {
if sgap > 1 {
// FIXME(dlc) - This can very inefficient, will need to fix.
for seq := sseq; seq >= floor; seq-- {
mset.ackMsg(o, seq)
}
} else {
Expand Down
13 changes: 10 additions & 3 deletions server/filestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -8616,9 +8616,16 @@ func (o *consumerFileStore) UpdateAcks(dseq, sseq uint64) error {
sgap := sseq - o.state.AckFloor.Stream
o.state.AckFloor.Consumer = dseq
o.state.AckFloor.Stream = sseq
for seq := sseq; seq > sseq-sgap && len(o.state.Pending) > 0; seq-- {
delete(o.state.Pending, seq)
if len(o.state.Redelivered) > 0 {
if sgap > uint64(len(o.state.Pending)) {
for seq := range o.state.Pending {
if seq <= sseq {
delete(o.state.Pending, seq)
delete(o.state.Redelivered, seq)
}
}
} else {
for seq := sseq; seq > sseq-sgap && len(o.state.Pending) > 0; seq-- {
delete(o.state.Pending, seq)
delete(o.state.Redelivered, seq)
}
}
Expand Down
67 changes: 66 additions & 1 deletion server/jetstream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22523,7 +22523,7 @@ func TestJetStreamAckAllWithLargeFirstSequenceAndNoAckFloor(t *testing.T) {
js.Publish("foo.bar", []byte("hello"))
}

ss, err := js.PullSubscribe("foo.*", "TEST", nats.AckAll())
ss, err := js.PullSubscribe("foo.*", "C1", nats.AckAll())
require_NoError(t, err)
msgs, err := ss.Fetch(10, nats.MaxWait(100*time.Millisecond))
require_NoError(t, err)
Expand All @@ -22542,4 +22542,69 @@ func TestJetStreamAckAllWithLargeFirstSequenceAndNoAckFloor(t *testing.T) {

_, err = js.StreamInfo("TEST", nats.MaxWait(250*time.Millisecond))
require_NoError(t, err)

// Now make sure that if we ack in the middle, meaning we still have ack pending,
// that we do the right thing as well.
ss, err = js.PullSubscribe("foo.*", "C2", nats.AckAll())
require_NoError(t, err)
msgs, err = ss.Fetch(10, nats.MaxWait(100*time.Millisecond))
require_NoError(t, err)
require_Equal(t, len(msgs), 10)

start = time.Now()
msgs[5].AckSync()
if elapsed := time.Since(start); elapsed > 250*time.Millisecond {
t.Fatalf("AckSync took too long %v", elapsed)
}

// Make sure next fetch works right away with low timeout.
msgs, err = ss.Fetch(10, nats.MaxWait(100*time.Millisecond))
require_NoError(t, err)
require_Equal(t, len(msgs), 10)

_, err = js.StreamInfo("TEST", nats.MaxWait(250*time.Millisecond))
require_NoError(t, err)
}

func TestJetStreamAckAllWithLargeFirstSequenceAndNoAckFloorWithInterestPolicy(t *testing.T) {
s := RunBasicJetStreamServer(t)
defer s.Shutdown()

// Client for API requests.
nc, js := jsClientConnect(t, s)
defer nc.Close()

_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo.>"},
Retention: nats.InterestPolicy,
})
require_NoError(t, err)

// Set first sequence to something very big here. This shows the issue with AckAll the
// first time it is called and existing ack floor is 0.
err = js.PurgeStream("TEST", &nats.StreamPurgeRequest{Sequence: 10_000_000_000})
require_NoError(t, err)

ss, err := js.PullSubscribe("foo.*", "C1", nats.AckAll())
require_NoError(t, err)

// Now add in 100 msgs
for i := 0; i < 100; i++ {
js.Publish("foo.bar", []byte("hello"))
}

msgs, err := ss.Fetch(10, nats.MaxWait(100*time.Millisecond))
require_NoError(t, err)
require_Equal(t, len(msgs), 10)

start := time.Now()
msgs[5].AckSync()
if elapsed := time.Since(start); elapsed > 250*time.Millisecond {
t.Fatalf("AckSync took too long %v", elapsed)
}

// We are testing for run away loops acking messages in the stream that are not there.
_, err = js.StreamInfo("TEST", nats.MaxWait(100*time.Millisecond))
require_NoError(t, err)
}

0 comments on commit 57e780f

Please sign in to comment.