Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix data lost when configured multiple ledger directories #3329

Merged

Conversation

hangc0276
Copy link
Contributor

@hangc0276 hangc0276 commented Jun 12, 2022

Motivation

We found one place where the bookie may lose data even though we turn on fsync for the journal.
Condition:

  • One journal disk, and turn on fsync for the journal
  • Configure two ledger disks, ledger1, and ledger2

Assume we write 100MB data into one bookie, 70MB data written into ledger1's write cache, and 30 MB data written into ledger2's write cache. Ledger1's write cache is full and triggers flush. In flushing the write cache, it will trigger a checkpoint to mark the journal’s lastMark position (100MB’s offset) and write the lastMark position into both ledger1 and ledger2's lastMark file.

At this time, this bookie shutdown without flush write cache, such as shutdown by kill -9 command, and ledger2's write cache (30MB) doesn’t flush into ledger disk. But ledger2's lastMark position which persisted into lastMark file has been updated to 100MB’s offset.

When the bookie starts up, the journal reply position will be min(ledger1's lastMark, ledger2's lastMark), and it will be 100MB’s offset. The ledger2's 30MB data won’t reply and that data will be lost.

Discussion thread:
https://lists.apache.org/thread/zz5vvv2yd80vqy22fv8wg5s2lqtkrzh9

Solutions

The root cause of this bug is that EntryLogger1 triggers a checkpoint when its write cache is full, updating both EntryLogger1 and EntryLogger2's lastMark position. However, EntryLogger2's data may still be in WriteCache, which may lead to data loss when the bookie shutdown will kill -9

There are two solutions for this bug.

Update lastMark position individually.

  • When EntryLogger1 triggers the checkpoint, we only update EntryLogger1's lastMark position instead of updating EntryLogger2's lastMark position at the same time.
  • When SyncThread triggers the checkpoint, we update all the EntryLoggers' lastMark positions.
  • When determining whether a journal file can be deleted, we should get the smallest lastMark position among all the writeable EntryLoggers, and delete the journal files which less than the smallest lastMark position.
  • When replying to the journal on bookie startups, we need to get the smallest lastMark position, and reply to the journal files with this position, otherwise, we will lose data.

However, there is one case being hard to handle in replying to the journal stage.
When one ledger disk transfers from ReadOnly mode to Writeable mode, the lastMark position is an old value. Using the old position to reply to the journal files will lead to a target journal file not found exception.

Only update lastMark position in SyncThread

There are two places that can trigger a checkpoint.

  • The scheduled tasks in SyncThread.doCheckpoint
  • The ledgerDir write-cache full and then flush

The second way is the root cause of data loss if the ledger is configured with multiple directories.
We can turn off the second way's update lastMark position operation and only make SyncThread update the lastMark position in a checkpoint when the ledger is configured with multiple directories.

This is the simplest way to fix this bug, but it has two drawbacks.

  • The lastMark position updates depend on SyncThread doing checkpoint intervals. In Pulsar, the default interval is 60s. It means the journal file expires with at least 60s
  • The bookie startup replying journal depend on the lastMark position. It means the journal will reply to at least 60s journal data before the start-up is complete. It may lead to the bookie start-up speed slowing down.

IMO, compared to data loss, the above two drawbacks can be acceptable.

Changes

I choose the second solution to fix this bug.

@gaozhangmin
Copy link
Contributor

@hangc0276 I found when syncThread do flush, there exists duplicate checkpointComplete invoke.

SyncThread flush:

private void flush() {
        Checkpoint checkpoint = checkpointSource.newCheckpoint();
        try {
            ledgerStorage.flush();
        } catch (NoWritableLedgerDirException e) {
            log.error("No writeable ledger directories", e);
            dirsListener.allDisksFull(true);
            return;
        } catch (IOException e) {
            log.error("Exception flushing ledgers", e);
            return;
        }

        if (disableCheckpoint) {
            return;
        }

        log.info("Flush ledger storage at checkpoint {}.", checkpoint);
        try {
            checkpointSource.checkpointComplete(null, checkpoint, false);
        } catch (IOException e) {
            log.error("Exception marking checkpoint as complete", e);
            dirsListener.allDisksFull(true);
        }
    }
@Override
    public void flush() throws IOException {
        Checkpoint cp = checkpointSource.newCheckpoint();
        checkpoint(cp);
        checkpointSource.checkpointComplete(ledgerDir, cp, true);
    }

@eolivelli eolivelli changed the title Fix data lose when configured multiple ledger directories Fix data lost when configured multiple ledger directories Jun 13, 2022
Copy link
Contributor

@eolivelli eolivelli left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that you are on your way.
good work !

I left some minor feedback.
Waiting for tests and for more reviews.

This change should be picked in all active branches.
it is a serious bug

