Skip to content

Commit

Permalink
[FIXED] Invalid value of max_bytes in pull request
Browse files Browse the repository at this point in the history
Signed-off-by: Piotr Piotrowski <piotr@synadia.com>
  • Loading branch information
piotrpio committed Jan 11, 2024
1 parent 6c4c9f4 commit d23679c
Showing 1 changed file with 4 additions and 2 deletions.
6 changes: 4 additions & 2 deletions jetstream/pull.go
Original file line number Diff line number Diff line change
Expand Up @@ -420,13 +420,15 @@ func (s *pullSubscription) checkPending() {
if (s.pending.msgCount < s.consumeOpts.ThresholdMessages ||
(s.pending.byteCount < s.consumeOpts.ThresholdBytes && s.consumeOpts.MaxBytes != 0)) &&
atomic.LoadUint32(&s.fetchInProgress) == 0 {
var batchSize int

var batchSize, maxBytes int
if s.consumeOpts.MaxBytes == 0 {
// if using messages, calculate appropriate batch size
batchSize = s.consumeOpts.MaxMessages - s.pending.msgCount
} else {
// if using bytes, use the max value
batchSize = s.consumeOpts.MaxMessages
maxBytes = s.consumeOpts.MaxBytes - s.pending.byteCount
}
if s.consumeOpts.StopAfter > 0 {
batchSize = min(batchSize, s.consumeOpts.StopAfter-s.delivered-s.pending.msgCount)
Expand All @@ -435,7 +437,7 @@ func (s *pullSubscription) checkPending() {
s.fetchNext <- &pullRequest{
Expires: s.consumeOpts.Expires,
Batch: batchSize,
MaxBytes: s.consumeOpts.MaxBytes - s.pending.byteCount,
MaxBytes: maxBytes,
Heartbeat: s.consumeOpts.Heartbeat,
}

Expand Down

0 comments on commit d23679c

Please sign in to comment.