Skip to content

Commit

Permalink
ensure every record in wal is only applied once when restart (#7920)
Browse files Browse the repository at this point in the history
close #7915
  • Loading branch information
lidezhu authored Aug 10, 2023
1 parent e6c1c64 commit 8427604
Show file tree
Hide file tree
Showing 13 changed files with 202 additions and 53 deletions.
13 changes: 7 additions & 6 deletions dbms/src/Storages/Page/V3/LogFile/LogFilename.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,28 +29,29 @@ LogFilename LogFilename::parseFrom(const String & parent_path, const String & fi
if (!startsWith(filename, LOG_FILE_PREFIX_TEMP) && !startsWith(filename, LOG_FILE_PREFIX_NORMAL))
{
LOG_INFO(log, "Ignore not log file [dir={}] [file={}]", parent_path, filename);
return {LogFileStage::Invalid, 0, 0, 0, ""};
return {LogFileStage::Invalid, 0, 0, 0, 0, ""};
}
Strings ss;
boost::split(ss, filename, boost::is_any_of("_"));
if (ss.size() != 3)
if (ss.size() != 3 && ss.size() != 4)
{
LOG_INFO(log, "Ignore unrecognized log file [dir={}] [file={}]", parent_path, filename);
return {LogFileStage::Invalid, 0, 0, 0, ""};
return {LogFileStage::Invalid, 0, 0, 0, 0, ""};
}

String err_msg;
try
{
Format::LogNumberType log_num = std::stoull(ss[1]);
Format::LogNumberType level_num = std::stoull(ss[2]);
UInt64 snap_seq = (ss.size() == 4) ? std::stoull(ss[3]) : 0;
if (ss[0] == LOG_FILE_PREFIX_TEMP)
{
return {LogFileStage::Temporary, log_num, level_num, bytes, parent_path};
return {LogFileStage::Temporary, log_num, level_num, snap_seq, bytes, parent_path};
}
else if (ss[0] == LOG_FILE_PREFIX_NORMAL)
{
return {LogFileStage::Normal, log_num, level_num, bytes, parent_path};
return {LogFileStage::Normal, log_num, level_num, snap_seq, bytes, parent_path};
}
}
catch (std::invalid_argument & e)
Expand All @@ -62,7 +63,7 @@ LogFilename LogFilename::parseFrom(const String & parent_path, const String & fi
err_msg = e.what();
}
LOG_INFO(log, "Ignore unrecognized log file [dir={}] [file={}] [err={}]", parent_path, filename, err_msg);
return {LogFileStage::Invalid, 0, 0, 0, ""};
return {LogFileStage::Invalid, 0, 0, 0, 0, ""};
}

} // namespace DB::PS::V3
7 changes: 5 additions & 2 deletions dbms/src/Storages/Page/V3/LogFile/LogFilename.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ struct LogFilename
const LogFileStage stage;
const Format::LogNumberType log_num;
const Format::LogNumberType level_num;
UInt64 snap_seq; // used to memorize the max seq in checkpoint file
const size_t bytes_on_disk;
const String parent_path;

Expand All @@ -44,11 +45,13 @@ struct LogFilename
inline String filename(LogFileStage file_stage) const
{
assert(file_stage != LogFileStage::Invalid);
auto suffix = (snap_seq == 0) ? "" : fmt::format("_{}", snap_seq);
return fmt::format(
"{}_{}_{}",
"{}_{}_{}{}",
((file_stage == LogFileStage::Temporary) ? LOG_FILE_PREFIX_TEMP : LOG_FILE_PREFIX_NORMAL),
log_num,
level_num);
level_num,
suffix);
}

