From 6c4c9f473df2d89d88ef444f7d1ba50ef1f669d9 Mon Sep 17 00:00:00 2001 From: Piotr Piotrowski Date: Wed, 10 Jan 2024 23:43:03 +0100 Subject: [PATCH 1/2] [FIXED] Invalid checkPending logic Signed-off-by: Piotr Piotrowski --- jetstream/jetstream_test.go | 181 ++++++++++++++++++++++++++++++++++++ jetstream/pull.go | 15 ++- 2 files changed, 192 insertions(+), 4 deletions(-) diff --git a/jetstream/jetstream_test.go b/jetstream/jetstream_test.go index 93e2026d9..baa4a8fb4 100644 --- a/jetstream/jetstream_test.go +++ b/jetstream/jetstream_test.go @@ -274,3 +274,184 @@ func TestRetryWithBackoff(t *testing.T) { }) } } + +func TestPullConsumer_checkPending(t *testing.T) { + tests := []struct { + name string + givenSub *pullSubscription + shouldSend bool + expectedPullRequest *pullRequest + }{ + { + name: "msgs threshold not reached, bytes not set, no pull request", + givenSub: &pullSubscription{ + pending: pendingMsgs{ + msgCount: 10, + }, + consumeOpts: &consumeOpts{ + ThresholdMessages: 5, + MaxMessages: 10, + }, + fetchInProgress: 0, + }, + shouldSend: false, + }, + { + name: "pending msgs below threshold, send pull request", + givenSub: &pullSubscription{ + pending: pendingMsgs{ + msgCount: 4, + }, + consumeOpts: &consumeOpts{ + ThresholdMessages: 5, + MaxMessages: 10, + }, + fetchInProgress: 0, + }, + shouldSend: true, + expectedPullRequest: &pullRequest{ + Batch: 6, + MaxBytes: 0, + }, + }, + { + name: "pending msgs below threshold but PR in progress", + givenSub: &pullSubscription{ + pending: pendingMsgs{ + msgCount: 4, + }, + consumeOpts: &consumeOpts{ + ThresholdMessages: 5, + MaxMessages: 10, + }, + fetchInProgress: 1, + }, + shouldSend: false, + }, + { + name: "pending bytes below threshold, send pull request", + givenSub: &pullSubscription{ + pending: pendingMsgs{ + byteCount: 400, + msgCount: 1000000, // msgs count should be ignored + }, + consumeOpts: &consumeOpts{ + MaxMessages: 1000000, + ThresholdBytes: 500, + MaxBytes: 1000, + }, + fetchInProgress: 0, + }, + shouldSend: true, + expectedPullRequest: &pullRequest{ + Batch: 1000000, + MaxBytes: 600, + }, + }, + { + name: "pending bytes above threshold, no pull request", + givenSub: &pullSubscription{ + pending: pendingMsgs{ + byteCount: 600, + }, + consumeOpts: &consumeOpts{ + ThresholdBytes: 500, + MaxBytes: 1000, + }, + fetchInProgress: 0, + }, + shouldSend: false, + }, + { + name: "pending bytes below threshold, fetch in progress, no pull request", + givenSub: &pullSubscription{ + pending: pendingMsgs{ + byteCount: 400, + }, + consumeOpts: &consumeOpts{ + ThresholdBytes: 500, + MaxBytes: 1000, + }, + fetchInProgress: 1, + }, + shouldSend: false, + }, + { + name: "StopAfter set, pending msgs below StopAfter, send pull request", + givenSub: &pullSubscription{ + pending: pendingMsgs{ + msgCount: 4, + }, + consumeOpts: &consumeOpts{ + ThresholdMessages: 5, + MaxMessages: 10, + StopAfter: 8, + }, + fetchInProgress: 0, + delivered: 2, + }, + shouldSend: true, + expectedPullRequest: &pullRequest{ + Batch: 2, // StopAfter (8) - delivered (2) - pending (4) + MaxBytes: 0, + }, + }, + { + name: "StopAfter set, pending msgs equal to StopAfter, no pull request", + givenSub: &pullSubscription{ + pending: pendingMsgs{ + msgCount: 6, + }, + consumeOpts: &consumeOpts{ + ThresholdMessages: 5, + MaxMessages: 10, + StopAfter: 6, + }, + fetchInProgress: 0, + delivered: 0, + }, + shouldSend: false, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + prChan := make(chan *pullRequest, 1) + test.givenSub.fetchNext = prChan + errs := make(chan error, 1) + ok := make(chan struct{}, 1) + go func() { + if test.shouldSend { + select { + case pr := <-prChan: + if *pr != *test.expectedPullRequest { + errs <- fmt.Errorf("Invalid pull request; want: %#v; got: %#v", test.expectedPullRequest, pr) + return + } + ok <- struct{}{} + case <-time.After(1 * time.Second): + errs <- fmt.Errorf("Timeout") + return + } + } else { + select { + case <-prChan: + errs <- fmt.Errorf("Unexpected pull request") + case <-time.After(100 * time.Millisecond): + ok <- struct{}{} + return + } + } + }() + + test.givenSub.checkPending() + select { + case <-ok: + // ok + case err := <-errs: + t.Fatal(err) + } + + }) + } +} diff --git a/jetstream/pull.go b/jetstream/pull.go index 3ea7092b0..fb7c0527b 100644 --- a/jetstream/pull.go +++ b/jetstream/pull.go @@ -417,10 +417,17 @@ func (s *pullSubscription) incrementDeliveredMsgs() { // the buffer to trigger a new pull request. // lock should be held before calling this method func (s *pullSubscription) checkPending() { - if s.pending.msgCount < s.consumeOpts.ThresholdMessages || - (s.pending.byteCount < s.consumeOpts.ThresholdBytes && s.consumeOpts.MaxBytes != 0) && - atomic.LoadUint32(&s.fetchInProgress) == 1 { - batchSize := s.consumeOpts.MaxMessages - s.pending.msgCount + if (s.pending.msgCount < s.consumeOpts.ThresholdMessages || + (s.pending.byteCount < s.consumeOpts.ThresholdBytes && s.consumeOpts.MaxBytes != 0)) && + atomic.LoadUint32(&s.fetchInProgress) == 0 { + var batchSize int + if s.consumeOpts.MaxBytes == 0 { + // if using messages, calculate appropriate batch size + batchSize = s.consumeOpts.MaxMessages - s.pending.msgCount + } else { + // if using bytes, use the max value + batchSize = s.consumeOpts.MaxMessages + } if s.consumeOpts.StopAfter > 0 { batchSize = min(batchSize, s.consumeOpts.StopAfter-s.delivered-s.pending.msgCount) } From 53f6d253a2aee39c1e96741477056db6cc5f60b0 Mon Sep 17 00:00:00 2001 From: Piotr Piotrowski Date: Thu, 11 Jan 2024 09:11:49 +0100 Subject: [PATCH 2/2] [FIXED] Invalid value of max_bytes in pull request Signed-off-by: Piotr Piotrowski --- jetstream/jetstream_test.go | 3 ++- jetstream/pull.go | 6 ++++-- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/jetstream/jetstream_test.go b/jetstream/jetstream_test.go index baa4a8fb4..62af4ef02 100644 --- a/jetstream/jetstream_test.go +++ b/jetstream/jetstream_test.go @@ -300,7 +300,8 @@ func TestPullConsumer_checkPending(t *testing.T) { name: "pending msgs below threshold, send pull request", givenSub: &pullSubscription{ pending: pendingMsgs{ - msgCount: 4, + msgCount: 4, + byteCount: 400, // byte count should be ignored }, consumeOpts: &consumeOpts{ ThresholdMessages: 5, diff --git a/jetstream/pull.go b/jetstream/pull.go index fb7c0527b..50d1fbb43 100644 --- a/jetstream/pull.go +++ b/jetstream/pull.go @@ -420,13 +420,15 @@ func (s *pullSubscription) checkPending() { if (s.pending.msgCount < s.consumeOpts.ThresholdMessages || (s.pending.byteCount < s.consumeOpts.ThresholdBytes && s.consumeOpts.MaxBytes != 0)) && atomic.LoadUint32(&s.fetchInProgress) == 0 { - var batchSize int + + var batchSize, maxBytes int if s.consumeOpts.MaxBytes == 0 { // if using messages, calculate appropriate batch size batchSize = s.consumeOpts.MaxMessages - s.pending.msgCount } else { // if using bytes, use the max value batchSize = s.consumeOpts.MaxMessages + maxBytes = s.consumeOpts.MaxBytes - s.pending.byteCount } if s.consumeOpts.StopAfter > 0 { batchSize = min(batchSize, s.consumeOpts.StopAfter-s.delivered-s.pending.msgCount) @@ -435,7 +437,7 @@ func (s *pullSubscription) checkPending() { s.fetchNext <- &pullRequest{ Expires: s.consumeOpts.Expires, Batch: batchSize, - MaxBytes: s.consumeOpts.MaxBytes - s.pending.byteCount, + MaxBytes: maxBytes, Heartbeat: s.consumeOpts.Heartbeat, }