diff --git a/protocol/messenger_mailserver.go b/protocol/messenger_mailserver.go index 88facd0b31e..ea2e4d215a8 100644 --- a/protocol/messenger_mailserver.go +++ b/protocol/messenger_mailserver.go @@ -815,10 +815,11 @@ loop: // If time range is greater than 24 hours, limit the range: to - (to-24h) from := w.from to := w.to - nextWorkTo := to + + exceeds24h := false if batch.To-batch.From > uint32(oneDayDuration.Seconds()) { from = to - uint32(oneDayDuration.Seconds()) - nextWorkTo = from + exceeds24h = true } cursor, envelopesCount, err := messageRequester.SendMessagesRequestForTopics(queryCtx, storenodeID, from, to, w.cursor, w.pubsubTopic, w.contentTopics, w.limit, true, processEnvelopes) @@ -843,21 +844,31 @@ loop: // Check the cursor after calling `shouldProcessNextPage`. // The app might use process the fetched envelopes in the callback for own needs. - if cursor == nil { + // If from/to does not exceed 24h and no cursor was returned, we have already + // requested the entire time range + if cursor == nil && !exceeds24h { return } logger.Debug("processBatch producer - creating work (cursor)") - workWg.Add(1) - workCh <- work{ + newWork := work{ pubsubTopic: w.pubsubTopic, contentTopics: w.contentTopics, cursor: cursor, limit: nextPageLimit, from: w.from, - to: nextWorkTo, + to: w.to, } + + // If from/to has exceeded the 24h, but there are no more records within the current + // 24h range, then we update the `to` for the new work to not include it. + if cursor == nil && exceeds24h { + newWork.to = from + } + + workWg.Add(1) + workCh <- newWork }(w) case err := <-errCh: logger.Debug("processBatch - received error", zap.Error(err))