inline String fullname(LogFileStage file_stage) const
Expand Down
5 changes: 3 additions & 2 deletions dbms/src/Storages/Page/V3/PageDirectory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1835,6 +1835,7 @@ bool PageDirectory<Trait>::tryDumpSnapshot(const WriteLimiterPtr & write_limiter
{
auto identifier = fmt::format("{}.dump", wal->name());
auto snap = createSnapshot(identifier);
SYNC_FOR("after_PageDirectory::create_snap_for_dump");

// Only apply compact logs when files snapshot is valid
auto files_snap = wal->tryGetFilesSnapshot(
Expand All @@ -1853,12 +1854,12 @@ bool PageDirectory<Trait>::tryDumpSnapshot(const WriteLimiterPtr & write_limiter
files_snap.dump_elapsed_ms = watch.elapsedMilliseconds();
if constexpr (std::is_same_v<Trait, u128::PageDirectoryTrait>)
{
bool done_any_io = wal->saveSnapshot(std::move(files_snap), Trait::Serializer::serializeTo(edit), write_limiter);
bool done_any_io = wal->saveSnapshot(std::move(files_snap), Trait::Serializer::serializeTo(edit), snap->sequence, write_limiter);
return done_any_io;
}
else if constexpr (std::is_same_v<Trait, universal::PageDirectoryTrait>)
{
bool done_any_io = wal->saveSnapshot(std::move(files_snap), Trait::Serializer::serializeInCompressedFormTo(edit), write_limiter);
bool done_any_io = wal->saveSnapshot(std::move(files_snap), Trait::Serializer::serializeInCompressedFormTo(edit), snap->sequence, write_limiter);
return done_any_io;
}
}
Expand Down
7 changes: 4 additions & 3 deletions dbms/src/Storages/Page/V3/PageDirectory.h
Original file line number Diff line number Diff line change
Expand Up @@ -500,15 +500,16 @@ class PageDirectory

size_t copyCheckpointInfoFromEdit(const PageEntriesEdit & edit);

// Perform a GC for in-memory entries and return the removed entries.
// If `return_removed_entries` is false, then just return an empty set.
struct InMemGCOption
{
// if true collect the removed entries and return
// If true, gcInMemEntries will return the removed entries.
// If false, just return an empty set to reduce the memory
// and CPU overhead.
bool need_removed_entries = true;
// collect the valid size of remote ids if not nullptr
RemoteFileValidSizes * remote_valid_sizes = nullptr;
};
// Perform a GC for in-memory entries
PageEntries gcInMemEntries(const InMemGCOption & options);

// Get the external id that is not deleted or being ref by another id by
Expand Down
21 changes: 14 additions & 7 deletions dbms/src/Storages/Page/V3/PageDirectoryFactory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ PageDirectoryFactory<Trait>::dangerouslyCreateFromEditWithoutWAL(const String &
{
PageDirectoryPtr dir = std::make_unique<typename Trait::PageDirectory>(std::move(storage_name), nullptr);

loadEdit(dir, edit);
loadEdit(dir, edit, /*force_apply*/ true);
// Reset the `sequence` to the maximum of persisted.
dir->sequence = max_applied_ver.sequence;

Expand All @@ -114,7 +114,7 @@ PageDirectoryFactory<Trait>::createFromEditForTest(const String & storage_name,
r.version.sequence = ++mock_sequence;
}

loadEdit(dir, edit);
loadEdit(dir, edit, /*force_apply*/ true);
// Reset the `sequence` to the maximum of persisted.
dir->sequence = max_applied_ver.sequence;
RUNTIME_CHECK(dir->sequence, mock_sequence);
Expand Down Expand Up @@ -150,10 +150,14 @@ PageDirectoryFactory<Trait>::createFromEditForTest(const String & storage_name,
}

template <typename Trait>
void PageDirectoryFactory<Trait>::loadEdit(const PageDirectoryPtr & dir, const PageEntriesEdit & edit)
void PageDirectoryFactory<Trait>::loadEdit(const PageDirectoryPtr & dir, const PageEntriesEdit & edit, bool force_apply, UInt64 filter_seq)
{
for (const auto & r : edit.getRecords())
{
bool do_apply = force_apply || (filter_seq < r.version.sequence);
if (!do_apply)
continue;

if (max_applied_ver < r.version)
max_applied_ver = r.version;

Expand Down Expand Up @@ -295,9 +299,10 @@ template <typename Trait>
void PageDirectoryFactory<Trait>::loadFromDisk(const PageDirectoryPtr & dir, WALStoreReaderPtr && reader)
{
DataFileIdSet data_file_ids;
auto checkpoint_snap_seq = reader->getSnapSeqForCheckpoint();
while (reader->remained())
{
auto record = reader->next();
auto [from_checkpoint, record] = reader->next();
if (!record)
{
// TODO: Handle error, some error could be ignored.
Expand All @@ -308,16 +313,18 @@ void PageDirectoryFactory<Trait>::loadFromDisk(const PageDirectoryPtr & dir, WAL
break;
}

// apply the edit read
// The edits in later log files may have some overlap with the first checkpoint file.
// But we want to just apply each edit exactly once.
// So we will skip edits in later log files if they are already applied.
if constexpr (std::is_same_v<Trait, u128::FactoryTrait>)
{
auto edit = Trait::Serializer::deserializeFrom(record.value(), nullptr);
loadEdit(dir, edit);
loadEdit(dir, edit, from_checkpoint, checkpoint_snap_seq);
}
else if constexpr (std::is_same_v<Trait, universal::FactoryTrait>)
{
auto edit = Trait::Serializer::deserializeFrom(record.value(), &data_file_ids);
loadEdit(dir, edit);
loadEdit(dir, edit, from_checkpoint, checkpoint_snap_seq);
}
else
{
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Storages/Page/V3/PageDirectoryFactory.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ class PageDirectoryFactory

private:
void loadFromDisk(const PageDirectoryPtr & dir, WALStoreReaderPtr && reader);
void loadEdit(const PageDirectoryPtr & dir, const PageEntriesEdit & edit);
void loadEdit(const PageDirectoryPtr & dir, const PageEntriesEdit & edit, bool force_apply, UInt64 filter_seq = 0);
static void applyRecord(
const PageDirectoryPtr & dir,
const typename PageEntriesEdit::EditRecord & r);
Expand Down
31 changes: 20 additions & 11 deletions dbms/src/Storages/Page/V3/WAL/WALReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,12 +77,13 @@ LogFilenameSet WALStoreReader::listAllFiles(
break;
}
case LogFileStage::Temporary:
[[fallthrough]];
case LogFileStage::Invalid:
{
// TODO: clean
break;
}
case LogFileStage::Invalid:
{
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unknown logfile name, parent_path={} filename={}", parent_path, file.filename);
}
}
}
}
Expand Down Expand Up @@ -213,7 +214,8 @@ WALStoreReader::WALStoreReader(String storage_name,
const ReadLimiterPtr & read_limiter_)
: provider(provider_)
, read_limiter(read_limiter_)
, checkpoint_read_done(!checkpoint.has_value())
, checkpoint_reader_created(!checkpoint.has_value())
, reading_checkpoint_file(false)
, checkpoint_file(checkpoint)
, files_to_read(std::move(files_))
, next_reading_file(files_to_read.begin())
Expand All @@ -228,12 +230,17 @@ bool WALStoreReader::remained() const

if (!reader->isEOF())
return true;
if (checkpoint_read_done && next_reading_file != files_to_read.end())
if (checkpoint_reader_created && next_reading_file != files_to_read.end())
return true;
return false;
}

std::optional<String> WALStoreReader::next()
UInt64 WALStoreReader::getSnapSeqForCheckpoint() const
{
return checkpoint_file.has_value() ? checkpoint_file->snap_seq : 0;
}

std::pair<bool, std::optional<String>> WALStoreReader::next()
{
bool ok = false;
String record;
Expand All @@ -242,34 +249,36 @@ std::optional<String> WALStoreReader::next()
std::tie(ok, record) = reader->readRecord();
if (ok)
{
return record;
return std::make_pair(reading_checkpoint_file, record);
}

// Roll to read the next file
if (bool next_file = openNextFile(); !next_file)
{
// No more file to be read.
return std::nullopt;
return std::make_pair(false, std::nullopt);
}
} while (true);
}

bool WALStoreReader::openNextFile()
{
if (checkpoint_read_done && next_reading_file == files_to_read.end())
if (checkpoint_reader_created && next_reading_file == files_to_read.end())
{
return false;
}

if (!checkpoint_read_done)
if (!checkpoint_reader_created)
{
reader = createLogReader(*checkpoint_file, provider, &reporter, recovery_mode, read_limiter, logger);
checkpoint_read_done = true;
checkpoint_reader_created = true;
reading_checkpoint_file = true;
}
else
{
reader = createLogReader(*next_reading_file, provider, &reporter, recovery_mode, read_limiter, logger);
++next_reading_file;
reading_checkpoint_file = false;
}
return true;
}
Expand Down
8 changes: 6 additions & 2 deletions dbms/src/Storages/Page/V3/WAL/WALReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,10 @@ class WALStoreReader

bool remained() const;

std::optional<String> next();
UInt64 getSnapSeqForCheckpoint() const;

// std::pair<from_checkpoint, record>
std::pair<bool, std::optional<String>> next();

void throwIfError() const
{
Expand Down Expand Up @@ -122,7 +125,8 @@ class WALStoreReader
ReportCollector reporter;
const ReadLimiterPtr read_limiter;

bool checkpoint_read_done;
bool checkpoint_reader_created;
bool reading_checkpoint_file;
const std::optional<LogFilename> checkpoint_file;
const LogFilenameSet files_to_read;
LogFilenameSet::const_iterator next_reading_file;
Expand Down
7 changes: 5 additions & 2 deletions dbms/src/Storages/Page/V3/WALStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -99,14 +99,15 @@ Format::LogNumberType WALStore::rollToNewLogWriter(const std::lock_guard<std::mu
{
// Roll to a new log file
auto log_num = last_log_num++;
auto [new_log_file, filename] = createLogWriter({log_num, 0}, false);
auto [new_log_file, filename] = createLogWriter({log_num, 0}, /*snap_sequence*/ 0, false);
UNUSED(filename);
log_file.swap(new_log_file);
return log_num;
}

std::tuple<std::unique_ptr<LogWriter>, LogFilename> WALStore::createLogWriter(
const std::pair<Format::LogNumberType, Format::LogNumberType> & new_log_lvl,
UInt64 snap_sequence,
bool temp_file)
{
String path;
Expand All @@ -133,6 +134,7 @@ std::tuple<std::unique_ptr<LogWriter>, LogFilename> WALStore::createLogWriter(
(temp_file ? LogFileStage::Temporary : LogFileStage::Normal),
new_log_lvl.first,
new_log_lvl.second,
snap_sequence,
0,
path};
auto filename = log_filename.filename(log_filename.stage);
Expand Down Expand Up @@ -217,6 +219,7 @@ WALStore::FilesSnapshot WALStore::tryGetFilesSnapshot(size_t max_persisted_log_f
bool WALStore::saveSnapshot(
FilesSnapshot && files_snap,
String && serialized_snap,
UInt64 snap_sequence,
const WriteLimiterPtr & write_limiter)
{
if (files_snap.persisted_log_files.empty())
Expand All @@ -227,7 +230,7 @@ bool WALStore::saveSnapshot(
// Use {largest_log_num, 1} to save the `edit`
const auto log_num = files_snap.persisted_log_files.rbegin()->log_num;
// Create a temporary file for saving directory snapshot
auto [compact_log, log_filename] = createLogWriter({log_num, 1}, /*temp_file*/ true);
auto [compact_log, log_filename] = createLogWriter({log_num, 1}, snap_sequence, /*temp_file*/ true);

// TODO: split the snap into multiple records in LogFile so that the memory
// consumption could be more smooth.
Expand Down
2 changes: 2 additions & 0 deletions dbms/src/Storages/Page/V3/WALStore.h
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ class WALStore
bool saveSnapshot(
FilesSnapshot && files_snap,
String && serialized_snap,
UInt64 snap_sequence,
const WriteLimiterPtr & write_limiter = nullptr);

const String & name() { return storage_name; }
Expand All @@ -113,6 +114,7 @@ class WALStore
std::tuple<std::unique_ptr<LogWriter>, LogFilename>
createLogWriter(
const std::pair<Format::LogNumberType, Format::LogNumberType> & new_log_lvl,
UInt64 snap_sequence,
bool temp_file);

Format::LogNumberType rollToNewLogWriter(const std::lock_guard<std::mutex> &);
Expand Down
Loading

0 comments on commit 8427604

Please sign in to comment.