From 952cb0a878973c867b4a94c33799e1283e8d5fa6 Mon Sep 17 00:00:00 2001 From: jiaqizho Date: Fri, 20 May 2022 01:12:39 +0800 Subject: [PATCH] Fix ref a del id failed in mix mode. (#4933) close pingcap/tiflash#4923 --- .../DeltaMerge/ColumnFile/ColumnFileBig.cpp | 10 +- .../DeltaMerge/ColumnFile/ColumnFileBig.h | 12 +- .../Delta/ColumnFilePersistedSet.cpp | 9 +- .../DeltaMerge/Delta/ColumnFilePersistedSet.h | 4 +- .../Storages/DeltaMerge/Delta/MemTableSet.cpp | 9 +- .../Storages/DeltaMerge/DeltaMergeStore.cpp | 12 +- dbms/src/Storages/DeltaMerge/File/DMFile.cpp | 6 +- dbms/src/Storages/DeltaMerge/File/DMFile.h | 18 ++- dbms/src/Storages/DeltaMerge/Segment.cpp | 21 +-- .../Storages/DeltaMerge/StableValueSpace.cpp | 12 +- dbms/src/Storages/DeltaMerge/StoragePool.cpp | 102 ++++++++++++++- dbms/src/Storages/DeltaMerge/StoragePool.h | 1 + .../DeltaMerge/tests/gtest_dm_file.cpp | 4 +- dbms/src/Storages/Page/PageStorage.cpp | 120 +++++++++++------- dbms/src/Storages/Page/PageStorage.h | 3 +- .../Page/V2/VersionSet/PageEntriesView.h | 6 +- dbms/src/Storages/Page/V3/PageDirectory.h | 1 + dbms/src/Storages/Page/V3/PageStorageImpl.cpp | 4 +- .../Page/V3/tests/gtest_page_storage.cpp | 28 ++++ .../V3/tests/gtest_page_storage_mix_mode.cpp | 79 ++++++++++++ 20 files changed, 358 insertions(+), 103 deletions(-) diff --git a/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileBig.cpp b/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileBig.cpp index 306f9470f5d..3328f60b15d 100644 --- a/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileBig.cpp +++ b/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileBig.cpp @@ -56,7 +56,7 @@ ColumnFileBig::getReader(const DMContext & context, const StorageSnapshotPtr & / void ColumnFileBig::serializeMetadata(WriteBuffer & buf, bool /*save_schema*/) const { - writeIntBinary(file->refId(), buf); + writeIntBinary(file->pageId(), buf); writeIntBinary(valid_rows, buf); writeIntBinary(valid_bytes, buf); } @@ -65,17 +65,17 @@ ColumnFilePersistedPtr ColumnFileBig::deserializeMetadata(DMContext & context, / const RowKeyRange & segment_range, ReadBuffer & buf) { - UInt64 file_ref_id; + UInt64 file_page_id; size_t valid_rows, valid_bytes; - readIntBinary(file_ref_id, buf); + readIntBinary(file_page_id, buf); readIntBinary(valid_rows, buf); readIntBinary(valid_bytes, buf); - auto file_id = context.storage_pool.dataReader()->getNormalPageId(file_ref_id); + auto file_id = context.storage_pool.dataReader()->getNormalPageId(file_page_id); auto file_parent_path = context.path_pool.getStableDiskDelegator().getDTFilePath(file_id); - auto dmfile = DMFile::restore(context.db_context.getFileProvider(), file_id, file_ref_id, file_parent_path, DMFile::ReadMetaMode::all()); + auto dmfile = DMFile::restore(context.db_context.getFileProvider(), file_id, file_page_id, file_parent_path, DMFile::ReadMetaMode::all()); auto * dp_file = new ColumnFileBig(dmfile, valid_rows, valid_bytes, segment_range); return std::shared_ptr(dp_file); diff --git a/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileBig.h b/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileBig.h index ced90112392..dba7eca7247 100644 --- a/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileBig.h +++ b/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileBig.h @@ -33,8 +33,8 @@ class ColumnFileBig : public ColumnFilePersisted private: DMFilePtr file; - size_t valid_rows; - size_t valid_bytes; + size_t valid_rows = 0; + size_t valid_bytes = 0; RowKeyRange segment_range; @@ -67,17 +67,17 @@ class ColumnFileBig : public ColumnFilePersisted auto getFile() const { return file; } - PageId getDataPageId() { return file->refId(); } + PageId getDataPageId() { return file->pageId(); } size_t getRows() const override { return valid_rows; } size_t getBytes() const override { return valid_bytes; }; void removeData(WriteBatches & wbs) const override { - // Here we remove the ref id instead of file_id. - // Because a dmfile could be used in serveral places, and only after all ref_ids are removed, + // Here we remove the data id instead of file_id. + // Because a dmfile could be used in several places, and only after all page ids are removed, // then the file_id got removed. - wbs.removed_data.delPage(file->refId()); + wbs.removed_data.delPage(file->pageId()); } ColumnFileReaderPtr diff --git a/dbms/src/Storages/DeltaMerge/Delta/ColumnFilePersistedSet.cpp b/dbms/src/Storages/DeltaMerge/Delta/ColumnFilePersistedSet.cpp index 289caf5816c..33ef262d557 100644 --- a/dbms/src/Storages/DeltaMerge/Delta/ColumnFilePersistedSet.cpp +++ b/dbms/src/Storages/DeltaMerge/Delta/ColumnFilePersistedSet.cpp @@ -216,11 +216,14 @@ ColumnFilePersisteds ColumnFilePersistedSet::checkHeadAndCloneTail(DMContext & c else if (auto * b_file = column_file->tryToBigFile(); b_file) { auto delegator = context.path_pool.getStableDiskDelegator(); - auto new_ref_id = context.storage_pool.newDataPageIdForDTFile(delegator, __PRETTY_FUNCTION__); + auto new_page_id = context.storage_pool.newDataPageIdForDTFile(delegator, __PRETTY_FUNCTION__); + // Note that the file id may has already been mark as deleted. We must + // create a reference to the page id itself instead of create a reference + // to the file id. + wbs.data.putRefPage(new_page_id, b_file->getDataPageId()); auto file_id = b_file->getFile()->fileId(); - wbs.data.putRefPage(new_ref_id, file_id); auto file_parent_path = delegator.getDTFilePath(file_id); - auto new_file = DMFile::restore(context.db_context.getFileProvider(), file_id, /* ref_id= */ new_ref_id, file_parent_path, DMFile::ReadMetaMode::all()); + auto new_file = DMFile::restore(context.db_context.getFileProvider(), file_id, /* page_id= */ new_page_id, file_parent_path, DMFile::ReadMetaMode::all()); auto new_big_file = b_file->cloneWith(context, new_file, target_range); cloned_tail.push_back(new_big_file); diff --git a/dbms/src/Storages/DeltaMerge/Delta/ColumnFilePersistedSet.h b/dbms/src/Storages/DeltaMerge/Delta/ColumnFilePersistedSet.h index 5fcd9b8c618..1580ae747da 100644 --- a/dbms/src/Storages/DeltaMerge/Delta/ColumnFilePersistedSet.h +++ b/dbms/src/Storages/DeltaMerge/Delta/ColumnFilePersistedSet.h @@ -57,8 +57,8 @@ class ColumnFilePersistedSet : public std::enable_shared_from_this persisted_files_count; - std::atomic persisted_files_level_count; + std::atomic persisted_files_count = 0; + std::atomic persisted_files_level_count = 0; std::atomic rows = 0; std::atomic bytes = 0; diff --git a/dbms/src/Storages/DeltaMerge/Delta/MemTableSet.cpp b/dbms/src/Storages/DeltaMerge/Delta/MemTableSet.cpp index bab8f352cad..d339b699d8d 100644 --- a/dbms/src/Storages/DeltaMerge/Delta/MemTableSet.cpp +++ b/dbms/src/Storages/DeltaMerge/Delta/MemTableSet.cpp @@ -104,11 +104,14 @@ ColumnFiles MemTableSet::cloneColumnFiles(DMContext & context, const RowKeyRange else if (auto * f = column_file->tryToBigFile(); f) { auto delegator = context.path_pool.getStableDiskDelegator(); - auto new_ref_id = context.storage_pool.newDataPageIdForDTFile(delegator, __PRETTY_FUNCTION__); + auto new_page_id = context.storage_pool.newDataPageIdForDTFile(delegator, __PRETTY_FUNCTION__); + // Note that the file id may has already been mark as deleted. We must + // create a reference to the page id itself instead of create a reference + // to the file id. + wbs.data.putRefPage(new_page_id, f->getDataPageId()); auto file_id = f->getFile()->fileId(); - wbs.data.putRefPage(new_ref_id, file_id); auto file_parent_path = delegator.getDTFilePath(file_id); - auto new_file = DMFile::restore(context.db_context.getFileProvider(), file_id, /* ref_id= */ new_ref_id, file_parent_path, DMFile::ReadMetaMode::all()); + auto new_file = DMFile::restore(context.db_context.getFileProvider(), file_id, /* page_id= */ new_page_id, file_parent_path, DMFile::ReadMetaMode::all()); auto new_column_file = f->cloneWith(context, new_file, target_range); cloned_column_files.push_back(new_column_file); diff --git a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp index 5583b0039b7..997db601d1e 100644 --- a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp +++ b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp @@ -314,8 +314,8 @@ void DeltaMergeStore::setUpBackgroundTask(const DMContextPtr & dm_context) if (valid_ids.count(id)) continue; - // Note that ref_id is useless here. - auto dmfile = DMFile::restore(global_context.getFileProvider(), id, /* ref_id= */ 0, path, DMFile::ReadMetaMode::none()); + // Note that page_id is useless here. + auto dmfile = DMFile::restore(global_context.getFileProvider(), id, /* page_id= */ 0, path, DMFile::ReadMetaMode::none()); if (dmfile->canGC()) { delegate.removeDTFile(dmfile->fileId()); @@ -820,14 +820,14 @@ void DeltaMergeStore::ingestFiles( /// Generate DMFile instance with a new ref_id pointed to the file_id. auto file_id = file->fileId(); const auto & file_parent_path = file->parentPath(); - auto ref_id = storage_pool->newDataPageIdForDTFile(delegate, __PRETTY_FUNCTION__); + auto page_id = storage_pool->newDataPageIdForDTFile(delegate, __PRETTY_FUNCTION__); - auto ref_file = DMFile::restore(file_provider, file_id, ref_id, file_parent_path, DMFile::ReadMetaMode::all()); + auto ref_file = DMFile::restore(file_provider, file_id, page_id, file_parent_path, DMFile::ReadMetaMode::all()); auto column_file = std::make_shared(*dm_context, ref_file, segment_range); if (column_file->getRows() != 0) { column_files.emplace_back(std::move(column_file)); - wbs.data.putRefPage(ref_id, file_id); + wbs.data.putRefPage(page_id, file->pageId()); } } @@ -2343,7 +2343,7 @@ void DeltaMergeStore::restoreStableFiles() { for (const auto & file_id : DMFile::listAllInPath(file_provider, root_path, options)) { - auto dmfile = DMFile::restore(file_provider, file_id, /* ref_id= */ 0, root_path, DMFile::ReadMetaMode::diskSizeOnly()); + auto dmfile = DMFile::restore(file_provider, file_id, /* page_id= */ 0, root_path, DMFile::ReadMetaMode::diskSizeOnly()); path_delegate.addDTFile(file_id, dmfile->getBytesOnDisk(), root_path); } } diff --git a/dbms/src/Storages/DeltaMerge/File/DMFile.cpp b/dbms/src/Storages/DeltaMerge/File/DMFile.cpp index d40a6bf5bab..fe984ad519f 100644 --- a/dbms/src/Storages/DeltaMerge/File/DMFile.cpp +++ b/dbms/src/Storages/DeltaMerge/File/DMFile.cpp @@ -146,7 +146,7 @@ DMFilePtr DMFile::create(UInt64 file_id, const String & parent_path, bool single DMFilePtr DMFile::restore( const FileProviderPtr & file_provider, UInt64 file_id, - UInt64 ref_id, + UInt64 page_id, const String & parent_path, const ReadMetaMode & read_meta_mode) { @@ -154,7 +154,7 @@ DMFilePtr DMFile::restore( bool single_file_mode = Poco::File(path).isFile(); DMFilePtr dmfile(new DMFile( file_id, - ref_id, + page_id, parent_path, single_file_mode ? Mode::SINGLE_FILE : Mode::FOLDER, Status::READABLE, @@ -556,7 +556,7 @@ void DMFile::readMetadata(const FileProviderPtr & file_provider, const ReadMetaM DB::readIntBinary(footer.sub_file_num, buf); // initialize sub file state buf.seek(footer.sub_file_stat_offset, SEEK_SET); - SubFileStat sub_file_stat; + SubFileStat sub_file_stat{}; for (UInt32 i = 0; i < footer.sub_file_num; i++) { String name; diff --git a/dbms/src/Storages/DeltaMerge/File/DMFile.h b/dbms/src/Storages/DeltaMerge/File/DMFile.h index e74048d0196..48d4071d595 100644 --- a/dbms/src/Storages/DeltaMerge/File/DMFile.h +++ b/dbms/src/Storages/DeltaMerge/File/DMFile.h @@ -141,7 +141,9 @@ class DMFile : private boost::noncopyable struct SubFileStat { - SubFileStat() = default; + SubFileStat() + : SubFileStat(0, 0) + {} SubFileStat(UInt64 offset_, UInt64 size_) : offset{offset_} , size{size_} @@ -195,7 +197,7 @@ class DMFile : private boost::noncopyable static DMFilePtr restore( const FileProviderPtr & file_provider, UInt64 file_id, - UInt64 ref_id, + UInt64 page_id, const String & parent_path, const ReadMetaMode & read_meta_mode); @@ -216,8 +218,10 @@ class DMFile : private boost::noncopyable void enableGC(); void remove(const FileProviderPtr & file_provider); + // The ID for locating DTFile on disk UInt64 fileId() const { return file_id; } - UInt64 refId() const { return ref_id; } + // The PageID for locating this object in the StoragePool.data + UInt64 pageId() const { return page_id; } String path() const; @@ -289,14 +293,14 @@ class DMFile : private boost::noncopyable private: DMFile(UInt64 file_id_, - UInt64 ref_id_, + UInt64 page_id_, String parent_path_, Mode mode_, Status status_, Poco::Logger * log_, DMConfigurationOpt configuration_ = std::nullopt) : file_id(file_id_) - , ref_id(ref_id_) + , page_id(page_id_) , parent_path(std::move(parent_path_)) , mode(mode_) , status(status_) @@ -395,8 +399,10 @@ class DMFile : private boost::noncopyable void initializeIndices(); private: + // The id to construct the file path on disk. UInt64 file_id; - UInt64 ref_id; // It is a reference to file_id, could be the same. + // It is the page_id that represent this file in the PageStorage. It could be the same as file id. + UInt64 page_id; String parent_path; PackStats pack_stats; diff --git a/dbms/src/Storages/DeltaMerge/Segment.cpp b/dbms/src/Storages/DeltaMerge/Segment.cpp index e8cde4ba0c6..9e195b2b25d 100644 --- a/dbms/src/Storages/DeltaMerge/Segment.cpp +++ b/dbms/src/Storages/DeltaMerge/Segment.cpp @@ -919,27 +919,30 @@ std::optional Segment::prepareSplitLogical(DMContext & dm_co auto delegate = dm_context.path_pool.getStableDiskDelegator(); for (const auto & dmfile : segment_snap->stable->getDMFiles()) { - auto ori_ref_id = dmfile->refId(); + auto ori_page_id = dmfile->pageId(); auto file_id = dmfile->fileId(); auto file_parent_path = delegate.getDTFilePath(file_id); - auto my_dmfile_id = storage_pool.newDataPageIdForDTFile(delegate, __PRETTY_FUNCTION__); - auto other_dmfile_id = storage_pool.newDataPageIdForDTFile(delegate, __PRETTY_FUNCTION__); + auto my_dmfile_page_id = storage_pool.newDataPageIdForDTFile(delegate, __PRETTY_FUNCTION__); + auto other_dmfile_page_id = storage_pool.newDataPageIdForDTFile(delegate, __PRETTY_FUNCTION__); - wbs.data.putRefPage(my_dmfile_id, file_id); - wbs.data.putRefPage(other_dmfile_id, file_id); - wbs.removed_data.delPage(ori_ref_id); + // Note that the file id may has already been mark as deleted. We must + // create a reference to the page id itself instead of create a reference + // to the file id. + wbs.data.putRefPage(my_dmfile_page_id, ori_page_id); + wbs.data.putRefPage(other_dmfile_page_id, ori_page_id); + wbs.removed_data.delPage(ori_page_id); auto my_dmfile = DMFile::restore( dm_context.db_context.getFileProvider(), file_id, - /* ref_id= */ my_dmfile_id, + /* page_id= */ my_dmfile_page_id, file_parent_path, DMFile::ReadMetaMode::all()); auto other_dmfile = DMFile::restore( dm_context.db_context.getFileProvider(), file_id, - /* ref_id= */ other_dmfile_id, + /* page_id= */ other_dmfile_page_id, file_parent_path, DMFile::ReadMetaMode::all()); @@ -1059,7 +1062,7 @@ std::optional Segment::prepareSplitPhysical(DMContext & dm_c { // Here we should remove the ref id instead of file_id. // Because a dmfile could be used by several segments, and only after all ref_ids are removed, then the file_id removed. - wbs.removed_data.delPage(file->refId()); + wbs.removed_data.delPage(file->pageId()); } LOG_FMT_INFO(log, "Segment [{}] prepare split physical done", segment_id); diff --git a/dbms/src/Storages/DeltaMerge/StableValueSpace.cpp b/dbms/src/Storages/DeltaMerge/StableValueSpace.cpp index 752a112b565..ed97bd8f421 100644 --- a/dbms/src/Storages/DeltaMerge/StableValueSpace.cpp +++ b/dbms/src/Storages/DeltaMerge/StableValueSpace.cpp @@ -80,7 +80,7 @@ void StableValueSpace::saveMeta(WriteBatch & meta_wb) writeIntBinary(valid_bytes, buf); writeIntBinary(static_cast(files.size()), buf); for (auto & f : files) - writeIntBinary(f->refId(), buf); + writeIntBinary(f->pageId(), buf); auto data_size = buf.count(); // Must be called before tryGetReadBuffer. meta_wb.putPage(id, 0, buf.tryGetReadBuffer(), data_size); @@ -100,15 +100,15 @@ StableValueSpacePtr StableValueSpace::restore(DMContext & context, PageId id) readIntBinary(valid_rows, buf); readIntBinary(valid_bytes, buf); readIntBinary(size, buf); - UInt64 ref_id; + UInt64 page_id; for (size_t i = 0; i < size; ++i) { - readIntBinary(ref_id, buf); + readIntBinary(page_id, buf); - auto file_id = context.storage_pool.dataReader()->getNormalPageId(ref_id); + auto file_id = context.storage_pool.dataReader()->getNormalPageId(page_id); auto file_parent_path = context.path_pool.getStableDiskDelegator().getDTFilePath(file_id); - auto dmfile = DMFile::restore(context.db_context.getFileProvider(), file_id, ref_id, file_parent_path, DMFile::ReadMetaMode::all()); + auto dmfile = DMFile::restore(context.db_context.getFileProvider(), file_id, page_id, file_parent_path, DMFile::ReadMetaMode::all()); stable->files.push_back(dmfile); } @@ -168,7 +168,7 @@ void StableValueSpace::recordRemovePacksPages(WriteBatches & wbs) const { // Here we should remove the ref id instead of file_id. // Because a dmfile could be used by several segments, and only after all ref_ids are removed, then the file_id removed. - wbs.removed_data.delPage(file->refId()); + wbs.removed_data.delPage(file->pageId()); } } diff --git a/dbms/src/Storages/DeltaMerge/StoragePool.cpp b/dbms/src/Storages/DeltaMerge/StoragePool.cpp index b94cc3c1735..fbb631064a7 100644 --- a/dbms/src/Storages/DeltaMerge/StoragePool.cpp +++ b/dbms/src/Storages/DeltaMerge/StoragePool.cpp @@ -18,6 +18,10 @@ #include #include #include +#include +#include +#include +#include #include @@ -244,6 +248,8 @@ StoragePool::StoragePool(Context & global_ctx, NamespaceId ns_id_, StoragePathPo void StoragePool::forceTransformMetaV2toV3() { + if (unlikely(run_mode != PageStorageRunMode::MIX_MODE)) + throw Exception(fmt::format("Transform meta must run under mix mode [run_mode={}]", static_cast(run_mode))); assert(meta_storage_v2 != nullptr); assert(meta_storage_v3 != nullptr); auto meta_transform_storage_writer = std::make_shared(run_mode, meta_storage_v2, meta_storage_v3); @@ -288,6 +294,80 @@ void StoragePool::forceTransformMetaV2toV3() meta_transform_storage_writer->writeIntoV2(std::move(write_batch_del_v2), nullptr); } +static inline DB::PS::V2::PageEntriesVersionSetWithDelta::Snapshot * +toV2ConcreteSnapshot(const DB::PageStorage::SnapshotPtr & ptr) +{ + return dynamic_cast(ptr.get()); +} + +void StoragePool::forceTransformDataV2toV3() +{ + if (unlikely(run_mode != PageStorageRunMode::MIX_MODE)) + throw Exception(fmt::format("Transform meta must run under mix mode [run_mode={}]", static_cast(run_mode))); + assert(data_storage_v2 != nullptr); + assert(data_storage_v3 != nullptr); + auto data_transform_storage_writer = std::make_shared(run_mode, data_storage_v2, data_storage_v3); + + auto snapshot = data_storage_v2->getSnapshot("transformDataV2toV3"); + auto * v2_snap = toV2ConcreteSnapshot(snapshot); + if (!snapshot || !v2_snap) + { + throw Exception("Can not allocate snapshot from pool.data v2", ErrorCodes::LOGICAL_ERROR); + } + + // Example + // 100 -> 100 + // 102 -> 100 + // 105 -> 100 + // 200 -> 200 + // 305 -> 300 + // Migration steps: + // collect v2 valid page id: 100, 102, 105, 200, 305 + // v3 put external 100, 200, 300; put ref 102, 105, 305 + // mark some id as deleted: v3 del 300 + // v2 delete 100, 102, 105, 200, 305 + + // The page ids that can be accessed by DeltaTree + const auto all_page_ids = v2_snap->view.validPageIds(); + + WriteBatch write_batch_transform{ns_id}; + WriteBatch write_batch_del_v2{ns_id}; + + std::set created_dt_file_id; + for (const auto page_id : all_page_ids) + { + // resolve the page_id into dtfile id + const auto resolved_file_id = v2_snap->view.resolveRefId(page_id); + if (auto ins_result = created_dt_file_id.insert(resolved_file_id); /*created=*/ins_result.second) + { + // first see this file id, migrate to v3 + write_batch_transform.putExternal(resolved_file_id, 0); + } + // migrate the reference for v3 + if (page_id != resolved_file_id) + { + write_batch_transform.putRefPage(page_id, resolved_file_id); + } + // record del for V2 + write_batch_del_v2.delPage(page_id); + } + // If the file id is not existed in `all_page_ids`, it means the file id + // itself has been deleted. + for (const auto dt_file_id : created_dt_file_id) + { + if (all_page_ids.count(dt_file_id) == 0) + { + write_batch_transform.delPage(dt_file_id); + } + } + + // Will rewrite into V3. + data_transform_storage_writer->writeIntoV3(std::move(write_batch_transform), nullptr); + + // DEL must call after rewrite. + data_transform_storage_writer->writeIntoV2(std::move(write_batch_del_v2), nullptr); +} + PageStorageRunMode StoragePool::restore() { switch (run_mode) @@ -324,10 +404,10 @@ PageStorageRunMode StoragePool::restore() // However, the pages on meta V2 can not be deleted. As the pages in meta are small, we perform a forceTransformMetaV2toV3 to convert pages before all. if (const auto & meta_remain_pages = meta_storage_v2->getNumberOfPages(); meta_remain_pages != 0) { - LOG_FMT_INFO(logger, "Current meta transform to V3 begin, [ns_id={}] [pages_before_transform={}]", ns_id, meta_remain_pages); + LOG_FMT_INFO(logger, "Current pool.meta transform to V3 begin [ns_id={}] [pages_before_transform={}]", ns_id, meta_remain_pages); forceTransformMetaV2toV3(); const auto & meta_remain_pages_after_transform = meta_storage_v2->getNumberOfPages(); - LOG_FMT_INFO(logger, "Current meta transform to V3 finished. [ns_id={}] [done={}] [pages_before_transform={}], [pages_after_transform={}]", // + LOG_FMT_INFO(logger, "Current pool.meta transform to V3 finished [ns_id={}] [done={}] [pages_before_transform={}], [pages_after_transform={}]", // ns_id, meta_remain_pages_after_transform == 0, meta_remain_pages, @@ -335,7 +415,23 @@ PageStorageRunMode StoragePool::restore() } else { - LOG_FMT_INFO(logger, "Current meta translate already done before restored.[ns_id={}] ", ns_id); + LOG_FMT_INFO(logger, "Current pool.meta translate already done before restored [ns_id={}] ", ns_id); + } + + if (const auto & data_remain_pages = data_storage_v2->getNumberOfPages(); data_remain_pages != 0) + { + LOG_FMT_INFO(logger, "Current pool.data transform to V3 begin [ns_id={}] [pages_before_transform={}]", ns_id, data_remain_pages); + forceTransformDataV2toV3(); + const auto & data_remain_pages_after_transform = data_storage_v2->getNumberOfPages(); + LOG_FMT_INFO(logger, "Current pool.data transform to V3 finished [ns_id={}] [done={}] [pages_before_transform={}], [pages_after_transform={}]", // + ns_id, + data_remain_pages_after_transform == 0, + data_remain_pages, + data_remain_pages_after_transform); + } + else + { + LOG_FMT_INFO(logger, "Current pool.data translate already done before restored [ns_id={}] ", ns_id); } // Check number of valid pages in v2 diff --git a/dbms/src/Storages/DeltaMerge/StoragePool.h b/dbms/src/Storages/DeltaMerge/StoragePool.h index f106ac725e4..7ba6dd85995 100644 --- a/dbms/src/Storages/DeltaMerge/StoragePool.h +++ b/dbms/src/Storages/DeltaMerge/StoragePool.h @@ -160,6 +160,7 @@ class StoragePool : private boost::noncopyable bool doV2Gc(const Settings & settings); void forceTransformMetaV2toV3(); + void forceTransformDataV2toV3(); #ifndef DBMS_PUBLIC_GTEST private: diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_file.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_file.cpp index 3ddf318509f..dfd4419fe38 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_file.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_file.cpp @@ -135,10 +135,10 @@ class DMFile_Test DMFilePtr restoreDMFile() { auto file_id = dm_file->fileId(); - auto ref_id = dm_file->refId(); + auto page_id = dm_file->pageId(); auto parent_path = dm_file->parentPath(); auto file_provider = dbContext().getFileProvider(); - return DMFile::restore(file_provider, file_id, ref_id, parent_path, DMFile::ReadMetaMode::all()); + return DMFile::restore(file_provider, file_id, page_id, parent_path, DMFile::ReadMetaMode::all()); } diff --git a/dbms/src/Storages/Page/PageStorage.cpp b/dbms/src/Storages/Page/PageStorage.cpp index 4a8dca05a71..6e1addae093 100644 --- a/dbms/src/Storages/Page/PageStorage.cpp +++ b/dbms/src/Storages/Page/PageStorage.cpp @@ -475,6 +475,8 @@ void PageWriter::writeIntoMixMode(WriteBatch && write_batch, WriteLimiterPtr wri // We need hold mem from V2 pages after write. std::list mem_holders; + std::set page_ids_before_ref; + for (const auto & write : write_batch.getWrites()) { switch (write.type) @@ -483,6 +485,7 @@ void PageWriter::writeIntoMixMode(WriteBatch && write_batch, WriteLimiterPtr wri case WriteBatch::WriteType::PUT: case WriteBatch::WriteType::PUT_EXTERNAL: { + page_ids_before_ref.insert(write.page_id); break; } // Both need del in v2 and v3 @@ -493,57 +496,84 @@ void PageWriter::writeIntoMixMode(WriteBatch && write_batch, WriteLimiterPtr wri } case WriteBatch::WriteType::REF: { + // 1. Try to resolve normal page id PageId resolved_page_id = storage_v3->getNormalPageId(ns_id, write.ori_page_id, /*snapshot*/ nullptr, false); - // If the normal id is not found in v3, read from v2 and create a new put + ref - if (resolved_page_id == INVALID_PAGE_ID) + + // If the origin id is found in V3, then just apply the ref to v3 + if (resolved_page_id != INVALID_PAGE_ID) + { + break; + } + + // 2. Check ori_page_id in current writebatch + if (page_ids_before_ref.count(write.ori_page_id) > 0) { - const auto & entry_for_put = storage_v2->getEntry(ns_id, write.ori_page_id, /*snapshot*/ {}); - if (entry_for_put.isValid()) - { - auto page_for_put = storage_v2->read(ns_id, write.ori_page_id); - - // Keep the mem hold, no need create new one. - mem_holders.emplace_back(page_for_put.mem_holder); - assert(entry_for_put.size == page_for_put.data.size()); - - // Page with fields - if (!entry_for_put.field_offsets.empty()) - { - wb_for_put_v3.putPage(write.ori_page_id, // - entry_for_put.tag, - std::make_shared(page_for_put.data.begin(), page_for_put.data.size()), - page_for_put.data.size(), - Page::fieldOffsetsToSizes(entry_for_put.field_offsets, entry_for_put.size)); - } - else - { // Normal page with fields - wb_for_put_v3.putPage(write.ori_page_id, // - entry_for_put.tag, - std::make_shared(page_for_put.data.begin(), - page_for_put.data.size()), - page_for_put.data.size()); - } - - LOG_FMT_INFO( - Logger::get("PageWriter"), - "Can't find the origin page in v3, migrate a new being ref page into V3 [page_id={}] [origin_id={}] [field_offsets={}]", - write.page_id, - write.ori_page_id, - entry_for_put.field_offsets.size()); - } - else - { - throw Exception(fmt::format("Can't find origin entry in V2 and V3, [ns_id={}, ori_page_id={}]", - ns_id, - write.ori_page_id), - ErrorCodes::LOGICAL_ERROR); - } + break; } - // else V3 found the origin one. - // Then do nothing. + + // Else the normal id is not found in v3, read from v2 and create a new put + ref + + // 3. Check ori_page_id in V2 + const auto & entry_for_put = storage_v2->getEntry(ns_id, write.ori_page_id, /*snapshot*/ {}); + + // If we can't find origin id in V3, must exist in V2. + if (!entry_for_put.isValid()) + { + throw Exception(fmt::format("Can't find origin entry in V2 and V3, [ns_id={}, ori_page_id={}]", + ns_id, + write.ori_page_id), + ErrorCodes::LOGICAL_ERROR); + } + + if (entry_for_put.size == 0) + { + // If the origin page size is 0. + // That means origin page in V2 is a external page id. + // Should not run into here after we introduce `StoragePool::forceTransformDataV2toV3` + throw Exception(fmt::format( + "Can't find the origin page in v3. Origin page in v2 size is 0, meaning it's a external id." + "Migrate a new being ref page into V3 [page_id={}] [origin_id={}]", + write.page_id, + write.ori_page_id, + entry_for_put.field_offsets.size()), + ErrorCodes::LOGICAL_ERROR); + } + + // Else find out origin page is a normal page in V2 + auto page_for_put = storage_v2->read(ns_id, write.ori_page_id); + + // Keep the mem holder for later write + mem_holders.emplace_back(page_for_put.mem_holder); + assert(entry_for_put.size == page_for_put.data.size()); + + // Page with fields + if (!entry_for_put.field_offsets.empty()) + { + wb_for_put_v3.putPage(write.ori_page_id, // + entry_for_put.tag, + std::make_shared(page_for_put.data.begin(), page_for_put.data.size()), + page_for_put.data.size(), + Page::fieldOffsetsToSizes(entry_for_put.field_offsets, entry_for_put.size)); + } + else + { // Normal page without fields + wb_for_put_v3.putPage(write.ori_page_id, // + entry_for_put.tag, + std::make_shared(page_for_put.data.begin(), + page_for_put.data.size()), + page_for_put.data.size()); + } + + LOG_FMT_INFO( + Logger::get("PageWriter"), + "Can't find the origin page in v3, migrate a new being ref page into V3 [page_id={}] [origin_id={}] [field_offsets={}]", + write.page_id, + write.ori_page_id, + entry_for_put.field_offsets.size()); + break; } default: diff --git a/dbms/src/Storages/Page/PageStorage.h b/dbms/src/Storages/Page/PageStorage.h index 06f3be5d1f7..481888bdf33 100644 --- a/dbms/src/Storages/Page/PageStorage.h +++ b/dbms/src/Storages/Page/PageStorage.h @@ -393,9 +393,10 @@ class PageWriter : private boost::noncopyable // Only used for META and KVStore write del. void writeIntoV2(WriteBatch && write_batch, WriteLimiterPtr write_limiter) const; -private: + // Only used for DATA transform data void writeIntoV3(WriteBatch && write_batch, WriteLimiterPtr write_limiter) const; +private: void writeIntoMixMode(WriteBatch && write_batch, WriteLimiterPtr write_limiter) const; // A wrap of getSettings only used for `RegionPersister::gc` diff --git a/dbms/src/Storages/Page/V2/VersionSet/PageEntriesView.h b/dbms/src/Storages/Page/V2/VersionSet/PageEntriesView.h index 6a099626108..fe51c629663 100644 --- a/dbms/src/Storages/Page/V2/VersionSet/PageEntriesView.h +++ b/dbms/src/Storages/Page/V2/VersionSet/PageEntriesView.h @@ -14,6 +14,10 @@ #pragma once +#include +#include +#include + #include namespace DB::PS::V2 @@ -52,9 +56,9 @@ class PageEntriesView size_t numPages() const; size_t numNormalPages() const; -private: PageId resolveRefId(PageId page_id) const; +private: friend class DeltaVersionEditAcceptor; }; diff --git a/dbms/src/Storages/Page/V3/PageDirectory.h b/dbms/src/Storages/Page/V3/PageDirectory.h index 635cf04bfe6..14833245c7a 100644 --- a/dbms/src/Storages/Page/V3/PageDirectory.h +++ b/dbms/src/Storages/Page/V3/PageDirectory.h @@ -326,6 +326,7 @@ class PageDirectory { return getNormalPageId(page_id, toConcreteSnapshot(snap), throw_on_not_exist); } + #ifndef NDEBUG // Just for tests, refactor them out later PageIDAndEntryV3 get(PageId page_id, const PageDirectorySnapshotPtr & snap) const diff --git a/dbms/src/Storages/Page/V3/PageStorageImpl.cpp b/dbms/src/Storages/Page/V3/PageStorageImpl.cpp index 6966f794de9..8aa9f92675c 100644 --- a/dbms/src/Storages/Page/V3/PageStorageImpl.cpp +++ b/dbms/src/Storages/Page/V3/PageStorageImpl.cpp @@ -254,8 +254,8 @@ void PageStorageImpl::traverseImpl(const std::functiongetAllPageIds(); for (const auto & valid_page : page_ids) { - const auto & page_entries = page_directory->get(valid_page, snapshot); - acceptor(blob_store.read(page_entries)); + const auto & page_id_and_entry = page_directory->get(valid_page, snapshot); + acceptor(blob_store.read(page_id_and_entry)); } } diff --git a/dbms/src/Storages/Page/V3/tests/gtest_page_storage.cpp b/dbms/src/Storages/Page/V3/tests/gtest_page_storage.cpp index b0c2625466d..91dfcaac6a8 100644 --- a/dbms/src/Storages/Page/V3/tests/gtest_page_storage.cpp +++ b/dbms/src/Storages/Page/V3/tests/gtest_page_storage.cpp @@ -1301,5 +1301,33 @@ try } CATCH + +TEST_F(PageStorageTest, putExternalAfterRestore) +try +{ + { + WriteBatch batch; + batch.putExternal(1999, 0); + page_storage->write(std::move(batch)); + } + + page_storage = reopenWithConfig(config); + + auto alive_ids = page_storage->getAliveExternalPageIds(TEST_NAMESPACE_ID); + ASSERT_EQ(alive_ids.size(), 1); + ASSERT_EQ(*alive_ids.begin(), 1999); + + { + WriteBatch batch; + batch.putExternal(1999, 0); + page_storage->write(std::move(batch)); + } + + alive_ids = page_storage->getAliveExternalPageIds(TEST_NAMESPACE_ID); + ASSERT_EQ(alive_ids.size(), 1); + ASSERT_EQ(*alive_ids.begin(), 1999); +} +CATCH + } // namespace PS::V3::tests } // namespace DB diff --git a/dbms/src/Storages/Page/V3/tests/gtest_page_storage_mix_mode.cpp b/dbms/src/Storages/Page/V3/tests/gtest_page_storage_mix_mode.cpp index d3fdafe57e8..98d84989dd9 100644 --- a/dbms/src/Storages/Page/V3/tests/gtest_page_storage_mix_mode.cpp +++ b/dbms/src/Storages/Page/V3/tests/gtest_page_storage_mix_mode.cpp @@ -19,6 +19,7 @@ #include #include #include +#include namespace DB { @@ -490,5 +491,83 @@ try } CATCH + +TEST_F(PageStorageMixedTest, MockDTIngest) +try +{ + { + WriteBatch batch; + batch.putExternal(100, 0); + page_writer_v2->write(std::move(batch), nullptr); + } + + ASSERT_EQ(reloadMixedStoragePool(), PageStorageRunMode::MIX_MODE); + { + // create dmf_1999 + // ingest to segment, create ref 2001 -> 1999 + // after ingest done, del 1999 + WriteBatch batch; + batch.putExternal(1999, 0); + batch.putRefPage(2001, 1999); + batch.delPage(1999); + ASSERT_NO_THROW(page_writer_mix->write(std::move(batch), nullptr)); + } + + { + // mock that create ref by dtfile id, should fail + WriteBatch batch; + batch.putRefPage(2012, 1999); + ASSERT_ANY_THROW(page_writer_mix->write(std::move(batch), nullptr)); + } + + { + // mock that create ref by page id of dtfile, should be ok + WriteBatch batch; + batch.putRefPage(2012, 2001); + ASSERT_NO_THROW(page_writer_mix->write(std::move(batch), nullptr)); + } + + // check 2012 -> 2001 => 2021 -> 1999 + ASSERT_EQ(page_reader_mix->getNormalPageId(2012), 1999); + + { + // Revert v3 + WriteBatch batch; + batch.delPage(2012); + batch.delPage(2001); + page_writer_mix->write(std::move(batch), nullptr); + } +} +CATCH + + +TEST_F(PageStorageMixedTest, RefV2External) +try +{ + { + WriteBatch batch; + batch.putExternal(100, 0); + batch.putRefPage(101, 100); + batch.delPage(100); + page_writer_v2->write(std::move(batch), nullptr); + } + + ASSERT_EQ(reloadMixedStoragePool(), PageStorageRunMode::MIX_MODE); + { + WriteBatch batch; + batch.putRefPage(102, 101); + // Should not run into this case after we introduce `StoragePool::forceTransformDataV2toV3` + ASSERT_ANY_THROW(page_writer_mix->write(std::move(batch), nullptr);); + } + { + // Revert v3 + WriteBatch batch; + batch.delPage(102); + page_writer_mix->write(std::move(batch), nullptr); + } +} +CATCH + + } // namespace PS::V3::tests } // namespace DB