throws IOException {
public void checkpointComplete(Checkpoint checkpoint,
boolean compact,
LedgerDirsManager ledgerDirsManager1) throws IOException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: ledgerDirsManager

@@ -603,7 +604,7 @@ private void replay(Journal journal, JournalScanner scanner) throws IOException
journalId >= markedLog.getLogFileId());
// last log mark may be missed due to no sync up before
// validate filtered log ids only when we have markedLogId
if (markedLog.getLogFileId() > 0) {
if (markedLog.getLogFileId() > 0 && markedLog.getLogFileId() != Long.MAX_VALUE) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if it would be better to have a constant for Long.MAX_VALUE

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Change should also be made at Journal.readLog, which curMark should be set to the min mark from ledgers dir.

@@ -660,7 +660,7 @@ static void writePaddingBytes(JournalChannel jc, ByteBuf paddingBuffer, int jour
// Should data be fsynced on disk before triggering the callback
private final boolean syncData;

private final LastLogMark lastLogMark = new LastLogMark(0, 0);
private final LastLogMark lastLogMark = new LastLogMark(Long.MAX_VALUE, Long.MAX_VALUE);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should make a public constant here, otherwise magic numbers are easily forgotten.

also, I may understand why we are changing from 0 to MAX_VALUE, but...what is the impact ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When creating a Journal instance, it will initiate lastLogMark by reading each ledger directory's lastMark file and get the minimum position as the replay start point. So we should init lastLogMark with the MAX_VALUE.

In fact, the original logic of init lastLogMark with 0, and get the maximum position of all the ledger directory's lastMark file to init the lastLogMark. IMO, it will lose data.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe @merlimat has more context about init lastLogMark to 0

@@ -35,6 +35,6 @@ public static void forceFlush(BookieImpl b) throws IOException {
CheckpointSourceList source = new CheckpointSourceList(b.journals);
Checkpoint cp = source.newCheckpoint();
b.ledgerStorage.flush();
source.checkpointComplete(cp, true);
source.checkpointComplete(cp, true, null);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so null means "all"...
we should document this in the javadocs

@@ -272,7 +272,7 @@ void execute(Journal[] journals) throws Exception {
for (Journal journal : journals) {
Checkpoint cp = journal.newCheckpoint();
try {
journal.checkpointComplete(cp, true);
journal.checkpointComplete(cp, true, null);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what about adding a comment here ?
null means that the checkpoint is not started by a specific LedgersDirManager

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have added the javadoc on the checkpointComplete method interface.

void checkpointComplete(Checkpoint checkpoint, boolean compact) throws IOException;
void checkpointComplete(Checkpoint checkpoint,
boolean compact,
LedgerDirsManager ledgerDirsManager) throws IOException;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should add javadocs here and explain why we have this ledgerDirsManager and when it may be null

Copy link
Contributor

@nicoloboschi nicoloboschi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's better to add proper test case to avoid future regressions

@lordcheng10
Copy link
Contributor

2.When bookie startup, read the minimal lastMark instead of the maximal lastMark as current last mark.
@hangc0276 How is this logic implemented? Do you need to modify the following logic?:

image

@lordcheng10
Copy link
Contributor

@hangc0276 I found when syncThread do flush, there exists duplicate checkpointComplete invoke.

SyncThread flush:

private void flush() {
        Checkpoint checkpoint = checkpointSource.newCheckpoint();
        try {
            ledgerStorage.flush();
        } catch (NoWritableLedgerDirException e) {
            log.error("No writeable ledger directories", e);
            dirsListener.allDisksFull(true);
            return;
        } catch (IOException e) {
            log.error("Exception flushing ledgers", e);
            return;
        }

        if (disableCheckpoint) {
            return;
        }

        log.info("Flush ledger storage at checkpoint {}.", checkpoint);
        try {
            checkpointSource.checkpointComplete(null, checkpoint, false);
        } catch (IOException e) {
            log.error("Exception marking checkpoint as complete", e);
            dirsListener.allDisksFull(true);
        }
    }
@Override
    public void flush() throws IOException {
        Checkpoint cp = checkpointSource.newCheckpoint();
        checkpoint(cp);
        checkpointSource.checkpointComplete(ledgerDir, cp, true);
    }

I also think the checkpoint is duplicated here

@@ -603,7 +604,7 @@ private void replay(Journal journal, JournalScanner scanner) throws IOException
journalId >= markedLog.getLogFileId());
// last log mark may be missed due to no sync up before
// validate filtered log ids only when we have markedLogId
if (markedLog.getLogFileId() > 0) {
if (markedLog.getLogFileId() > 0 && markedLog.getLogFileId() != Long.MAX_VALUE) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Change should also be made at Journal.readLog, which curMark should be set to the min mark from ledgers dir.

@gaozhangmin
Copy link
Contributor

2.When bookie startup, read the minimal lastMark instead of the maximal lastMark as current last mark.
@hangc0276 How is this logic implemented? Do you need to modify the following logic?:

image

+1, changes should also be made at Journal.readLog

@hangc0276
Copy link
Contributor Author

it's better to add proper test case to avoid future regressions

@nicoloboschi I have added the test to cover this change, please help take a look, thanks.

@hangc0276
Copy link
Contributor Author

@merlimat @eolivelli @nicoloboschi @gaozhangmin @lordcheng10 I have updated the code and added the test to cover this change, Please help take a look, thanks.

@hangc0276
Copy link
Contributor Author

rerun failure checks

2 similar comments
@hangc0276
Copy link
Contributor Author

rerun failure checks

@hangc0276
Copy link
Contributor Author

rerun failure checks

@gaozhangmin
Copy link
Contributor

@hangc0276 I found when syncThread do flush, there exists duplicate checkpointComplete invoke.
SyncThread flush:

private void flush() {
        Checkpoint checkpoint = checkpointSource.newCheckpoint();
        try {
            ledgerStorage.flush();
        } catch (NoWritableLedgerDirException e) {
            log.error("No writeable ledger directories", e);
            dirsListener.allDisksFull(true);
            return;
        } catch (IOException e) {
            log.error("Exception flushing ledgers", e);
            return;
        }

        if (disableCheckpoint) {
            return;
        }

        log.info("Flush ledger storage at checkpoint {}.", checkpoint);
        try {
            checkpointSource.checkpointComplete(null, checkpoint, false);
        } catch (IOException e) {
            log.error("Exception marking checkpoint as complete", e);
            dirsListener.allDisksFull(true);
        }
    }
@Override
    public void flush() throws IOException {
        Checkpoint cp = checkpointSource.newCheckpoint();
        checkpoint(cp);
        checkpointSource.checkpointComplete(ledgerDir, cp, true);
    }

I also think the checkpoint is duplicated here

@hangc0276 This problem is not going to be resolved here, right?

@hangc0276
Copy link
Contributor Author

@hangc0276 I found when syncThread do flush, there exists duplicate checkpointComplete invoke.
SyncThread flush:

private void flush() {
        Checkpoint checkpoint = checkpointSource.newCheckpoint();
        try {
            ledgerStorage.flush();
        } catch (NoWritableLedgerDirException e) {
            log.error("No writeable ledger directories", e);
            dirsListener.allDisksFull(true);
            return;
        } catch (IOException e) {
            log.error("Exception flushing ledgers", e);
            return;
        }

        if (disableCheckpoint) {
            return;
        }

        log.info("Flush ledger storage at checkpoint {}.", checkpoint);
        try {
            checkpointSource.checkpointComplete(null, checkpoint, false);
        } catch (IOException e) {
            log.error("Exception marking checkpoint as complete", e);
            dirsListener.allDisksFull(true);
        }
    }
@Override
    public void flush() throws IOException {
        Checkpoint cp = checkpointSource.newCheckpoint();
        checkpoint(cp);
        checkpointSource.checkpointComplete(ledgerDir, cp, true);
    }

I also think the checkpoint is duplicated here

@hangc0276 This problem is not going to be resolved here, right?

@gaozhangmin Yes,we can use another PR to solve it.

@hangc0276
Copy link
Contributor Author

rerun failure checks

1 similar comment
@hangc0276
Copy link
Contributor Author

rerun failure checks

@gaozhangmin
Copy link
Contributor

@hangc0276 I found when syncThread do flush, there exists duplicate checkpointComplete invoke.
SyncThread flush:

private void flush() {
        Checkpoint checkpoint = checkpointSource.newCheckpoint();
        try {
            ledgerStorage.flush();
        } catch (NoWritableLedgerDirException e) {
            log.error("No writeable ledger directories", e);
            dirsListener.allDisksFull(true);
            return;
        } catch (IOException e) {
            log.error("Exception flushing ledgers", e);
            return;
        }

        if (disableCheckpoint) {
            return;
        }

        log.info("Flush ledger storage at checkpoint {}.", checkpoint);
        try {
            checkpointSource.checkpointComplete(null, checkpoint, false);
        } catch (IOException e) {
            log.error("Exception marking checkpoint as complete", e);
            dirsListener.allDisksFull(true);
        }
    }
@Override
    public void flush() throws IOException {
        Checkpoint cp = checkpointSource.newCheckpoint();
        checkpoint(cp);
        checkpointSource.checkpointComplete(ledgerDir, cp, true);
    }

I also think the checkpoint is duplicated here

@hangc0276 This problem is not going to be resolved here, right?

@gaozhangmin Yes,we can use another PR to solve it.

I submit pr #3353 to solve this issue. @hangc0276 PTAL

Copy link
Member

@wenbingshen wenbingshen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we are now replaying the journal from the smallest LogMark, can we get the journal log mark position of the current entry to the JournalScanner and compare it with the checkpoint position on the ledger disk where the entry to be restored is located, and only restore the entry whose logMark position is greater than the checkpoint? So as to avoid repeatedly writing the data that has been flushed to the disk.

scanner.process(journalVersion, offset, recBuff);

                 if (!isPaddingRecord) {
                    scanner.process(journalVersion, offset, recBuff, journalId, recLog.fc.position());
                }

@@ -245,7 +246,8 @@ void readLog() {
}
bb.clear();
mark.readLogMark(bb);
if (curMark.compare(mark) < 0) {
// get the minimum mark position from all the ledger directories to ensure no data loss.
if (curMark.compare(mark) > 0) {
Copy link
Member

@wenbingshen wenbingshen Jul 7, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we are now replaying the journal from the smallest LogMark, can we get the journal log mark position of the current entry to the JournalScanner and compare it with the checkpoint position on the ledger disk where the entry to be restored is located, and only restore the entry whose logMark position is greater than the checkpoint? So as to avoid repeatedly writing the data that has been flushed to the ledger disk.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It will introduce complex logic for this comparison.

  1. If the ledger directory expands or shrinks, the map logic of the ledgerId to the ledger directory (logMark) also changed. It may be located on the wrong logMark file, and will lead to skipping the unflushed entries.
  2. There are many kinds of storage implementation, such as dbLedgerStorage, SortedLedgerStorage, and InterleavedLedgerStorage, we should get the ledgerId related storage instance to check the logMark position for each storage implementation. This operation will introduce complex logic.
  3. For the comparison, we can only save the write ledger throughput. We also need to read the data from the journal log file out.

Based on the above reason, I prefer to replay all entries in the journal log file based on the min logMark position.

@aloyszhang
Copy link
Contributor

There are two places that can trigger checkpoint.

  1. the scheduled tasks in SyncThread.doCheckpoint
  2. the ledgerDir write-cache full and then flush
    The second way is the root cause of data loss.

If removing the checkpointSource.checkpointComplete logic in flush, then at this point we will not delete journal files.

The scheduled task SyncThread.doCheckpoint will invoke checkpointSource.checkpointComplete and it's safe here to delete journal files since we have already flushed all write-caches for all ledger directories.

WDYT @hangc0276

@hangc0276 hangc0276 force-pushed the chenhang/fix_lost_data_on_multiple_ledgers branch from fbf5c77 to e2f673e Compare May 15, 2023 08:07
@hangc0276
Copy link
Contributor Author

hangc0276 commented May 15, 2023

There are two places that can trigger checkpoint.

  1. the scheduled tasks in SyncThread.doCheckpoint
  2. the ledgerDir write-cache full and then flush
    The second way is the root cause of data loss.

If removing the checkpointSource.checkpointComplete logic in flush, then at this point we will not delete journal files.

The scheduled task SyncThread.doCheckpoint will invoke checkpointSource.checkpointComplete and it's safe here to delete journal files since we have already flushed all write-caches for all ledger directories.

WDYT @hangc0276

@aloyszhang Thanks for your suggestion. Yes, making the SyncThread.doCheckpoint the only endpoint is the simplest way to solve this problem. I updated the PR description, please help take a look, thanks.

Copy link
Contributor

@aloyszhang aloyszhang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@@ -872,6 +869,9 @@ int shutdown(int exitCode) {
// Shutdown the EntryLogger which has the GarbageCollector Thread running
ledgerStorage.shutdown();

// Shutdown Sync thread
syncThread.shutdown();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here may through exceptions when shutdown the syncThread which will call checkpoint of ledgerStoreage, since we have already shut down the ledgeerStorage before

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, you are right. I updated the code, please help take a look, thanks.

@hangc0276
Copy link
Contributor Author

@eolivelli @merlimat @dlg99 @zymap I updated the code, and need your eyes for this PR, thanks.

Copy link
Contributor

@eolivelli eolivelli left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

@@ -905,7 +907,9 @@ private void swapWriteCache() {
public void flush() throws IOException {
Checkpoint cp = checkpointSource.newCheckpoint();
checkpoint(cp);
checkpointSource.checkpointComplete(cp, true);
if (singleLedgerDirs) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add a small comment with a quick description of the motivation for this condition

@eolivelli eolivelli merged commit 8a76703 into apache:master Jun 25, 2023
zymap pushed a commit that referenced this pull request Jun 26, 2023
hangc0276 added a commit to hangc0276/bookkeeper that referenced this pull request Jun 26, 2023
@hangc0276 hangc0276 mentioned this pull request Jun 27, 2023
zymap pushed a commit that referenced this pull request Dec 6, 2023
Ghatage pushed a commit to sijie/bookkeeper that referenced this pull request Jul 12, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

9 participants