Skip to content

Commit

Permalink
[ISSUE #8653] Fix index service upload last file when broker shutdown…
Browse files Browse the repository at this point in the history
… and fetcher check in tiered storage (#8654)

* [ISSUE #8653] Fix index service upload last file when broker shutdown and fetcher check in tiered storage

* [ISSUE #8653] Fix index service upload last file when broker shutdown and fetcher check in tiered storage

* remove unused import
  • Loading branch information
lizhimins authored Sep 9, 2024
1 parent e7eda2c commit 587e976
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -180,9 +180,15 @@ public boolean fetchFromCurrentStore(String topic, int queueId, long offset, int
}

// determine whether tiered storage path conditions are met
if (storageLevel.check(MessageStoreConfig.TieredStorageLevel.NOT_IN_DISK)
&& !next.checkInStoreByConsumeOffset(topic, queueId, offset)) {
return true;
if (storageLevel.check(MessageStoreConfig.TieredStorageLevel.NOT_IN_DISK)) {
// return true to read from tiered storage if the CommitLog is empty
if (next != null && next.getCommitLog() != null &&
next.getCommitLog().getMinOffset() < 0L) {
return true;
}
if (!next.checkInStoreByConsumeOffset(topic, queueId, offset)) {
return true;
}
}

if (storageLevel.check(MessageStoreConfig.TieredStorageLevel.NOT_IN_MEM)
Expand All @@ -208,10 +214,10 @@ public CompletableFuture<GetMessageResult> getMessageAsync(String group, String
}

if (fetchFromCurrentStore(topic, queueId, offset, maxMsgNums)) {
log.trace("GetMessageAsync from current store, " +
log.trace("GetMessageAsync from remote store, " +
"topic: {}, queue: {}, offset: {}, maxCount: {}", topic, queueId, offset, maxMsgNums);
} else {
log.trace("GetMessageAsync from remote store, " +
log.trace("GetMessageAsync from next store, " +
"topic: {}, queue: {}, offset: {}, maxCount: {}", topic, queueId, offset, maxMsgNums);
return next.getMessageAsync(group, topic, queueId, offset, maxMsgNums, messageFilter);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,6 @@
import org.apache.rocketmq.store.logfile.MappedFile;
import org.apache.rocketmq.tieredstore.MessageStoreConfig;
import org.apache.rocketmq.tieredstore.common.AppendResult;
import org.apache.rocketmq.tieredstore.exception.TieredStoreErrorCode;
import org.apache.rocketmq.tieredstore.exception.TieredStoreException;
import org.apache.rocketmq.tieredstore.file.FlatAppendFile;
import org.apache.rocketmq.tieredstore.file.FlatFileFactory;
import org.apache.rocketmq.tieredstore.provider.FileSegment;
Expand Down Expand Up @@ -271,23 +269,23 @@ public CompletableFuture<List<IndexItem>> queryAsync(
public void forceUpload() {
try {
readWriteLock.writeLock().lock();
if (this.currentWriteFile == null) {
log.warn("IndexStoreService no need force upload current write file");
return;
}
// note: current file has been shutdown before
IndexStoreFile lastFile = new IndexStoreFile(storeConfig, currentWriteFile.getTimestamp());
if (this.doCompactThenUploadFile(lastFile)) {
this.setCompactTimestamp(lastFile.getTimestamp());
} else {
throw new TieredStoreException(
TieredStoreErrorCode.UNKNOWN, "IndexStoreService force compact current file error");
while (true) {
Map.Entry<Long, IndexFile> entry =
this.timeStoreTable.higherEntry(this.compactTimestamp.get());
if (entry == null) {
break;
}
if (this.doCompactThenUploadFile(entry.getValue())) {
this.setCompactTimestamp(entry.getValue().getTimestamp());
// The total number of files will not too much, prevent io too fast.
TimeUnit.MILLISECONDS.sleep(50);
}
}
} catch (Exception e) {
log.error("IndexStoreService force upload error", e);
throw new RuntimeException(e);
} finally {
readWriteLock.writeLock().lock();
readWriteLock.writeLock().unlock();
}
}

Expand Down Expand Up @@ -393,19 +391,13 @@ protected IndexFile getNextSealedFile() {
@Override
public void shutdown() {
super.shutdown();
readWriteLock.writeLock().lock();
try {
for (Map.Entry<Long /* timestamp */, IndexFile> entry : timeStoreTable.entrySet()) {
entry.getValue().shutdown();
}
if (!autoCreateNewFile) {
this.forceUpload();
// Wait index service upload then clear time store table
while (!this.timeStoreTable.isEmpty()) {
try {
TimeUnit.MILLISECONDS.sleep(50);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
this.timeStoreTable.clear();
} catch (Exception e) {
log.error("IndexStoreService shutdown error", e);
} finally {
readWriteLock.writeLock().unlock();
}
}

Expand All @@ -424,6 +416,18 @@ public void run() {
}
this.waitForRunning(TimeUnit.SECONDS.toMillis(10));
}
readWriteLock.writeLock().lock();
try {
if (autoCreateNewFile) {
this.forceUpload();
}
this.timeStoreTable.forEach((timestamp, file) -> file.shutdown());
this.timeStoreTable.clear();
} catch (Exception e) {
log.error("IndexStoreService shutdown error", e);
} finally {
readWriteLock.writeLock().unlock();
}
log.info(this.getServiceName() + " service shutdown");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ public void doConvertOldFormatTest() throws IOException {
indexService = new IndexStoreService(fileAllocator, filePath);
indexService.start();
ConcurrentSkipListMap<Long, IndexFile> timeStoreTable = indexService.getTimeStoreTable();
Assert.assertEquals(1, timeStoreTable.size());
Assert.assertEquals(2, timeStoreTable.size());
Assert.assertEquals(Long.valueOf(timestamp), timeStoreTable.firstKey());
mappedFile.destroy(10 * 1000);
}
Expand Down Expand Up @@ -232,7 +232,7 @@ public void restartServiceTest() throws InterruptedException {
indexService = new IndexStoreService(fileAllocator, filePath);
indexService.start();
Assert.assertEquals(timestamp, indexService.getTimeStoreTable().firstKey().longValue());
Assert.assertEquals(2, indexService.getTimeStoreTable().size());
Assert.assertEquals(4, indexService.getTimeStoreTable().size());
Assert.assertEquals(IndexFile.IndexStatusEnum.UPLOAD,
indexService.getTimeStoreTable().firstEntry().getValue().getFileStatus());
}
Expand Down

0 comments on commit 587e976

Please sign in to comment.