diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java index 99d586ae236..9a25f85a6b8 100644 --- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java +++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java @@ -460,6 +460,9 @@ public synchronized void shutdown() { if (flatFileStore != null) { flatFileStore.shutdown(); } + if (indexService != null) { + indexService.shutdown(); + } if (storeExecutor != null) { storeExecutor.shutdown(); } diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/FlatAppendFile.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/FlatAppendFile.java index b9ba80d08d4..0c20a1cfb4f 100644 --- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/FlatAppendFile.java +++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/FlatAppendFile.java @@ -90,6 +90,7 @@ public void recoverFileSize() { public void initOffset(long offset) { if (this.fileSegmentTable.isEmpty()) { FileSegment fileSegment = fileSegmentFactory.createSegment(fileType, filePath, offset); + fileSegment.initPosition(fileSegment.getSize()); this.flushFileSegmentMeta(fileSegment); this.fileSegmentTable.add(fileSegment); } diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreFile.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreFile.java index 180399332e4..f9604b43e6f 100644 --- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreFile.java +++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreFile.java @@ -287,7 +287,9 @@ protected CompletableFuture> queryAsyncFromUnsealedFile( buffer.position(this.getItemPosition(slotValue)); buffer.get(bytes); IndexItem indexItem = new IndexItem(bytes); - if (hashCode == indexItem.getHashCode()) { + long storeTimestamp = indexItem.getTimeDiff() + beginTimestamp.get(); + if (hashCode == indexItem.getHashCode() && + beginTime <= storeTimestamp && storeTimestamp <= endTime) { result.add(indexItem); if (result.size() > maxCount) { break;