From dba26219766e91b35b9f79181efb4feb082bf2f9 Mon Sep 17 00:00:00 2001 From: lizhimins <707364882@qq.com> Date: Wed, 24 Jul 2024 14:28:51 +0800 Subject: [PATCH] [ISSUE #8438] Fix broker return two messages when query message and index service bug --- .../org/apache/rocketmq/tieredstore/TieredMessageStore.java | 3 +++ .../apache/rocketmq/tieredstore/file/FlatAppendFile.java | 1 + .../apache/rocketmq/tieredstore/index/IndexStoreFile.java | 6 +++++- 3 files changed, 9 insertions(+), 1 deletion(-) diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java index 99d586ae236..9a25f85a6b8 100644 --- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java +++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java @@ -460,6 +460,9 @@ public synchronized void shutdown() { if (flatFileStore != null) { flatFileStore.shutdown(); } + if (indexService != null) { + indexService.shutdown(); + } if (storeExecutor != null) { storeExecutor.shutdown(); } diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/FlatAppendFile.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/FlatAppendFile.java index b9ba80d08d4..0c20a1cfb4f 100644 --- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/FlatAppendFile.java +++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/FlatAppendFile.java @@ -90,6 +90,7 @@ public void recoverFileSize() { public void initOffset(long offset) { if (this.fileSegmentTable.isEmpty()) { FileSegment fileSegment = fileSegmentFactory.createSegment(fileType, filePath, offset); + fileSegment.initPosition(fileSegment.getSize()); this.flushFileSegmentMeta(fileSegment); this.fileSegmentTable.add(fileSegment); } diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreFile.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreFile.java index 180399332e4..1a07e81ac62 100644 --- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreFile.java +++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreFile.java @@ -287,8 +287,12 @@ protected CompletableFuture> queryAsyncFromUnsealedFile( buffer.position(this.getItemPosition(slotValue)); buffer.get(bytes); IndexItem indexItem = new IndexItem(bytes); + long storeTimestamp = indexItem.getTimeDiff() + beginTimestamp.get(); if (hashCode == indexItem.getHashCode()) { - result.add(indexItem); + if (hashCode == indexItem.getHashCode() && + beginTime <= storeTimestamp && storeTimestamp <= endTime) { + result.add(indexItem); + } if (result.size() > maxCount) { break; }