Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ISSUE #8653] Fix index service upload last file when broker shutdown and fetcher check in tiered storage #8654

Merged
merged 3 commits into from
Sep 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -180,9 +180,15 @@ public boolean fetchFromCurrentStore(String topic, int queueId, long offset, int
}

// determine whether tiered storage path conditions are met
if (storageLevel.check(MessageStoreConfig.TieredStorageLevel.NOT_IN_DISK)
&& !next.checkInStoreByConsumeOffset(topic, queueId, offset)) {
return true;
if (storageLevel.check(MessageStoreConfig.TieredStorageLevel.NOT_IN_DISK)) {
// return true to read from tiered storage if the CommitLog is empty
if (next != null && next.getCommitLog() != null &&
next.getCommitLog().getMinOffset() < 0L) {
return true;
}
if (!next.checkInStoreByConsumeOffset(topic, queueId, offset)) {
return true;
}
}

if (storageLevel.check(MessageStoreConfig.TieredStorageLevel.NOT_IN_MEM)
Expand All @@ -208,10 +214,10 @@ public CompletableFuture<GetMessageResult> getMessageAsync(String group, String
}

if (fetchFromCurrentStore(topic, queueId, offset, maxMsgNums)) {
log.trace("GetMessageAsync from current store, " +
log.trace("GetMessageAsync from remote store, " +
"topic: {}, queue: {}, offset: {}, maxCount: {}", topic, queueId, offset, maxMsgNums);
} else {
log.trace("GetMessageAsync from remote store, " +
log.trace("GetMessageAsync from next store, " +
"topic: {}, queue: {}, offset: {}, maxCount: {}", topic, queueId, offset, maxMsgNums);
return next.getMessageAsync(group, topic, queueId, offset, maxMsgNums, messageFilter);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,6 @@
import org.apache.rocketmq.store.logfile.MappedFile;
import org.apache.rocketmq.tieredstore.MessageStoreConfig;
import org.apache.rocketmq.tieredstore.common.AppendResult;
import org.apache.rocketmq.tieredstore.exception.TieredStoreErrorCode;
import org.apache.rocketmq.tieredstore.exception.TieredStoreException;
import org.apache.rocketmq.tieredstore.file.FlatAppendFile;
import org.apache.rocketmq.tieredstore.file.FlatFileFactory;
import org.apache.rocketmq.tieredstore.provider.FileSegment;
Expand Down Expand Up @@ -271,23 +269,23 @@ public CompletableFuture<List<IndexItem>> queryAsync(
public void forceUpload() {
try {
readWriteLock.writeLock().lock();
if (this.currentWriteFile == null) {
log.warn("IndexStoreService no need force upload current write file");
return;
}
// note: current file has been shutdown before
IndexStoreFile lastFile = new IndexStoreFile(storeConfig, currentWriteFile.getTimestamp());
if (this.doCompactThenUploadFile(lastFile)) {
this.setCompactTimestamp(lastFile.getTimestamp());
} else {
throw new TieredStoreException(
TieredStoreErrorCode.UNKNOWN, "IndexStoreService force compact current file error");
while (true) {
Map.Entry<Long, IndexFile> entry =
this.timeStoreTable.higherEntry(this.compactTimestamp.get());
if (entry == null) {
break;
}
if (this.doCompactThenUploadFile(entry.getValue())) {
this.setCompactTimestamp(entry.getValue().getTimestamp());
// The total number of files will not too much, prevent io too fast.
TimeUnit.MILLISECONDS.sleep(50);
}
}
} catch (Exception e) {
log.error("IndexStoreService force upload error", e);
throw new RuntimeException(e);
} finally {
readWriteLock.writeLock().lock();
readWriteLock.writeLock().unlock();
}
}

Expand Down Expand Up @@ -393,19 +391,13 @@ protected IndexFile getNextSealedFile() {
@Override
public void shutdown() {
super.shutdown();
readWriteLock.writeLock().lock();
try {
for (Map.Entry<Long /* timestamp */, IndexFile> entry : timeStoreTable.entrySet()) {
entry.getValue().shutdown();
}
if (!autoCreateNewFile) {
this.forceUpload();
// Wait index service upload then clear time store table
while (!this.timeStoreTable.isEmpty()) {
try {
TimeUnit.MILLISECONDS.sleep(50);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
this.timeStoreTable.clear();
} catch (Exception e) {
log.error("IndexStoreService shutdown error", e);
} finally {
readWriteLock.writeLock().unlock();
}
}

Expand All @@ -424,6 +416,18 @@ public void run() {
}
this.waitForRunning(TimeUnit.SECONDS.toMillis(10));
}
readWriteLock.writeLock().lock();
try {
if (autoCreateNewFile) {
this.forceUpload();
}
this.timeStoreTable.forEach((timestamp, file) -> file.shutdown());
this.timeStoreTable.clear();
} catch (Exception e) {
log.error("IndexStoreService shutdown error", e);
} finally {
readWriteLock.writeLock().unlock();
}
log.info(this.getServiceName() + " service shutdown");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ public void doConvertOldFormatTest() throws IOException {
indexService = new IndexStoreService(fileAllocator, filePath);
indexService.start();
ConcurrentSkipListMap<Long, IndexFile> timeStoreTable = indexService.getTimeStoreTable();
Assert.assertEquals(1, timeStoreTable.size());
Assert.assertEquals(2, timeStoreTable.size());
Assert.assertEquals(Long.valueOf(timestamp), timeStoreTable.firstKey());
mappedFile.destroy(10 * 1000);
}
Expand Down Expand Up @@ -232,7 +232,7 @@ public void restartServiceTest() throws InterruptedException {
indexService = new IndexStoreService(fileAllocator, filePath);
indexService.start();
Assert.assertEquals(timestamp, indexService.getTimeStoreTable().firstKey().longValue());
Assert.assertEquals(2, indexService.getTimeStoreTable().size());
Assert.assertEquals(4, indexService.getTimeStoreTable().size());
Assert.assertEquals(IndexFile.IndexStatusEnum.UPLOAD,
indexService.getTimeStoreTable().firstEntry().getValue().getFileStatus());
}
Expand Down
Loading