Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(store): fix end offset of fetch result #725

Merged
merged 1 commit into from
Nov 26, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -385,8 +385,14 @@ public CompletableFuture<PopResult> popRetry(StoreContext context, long consumer
return retryStreamIdFuture.thenCompose(retryStreamId -> pop(context, consumerGroupId, retryStreamId, offset, PopOperation.PopOperationType.POP_RETRY, filter, batchSize, invisibleDuration));
}

record FetchResult(List<FlatMessageExt> messageList, long endOffset) {
public int size() {
return messageList.size();
}
}

@WithSpan(kind = SpanKind.SERVER)
private CompletableFuture<List<FlatMessageExt>> fetchMessages(StoreContext context, @SpanAttribute long streamId,
private CompletableFuture<FetchResult> fetchMessages(StoreContext context, @SpanAttribute long streamId,
@SpanAttribute long offset, @SpanAttribute int batchSize) {
long startOffset = streamStore.startOffset(streamId);
if (offset < startOffset) {
Expand All @@ -395,13 +401,14 @@ private CompletableFuture<List<FlatMessageExt>> fetchMessages(StoreContext conte

long confirmOffset = streamStore.confirmOffset(streamId);
if (offset >= confirmOffset) {
return CompletableFuture.completedFuture(Collections.emptyList());
return CompletableFuture.completedFuture(new FetchResult(Collections.emptyList(), confirmOffset));
}

if (offset + batchSize > confirmOffset) {
batchSize = (int) (confirmOffset - offset);
}

long finalOffset = offset;
return streamStore.fetch(context, streamId, offset, batchSize)
.thenApply(fetchResult -> {
AtomicLong fetchBytes = new AtomicLong();
Expand All @@ -422,7 +429,7 @@ private CompletableFuture<List<FlatMessageExt>> fetchMessages(StoreContext conte

context.span().ifPresent(span -> span.setAttribute("fetchBytes", fetchBytes.get()));

return resultList;
return new FetchResult(resultList, finalOffset + resultList.size());
});
}

Expand All @@ -436,23 +443,23 @@ private CompletableFuture<FilterFetchResult> fetchAndFilterMessages(StoreContext
return fetchMessages(context, streamId, offset, fetchBatchSize)
.thenCompose(fetchResult -> {
// Add filter result to message list.
List<FlatMessageExt> matchedMessageList = filter.doFilter(fetchResult);
List<FlatMessageExt> matchedMessageList = filter.doFilter(fetchResult.messageList());
// Update end offset
int index = batchSize - result.size();
if (matchedMessageList.size() > index) {
FlatMessageExt messageExt = matchedMessageList.get(index);
result.setEndOffset(messageExt.offset());
result.addMessageList(matchedMessageList.subList(0, index));
} else {
result.setEndOffset(offset + fetchResult.size());
result.setEndOffset(fetchResult.endOffset());
result.addMessageList(matchedMessageList);
}
// If not enough messages after applying filter, fetch more messages.
boolean needToFetch = result.size() < batchSize;
boolean hasMoreMessages = fetchResult.size() >= fetchBatchSize;

int newFetchCount = fetchCount + fetchResult.size();
long newFetchBytes = fetchBytes + fetchResult.stream()
long newFetchBytes = fetchBytes + fetchResult.messageList().stream()
.map(messageExt -> (long) messageExt.message().getByteBuffer().limit())
.reduce(0L, Long::sum);
boolean notExceedLimit = newFetchCount < config.maxFetchCount() &&
Expand Down
Loading