From e25d205a2b6de1cba8f7bdcb7a63e74d47e2922e Mon Sep 17 00:00:00 2001 From: JaySon-Huang Date: Wed, 28 Oct 2020 11:53:31 +0800 Subject: [PATCH 1/8] RegionPersister run in compatibility mode Signed-off-by: JaySon-Huang --- dbms/src/Interpreters/Context.cpp | 17 ++- dbms/src/Interpreters/Context.h | 7 +- dbms/src/Interpreters/IDAsPathUpgrader.cpp | 14 +- dbms/src/Server/Server.cpp | 71 ++++++--- .../Storages/DeltaMerge/DeltaMergeStore.cpp | 2 +- .../DeltaMerge/tests/gtest_dm_file.cpp | 6 +- .../DeltaMerge/tests/gtest_dm_segment.cpp | 4 +- .../tests/gtest_dm_segment_common_handle.cpp | 2 +- dbms/src/Storages/Page/PageFile.cpp | 6 +- dbms/src/Storages/Page/PageFile.h | 2 +- dbms/src/Storages/Page/PageStorage.cpp | 27 ++++ dbms/src/Storages/Page/PageStorage.h | 2 + .../Page/tests/gtest_data_compactor.cpp | 2 +- .../Page/tests/gtest_legacy_compactor.cpp | 2 +- .../Page/tests/gtest_page_storage.cpp | 2 +- .../tests/gtest_page_storage_multi_paths.cpp | 2 +- .../gtest_page_storage_multi_writers.cpp | 2 +- dbms/src/Storages/PathPool.cpp | 139 +++++++++++++++++- dbms/src/Storages/PathPool.h | 58 +++++++- dbms/src/Storages/Transaction/KVStore.cpp | 4 +- dbms/src/Storages/Transaction/KVStore.h | 2 +- .../Storages/Transaction/RegionPersister.cpp | 118 ++++++++++++--- .../Storages/Transaction/RegionPersister.h | 19 ++- dbms/src/Storages/Transaction/TMTContext.cpp | 4 +- dbms/src/Storages/Transaction/TMTContext.h | 4 +- dbms/src/Storages/tests/gtest_path_pool.cpp | 58 +++++++- dbms/src/test_utils/TiflashTestBasic.h | 15 +- 27 files changed, 488 insertions(+), 103 deletions(-) diff --git a/dbms/src/Interpreters/Context.cpp b/dbms/src/Interpreters/Context.cpp index 5f5bda1ee04..78dcf2ac844 100644 --- a/dbms/src/Interpreters/Context.cpp +++ b/dbms/src/Interpreters/Context.cpp @@ -124,7 +124,7 @@ struct ContextShared String tmp_path; /// The path to the temporary files that occur when processing the request. String flags_path; /// Path to the directory with some control flags for server maintenance. String user_files_path; /// Path to the directory with user provided files, usable by 'file' table function. - PathPool extra_paths; /// The extra data directories. Some Storage Engine like DeltaMerge will store the main data in them if specified. + PathPool path_pool; /// The data directories. RegionPersister and some Storage Engine like DeltaMerge will use this to manage data placement on disks. ConfigurationPtr config; /// Global configuration settings. Databases databases; /// List of databases and tables in them. @@ -515,10 +515,10 @@ String Context::getUserFilesPath() const return shared->user_files_path; } -PathPool & Context::getExtraPaths() const +PathPool & Context::getPathPool() const { auto lock = getLock(); - return shared->extra_paths; + return shared->path_pool; } void Context::setPath(const String & path) @@ -555,13 +555,17 @@ void Context::setUserFilesPath(const String & path) shared->user_files_path = path; } -void Context::setExtraPaths(const Strings & main_data_paths, +void Context::setPathPool( // + const Strings & main_data_paths, const Strings & latest_data_paths, + const Strings & kvstore_paths, + bool enable_raft_storage_compatibility_mode, PathCapacityMetricsPtr global_capacity_, FileProviderPtr file_provider_) { auto lock = getLock(); - shared->extra_paths = PathPool(main_data_paths, latest_data_paths, global_capacity_, file_provider_); + shared->path_pool = PathPool( + main_data_paths, latest_data_paths, kvstore_paths, global_capacity_, file_provider_, enable_raft_storage_compatibility_mode); } void Context::setConfig(const ConfigurationPtr & config) @@ -1449,7 +1453,6 @@ DDLWorker & Context::getDDLWorker() const void Context::createTMTContext(const std::vector & pd_addrs, const std::unordered_set & ignore_databases, - const std::string & kvstore_path, ::TiDB::StorageEngine engine, bool disable_bg_flush, pingcap::ClusterConfig cluster_config) @@ -1457,7 +1460,7 @@ void Context::createTMTContext(const std::vector & pd_addrs, auto lock = getLock(); if (shared->tmt_context) throw Exception("TMTContext has already existed", ErrorCodes::LOGICAL_ERROR); - shared->tmt_context = std::make_shared(*this, pd_addrs, ignore_databases, kvstore_path, engine, disable_bg_flush, cluster_config); + shared->tmt_context = std::make_shared(*this, pd_addrs, ignore_databases, engine, disable_bg_flush, cluster_config); } void Context::initializePathCapacityMetric(const std::vector & all_path, size_t capacity_quota) diff --git a/dbms/src/Interpreters/Context.h b/dbms/src/Interpreters/Context.h index 235d8090394..71d8e3fbd63 100644 --- a/dbms/src/Interpreters/Context.h +++ b/dbms/src/Interpreters/Context.h @@ -167,15 +167,17 @@ class Context String getTemporaryPath() const; String getFlagsPath() const; String getUserFilesPath() const; - PathPool & getExtraPaths() const; + PathPool & getPathPool() const; void setPath(const String & path); void setTemporaryPath(const String & path); void setFlagsPath(const String & path); void setUserFilesPath(const String & path); - void setExtraPaths(const Strings & main_data_paths, + void setPathPool(const Strings & main_data_paths, const Strings & latest_data_paths, + const Strings & kvstore_paths, + bool enable_raft_storage_compatibility_mode, PathCapacityMetricsPtr global_capacity_, FileProviderPtr file_provider); @@ -392,7 +394,6 @@ class Context void createTMTContext(const std::vector & pd_addrs, const std::unordered_set & ignore_databases, - const std::string & kvstore_path, ::TiDB::StorageEngine engine, bool disable_bg_tasks, pingcap::ClusterConfig cluster_config = {}); diff --git a/dbms/src/Interpreters/IDAsPathUpgrader.cpp b/dbms/src/Interpreters/IDAsPathUpgrader.cpp index c4f110004f9..0eb17a55df7 100644 --- a/dbms/src/Interpreters/IDAsPathUpgrader.cpp +++ b/dbms/src/Interpreters/IDAsPathUpgrader.cpp @@ -371,7 +371,7 @@ void IDAsPathUpgrader::DatabaseDiskInfo::renameToTmpDirectories(const Context & true); // Rename database data dir for multi-paths - auto root_pool = ctx.getExtraPaths(); + auto root_pool = ctx.getPathPool(); for (const auto & extra_path : root_pool.listPaths()) renamePath( // doGetExtraDirectory(extra_path, /*escape*/ true, /*tmp*/ false), // @@ -481,7 +481,7 @@ static void dropAbsentDatabase( const String old_data_dir_not_escaped = db_info.getDataDirectory(root_path, false); tryRemoveDirectory(old_data_dir_not_escaped, log, true); - const auto & data_extra_paths = context.getExtraPaths(); + const auto & data_extra_paths = context.getPathPool(); for (const auto & extra_root_path : data_extra_paths.listPaths()) { tryRemoveDirectory(db_info.getExtraDirectory(extra_root_path), log, true); @@ -562,7 +562,7 @@ void IDAsPathUpgrader::fixNotEscapedDirectories() if (Poco::File dir(escaped_db_data_dir); !dir.exists()) dir.createDirectory(); - const auto & data_extra_paths = global_context.getExtraPaths(); + const auto & data_extra_paths = global_context.getPathPool(); for (const auto & extra_root_path : data_extra_paths.listPaths()) { auto escaped_extra_dir = db_info.getExtraDirectory(extra_root_path, /*escape=*/true); @@ -611,7 +611,7 @@ void IDAsPathUpgrader::fixNotEscapedDirectories() } // Fix extra path. - const auto & data_extra_paths = global_context.getExtraPaths(); + const auto & data_extra_paths = global_context.getPathPool(); for (const auto & extra_root_path : data_extra_paths.listPaths()) { // It was created by old PathPool, both database name and table name are not escaped. @@ -631,7 +631,7 @@ void IDAsPathUpgrader::fixNotEscapedDirectories() // clean not escaped database dir created by old PathPool const String not_escaped_data_dir = db_info.getDataDirectory(root_path, /*escape*/ false); tryRemoveDirectory(not_escaped_data_dir, log, true); - const auto & data_extra_paths = global_context.getExtraPaths(); + const auto & data_extra_paths = global_context.getPathPool(); for (const auto & extra_root_path : data_extra_paths.listPaths()) { auto not_escaped_extra_data_dir = db_info.getExtraDirectory(extra_root_path, /*escape*/ false); @@ -735,7 +735,7 @@ void IDAsPathUpgrader::renameDatabase(const String & db_name, const DatabaseDisk // Remove old data dir const String old_data_dir = db_info.getDataDirectory(root_path); tryRemoveDirectory(old_data_dir, log); - const auto & data_extra_paths = global_context.getExtraPaths(); + const auto & data_extra_paths = global_context.getPathPool(); for (const auto & extra_root_path : data_extra_paths.listPaths()) { tryRemoveDirectory(db_info.getExtraDirectory(extra_root_path), log); @@ -762,7 +762,7 @@ void IDAsPathUpgrader::renameTable( { // Rename data path for multi disk - auto data_extra_paths = global_context.getExtraPaths(); + auto data_extra_paths = global_context.getPathPool(); for (const auto & extra_root_path : data_extra_paths.listPaths()) { auto old_tbl_extra_data_path = table.getExtraDirectory(extra_root_path, db_info); diff --git a/dbms/src/Server/Server.cpp b/dbms/src/Server/Server.cpp index 6c41df71a87..b129ad40399 100644 --- a/dbms/src/Server/Server.cpp +++ b/dbms/src/Server/Server.cpp @@ -92,6 +92,19 @@ static std::string getCanonicalPath(std::string path) return path; } +static Strings parseMultiplePaths(String s, const String & logging_prefix, Poco::Logger * log) +{ + Poco::trimInPlace(s); + Strings res; + Poco::StringTokenizer string_tokens(s, ","); + for (auto it = string_tokens.begin(); it != string_tokens.end(); it++) + { + res.emplace_back(getCanonicalPath(std::string(*it))); + LOG_INFO(log, logging_prefix << " data candidate path: " << std::string(*it)); + } + return res; +} + void Server::uninitialize() { logger().information("shutting down"); @@ -152,23 +165,24 @@ struct TiFlashRaftConfig { const std::string engine_key = "engine"; const std::string engine_value = "tiflash"; - std::vector pd_addrs; + Strings pd_addrs; std::unordered_set ignore_databases{"system"}; - std::string kvstore_path; + Strings kvstore_path; // Actually it is "flash.service_addr" std::string flash_server_addr; + bool enable_storage_compatibility_mode = true; static const TiDB::StorageEngine DEFAULT_ENGINE = TiDB::StorageEngine::DT; bool disable_bg_flush = false; TiDB::StorageEngine engine = DEFAULT_ENGINE; public: - TiFlashRaftConfig(const std::string & path, Poco::Util::LayeredConfiguration & config, Poco::Logger * log); + TiFlashRaftConfig(const Strings & latest_data_paths, Poco::Util::LayeredConfiguration & config, Poco::Logger * log); }; /// Load raft related configs. -TiFlashRaftConfig::TiFlashRaftConfig(const std::string & path, Poco::Util::LayeredConfiguration & config, Poco::Logger * log) - : ignore_databases{"system"}, kvstore_path{path + "kvstore/"} +TiFlashRaftConfig::TiFlashRaftConfig(const Strings & latest_data_paths, Poco::Util::LayeredConfiguration & config, Poco::Logger * log) + : ignore_databases{"system"}, kvstore_path{} { flash_server_addr = config.getString("flash.service_addr", "0.0.0.0:3930"); @@ -210,7 +224,17 @@ TiFlashRaftConfig::TiFlashRaftConfig(const std::string & path, Poco::Util::Layer if (config.has("raft.kvstore_path")) { - kvstore_path = config.getString("raft.kvstore_path"); + kvstore_path = parseMultiplePaths(config.getString("raft.kvstore_path"), "Raft", log); + if (kvstore_path.empty()) + { + LOG_INFO(log, "The configuration \"raft.kvstore_path\" is empty, generate the paths from \"latest_data_path\""); + for (const auto & s : latest_data_paths) + { + String path = Poco::Path{s + "/kvstore"}.toString(); + LOG_INFO(log, "Raft data candidate path: " << path); + kvstore_path.emplace_back(std::move(path)); + } + } } if (config.has("raft.storage_engine")) @@ -245,6 +269,11 @@ TiFlashRaftConfig::TiFlashRaftConfig(const std::string & path, Poco::Util::Layer ErrorCodes::INVALID_CONFIG_PARAMETER); disable_bg_flush = true; } + + if (config.has("raft.enable_storage_compatibility_mode")) + { + enable_storage_compatibility_mode = config.getBool("raft.enable_storage_compatibility_mode"); + } } } @@ -383,6 +412,11 @@ int Server::main(const std::vector & /*args*/) } /// ===== Paths related configuration initialized start ===== /// + /// Note that theses global variables should be initialized by the following order: + // 1. capacity + // 2. path pool + // 3. TMTContext + // TODO: remove this configuration left by ClickHouse std::vector all_fast_path; @@ -428,19 +462,7 @@ int Server::main(const std::vector & /*args*/) Strings main_data_paths, latest_data_paths; if (config().has("main_data_path")) { - auto parse_multiple_paths = [&log](String s, const String & logging_prefix) -> Strings { - Poco::trimInPlace(s); - Strings res; - Poco::StringTokenizer string_tokens(s, ","); - for (auto it = string_tokens.begin(); it != string_tokens.end(); it++) - { - res.emplace_back(getCanonicalPath(std::string(*it))); - LOG_INFO(log, logging_prefix << " data candidate path: " << std::string(*it)); - } - return res; - }; - - main_data_paths = parse_multiple_paths(config().getString("main_data_path"), "Main"); + main_data_paths = parseMultiplePaths(config().getString("main_data_path"), "Main", log); if (main_data_paths.empty()) { String error_msg @@ -450,7 +472,7 @@ int Server::main(const std::vector & /*args*/) } if (config().has("latest_data_path")) - latest_data_paths = parse_multiple_paths(config().getString("latest_data_path"), "Latest"); + latest_data_paths = parseMultiplePaths(config().getString("latest_data_path"), "Latest", log); if (latest_data_paths.empty()) { LOG_INFO(log, "The configuration \"latest_data_paths\" is empty, use the same paths of \"main_data_path\""); @@ -516,10 +538,12 @@ int Server::main(const std::vector & /*args*/) } global_context->initializePathCapacityMetric(all_normal_path, capacity); - global_context->setExtraPaths(main_data_paths, latest_data_paths, global_context->getPathCapacity(), global_context->getFileProvider()); - const std::string path = all_normal_path[0]; - TiFlashRaftConfig raft_config(path, config(), log); + TiFlashRaftConfig raft_config(latest_data_paths, config(), log); + global_context->setPathPool(main_data_paths, latest_data_paths, raft_config.kvstore_path, // + raft_config.enable_storage_compatibility_mode, // + global_context->getPathCapacity(), global_context->getFileProvider()); + // Use pd address to define which default_database we use by defauly. // For mock test, we use "default". For deployed with pd/tidb/tikv use "system", which is always exist in TiFlash. std::string default_database = config().getString("default_database", raft_config.pd_addrs.empty() ? "default" : "system"); @@ -763,7 +787,6 @@ int Server::main(const std::vector & /*args*/) auto cluster_config = getClusterConfig(security_config, raft_config); global_context->createTMTContext(raft_config.pd_addrs, raft_config.ignore_databases, - raft_config.kvstore_path, raft_config.engine, raft_config.disable_bg_flush, cluster_config); diff --git a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp index 819ac58045e..393cb828f74 100644 --- a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp +++ b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp @@ -120,7 +120,7 @@ DeltaMergeStore::DeltaMergeStore(Context & db_context, size_t rowkey_column_size_, const Settings & settings_) : global_context(db_context.getGlobalContext()), - path_pool(global_context.getExtraPaths().withTable(db_name_, table_name_, data_path_contains_database_name)), + path_pool(global_context.getPathPool().withTable(db_name_, table_name_, data_path_contains_database_name)), settings(settings_), storage_pool(db_name_ + "." + table_name_, path_pool, global_context, db_context.getSettingsRef()), db_name(db_name_), diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_file.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_file.cpp index 26a89d1b516..d9f0daa938b 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_file.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_file.cpp @@ -41,7 +41,7 @@ class DMFile_Test : public ::testing::Test auto & ctx = DMTestEnv::getContext(); auto settings = DB::Settings(); - path_pool = std::make_unique(ctx.getExtraPaths().withTable("test", "t1", false)); + path_pool = std::make_unique(ctx.getPathPool().withTable("test", "t1", false)); storage_pool = std::make_unique("test.t1", *path_pool, ctx, settings); dm_file = DMFile::create(0, parent_path); db_context = std::make_unique(DMTestEnv::getContext(settings)); @@ -65,7 +65,7 @@ class DMFile_Test : public ::testing::Test *table_columns_ = *cols; auto & ctx = DMTestEnv::getContext(); - *path_pool = ctx.getExtraPaths().withTable("test", "t1", false); + *path_pool = ctx.getPathPool().withTable("test", "t1", false); dm_context = std::make_unique( // *db_context, *path_pool, @@ -917,7 +917,7 @@ class DMFile_Clustered_Index_Test : public ::testing::Test auto settings = DB::Settings(); auto & ctx = DMTestEnv::getContext(); - path_pool = std::make_unique(ctx.getExtraPaths().withTable("test", "t", false)); + path_pool = std::make_unique(ctx.getPathPool().withTable("test", "t", false)); storage_pool = std::make_unique("test.t1", *path_pool, ctx, settings); dm_file = DMFile::create(0, path); db_context = std::make_unique(DMTestEnv::getContext(settings)); diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_segment.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_segment.cpp index 6a54821133f..35e6d3d6007 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_segment.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_segment.cpp @@ -34,7 +34,7 @@ class Segment_test : public ::testing::Test void SetUp() override { db_context = std::make_unique(DMTestEnv::getContext(DB::Settings())); - storage_path_pool = std::make_unique(db_context->getExtraPaths().withTable("test", "t1", false)); + storage_path_pool = std::make_unique(db_context->getPathPool().withTable("test", "t1", false)); storage_path_pool->drop(true); table_columns_ = std::make_shared(); dropDataOnDisk(); @@ -47,7 +47,7 @@ class Segment_test : public ::testing::Test SegmentPtr reload(const ColumnDefinesPtr & pre_define_columns = {}, DB::Settings && db_settings = DB::Settings()) { *db_context = DMTestEnv::getContext(db_settings); - storage_path_pool = std::make_unique(db_context->getExtraPaths().withTable("test", "t1", false)); + storage_path_pool = std::make_unique(db_context->getPathPool().withTable("test", "t1", false)); storage_pool = std::make_unique("test.t1", *storage_path_pool, *db_context, db_context->getSettingsRef()); storage_pool->restore(); ColumnDefinesPtr cols = (!pre_define_columns) ? DMTestEnv::getDefaultColumns() : pre_define_columns; diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_segment_common_handle.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_segment_common_handle.cpp index e76469e3a42..0e2e34e872a 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_segment_common_handle.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_segment_common_handle.cpp @@ -46,7 +46,7 @@ class Segment_Common_Handle_test : public ::testing::Test { *db_context = DMTestEnv::getContext(db_settings); auto & ctx = DMTestEnv::getContext(); - path_pool = std::make_unique(ctx.getExtraPaths().withTable("test", "t", false)); + path_pool = std::make_unique(ctx.getPathPool().withTable("test", "t", false)); storage_pool = std::make_unique("test.t1", *path_pool, ctx, db_context->getSettingsRef()); storage_pool->restore(); ColumnDefinesPtr cols = (!pre_define_columns) ? DMTestEnv::getDefaultColumns(is_common_handle) : pre_define_columns; diff --git a/dbms/src/Storages/Page/PageFile.cpp b/dbms/src/Storages/Page/PageFile.cpp index e7db27fe1c1..edc5e49f74e 100644 --- a/dbms/src/Storages/Page/PageFile.cpp +++ b/dbms/src/Storages/Page/PageFile.cpp @@ -254,7 +254,7 @@ bool PageFile::MetaMergingReader::hasNext() const return (status == Status::Uninitialized) || (status == Status::Opened && meta_file_offset < meta_size); } -void PageFile::MetaMergingReader::moveNext() +void PageFile::MetaMergingReader::moveNext(PageFile::Version * v) { curr_edit.clear(); curr_write_batch_sequence = 0; @@ -297,6 +297,10 @@ void PageFile::MetaMergingReader::moveNext() throw Exception("PageFile binary version not match, unknown version: " + DB::toString(binary_version), ErrorCodes::LOGICAL_ERROR); } + // return the binary_version if `v` is not null + if (unlikely(v != nullptr)) + *v = binary_version; + // check the checksum of WriteBatch const auto wb_bytes_without_checksum = wb_bytes - sizeof(PageMetaFormat::Checksum); const auto wb_checksum = PageUtil::get(wb_start_pos + wb_bytes_without_checksum); diff --git a/dbms/src/Storages/Page/PageFile.h b/dbms/src/Storages/Page/PageFile.h index 0637f3bb7b6..082fbc48abf 100644 --- a/dbms/src/Storages/Page/PageFile.h +++ b/dbms/src/Storages/Page/PageFile.h @@ -132,7 +132,7 @@ class PageFile : public Allocator public: bool hasNext() const; - void moveNext(); + void moveNext(PageFile::Version * v = nullptr); PageEntriesEdit getEdits() { return std::move(curr_edit); } diff --git a/dbms/src/Storages/Page/PageStorage.cpp b/dbms/src/Storages/Page/PageStorage.cpp index ace4844d837..5c25509aa80 100644 --- a/dbms/src/Storages/Page/PageStorage.cpp +++ b/dbms/src/Storages/Page/PageStorage.cpp @@ -60,6 +60,33 @@ bool PageStorage::StatisticsInfo::equals(const StatisticsInfo & rhs) return puts == rhs.puts && refs == rhs.refs && deletes == rhs.deletes && upserts == rhs.upserts; } +PageFile::Version PageStorage::getMinDataVersion(const FileProviderPtr & file_provider, PSDiskDelegatorPtr & delegator) +{ + Poco::Logger * log = &Poco::Logger::get("PageStorage::getMinDataVersion"); + ListPageFilesOption option; + option.ignore_checkpoint = true; + option.ignore_legacy = true; + option.remove_tmp_files = false; + auto page_files = listAllPageFiles(file_provider, delegator, log, option); + if (page_files.empty()) + return PageFile::CURRENT_VERSION; + + // Simply check the first PageFile is good enough + auto reader = const_cast(*page_files.begin()).createMetaMergingReader(); + + PageFile::Version min_binary_version = PageFile::CURRENT_VERSION, temp_version; + reader->moveNext(&temp_version); + min_binary_version = std::min(min_binary_version, temp_version); + while (reader->hasNext()) + { + // Continue to read the binary version of next WriteBatch. + reader->moveNext(&temp_version); + min_binary_version = std::min(min_binary_version, temp_version); + } + LOG_TRACE(log, "getMinDataVersion done from " + reader->toString()); + return min_binary_version; +} + PageFileSet PageStorage::listAllPageFiles(const FileProviderPtr & file_provider, PSDiskDelegatorPtr & delegator, Poco::Logger * page_file_log, diff --git a/dbms/src/Storages/Page/PageStorage.h b/dbms/src/Storages/Page/PageStorage.h index 05ea1e96394..af7b1513274 100644 --- a/dbms/src/Storages/Page/PageStorage.h +++ b/dbms/src/Storages/Page/PageStorage.h @@ -155,6 +155,8 @@ class PageStorage Poco::Logger * page_file_log, const ListPageFilesOption & option = ListPageFilesOption()); + static PageFile::Version getMinDataVersion(const FileProviderPtr & file_provider, PSDiskDelegatorPtr & delegator); + private: WriterPtr checkAndRenewWriter(PageFile & page_file, const String & parent_path_hint, diff --git a/dbms/src/Storages/Page/tests/gtest_data_compactor.cpp b/dbms/src/Storages/Page/tests/gtest_data_compactor.cpp index 24667719505..cbf0d5526a4 100644 --- a/dbms/src/Storages/Page/tests/gtest_data_compactor.cpp +++ b/dbms/src/Storages/Page/tests/gtest_data_compactor.cpp @@ -37,7 +37,7 @@ try auto & ctx = TiFlashTestEnv::getContext(DB::Settings(), test_paths); const FileProviderPtr file_provider = ctx.getFileProvider(); - auto pool = ctx.getExtraPaths().withTable("test", "t", false); + auto pool = ctx.getPathPool().withTable("test", "t", false); auto delegate = pool.getPSDiskDelegatorMulti("log"); PageStorage storage("data_compact_test", delegate, config, file_provider); diff --git a/dbms/src/Storages/Page/tests/gtest_legacy_compactor.cpp b/dbms/src/Storages/Page/tests/gtest_legacy_compactor.cpp index acdb2e2155c..b42ca264b65 100644 --- a/dbms/src/Storages/Page/tests/gtest_legacy_compactor.cpp +++ b/dbms/src/Storages/Page/tests/gtest_legacy_compactor.cpp @@ -167,7 +167,7 @@ try auto & ctx = TiFlashTestEnv::getContext(); const FileProviderPtr file_provider = ctx.getFileProvider(); - StoragePathPool spool = ctx.getExtraPaths().withTable("test", "t", false); + StoragePathPool spool = ctx.getPathPool().withTable("test", "t", false); auto delegator = spool.getPSDiskDelegatorSingle("meta"); PageStorage storage("compact_test", delegator, PageStorage::Config{}, file_provider); diff --git a/dbms/src/Storages/Page/tests/gtest_page_storage.cpp b/dbms/src/Storages/Page/tests/gtest_page_storage.cpp index 5f1f19e5221..9021815427f 100644 --- a/dbms/src/Storages/Page/tests/gtest_page_storage.cpp +++ b/dbms/src/Storages/Page/tests/gtest_page_storage.cpp @@ -44,7 +44,7 @@ class PageStorage_test : public ::testing::Test { // drop dir if exists auto & ctx = TiFlashTestEnv::getContext(); - path_pool = std::make_unique(ctx.getExtraPaths().withTable("test", "t1", false)); + path_pool = std::make_unique(ctx.getPathPool().withTable("test", "t1", false)); for (const auto & p : path_pool->getPSDiskDelegatorSingle("log")->listPaths()) { if (Poco::File file(p); file.exists()) diff --git a/dbms/src/Storages/Page/tests/gtest_page_storage_multi_paths.cpp b/dbms/src/Storages/Page/tests/gtest_page_storage_multi_paths.cpp index 95e9cd132d1..6078c19960f 100644 --- a/dbms/src/Storages/Page/tests/gtest_page_storage_multi_paths.cpp +++ b/dbms/src/Storages/Page/tests/gtest_page_storage_multi_paths.cpp @@ -92,7 +92,7 @@ try size_t number_of_paths = GetParam(); auto all_paths = getMultiTestPaths(number_of_paths); auto capacity = std::make_shared(all_paths, 0); - StoragePathPool pool = PathPool(all_paths, all_paths, capacity, file_provider).withTable("test", "table", false); + StoragePathPool pool = PathPool(all_paths, all_paths, Strings{}, capacity, file_provider).withTable("test", "table", false); storage = std::make_shared("test.table", pool.getPSDiskDelegatorMulti("log"), config, file_provider); storage->restore(); diff --git a/dbms/src/Storages/Page/tests/gtest_page_storage_multi_writers.cpp b/dbms/src/Storages/Page/tests/gtest_page_storage_multi_writers.cpp index 611ba46e854..08e0058d961 100644 --- a/dbms/src/Storages/Page/tests/gtest_page_storage_multi_writers.cpp +++ b/dbms/src/Storages/Page/tests/gtest_page_storage_multi_writers.cpp @@ -65,7 +65,7 @@ class PageStorageMultiWriters_test : public ::testing::Test std::shared_ptr reopenWithConfig(const PageStorage::Config & config_) { - auto spool = TiFlashTestEnv::getContext().getExtraPaths().withTable("test", "t", false); + auto spool = TiFlashTestEnv::getContext().getPathPool().withTable("test", "t", false); auto delegator = spool.getPSDiskDelegatorSingle("log"); auto storage = std::make_shared("test.t", delegator, config_, file_provider); storage->restore(); diff --git a/dbms/src/Storages/PathPool.cpp b/dbms/src/Storages/PathPool.cpp index 836db3cb44a..424a41b9044 100644 --- a/dbms/src/Storages/PathPool.cpp +++ b/dbms/src/Storages/PathPool.cpp @@ -24,13 +24,29 @@ extern const int LOGICAL_ERROR; } // Constructor to be used during initialization -PathPool::PathPool(const Strings & main_data_paths_, const Strings & latest_data_paths_, // - PathCapacityMetricsPtr global_capacity_, FileProviderPtr file_provider_) +PathPool::PathPool(const Strings & main_data_paths_, const Strings & latest_data_paths_, const Strings & kvstore_paths_, // + PathCapacityMetricsPtr global_capacity_, FileProviderPtr file_provider_, bool enable_raft_storage_compatibility_mode_) : main_data_paths(main_data_paths_), latest_data_paths(latest_data_paths_), + kvstore_paths(kvstore_paths_), + enable_raft_storage_compatibility_mode(enable_raft_storage_compatibility_mode_), global_capacity(global_capacity_), - file_provider(file_provider_) -{} + file_provider(file_provider_), + log(&Poco::Logger::get("PathPool")) +{ + if (kvstore_paths.empty()) + { + // Set default path generated from latest_data_paths + for (const auto & s : latest_data_paths) + { + // Get a normalized path without trailing '/' + auto p = Poco::Path{s + "/kvstore"}.toString(); + if (p.back() == '/') + p.erase(p.begin() + p.size() - 1); + kvstore_paths.emplace_back(std::move(p)); + } + } +} StoragePathPool PathPool::withTable(const String & database_, const String & table_, bool path_need_database_name_) const { @@ -50,6 +66,8 @@ Strings PathPool::listPaths() const return paths; } +PSDiskDelegatorPtr PathPool::getPSDiskDelegatorRaft() { return std::make_shared(*this); } + //========================================================================================== // StoragePathPool //========================================================================================== @@ -482,5 +500,118 @@ void PSDiskDelegatorSingle::removePageFile(const PageFileIdAndLevel & /*id_lvl*/ pool.global_capacity->freeUsedSize(pool.latest_path_infos[0].path, file_size); } +//========================================================================================== +// Raft data +//========================================================================================== +PSDiskDelegatorRaft::PSDiskDelegatorRaft(PathPool & pool_) : pool(pool_) +{ + for (const auto & s : pool.kvstore_paths) + { + RaftPathInfo info; + // Get a normalized path without trailing '/' + info.path = s; + if (info.path.back() == '/') + info.path.erase(info.path.begin() + info.path.size() - 1); + info.total_size = 0; + raft_path_infos.emplace_back(info); + } +} + +size_t PSDiskDelegatorRaft::numPaths() const { return raft_path_infos.size(); } + +String PSDiskDelegatorRaft::defaultPath() const { return raft_path_infos[0].path; } + +Strings PSDiskDelegatorRaft::listPaths() const { return pool.kvstore_paths; } + +String PSDiskDelegatorRaft::choosePath(const PageFileIdAndLevel & id_lvl) +{ + std::lock_guard lock{mutex}; + /// If id exists in page_path_map, just return the same path + if (auto iter = page_path_map.find(id_lvl); iter != page_path_map.end()) + { + return raft_path_infos[iter->second].path; + } + + // Else choose path randomly + UInt64 total_size = 0; + for (size_t i = 0; i < raft_path_infos.size(); ++i) + total_size += raft_path_infos[i].total_size; + if (total_size == 0) + { + LOG_DEBUG(pool.log, "PSDiskDelegatorRaft no data currently. Choose path 0 for Raft."); + return raft_path_infos[0].path; + } + + std::vector ratio; + for (size_t i = 0; i < raft_path_infos.size(); ++i) + { + ratio.push_back((double)(total_size - raft_path_infos[i].total_size) / ((raft_path_infos.size() - 1) * total_size)); + } + double rand_number = (double)rand() / RAND_MAX; + double ratio_sum = 0; + for (size_t i = 0; i < ratio.size(); i++) + { + ratio_sum += ratio[i]; + if ((rand_number < ratio_sum) || (i == ratio.size() - 1)) + { + LOG_DEBUG(pool.log, "PSDiskDelegatorRaft choose path " + toString(i) + " for Raft"); + return raft_path_infos[0].path; + } + } + throw Exception("Should not reach here", ErrorCodes::LOGICAL_ERROR); +} + +size_t PSDiskDelegatorRaft::addPageFileUsedSize( + const PageFileIdAndLevel & id_lvl, size_t size_to_add, const String & pf_parent_path, bool need_insert_location) +{ + // Get a normalized path without trailing '/' + String upper_path = Poco::Path(pf_parent_path).toString(); + if (upper_path.back() == '/') + upper_path.erase(upper_path.begin() + upper_path.size() - 1); + UInt32 index = UINT32_MAX; + for (size_t i = 0; i < raft_path_infos.size(); i++) + { + if (raft_path_infos[i].path == upper_path) + { + index = i; + break; + } + } + if (unlikely(index == UINT32_MAX)) + throw Exception("Unrecognized path " + upper_path); + + { + std::lock_guard lock{mutex}; + if (need_insert_location) + page_path_map[id_lvl] = index; + raft_path_infos[index].total_size += size_to_add; + } + + // update global used size + pool.global_capacity->addUsedSize(upper_path, size_to_add); + return index; +} + +String PSDiskDelegatorRaft::getPageFilePath(const PageFileIdAndLevel & id_lvl) const +{ + std::lock_guard lock{mutex}; + auto iter = page_path_map.find(id_lvl); + if (likely(iter != page_path_map.end())) + return raft_path_infos[iter->second].path; + throw Exception("Can not find path for PageFile [id=" + toString(id_lvl.first) + "_" + toString(id_lvl.second) + "]"); +} + +void PSDiskDelegatorRaft::removePageFile(const PageFileIdAndLevel & id_lvl, size_t file_size) +{ + std::lock_guard lock{mutex}; + auto iter = page_path_map.find(id_lvl); + if (unlikely(iter == page_path_map.end())) + return; + auto index = iter->second; + raft_path_infos[index].total_size -= file_size; + page_path_map.erase(iter); + + pool.global_capacity->freeUsedSize(raft_path_infos[index].path, file_size); +} } // namespace DB diff --git a/dbms/src/Storages/PathPool.h b/dbms/src/Storages/PathPool.h index 0419f959ffd..dbf24bec430 100644 --- a/dbms/src/Storages/PathPool.h +++ b/dbms/src/Storages/PathPool.h @@ -31,8 +31,7 @@ class PSDiskDelegator; using PSDiskDelegatorPtr = std::shared_ptr; class PSDiskDelegatorMulti; class PSDiskDelegatorSingle; -// TODO: support multi-paths for Raft data -// using PSDiskDelegatorRaft = PSDiskDelegatorSingle; +class PSDiskDelegatorRaft; /// A class to manage global paths. @@ -42,12 +41,22 @@ class PathPool PathPool() = default; // Constructor to be used during initialization - PathPool(const Strings & main_data_paths, const Strings & latest_data_paths, PathCapacityMetricsPtr global_capacity_, - FileProviderPtr file_provider_); + PathPool( // + const Strings & main_data_paths, const Strings & latest_data_paths, // + const Strings & kvstore_paths, // + PathCapacityMetricsPtr global_capacity_, FileProviderPtr file_provider_, // + bool enable_raft_storage_compatibility_mode_ = false); // Constructor to create PathPool for one Storage StoragePathPool withTable(const String & database_, const String & table_, bool path_need_database_name_) const; + bool isRaftStorageCapatibilityModeEnabled() const { return enable_raft_storage_compatibility_mode; } + + // Generate a delegator for managing the paths of `RegionPersister`. + // Those paths are generated from `kvstore_paths`. + // User should keep the pointer to track the PageFileID -> path index mapping. + PSDiskDelegatorPtr getPSDiskDelegatorRaft(); + public: /// Methods for the root PathPool /// Strings listPaths() const; @@ -61,9 +70,15 @@ class PathPool } }; using PageFilePathMap = std::unordered_map; + + friend class PSDiskDelegatorRaft; + private: Strings main_data_paths; Strings latest_data_paths; + Strings kvstore_paths; + + bool enable_raft_storage_compatibility_mode; PathCapacityMetricsPtr global_capacity; @@ -165,6 +180,41 @@ class PSDiskDelegatorSingle : public PSDiskDelegator const String path_prefix; }; +class PSDiskDelegatorRaft : public PSDiskDelegator +{ +public: + PSDiskDelegatorRaft(PathPool & pool_); + + size_t numPaths() const override; + + String defaultPath() const override; + + Strings listPaths() const override; + + String choosePath(const PageFileIdAndLevel & id_lvl) override; + + size_t addPageFileUsedSize( + const PageFileIdAndLevel & id_lvl, size_t size_to_add, const String & pf_parent_path, bool need_insert_location) override; + + String getPageFilePath(const PageFileIdAndLevel & id_lvl) const override; + + void removePageFile(const PageFileIdAndLevel & id_lvl, size_t file_size) override; + +private: + struct RaftPathInfo + { + String path; + size_t total_size; // total used bytes + }; + using RaftPathInfos = std::vector; + + PathPool & pool; + mutable std::mutex mutex; + RaftPathInfos raft_path_infos; + // PageFileID -> path index + PathPool::PageFilePathMap page_path_map; +}; + /// A class to manage paths for the specified storage. class StoragePathPool { diff --git a/dbms/src/Storages/Transaction/KVStore.cpp b/dbms/src/Storages/Transaction/KVStore.cpp index b26e27a910c..9a63735873d 100644 --- a/dbms/src/Storages/Transaction/KVStore.cpp +++ b/dbms/src/Storages/Transaction/KVStore.cpp @@ -19,8 +19,8 @@ namespace ErrorCodes extern const int LOGICAL_ERROR; } -KVStore::KVStore(const std::string & data_dir, const FileProviderPtr & file_provider) - : region_persister(data_dir, region_manager, file_provider), +KVStore::KVStore(Context & context) + : region_persister(context, region_manager), raft_cmd_res(std::make_unique()), log(&Logger::get("KVStore")) {} diff --git a/dbms/src/Storages/Transaction/KVStore.h b/dbms/src/Storages/Transaction/KVStore.h index 9cfc71930e0..813991926b3 100644 --- a/dbms/src/Storages/Transaction/KVStore.h +++ b/dbms/src/Storages/Transaction/KVStore.h @@ -40,7 +40,7 @@ enum class TiFlashApplyRes : uint32_t; class KVStore final : private boost::noncopyable { public: - KVStore(const std::string & data_dir, const FileProviderPtr & file_provider); + KVStore(Context & context); void restore(const IndexReaderCreateFunc & index_reader_create); RegionPtr getRegion(const RegionID region_id) const; diff --git a/dbms/src/Storages/Transaction/RegionPersister.cpp b/dbms/src/Storages/Transaction/RegionPersister.cpp index 4b3956cc10e..64040da67bf 100644 --- a/dbms/src/Storages/Transaction/RegionPersister.cpp +++ b/dbms/src/Storages/Transaction/RegionPersister.cpp @@ -1,4 +1,8 @@ #include +#include +#include +#include +#include #include #include #include @@ -13,9 +17,18 @@ extern const int LOGICAL_ERROR; void RegionPersister::drop(RegionID region_id, const RegionTaskLock &) { - stable::WriteBatch wb; - wb.delPage(region_id); - page_storage.write(std::move(wb)); + if (page_storage) + { + WriteBatch wb; + wb.delPage(region_id); + page_storage->write(std::move(wb)); + } + else + { + DB::stable::WriteBatch wb; + wb.delPage(region_id); + stable_page_storage->write(std::move(wb)); + } } void RegionPersister::computeRegionWriteBuffer(const Region & region, RegionCacheWriteElement & region_write_buffer) @@ -29,7 +42,6 @@ void RegionPersister::computeRegionWriteBuffer(const Region & region, RegionCach LOG_WARNING(&Logger::get("RegionPersister"), "Persisting big region: " << region.toString() << " with data info: " << region.dataInfo() << ", serialized size " << region_size); - //throw Exception("Region is too big to persist", ErrorCodes::LOGICAL_ERROR); } } @@ -55,9 +67,18 @@ void RegionPersister::doPersist(RegionCacheWriteElement & region_write_buffer, c std::lock_guard lock(mutex); - auto cache = page_storage.getEntry(region_id); - if (cache.isValid() && cache.tag > applied_index) - return; + if (page_storage) + { + auto entry = page_storage->getEntry(region_id); + if (entry.isValid() && entry.tag > applied_index) + return; + } + else + { + auto entry = stable_page_storage->getEntry(region_id); + if (entry.isValid() && entry.tag > applied_index) + return; + } if (region.isPendingRemove()) { @@ -65,30 +86,85 @@ void RegionPersister::doPersist(RegionCacheWriteElement & region_write_buffer, c return; } - stable::WriteBatch wb; auto read_buf = buffer.tryGetReadBuffer(); - wb.putPage(region_id, applied_index, read_buf, region_size); - page_storage.write(std::move(wb)); + if (page_storage) + { + WriteBatch wb; + wb.putPage(region_id, applied_index, read_buf, region_size); + page_storage->write(std::move(wb)); + } + else + { + DB::stable::WriteBatch wb; + wb.putPage(region_id, applied_index, read_buf, region_size); + stable_page_storage->write(std::move(wb)); + } } +RegionPersister::RegionPersister(Context & global_context_, const RegionManager & region_manager_) + : global_context(global_context_), region_manager(region_manager_), log(&Logger::get("RegionPersister")) +{} + RegionMap RegionPersister::restore(IndexReaderCreateFunc * func) { - // FIXME: if we use DB::PageStorage, we should call `restore` - // page_storage.restore(); + auto & path_pool = global_context.getPathPool(); + auto delegator = path_pool.getPSDiskDelegatorRaft(); + // If there is no PageFile with basic version binary format, use the latest version of PageStorage. + auto detect_binary_version = PageStorage::getMinDataVersion(global_context.getFileProvider(), delegator); + bool run_in_compatibility_mode = path_pool.isRaftStorageCapatibilityModeEnabled() && (detect_binary_version == PageFile::VERSION_BASE); + if (!run_in_compatibility_mode) + { + LOG_INFO(log, "RegionPersister running in normal mode"); + DB::PageStorage::Config config; + config.num_write_slots = 4; // extend write slots to 4 at least + page_storage = std::make_unique( // + "RegionPersister", + std::move(delegator), + config, + global_context.getFileProvider(), + global_context.getTiFlashMetrics()); + page_storage->restore(); + } + else + { + LOG_INFO(log, "RegionPersister running in compatibility mode"); + stable_page_storage = std::make_unique( + "RegionPersister", delegator->defaultPath(), DB::stable::PageStorage::Config(), global_context.getFileProvider()); + } RegionMap regions; - auto acceptor = [&](const stable::Page & page) { - ReadBufferFromMemory buf(page.data.begin(), page.data.size()); - auto region = Region::deserialize(buf, func); - if (page.page_id != region->id()) - throw Exception("region id and page id not match!", ErrorCodes::LOGICAL_ERROR); - regions.emplace(page.page_id, region); - }; - page_storage.traverse(acceptor); + if (page_storage) + { + auto acceptor = [&](const Page & page) { + ReadBufferFromMemory buf(page.data.begin(), page.data.size()); + auto region = Region::deserialize(buf, func); + if (page.page_id != region->id()) + throw Exception("region id and page id not match!", ErrorCodes::LOGICAL_ERROR); + regions.emplace(page.page_id, region); + }; + page_storage->traverse(acceptor); + } + else + { + auto acceptor = [&](const DB::stable::Page & page) { + ReadBufferFromMemory buf(page.data.begin(), page.data.size()); + auto region = Region::deserialize(buf, func); + if (page.page_id != region->id()) + throw Exception("region id and page id not match!", ErrorCodes::LOGICAL_ERROR); + regions.emplace(page.page_id, region); + }; + stable_page_storage->traverse(acceptor); + } return regions; } -bool RegionPersister::gc() { return page_storage.gc(); } +bool RegionPersister::gc() +{ + if (page_storage) + return page_storage->gc(); + else + return stable_page_storage->gc(); +} } // namespace DB diff --git a/dbms/src/Storages/Transaction/RegionPersister.h b/dbms/src/Storages/Transaction/RegionPersister.h index 15eb06eb1f6..d4038121acb 100644 --- a/dbms/src/Storages/Transaction/RegionPersister.h +++ b/dbms/src/Storages/Transaction/RegionPersister.h @@ -1,6 +1,6 @@ #pragma once -#include +#include #include #include #include @@ -8,6 +8,13 @@ namespace DB { +class Context; +class PageStorage; +namespace stable +{ +class PageStorage; +} + class Region; using RegionPtr = std::shared_ptr; using RegionMap = std::unordered_map; @@ -18,10 +25,7 @@ class RegionManager; class RegionPersister final : private boost::noncopyable { public: - RegionPersister( - const std::string & storage_path, const RegionManager & region_manager_, const FileProviderPtr & file_provider, const stable::PageStorage::Config & config = {}) - : page_storage("RegionPersister", storage_path, config, file_provider), region_manager(region_manager_), log(&Logger::get("RegionPersister")) - {} + RegionPersister(Context & global_context_, const RegionManager & region_manager_); void drop(RegionID region_id, const RegionTaskLock &); void persist(const Region & region); @@ -37,7 +41,10 @@ class RegionPersister final : private boost::noncopyable void doPersist(const Region & region, const RegionTaskLock * lock); private: - DB::stable::PageStorage page_storage; + Context & global_context; + std::shared_ptr page_storage; + std::shared_ptr stable_page_storage; + const RegionManager & region_manager; std::mutex mutex; Logger * log; diff --git a/dbms/src/Storages/Transaction/TMTContext.cpp b/dbms/src/Storages/Transaction/TMTContext.cpp index a0e921c686e..007dd560da1 100644 --- a/dbms/src/Storages/Transaction/TMTContext.cpp +++ b/dbms/src/Storages/Transaction/TMTContext.cpp @@ -14,10 +14,10 @@ namespace DB { TMTContext::TMTContext(Context & context_, const std::vector & addrs, - const std::unordered_set & ignore_databases_, const std::string & kvstore_path, ::TiDB::StorageEngine engine_, + const std::unordered_set & ignore_databases_, ::TiDB::StorageEngine engine_, bool disable_bg_flush_, const pingcap::ClusterConfig & cluster_config) : context(context_), - kvstore(std::make_shared(kvstore_path, context.getFileProvider())), + kvstore(std::make_shared(context)), region_table(context), background_service(nullptr), cluster(addrs.size() == 0 ? std::make_shared() : std::make_shared(addrs, cluster_config)), diff --git a/dbms/src/Storages/Transaction/TMTContext.h b/dbms/src/Storages/Transaction/TMTContext.h index d9f09f61fa9..9485734ba61 100644 --- a/dbms/src/Storages/Transaction/TMTContext.h +++ b/dbms/src/Storages/Transaction/TMTContext.h @@ -45,8 +45,8 @@ class TMTContext : private boost::noncopyable // TODO: get flusher args from config file explicit TMTContext(Context & context, const std::vector & addrs, - const std::unordered_set & ignore_databases_, const std::string & kv_store_path, TiDB::StorageEngine engine_, - bool disable_bg_flush_, const pingcap::ClusterConfig & cluster_config); + const std::unordered_set & ignore_databases_, TiDB::StorageEngine engine_, bool disable_bg_flush_, + const pingcap::ClusterConfig & cluster_config); SchemaSyncerPtr getSchemaSyncer() const; void setSchemaSyncer(SchemaSyncerPtr); diff --git a/dbms/src/Storages/tests/gtest_path_pool.cpp b/dbms/src/Storages/tests/gtest_path_pool.cpp index cb1edec2f7a..b3b861b2bba 100644 --- a/dbms/src/Storages/tests/gtest_path_pool.cpp +++ b/dbms/src/Storages/tests/gtest_path_pool.cpp @@ -34,7 +34,7 @@ try Strings paths = getMultiTestPaths(); auto & ctx = TiFlashTestEnv::getContext(); - PathPool pool(paths, paths, ctx.getPathCapacity(), ctx.getFileProvider()); + PathPool pool(paths, paths, Strings{}, ctx.getPathCapacity(), ctx.getFileProvider()); auto spool = pool.withTable("test", "t", false); // Stable delegate @@ -98,6 +98,33 @@ try } EXPECT_EQ(delegate->numPaths(), 1UL); + size_t bytes_written = 200; + for (size_t i = 0; i < TEST_NUMBER_FOR_CHOOSE; ++i) + { + PageFileIdAndLevel id{i, 0}; + auto chosen = delegate->choosePath(id); + ASSERT_NE(std::find(res.begin(), res.end(), chosen), res.end()); + delegate->addPageFileUsedSize(id, bytes_written, chosen, true); + auto path_get = delegate->getPageFilePath(id); + ASSERT_EQ(path_get, chosen); + } + for (size_t i = 0; i < TEST_NUMBER_FOR_CHOOSE; ++i) + { + PageFileIdAndLevel id{i, 0}; + delegate->removePageFile(id, bytes_written); + } + } + // Raft delegate + { + auto delegate = pool.getPSDiskDelegatorRaft(); + auto res = delegate->listPaths(); + EXPECT_EQ(res.size(), paths.size()); + for (size_t i = 0; i < res.size(); ++i) + { + EXPECT_EQ(res[i], paths[i] + "/kvstore"); + } + EXPECT_EQ(delegate->numPaths(), res.size()); + size_t bytes_written = 200; for (size_t i = 0; i < TEST_NUMBER_FOR_CHOOSE; ++i) { @@ -124,7 +151,7 @@ try Strings latest_paths(paths.begin(), paths.begin() + 1); auto & ctx = TiFlashTestEnv::getContext(); - PathPool pool(paths, latest_paths, ctx.getPathCapacity(), ctx.getFileProvider()); + PathPool pool(paths, latest_paths, Strings{}, ctx.getPathCapacity(), ctx.getFileProvider()); auto spool = pool.withTable("test", "t", false); // Stable delegate { @@ -187,6 +214,33 @@ try } EXPECT_EQ(delegate->numPaths(), 1UL); + size_t bytes_written = 200; + for (size_t i = 0; i < TEST_NUMBER_FOR_CHOOSE; ++i) + { + PageFileIdAndLevel id{i, 0}; + auto chosen = delegate->choosePath(id); + ASSERT_NE(std::find(res.begin(), res.end(), chosen), res.end()); + delegate->addPageFileUsedSize(id, bytes_written, chosen, true); + auto path_get = delegate->getPageFilePath(id); + ASSERT_EQ(path_get, chosen); + } + for (size_t i = 0; i < TEST_NUMBER_FOR_CHOOSE; ++i) + { + PageFileIdAndLevel id{i, 0}; + delegate->removePageFile(id, bytes_written); + } + } + // Raft delegate + { + auto delegate = pool.getPSDiskDelegatorRaft(); + auto res = delegate->listPaths(); + EXPECT_EQ(res.size(), latest_paths.size()); + for (size_t i = 0; i < res.size(); ++i) + { + EXPECT_EQ(res[i], latest_paths[i] + "/kvstore"); + } + EXPECT_EQ(delegate->numPaths(), res.size()); + size_t bytes_written = 200; for (size_t i = 0; i < TEST_NUMBER_FOR_CHOOSE; ++i) { diff --git a/dbms/src/test_utils/TiflashTestBasic.h b/dbms/src/test_utils/TiflashTestBasic.h index ea8452f463a..0a172fb0363 100644 --- a/dbms/src/test_utils/TiflashTestBasic.h +++ b/dbms/src/test_utils/TiflashTestBasic.h @@ -69,7 +69,7 @@ class TiFlashTestEnv public: static String getTemporaryPath() { return Poco::Path("./tmp/").absolute().toString(); } - static std::pair getExtraPaths(const Strings & testdata_path = {}) + static std::pair getPathPool(const Strings & testdata_path = {}) { Strings result; if (!testdata_path.empty()) @@ -125,6 +125,8 @@ class TiFlashTestEnv try { context.getTMTContext(); + auto paths = getPathPool(testdata_path); + context.setPathPool(paths.first, paths.second, Strings{}, false, context.getPathCapacity(), context.getFileProvider()); } catch (Exception & e) { @@ -136,17 +138,22 @@ class TiFlashTestEnv KeyManagerPtr key_manager = std::make_shared(false); context.initializeFileProvider(key_manager, false); + // Theses global variables should be initialized by the following order + // 1. capacity + // 2. path pool + // 3. TMTContext + // FIXME: These paths are only set at the first time if (testdata_path.empty()) testdata_path.emplace_back(getTemporaryPath()); context.initializePathCapacityMetric(testdata_path, 0); - context.createTMTContext({}, {"default"}, root_path + "/kvstore", TiDB::StorageEngine::TMT, false); + auto paths = getPathPool(testdata_path); + context.setPathPool(paths.first, paths.second, Strings{}, false, context.getPathCapacity(), context.getFileProvider()); + context.createTMTContext({}, {"default"}, TiDB::StorageEngine::TMT, false); context.getTMTContext().restore(); } context.getSettingsRef() = settings; - auto paths = getExtraPaths(testdata_path); - context.setExtraPaths(paths.first, paths.second, context.getPathCapacity(), context.getFileProvider()); return context; } }; From 554d4ed05fe77a719871bbc3bf89a08473ae2998 Mon Sep 17 00:00:00 2001 From: JaySon-Huang Date: Wed, 4 Nov 2020 17:59:39 +0800 Subject: [PATCH 2/8] Port some tests --- dbms/src/Storages/Page/PageStorage.cpp | 1 + dbms/src/Storages/Page/PageStorage.h | 4 +- dbms/src/Storages/Transaction/RegionManager.h | 3 + .../Storages/Transaction/RegionPersister.cpp | 70 +++- .../Storages/Transaction/RegionPersister.h | 3 +- .../Storages/Transaction/tests/CMakeLists.txt | 3 - .../tests/gtest_region_persister.cpp | 394 ++++++++++++++++++ .../Transaction/tests/region_helper.h | 54 +-- .../Transaction/tests/region_persister.cpp | 330 --------------- dbms/src/test_utils/TiflashTestBasic.h | 2 +- 10 files changed, 479 insertions(+), 385 deletions(-) create mode 100644 dbms/src/Storages/Transaction/tests/gtest_region_persister.cpp delete mode 100644 dbms/src/Storages/Transaction/tests/region_persister.cpp diff --git a/dbms/src/Storages/Page/PageStorage.cpp b/dbms/src/Storages/Page/PageStorage.cpp index 5c25509aa80..943e31d4b9f 100644 --- a/dbms/src/Storages/Page/PageStorage.cpp +++ b/dbms/src/Storages/Page/PageStorage.cpp @@ -1,5 +1,6 @@ #include #include +#include #include #include #include diff --git a/dbms/src/Storages/Page/PageStorage.h b/dbms/src/Storages/Page/PageStorage.h index af7b1513274..d3b114f1021 100644 --- a/dbms/src/Storages/Page/PageStorage.h +++ b/dbms/src/Storages/Page/PageStorage.h @@ -1,10 +1,8 @@ #pragma once -#include #include #include #include -#include #include #include @@ -19,6 +17,8 @@ namespace DB { +class FileProvider; +using FileProviderPtr = std::shared_ptr; class TiFlashMetrics; using TiFlashMetricsPtr = std::shared_ptr; class PathCapacityMetrics; diff --git a/dbms/src/Storages/Transaction/RegionManager.h b/dbms/src/Storages/Transaction/RegionManager.h index 9570d9ff3d7..dcc2dcaabb1 100644 --- a/dbms/src/Storages/Transaction/RegionManager.h +++ b/dbms/src/Storages/Transaction/RegionManager.h @@ -2,6 +2,9 @@ #include +#include +#include + namespace DB { diff --git a/dbms/src/Storages/Transaction/RegionPersister.cpp b/dbms/src/Storages/Transaction/RegionPersister.cpp index 64040da67bf..658aa88ddd5 100644 --- a/dbms/src/Storages/Transaction/RegionPersister.cpp +++ b/dbms/src/Storages/Transaction/RegionPersister.cpp @@ -105,31 +105,55 @@ RegionPersister::RegionPersister(Context & global_context_, const RegionManager : global_context(global_context_), region_manager(region_manager_), log(&Logger::get("RegionPersister")) {} -RegionMap RegionPersister::restore(IndexReaderCreateFunc * func) +namespace +{ +DB::stable::PageStorage::Config getStablePSConfig(const PageStorage::Config & config) +{ + DB::stable::PageStorage::Config c; + c.sync_on_write = config.sync_on_write; + c.file_roll_size = config.file_roll_size; + c.file_max_size = config.file_max_size; + c.file_small_size = config.file_max_size; + + c.merge_hint_low_used_rate = config.merge_hint_low_used_rate; + c.merge_hint_low_used_file_total_size = config.merge_hint_low_used_file_total_size; + c.merge_hint_low_used_file_num = config.merge_hint_low_used_file_num; + c.gc_compact_legacy_min_num = config.gc_compact_legacy_min_num; + + c.version_set_config.compact_hint_delta_deletions = config.version_set_config.compact_hint_delta_deletions; + c.version_set_config.compact_hint_delta_entries = config.version_set_config.compact_hint_delta_entries; + return c; +} +} // namespace + +RegionMap RegionPersister::restore(IndexReaderCreateFunc * func, PageStorage::Config config) { - auto & path_pool = global_context.getPathPool(); - auto delegator = path_pool.getPSDiskDelegatorRaft(); - // If there is no PageFile with basic version binary format, use the latest version of PageStorage. - auto detect_binary_version = PageStorage::getMinDataVersion(global_context.getFileProvider(), delegator); - bool run_in_compatibility_mode = path_pool.isRaftStorageCapatibilityModeEnabled() && (detect_binary_version == PageFile::VERSION_BASE); - if (!run_in_compatibility_mode) - { - LOG_INFO(log, "RegionPersister running in normal mode"); - DB::PageStorage::Config config; - config.num_write_slots = 4; // extend write slots to 4 at least - page_storage = std::make_unique( // - "RegionPersister", - std::move(delegator), - config, - global_context.getFileProvider(), - global_context.getTiFlashMetrics()); - page_storage->restore(); - } - else { - LOG_INFO(log, "RegionPersister running in compatibility mode"); - stable_page_storage = std::make_unique( - "RegionPersister", delegator->defaultPath(), DB::stable::PageStorage::Config(), global_context.getFileProvider()); + auto & path_pool = global_context.getPathPool(); + auto delegator = path_pool.getPSDiskDelegatorRaft(); + // If there is no PageFile with basic version binary format, use the latest version of PageStorage. + auto detect_binary_version = PageStorage::getMinDataVersion(global_context.getFileProvider(), delegator); + bool run_in_compatibility_mode + = path_pool.isRaftStorageCapatibilityModeEnabled() && (detect_binary_version == PageFile::VERSION_BASE); + if (!run_in_compatibility_mode) + { + config.num_write_slots = 4; // extend write slots to 4 at least + LOG_INFO(log, "RegionPersister running in normal mode"); + page_storage = std::make_unique( // + "RegionPersister", + std::move(delegator), + config, + global_context.getFileProvider(), + global_context.getTiFlashMetrics()); + page_storage->restore(); + } + else + { + LOG_INFO(log, "RegionPersister running in compatibility mode"); + auto c = getStablePSConfig(config); + stable_page_storage = std::make_unique( // + "RegionPersister", delegator->defaultPath(), c, global_context.getFileProvider()); + } } RegionMap regions; diff --git a/dbms/src/Storages/Transaction/RegionPersister.h b/dbms/src/Storages/Transaction/RegionPersister.h index d4038121acb..50eda9e6186 100644 --- a/dbms/src/Storages/Transaction/RegionPersister.h +++ b/dbms/src/Storages/Transaction/RegionPersister.h @@ -1,6 +1,7 @@ #pragma once #include +#include #include #include #include @@ -30,7 +31,7 @@ class RegionPersister final : private boost::noncopyable void drop(RegionID region_id, const RegionTaskLock &); void persist(const Region & region); void persist(const Region & region, const RegionTaskLock & lock); - RegionMap restore(IndexReaderCreateFunc * func = nullptr); + RegionMap restore(IndexReaderCreateFunc * func = nullptr, PageStorage::Config config = {}); bool gc(); using RegionCacheWriteElement = std::tuple; diff --git a/dbms/src/Storages/Transaction/tests/CMakeLists.txt b/dbms/src/Storages/Transaction/tests/CMakeLists.txt index f170ede8402..e9a20b18701 100644 --- a/dbms/src/Storages/Transaction/tests/CMakeLists.txt +++ b/dbms/src/Storages/Transaction/tests/CMakeLists.txt @@ -1,8 +1,5 @@ include_directories (${CMAKE_CURRENT_BINARY_DIR}) -add_executable (region_persister region_persister.cpp) -target_link_libraries (region_persister dbms) - add_executable (region_test region.cpp) target_link_libraries (region_test dbms) diff --git a/dbms/src/Storages/Transaction/tests/gtest_region_persister.cpp b/dbms/src/Storages/Transaction/tests/gtest_region_persister.cpp new file mode 100644 index 00000000000..6275880a83d --- /dev/null +++ b/dbms/src/Storages/Transaction/tests/gtest_region_persister.cpp @@ -0,0 +1,394 @@ +#include + +#define private public +#include +#undef private + +#include +#include +#include +#include +#include +#include +#include +#include + +#include + +#include "region_helper.h" + +namespace DB::tests +{ + +class RegionPersister_test : public ::testing::Test +{ +public: + RegionPersister_test() : dir_path(TiFlashTestEnv::getTemporaryPath() + "/region_persister_tmp") {} + + static void SetUpTestCase() { TiFlashTestEnv::setupLogger(); } + + void SetUp() override { dropFiles(); } + + void dropFiles() + { + // cleanup + Poco::File file(dir_path); + if (file.exists()) + file.remove(true); + file.createDirectories(); + } + + void runTest(const String & path, bool sync_on_write); + void testFunc(const String & path, const PageStorage::Config & config, int region_num, bool is_gc, bool clean_up); + +protected: + String dir_path; + + DB::Timestamp tso = 0; +}; + +static ::testing::AssertionResult PeerCompare( // + const char * lhs_expr, + const char * rhs_expr, + const metapb::Peer & lhs, + const metapb::Peer & rhs) +{ + if (lhs.id() == rhs.id() && lhs.is_learner() == rhs.is_learner()) + return ::testing::AssertionSuccess(); + else + return ::testing::internal::EqFailure(lhs_expr, rhs_expr, lhs.ShortDebugString(), rhs.ShortDebugString(), false); +} +#define ASSERT_PEER_EQ(val1, val2) ASSERT_PRED_FORMAT2(::DB::tests::PeerCompare, val1, val2) + +static ::testing::AssertionResult RegionCompare( // + const char * lhs_expr, + const char * rhs_expr, + const DB::Region & lhs, + const DB::Region & rhs) +{ + if (lhs == rhs) + return ::testing::AssertionSuccess(); + else + return ::testing::internal::EqFailure(lhs_expr, rhs_expr, lhs.toString(), rhs.toString(), false); +} +#define ASSERT_REGION_EQ(val1, val2) ASSERT_PRED_FORMAT2(::DB::tests::RegionCompare, val1, val2) + +TEST_F(RegionPersister_test, peer) +try +{ + auto peer = createPeer(100, true); + auto path = dir_path + "/peer.test"; + WriteBufferFromFile write_buf(path, DBMS_DEFAULT_BUFFER_SIZE, O_WRONLY | O_CREAT); + auto size = writeBinary2(peer, write_buf); + write_buf.next(); + write_buf.sync(); + ASSERT_EQ(size, (size_t)Poco::File(path).getSize()); + + ReadBufferFromFile read_buf(path, DBMS_DEFAULT_BUFFER_SIZE, O_RDONLY); + auto new_peer = readPeer(read_buf); + ASSERT_PEER_EQ(new_peer, peer); +} +CATCH + +TEST_F(RegionPersister_test, region_info) +try +{ + auto region_info = createRegionInfo(233, "", ""); + auto path = dir_path + "/region_info.test"; + WriteBufferFromFile write_buf(path, DBMS_DEFAULT_BUFFER_SIZE, O_WRONLY | O_CREAT); + auto size = writeBinary2(region_info, write_buf); + write_buf.next(); + write_buf.sync(); + ASSERT_EQ(size, (size_t)Poco::File(path).getSize()); + + ReadBufferFromFile read_buf(path, DBMS_DEFAULT_BUFFER_SIZE, O_RDONLY); + auto new_region_info = readRegion(read_buf); + ASSERT_EQ(new_region_info.id(), region_info.id()); + ASSERT_EQ(new_region_info.start_key(), region_info.start_key()); + ASSERT_EQ(new_region_info.end_key(), region_info.end_key()); + ASSERT_EQ(new_region_info.peers_size(), region_info.peers_size()); + for (int i = 0; i < new_region_info.peers_size(); ++i) + ASSERT_PEER_EQ(new_region_info.peers(i), region_info.peers(i)); +} +CATCH + +TEST_F(RegionPersister_test, region_meta) +try +{ + RegionMeta meta = createRegionMeta(888, 66); + auto path = dir_path + "/meta.test"; + WriteBufferFromFile write_buf(path, DBMS_DEFAULT_BUFFER_SIZE, O_WRONLY | O_CREAT); + auto size = std::get<0>(meta.serialize(write_buf)); + write_buf.next(); + write_buf.sync(); + ASSERT_EQ(size, (size_t)Poco::File(path).getSize()); + + ReadBufferFromFile read_buf(path, DBMS_DEFAULT_BUFFER_SIZE, O_RDONLY); + auto new_meta = RegionMeta::deserialize(read_buf); + ASSERT_EQ(new_meta, meta); +} +CATCH + +TEST_F(RegionPersister_test, region) +try +{ + TableID table_id = 100; + auto region = std::make_shared(createRegionMeta(1001, table_id)); + TiKVKey key = RecordKVFormat::genKey(table_id, 323, 9983); + region->insert("default", TiKVKey::copyFrom(key), TiKVValue("value1")); + region->insert("write", TiKVKey::copyFrom(key), RecordKVFormat::encodeWriteCfValue('P', 0)); + region->insert("lock", TiKVKey::copyFrom(key), RecordKVFormat::encodeLockCfValue('P', "", 0, 0)); + + auto path = dir_path + "/region.test"; + WriteBufferFromFile write_buf(path, DBMS_DEFAULT_BUFFER_SIZE, O_WRONLY | O_CREAT); + size_t region_ser_size = std::get<0>(region->serialize(write_buf)); + write_buf.next(); + write_buf.sync(); + ASSERT_EQ(region_ser_size, (size_t)Poco::File(path).getSize()); + + ReadBufferFromFile read_buf(path, DBMS_DEFAULT_BUFFER_SIZE, O_RDONLY); + auto new_region = Region::deserialize(read_buf); + ASSERT_REGION_EQ(*new_region, *region); +} +CATCH + +TEST_F(RegionPersister_test, region_stat) +try +{ + RegionPtr region = nullptr; + TableID table_id = 100; + { + raft_serverpb::RaftApplyState apply_state; + apply_state.set_applied_index(6671); + apply_state.mutable_truncated_state()->set_index(6672); + apply_state.mutable_truncated_state()->set_term(6673); + + region = std::make_shared(createRegionMeta(1001, table_id, std::make_optional(apply_state))); + } + + TiKVKey key = RecordKVFormat::genKey(table_id, 323, 9983); + region->insert("default", TiKVKey::copyFrom(key), TiKVValue("value1")); + region->insert("write", TiKVKey::copyFrom(key), RecordKVFormat::encodeWriteCfValue('P', 0)); + region->insert("lock", TiKVKey::copyFrom(key), RecordKVFormat::encodeLockCfValue('P', "", 0, 0)); + + auto path = dir_path + "/region_state.test"; + WriteBufferFromFile write_buf(path, DBMS_DEFAULT_BUFFER_SIZE, O_WRONLY | O_CREAT); + size_t region_ser_size = std::get<0>(region->serialize(write_buf)); + write_buf.next(); + + ASSERT_EQ(region_ser_size, (size_t)Poco::File(path).getSize()); + ReadBufferFromFile read_buf(path, DBMS_DEFAULT_BUFFER_SIZE, O_RDONLY); + auto new_region = Region::deserialize(read_buf); + ASSERT_EQ(*new_region, *region); +} +CATCH + +TEST_F(RegionPersister_test, persister) +try +{ + RegionManager region_manager; + + std::string path = dir_path + "/broken_file"; + + auto & ctx = TiFlashTestEnv::getContext(DB::Settings(), + Strings{ + path, + }); + + size_t region_num = 100; + RegionMap regions; + TableID table_id = 100; + { + UInt64 diff = 0; + // PageStorage::Config config; + // config.file_roll_size = 128 * MB; + RegionPersister persister(ctx, region_manager); + persister.restore(); + + for (size_t i = 0; i < region_num; ++i) + { + auto region = std::make_shared(createRegionMeta(i, table_id)); + TiKVKey key = RecordKVFormat::genKey(table_id, i, diff++); + region->insert("default", TiKVKey::copyFrom(key), TiKVValue("value1")); + region->insert("write", TiKVKey::copyFrom(key), RecordKVFormat::encodeWriteCfValue('P', 0)); + region->insert("lock", TiKVKey::copyFrom(key), RecordKVFormat::encodeLockCfValue('P', "", 0, 0)); + + persister.persist(*region); + + regions.emplace(region->id(), region); + } + + // If we truncate page data file, exception will throw instead of droping last region. + auto meta_path = path + "/kvstore/page_1_0/meta"; // First page + Poco::File meta_file(meta_path); + size_t size = meta_file.getSize(); + int rt = ::truncate(meta_path.c_str(), size - 1); // Remove last one byte + ASSERT_EQ(rt, 0); + } + + RegionMap new_regions; + { + RegionPersister persister(ctx, region_manager); + new_regions = persister.restore(); + size_t num_regions_missed = 0; + for (size_t i = 0; i < region_num; ++i) + { + auto new_iter = new_regions.find(i); + if (new_iter == new_regions.end()) + { + LOG_INFO(&Poco::Logger::get("RegionPersister_test"), "Region missed, id=" << i); + ++num_regions_missed; + } + else + { + auto old_region = regions[i]; + auto new_region = new_regions[i]; + ASSERT_EQ(*new_region, *old_region); + } + } + ASSERT_EQ(num_regions_missed, 1UL); + } +} +CATCH + + +void RegionPersister_test::testFunc(const String & path, const PageStorage::Config & config, int region_num, bool is_gc, bool clean_up) +{ + if (clean_up) + dropFiles(); + + auto & ctx = TiFlashTestEnv::getContext(DB::Settings(), + Strings{ + path, + }); + + RegionManager region_manager; + RegionPersister persister(ctx, region_manager); + persister.restore(nullptr, config); + + TableID table_id = 100; + RegionMap regions; + for (int i = 0; i < region_num; ++i) + { + auto region = std::make_shared(createRegionMeta(i, table_id)); + TiKVKey key = RecordKVFormat::genKey(table_id, i, tso++); + region->insert("default", TiKVKey::copyFrom(key), TiKVValue("value1")); + region->insert("write", TiKVKey::copyFrom(key), RecordKVFormat::encodeWriteCfValue('P', 0)); + region->insert("lock", TiKVKey::copyFrom(key), RecordKVFormat::encodeLockCfValue('P', "", 0, 0)); + + persister.persist(*region); + + regions.emplace(region->id(), region); + } + + if (is_gc) + persister.gc(); + + RegionMap new_regions; + new_regions = persister.restore(nullptr, config); + + for (int i = 0; i < region_num; ++i) + { + auto old_region = regions[i]; + auto new_region = new_regions[i]; + ASSERT_EQ(*new_region, *old_region); + } + + if (clean_up) + dropFiles(); +} + +void RegionPersister_test::runTest(const String & path, bool sync_on_write) +{ + Stopwatch watch; + + dropFiles(); + SCOPE_EXIT({ dropFiles(); }); + + { + PageStorage::Config conf; + conf.sync_on_write = sync_on_write; + conf.file_roll_size = 1; + conf.merge_hint_low_used_file_total_size = 1; + conf.num_write_slots = 4; + + testFunc(path, conf, 10, false, false); + testFunc(path, conf, 10, true, false); + + testFunc(path, conf, 10, false, true); + testFunc(path, conf, 10, true, true); + } + { + PageStorage::Config conf; + conf.sync_on_write = sync_on_write; + conf.file_roll_size = 500; + conf.merge_hint_low_used_file_total_size = 1; + conf.num_write_slots = 4; + + testFunc(path, conf, 100, false, false); + testFunc(path, conf, 100, false, false); + testFunc(path, conf, 100, false, false); + testFunc(path, conf, 100, false, true); + + testFunc(path, conf, 100, true, false); + testFunc(path, conf, 100, true, false); + testFunc(path, conf, 100, true, false); + testFunc(path, conf, 100, true, false); + } + { + PageStorage::Config conf; + conf.sync_on_write = sync_on_write; + conf.file_roll_size = 500; + conf.merge_hint_low_used_file_total_size = 1; + conf.num_write_slots = 4; + + testFunc(path, conf, 100, false, false); + testFunc(path, conf, 100, false, false); + testFunc(path, conf, 100, false, false); + testFunc(path, conf, 100, false, true); + + testFunc(path, conf, 100, true, false); + testFunc(path, conf, 100, true, false); + testFunc(path, conf, 100, true, false); + testFunc(path, conf, 100, true, false); + } + { + PageStorage::Config conf; + conf.sync_on_write = sync_on_write; + conf.num_write_slots = 4; + + testFunc(path, conf, 10000, false, false); + testFunc(path, conf, 10000, false, false); + testFunc(path, conf, 10000, false, false); + testFunc(path, conf, 10000, false, false); + testFunc(path, conf, 10000, false, false); + testFunc(path, conf, 10000, false, false); + testFunc(path, conf, 10000, false, false); + } + { + PageStorage::Config conf; + conf.sync_on_write = sync_on_write; + conf.num_write_slots = 4; + + testFunc(path, conf, 10000, true, false); + testFunc(path, conf, 10000, true, false); + testFunc(path, conf, 10000, true, false); + testFunc(path, conf, 10000, true, false); + testFunc(path, conf, 10000, true, false); + testFunc(path, conf, 10000, true, false); + testFunc(path, conf, 10000, true, false); + } + + auto seconds = watch.elapsedSeconds(); + LOG_INFO(&Poco::Logger::get("RegionPersister_test"), // + "[sync_on_write=" << sync_on_write << "], [time=" << DB::toString(seconds, 4) << "s]"); +} + +// This test takes about 10 minutes. Disable by default +TEST_F(RegionPersister_test, DISABLED_persister_sync_on_write) +{ + runTest(dir_path + "region_persist_storage_sow_false", false); + runTest(dir_path + "region_persist_storage_sow_true", true); +} + +} // namespace DB::tests diff --git a/dbms/src/Storages/Transaction/tests/region_helper.h b/dbms/src/Storages/Transaction/tests/region_helper.h index 098fbdcf954..eaeb3ca0ed0 100644 --- a/dbms/src/Storages/Transaction/tests/region_helper.h +++ b/dbms/src/Storages/Transaction/tests/region_helper.h @@ -1,35 +1,37 @@ #pragma once -#include #include +#include + +#include using namespace DB; -#define ASSERT_CHECK(cond, res) \ - do \ - { \ - if (!(cond)) \ - { \ - std::cerr << __FILE__ << ":" << __LINE__ << ":" \ +#define ASSERT_CHECK(cond, res) \ + do \ + { \ + if (!(cond)) \ + { \ + std::cerr << __FILE__ << ":" << __LINE__ << ":" \ << " Assertion " << #cond << " failed.\n"; \ - if ((res)) \ - { \ - (res) = false; \ - } \ - } \ + if ((res)) \ + { \ + (res) = false; \ + } \ + } \ } while (0) -#define ASSERT_CHECK_EQUAL(a, b, res) \ - do \ - { \ - if (!(a == b)) \ - { \ - std::cerr << __FILE__ << ":" << __LINE__ << ":" \ +#define ASSERT_CHECK_EQUAL(a, b, res) \ + do \ + { \ + if (!(a == b)) \ + { \ + std::cerr << __FILE__ << ":" << __LINE__ << ":" \ << " Assertion " << #a << " == " << #b << " failed.\n"; \ - if ((res)) \ - { \ - (res) = false; \ - } \ - } \ + if ((res)) \ + { \ + (res) = false; \ + } \ + } \ } while (0) @@ -53,7 +55,9 @@ inline metapb::Region createRegionInfo(UInt64 id, const std::string start_key, c return region_info; } -inline RegionMeta createRegionMeta(UInt64 id) +inline RegionMeta createRegionMeta(UInt64 id, DB::TableID table_id, std::optional apply_state = std::nullopt) { - return RegionMeta(createPeer(31, true), createRegionInfo(id, "", ""), initialApplyState()); + return RegionMeta(/*peer=*/createPeer(31, true), + /*region=*/createRegionInfo(id, RecordKVFormat::genKey(table_id, 0), RecordKVFormat::genKey(table_id, 300)), + /*apply_state_=*/apply_state.value_or(initialApplyState())); } diff --git a/dbms/src/Storages/Transaction/tests/region_persister.cpp b/dbms/src/Storages/Transaction/tests/region_persister.cpp deleted file mode 100644 index 72450101488..00000000000 --- a/dbms/src/Storages/Transaction/tests/region_persister.cpp +++ /dev/null @@ -1,330 +0,0 @@ -#include -#include -#include - -#include - -#include "region_helper.h" - -using namespace DB; - -const std::string dir_path = "./region_persister_tmp_/"; - - -int main(int, char **) -{ - SCOPE_EXIT({ - // remove tmp dir - Poco::File(dir_path).remove(true); - }); - Poco::File dir(dir_path); - if (dir.exists()) - dir.remove(true); - dir.createDirectory(); - - bool suc = true; - { - auto peer = createPeer(100, true); - auto path = dir_path + "peer.test"; - WriteBufferFromFile write_buf(path, DBMS_DEFAULT_BUFFER_SIZE, O_WRONLY | O_CREAT); - auto size = writeBinary2(peer, write_buf); - write_buf.next(); - - ASSERT_CHECK_EQUAL(size, (size_t)Poco::File(path).getSize(), suc); - ReadBufferFromFile read_buf(path, DBMS_DEFAULT_BUFFER_SIZE, O_RDONLY); - auto new_peer = readPeer(read_buf); - ASSERT_CHECK_EQUAL(new_peer, peer, suc); - } - - { - auto region_info = createRegionInfo(233, "", ""); - auto path = dir_path + "region_info.test"; - WriteBufferFromFile write_buf(path, DBMS_DEFAULT_BUFFER_SIZE, O_WRONLY | O_CREAT); - auto size = writeBinary2(region_info, write_buf); - write_buf.next(); - - ASSERT_CHECK_EQUAL(size, (size_t)Poco::File(path).getSize(), suc); - ReadBufferFromFile read_buf(path, DBMS_DEFAULT_BUFFER_SIZE, O_RDONLY); - auto new_region_info = readRegion(read_buf); - ASSERT_CHECK_EQUAL(new_region_info, region_info, suc); - } - - { - RegionMeta meta = createRegionMeta(888); - auto path = dir_path + "meta.test"; - WriteBufferFromFile write_buf(path, DBMS_DEFAULT_BUFFER_SIZE, O_WRONLY | O_CREAT); - auto size = std::get<0>(meta.serialize(write_buf)); - write_buf.next(); - - ASSERT_CHECK_EQUAL(size, (size_t)Poco::File(path).getSize(), suc); - ReadBufferFromFile read_buf(path, DBMS_DEFAULT_BUFFER_SIZE, O_RDONLY); - auto new_meta = RegionMeta::deserialize(read_buf); - ASSERT_CHECK_EQUAL(new_meta, meta, suc); - } - - { - auto region = std::make_shared(createRegionMeta(100)); - TiKVKey key = RecordKVFormat::genKey(100, 323, 9983); - region->insert("default", TiKVKey::copyFrom(key), TiKVValue("value1")); - region->insert("write", TiKVKey::copyFrom(key), RecordKVFormat::encodeWriteCfValue('P', 0)); - region->insert("lock", TiKVKey::copyFrom(key), RecordKVFormat::encodeLockCfValue('P', "", 0, 0)); - - auto path = dir_path + "region.test"; - WriteBufferFromFile write_buf(path, DBMS_DEFAULT_BUFFER_SIZE, O_WRONLY | O_CREAT); - size_t region_ser_size = std::get<0>(region->serialize(write_buf)); - write_buf.next(); - - ASSERT_CHECK_EQUAL(region_ser_size, (size_t)Poco::File(path).getSize(), suc); - ReadBufferFromFile read_buf(path, DBMS_DEFAULT_BUFFER_SIZE, O_RDONLY); - auto new_region = Region::deserialize(read_buf); - - - ASSERT_CHECK_EQUAL(*new_region, *region, suc); - } - - { - RegionPtr region = nullptr; - { - metapb::Peer peer; - raft_serverpb::RegionLocalState region_state; - raft_serverpb::RaftApplyState apply_state; - - peer.set_id(6666); - peer.set_store_id(6667); - - { - metapb::Region region_; - region_.set_id(6668); - *region_.mutable_start_key() = "6669"; - *region_.mutable_end_key() = "6670"; - region_.mutable_region_epoch()->set_conf_ver(6671); - region_.mutable_region_epoch()->set_version(6672); - *region_.add_peers() = peer; - - *region_state.mutable_region() = region_; - region_state.set_state(raft_serverpb::PeerState::Merging); - region_state.mutable_merge_state()->set_min_index(6674); - region_state.mutable_merge_state()->set_commit(6675); - - { - metapb::Region tmp; - tmp.set_id(6676); - *tmp.mutable_start_key() = "6677"; - *tmp.mutable_end_key() = "6678"; - - *region_state.mutable_merge_state()->mutable_target() = std::move(tmp); - } - } - - apply_state.set_applied_index(6671); - apply_state.mutable_truncated_state()->set_index(6672); - apply_state.mutable_truncated_state()->set_term(6673); - - RegionMeta meta(std::move(peer), std::move(apply_state), 6679, std::move(region_state)); - region = std::make_shared(std::move(meta)); - } - - TiKVKey key = RecordKVFormat::genKey(100, 323, 9983); - region->insert("default", TiKVKey::copyFrom(key), TiKVValue("value1")); - region->insert("write", TiKVKey::copyFrom(key), RecordKVFormat::encodeWriteCfValue('P', 0)); - region->insert("lock", TiKVKey::copyFrom(key), RecordKVFormat::encodeLockCfValue('P', "", 0, 0)); - - auto path = dir_path + "region_state.test"; - WriteBufferFromFile write_buf(path, DBMS_DEFAULT_BUFFER_SIZE, O_WRONLY | O_CREAT); - size_t region_ser_size = std::get<0>(region->serialize(write_buf)); - write_buf.next(); - - ASSERT_CHECK_EQUAL(region_ser_size, (size_t)Poco::File(path).getSize(), suc); - ReadBufferFromFile read_buf(path, DBMS_DEFAULT_BUFFER_SIZE, O_RDONLY); - auto new_region = Region::deserialize(read_buf); - - ASSERT_CHECK_EQUAL(*new_region, *region, suc); - } - - auto remove_dir = [](const std::string & path) { - Poco::File file(path); - if (file.exists()) - file.remove(true); - }; - - { - RegionManager region_manager; - - std::string path = dir_path + "broken_file"; - remove_dir(path); - SCOPE_EXIT({ remove_dir(path); }); - - size_t region_num = 100; - RegionMap regions; - RegionMap new_regions; - { - UInt64 diff = 0; - PageStorage::Config config; - config.file_roll_size = 128 * MB; - RegionPersister persister(path, region_manager); - for (size_t i = 0; i < region_num; ++i) - { - auto region = std::make_shared(createRegionMeta(i)); - TiKVKey key = RecordKVFormat::genKey(100, i, diff++); - region->insert("default", TiKVKey::copyFrom(key), TiKVValue("value1")); - region->insert("write", TiKVKey::copyFrom(key), RecordKVFormat::encodeWriteCfValue('P', 0)); - region->insert("lock", TiKVKey::copyFrom(key), RecordKVFormat::encodeLockCfValue('P', "", 0, 0)); - - persister.persist(*region); - - regions.emplace(region->id(), region); - } - - // If we truncate page data file, exception will throw instead of droping last region. - auto meta_path = path + "/page_1_0/meta"; // First page - Poco::File meta_file(meta_path); - size_t size = meta_file.getSize(); - int rt = ::truncate(meta_path.c_str(), size - 1); // Remove last one byte - ASSERT_CHECK(!rt, suc); - } - - { - RegionPersister persister(path, region_manager); - persister.restore(new_regions); - for (size_t i = 0; i < region_num; ++i) - { - if (i == region_num - 1) - { - // The last region is broken and should not exist. - ASSERT_CHECK_EQUAL(new_regions.find(i), new_regions.end(), suc); - } - else - { - auto old_region = regions[i]; - auto new_region = new_regions[i]; - ASSERT_CHECK_EQUAL(*new_region, *old_region, suc); - } - } - } - } - - - UInt64 diff = 0; - auto test_func1 = [&](const std::string & path, const PageStorage::Config & config, int region_num, bool is_gc, bool clean_up) { - if (clean_up) - remove_dir(path); - - RegionManager region_manager; - RegionPersister persister(path, region_manager, config); - RegionMap regions; - for (int i = 0; i < region_num; ++i) - { - auto region = std::make_shared(createRegionMeta(i)); - TiKVKey key = RecordKVFormat::genKey(100, i, diff++); - region->insert("default", TiKVKey::copyFrom(key), TiKVValue("value1")); - region->insert("write", TiKVKey::copyFrom(key), RecordKVFormat::encodeWriteCfValue('P', 0)); - region->insert("lock", TiKVKey::copyFrom(key), RecordKVFormat::encodeLockCfValue('P', "", 0, 0)); - - persister.persist(*region); - - regions.emplace(region->id(), region); - } - - if (is_gc) - persister.gc(); - - RegionMap new_regions; - persister.restore(new_regions); - - for (int i = 0; i < region_num; ++i) - { - auto old_region = regions[i]; - auto new_region = new_regions[i]; - ASSERT_CHECK_EQUAL(*new_region, *old_region, suc); - } - - if (clean_up) - remove_dir(path); - }; - - auto run_test = [&](const std::string & path, bool sync_on_write) { - Timepoint t1 = Clock::now(); - - remove_dir(path); - SCOPE_EXIT({ remove_dir(path); }); - - { - PageStorage::Config conf; - conf.sync_on_write = sync_on_write; - conf.file_roll_size = 1; - conf.merge_hint_low_used_file_total_size = 1; - - test_func1(path, conf, 10, false, false); - test_func1(path, conf, 10, true, false); - - test_func1(path, conf, 10, false, true); - test_func1(path, conf, 10, true, true); - } - { - PageStorage::Config conf; - conf.sync_on_write = sync_on_write; - conf.file_roll_size = 500; - conf.merge_hint_low_used_file_total_size = 1; - - test_func1(path, conf, 100, false, false); - test_func1(path, conf, 100, false, false); - test_func1(path, conf, 100, false, false); - test_func1(path, conf, 100, false, true); - - test_func1(path, conf, 100, true, false); - test_func1(path, conf, 100, true, false); - test_func1(path, conf, 100, true, false); - test_func1(path, conf, 100, true, false); - } - { - PageStorage::Config conf; - conf.sync_on_write = sync_on_write; - conf.file_roll_size = 500; - conf.merge_hint_low_used_file_total_size = 1; - - test_func1(path, conf, 100, false, false); - test_func1(path, conf, 100, false, false); - test_func1(path, conf, 100, false, false); - test_func1(path, conf, 100, false, true); - - test_func1(path, conf, 100, true, false); - test_func1(path, conf, 100, true, false); - test_func1(path, conf, 100, true, false); - test_func1(path, conf, 100, true, false); - } - { - PageStorage::Config conf; - conf.sync_on_write = sync_on_write; - - test_func1(path, conf, 10000, false, false); - test_func1(path, conf, 10000, false, false); - test_func1(path, conf, 10000, false, false); - test_func1(path, conf, 10000, false, false); - test_func1(path, conf, 10000, false, false); - test_func1(path, conf, 10000, false, false); - test_func1(path, conf, 10000, false, false); - } - { - PageStorage::Config conf; - conf.sync_on_write = sync_on_write; - - test_func1(path, conf, 10000, true, false); - test_func1(path, conf, 10000, true, false); - test_func1(path, conf, 10000, true, false); - test_func1(path, conf, 10000, true, false); - test_func1(path, conf, 10000, true, false); - test_func1(path, conf, 10000, true, false); - test_func1(path, conf, 10000, true, false); - } - - Timepoint t2 = Clock::now(); - Seconds seconds = std::chrono::duration_cast(t2 - t1); - std::cout << "sync_on_write[" << sync_on_write << "] time: " << seconds.count() << " seconds" << std::endl; - }; - - run_test(dir_path + "region_persist_storage_sow_false", false); - run_test(dir_path + "region_persist_storage_sow_true", true); - - - return suc ? 0 : 1; -} diff --git a/dbms/src/test_utils/TiflashTestBasic.h b/dbms/src/test_utils/TiflashTestBasic.h index 0a172fb0363..2b9bd59d862 100644 --- a/dbms/src/test_utils/TiflashTestBasic.h +++ b/dbms/src/test_utils/TiflashTestBasic.h @@ -113,7 +113,7 @@ class TiFlashTestEnv throw Exception("Can not find testdata with name[" + name + "]"); } - static Context & getContext(const DB::Settings & settings = DB::Settings(), std::vector testdata_path = {}) + static Context & getContext(const DB::Settings & settings = DB::Settings(), Strings testdata_path = {}) { static Context context = DB::Context::createGlobal(); // Load `testdata_path` as path if it is set. From 20fe0d600d4b2fd3bd54af5cb204fd6ac2ee38b0 Mon Sep 17 00:00:00 2001 From: JaySon-Huang Date: Thu, 5 Nov 2020 15:50:56 +0800 Subject: [PATCH 3/8] Add persister compatibility mode test --- dbms/src/Common/FailPoint.cpp | 4 +- dbms/src/Storages/Page/PageStorage.cpp | 4 +- dbms/src/Storages/Page/PageStorage.h | 1 + .../Storages/Transaction/RegionPersister.cpp | 11 ++ .../tests/gtest_region_persister.cpp | 146 ++++++++++++++++-- dbms/src/test_utils/TiflashTestBasic.h | 4 +- 6 files changed, 155 insertions(+), 15 deletions(-) diff --git a/dbms/src/Common/FailPoint.cpp b/dbms/src/Common/FailPoint.cpp index d3b979817db..0e4bdfde224 100644 --- a/dbms/src/Common/FailPoint.cpp +++ b/dbms/src/Common/FailPoint.cpp @@ -23,7 +23,9 @@ std::unordered_map> FailPointHelper::f M(region_exception_after_read_from_storage_some_error) \ M(region_exception_after_read_from_storage_all_error) \ M(exception_before_dmfile_remove_encryption) \ - M(exception_before_dmfile_remove_from_disk) + M(exception_before_dmfile_remove_from_disk) \ + M(force_enable_region_persister_compatibility_mode) \ + M(force_disable_region_persister_compatibility_mode) #define APPLY_FOR_FAILPOINTS_WITH_CHANNEL(M) M(pause_after_learner_read) diff --git a/dbms/src/Storages/Page/PageStorage.cpp b/dbms/src/Storages/Page/PageStorage.cpp index 943e31d4b9f..b896a9a745a 100644 --- a/dbms/src/Storages/Page/PageStorage.cpp +++ b/dbms/src/Storages/Page/PageStorage.cpp @@ -72,8 +72,8 @@ PageFile::Version PageStorage::getMinDataVersion(const FileProviderPtr & file_pr if (page_files.empty()) return PageFile::CURRENT_VERSION; - // Simply check the first PageFile is good enough - auto reader = const_cast(*page_files.begin()).createMetaMergingReader(); + // Simply check the last PageFile is good enough + auto reader = const_cast(*page_files.rbegin()).createMetaMergingReader(); PageFile::Version min_binary_version = PageFile::CURRENT_VERSION, temp_version; reader->moveNext(&temp_version); diff --git a/dbms/src/Storages/Page/PageStorage.h b/dbms/src/Storages/Page/PageStorage.h index d3b114f1021..cbc86c93053 100644 --- a/dbms/src/Storages/Page/PageStorage.h +++ b/dbms/src/Storages/Page/PageStorage.h @@ -3,6 +3,7 @@ #include #include #include +#include #include #include diff --git a/dbms/src/Storages/Transaction/RegionPersister.cpp b/dbms/src/Storages/Transaction/RegionPersister.cpp index 658aa88ddd5..703b84b6bb0 100644 --- a/dbms/src/Storages/Transaction/RegionPersister.cpp +++ b/dbms/src/Storages/Transaction/RegionPersister.cpp @@ -1,3 +1,4 @@ +#include #include #include #include @@ -15,6 +16,12 @@ namespace ErrorCodes extern const int LOGICAL_ERROR; } // namespace ErrorCodes +namespace FailPoints +{ +extern const char force_enable_region_persister_compatibility_mode[]; +extern const char force_disable_region_persister_compatibility_mode[]; +} + void RegionPersister::drop(RegionID region_id, const RegionTaskLock &) { if (page_storage) @@ -135,6 +142,10 @@ RegionMap RegionPersister::restore(IndexReaderCreateFunc * func, PageStorage::Co auto detect_binary_version = PageStorage::getMinDataVersion(global_context.getFileProvider(), delegator); bool run_in_compatibility_mode = path_pool.isRaftStorageCapatibilityModeEnabled() && (detect_binary_version == PageFile::VERSION_BASE); + + fiu_do_on(FailPoints::force_enable_region_persister_compatibility_mode, { run_in_compatibility_mode = true; }); + fiu_do_on(FailPoints::force_disable_region_persister_compatibility_mode, { run_in_compatibility_mode = false; }); + if (!run_in_compatibility_mode) { config.num_write_slots = 4; // extend write slots to 4 at least diff --git a/dbms/src/Storages/Transaction/tests/gtest_region_persister.cpp b/dbms/src/Storages/Transaction/tests/gtest_region_persister.cpp index 6275880a83d..3a2ee159d5c 100644 --- a/dbms/src/Storages/Transaction/tests/gtest_region_persister.cpp +++ b/dbms/src/Storages/Transaction/tests/gtest_region_persister.cpp @@ -2,14 +2,15 @@ #define private public #include +#include #undef private +#include #include #include #include #include #include -#include #include #include @@ -17,7 +18,15 @@ #include "region_helper.h" -namespace DB::tests +namespace DB +{ +namespace FailPoints +{ +extern const char force_enable_region_persister_compatibility_mode[]; +extern const char force_disable_region_persister_compatibility_mode[]; +} // namespace FailPoints + +namespace tests { class RegionPersister_test : public ::testing::Test @@ -25,7 +34,11 @@ class RegionPersister_test : public ::testing::Test public: RegionPersister_test() : dir_path(TiFlashTestEnv::getTemporaryPath() + "/region_persister_tmp") {} - static void SetUpTestCase() { TiFlashTestEnv::setupLogger(); } + static void SetUpTestCase() + { + TiFlashTestEnv::setupLogger(); + fiu_init(0); // init failpoint + } void SetUp() override { dropFiles(); } @@ -53,7 +66,7 @@ static ::testing::AssertionResult PeerCompare( // const metapb::Peer & lhs, const metapb::Peer & rhs) { - if (lhs.id() == rhs.id() && lhs.is_learner() == rhs.is_learner()) + if (lhs.id() == rhs.id() && lhs.role() == rhs.role()) return ::testing::AssertionSuccess(); else return ::testing::internal::EqFailure(lhs_expr, rhs_expr, lhs.ShortDebugString(), rhs.ShortDebugString(), false); @@ -198,12 +211,13 @@ try size_t region_num = 100; RegionMap regions; TableID table_id = 100; + + PageStorage::Config config; + config.file_roll_size = 128 * MB; { UInt64 diff = 0; - // PageStorage::Config config; - // config.file_roll_size = 128 * MB; RegionPersister persister(ctx, region_manager); - persister.restore(); + persister.restore(nullptr, config); for (size_t i = 0; i < region_num; ++i) { @@ -229,14 +243,14 @@ try RegionMap new_regions; { RegionPersister persister(ctx, region_manager); - new_regions = persister.restore(); + new_regions = persister.restore(nullptr, config); size_t num_regions_missed = 0; for (size_t i = 0; i < region_num; ++i) { auto new_iter = new_regions.find(i); if (new_iter == new_regions.end()) { - LOG_INFO(&Poco::Logger::get("RegionPersister_test"), "Region missed, id=" << i); + LOG_ERROR(&Poco::Logger::get("RegionPersister_test"), "Region missed, id=" << i); ++num_regions_missed; } else @@ -251,6 +265,117 @@ try } CATCH +TEST_F(RegionPersister_test, persister_compatibility_mode) +try +{ + std::string path = dir_path + "/compatibility_mode"; + + // Force to run in compatibility mode for the default region persister + FailPointHelper::enableFailPoint(FailPoints::force_enable_region_persister_compatibility_mode); + SCOPE_EXIT({ FailPointHelper::disableFailPoint(FailPoints::force_enable_region_persister_compatibility_mode); }); + auto & ctx = TiFlashTestEnv::getContext(DB::Settings(), + Strings{ + path, + }); + + size_t region_num = 500; + RegionMap regions; + TableID table_id = 100; + + PageStorage::Config config; + config.file_roll_size = 2 * MB; + RegionManager region_manager; + DB::Timestamp tso = 0; + { + RegionPersister persister(ctx, region_manager); + // Force to run in compatibility mode + FailPointHelper::enableFailPoint(FailPoints::force_enable_region_persister_compatibility_mode); + persister.restore(nullptr, config); + ASSERT_EQ(persister.page_storage, nullptr); + ASSERT_NE(persister.stable_page_storage, nullptr); + + for (size_t i = 0; i < region_num; ++i) + { + auto region = std::make_shared(createRegionMeta(i, table_id)); + TiKVKey key = RecordKVFormat::genKey(table_id, i, tso++); + region->insert("default", TiKVKey::copyFrom(key), TiKVValue("value1")); + region->insert("write", TiKVKey::copyFrom(key), RecordKVFormat::encodeWriteCfValue('P', 0)); + region->insert("lock", TiKVKey::copyFrom(key), RecordKVFormat::encodeLockCfValue('P', "", 0, 0)); + + persister.persist(*region); + + regions.emplace(region->id(), region); + } + } + + { + RegionPersister persister(ctx, region_manager); + // restore normally, should run in compatibility mode. + RegionMap new_regions = persister.restore(nullptr, config); + ASSERT_EQ(persister.page_storage, nullptr); + ASSERT_NE(persister.stable_page_storage, nullptr); + // Try to read + for (size_t i = 0; i < region_num; ++i) + { + auto new_iter = new_regions.find(i); + ASSERT_NE(new_iter, new_regions.end()); + auto old_region = regions[i]; + auto new_region = new_regions[i]; + ASSERT_EQ(*new_region, *old_region); + } + } + + size_t region_num_under_nromal_mode = 200; + { + RegionPersister persister(ctx, region_manager); + // Force to run in normal mode + FailPointHelper::enableFailPoint(FailPoints::force_disable_region_persister_compatibility_mode); + RegionMap new_regions = persister.restore(nullptr, config); + ASSERT_NE(persister.page_storage, nullptr); + ASSERT_EQ(persister.stable_page_storage, nullptr); + // Try to read + for (size_t i = 0; i < region_num; ++i) + { + auto new_iter = new_regions.find(i); + ASSERT_NE(new_iter, new_regions.end()); + auto old_region = regions[i]; + auto new_region = new_regions[i]; + ASSERT_EQ(*new_region, *old_region); + } + // Try to write more regions under normal mode + for (size_t i = region_num; i < region_num + region_num_under_nromal_mode; ++i) + { + auto region = std::make_shared(createRegionMeta(i, table_id)); + TiKVKey key = RecordKVFormat::genKey(table_id, i, tso++); + region->insert("default", TiKVKey::copyFrom(key), TiKVValue("value1")); + region->insert("write", TiKVKey::copyFrom(key), RecordKVFormat::encodeWriteCfValue('P', 0)); + region->insert("lock", TiKVKey::copyFrom(key), RecordKVFormat::encodeLockCfValue('P', "", 0, 0)); + + persister.persist(*region); + + regions.emplace(region->id(), region); + } + } + + { + RegionPersister persister(ctx, region_manager); + // Restore normally, should run in normal mode. + RegionMap new_regions = persister.restore(nullptr, config); + ASSERT_NE(persister.page_storage, nullptr); + ASSERT_EQ(persister.stable_page_storage, nullptr); + // Try to read + for (size_t i = 0; i < region_num + region_num_under_nromal_mode; ++i) + { + auto new_iter = new_regions.find(i); + ASSERT_NE(new_iter, new_regions.end()) << " region:" << i; + auto old_region = regions[i]; + auto new_region = new_regions[i]; + ASSERT_EQ(*new_region, *old_region) << " region:" << i; + } + } +} +CATCH + void RegionPersister_test::testFunc(const String & path, const PageStorage::Config & config, int region_num, bool is_gc, bool clean_up) { @@ -391,4 +516,5 @@ TEST_F(RegionPersister_test, DISABLED_persister_sync_on_write) runTest(dir_path + "region_persist_storage_sow_true", true); } -} // namespace DB::tests +} // namespace tests +} // namespace DB diff --git a/dbms/src/test_utils/TiflashTestBasic.h b/dbms/src/test_utils/TiflashTestBasic.h index 2b9bd59d862..9e3247b4c89 100644 --- a/dbms/src/test_utils/TiflashTestBasic.h +++ b/dbms/src/test_utils/TiflashTestBasic.h @@ -126,7 +126,7 @@ class TiFlashTestEnv { context.getTMTContext(); auto paths = getPathPool(testdata_path); - context.setPathPool(paths.first, paths.second, Strings{}, false, context.getPathCapacity(), context.getFileProvider()); + context.setPathPool(paths.first, paths.second, Strings{}, true, context.getPathCapacity(), context.getFileProvider()); } catch (Exception & e) { @@ -148,7 +148,7 @@ class TiFlashTestEnv testdata_path.emplace_back(getTemporaryPath()); context.initializePathCapacityMetric(testdata_path, 0); auto paths = getPathPool(testdata_path); - context.setPathPool(paths.first, paths.second, Strings{}, false, context.getPathCapacity(), context.getFileProvider()); + context.setPathPool(paths.first, paths.second, Strings{}, true, context.getPathCapacity(), context.getFileProvider()); context.createTMTContext({}, {"default"}, TiDB::StorageEngine::TMT, false); context.getTMTContext().restore(); From 1992b199cb83e9f356f9d7115cdd9075c0cef7e7 Mon Sep 17 00:00:00 2001 From: JaySon-Huang Date: Thu, 5 Nov 2020 19:21:54 +0800 Subject: [PATCH 4/8] Fix compile error --- dbms/src/Storages/Transaction/RegionPersister.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dbms/src/Storages/Transaction/RegionPersister.h b/dbms/src/Storages/Transaction/RegionPersister.h index 50eda9e6186..1fc7f307ba3 100644 --- a/dbms/src/Storages/Transaction/RegionPersister.h +++ b/dbms/src/Storages/Transaction/RegionPersister.h @@ -31,7 +31,7 @@ class RegionPersister final : private boost::noncopyable void drop(RegionID region_id, const RegionTaskLock &); void persist(const Region & region); void persist(const Region & region, const RegionTaskLock & lock); - RegionMap restore(IndexReaderCreateFunc * func = nullptr, PageStorage::Config config = {}); + RegionMap restore(IndexReaderCreateFunc * func = nullptr, DB::PageStorage::Config config = DB::PageStorage::Config{}); bool gc(); using RegionCacheWriteElement = std::tuple; From 6e9a3e06957f01ec72ec377e97ee48051648c519 Mon Sep 17 00:00:00 2001 From: JaySon-Huang Date: Mon, 9 Nov 2020 16:56:22 +0800 Subject: [PATCH 5/8] Should check the max binary version of PageFile --- dbms/src/Storages/Page/PageStorage.cpp | 14 +++++++------- dbms/src/Storages/Page/PageStorage.h | 2 +- dbms/src/Storages/Transaction/RegionPersister.cpp | 2 +- .../Transaction/tests/gtest_region_persister.cpp | 2 +- 4 files changed, 10 insertions(+), 10 deletions(-) diff --git a/dbms/src/Storages/Page/PageStorage.cpp b/dbms/src/Storages/Page/PageStorage.cpp index b896a9a745a..3347da5acb9 100644 --- a/dbms/src/Storages/Page/PageStorage.cpp +++ b/dbms/src/Storages/Page/PageStorage.cpp @@ -61,9 +61,9 @@ bool PageStorage::StatisticsInfo::equals(const StatisticsInfo & rhs) return puts == rhs.puts && refs == rhs.refs && deletes == rhs.deletes && upserts == rhs.upserts; } -PageFile::Version PageStorage::getMinDataVersion(const FileProviderPtr & file_provider, PSDiskDelegatorPtr & delegator) +PageFile::Version PageStorage::getMaxDataVersion(const FileProviderPtr & file_provider, PSDiskDelegatorPtr & delegator) { - Poco::Logger * log = &Poco::Logger::get("PageStorage::getMinDataVersion"); + Poco::Logger * log = &Poco::Logger::get("PageStorage::getMaxDataVersion"); ListPageFilesOption option; option.ignore_checkpoint = true; option.ignore_legacy = true; @@ -75,17 +75,17 @@ PageFile::Version PageStorage::getMinDataVersion(const FileProviderPtr & file_pr // Simply check the last PageFile is good enough auto reader = const_cast(*page_files.rbegin()).createMetaMergingReader(); - PageFile::Version min_binary_version = PageFile::CURRENT_VERSION, temp_version; + PageFile::Version max_binary_version = PageFile::CURRENT_VERSION, temp_version; reader->moveNext(&temp_version); - min_binary_version = std::min(min_binary_version, temp_version); + max_binary_version = std::max(max_binary_version, temp_version); while (reader->hasNext()) { // Continue to read the binary version of next WriteBatch. reader->moveNext(&temp_version); - min_binary_version = std::min(min_binary_version, temp_version); + max_binary_version = std::max(max_binary_version, temp_version); } - LOG_TRACE(log, "getMinDataVersion done from " + reader->toString()); - return min_binary_version; + LOG_DEBUG(log, "getMaxDataVersion done from " + reader->toString() << " [max version=" << max_binary_version << "]"); + return max_binary_version; } PageFileSet PageStorage::listAllPageFiles(const FileProviderPtr & file_provider, diff --git a/dbms/src/Storages/Page/PageStorage.h b/dbms/src/Storages/Page/PageStorage.h index cbc86c93053..f94750aedca 100644 --- a/dbms/src/Storages/Page/PageStorage.h +++ b/dbms/src/Storages/Page/PageStorage.h @@ -156,7 +156,7 @@ class PageStorage Poco::Logger * page_file_log, const ListPageFilesOption & option = ListPageFilesOption()); - static PageFile::Version getMinDataVersion(const FileProviderPtr & file_provider, PSDiskDelegatorPtr & delegator); + static PageFile::Version getMaxDataVersion(const FileProviderPtr & file_provider, PSDiskDelegatorPtr & delegator); private: WriterPtr checkAndRenewWriter(PageFile & page_file, diff --git a/dbms/src/Storages/Transaction/RegionPersister.cpp b/dbms/src/Storages/Transaction/RegionPersister.cpp index 703b84b6bb0..a0189d74186 100644 --- a/dbms/src/Storages/Transaction/RegionPersister.cpp +++ b/dbms/src/Storages/Transaction/RegionPersister.cpp @@ -139,7 +139,7 @@ RegionMap RegionPersister::restore(IndexReaderCreateFunc * func, PageStorage::Co auto & path_pool = global_context.getPathPool(); auto delegator = path_pool.getPSDiskDelegatorRaft(); // If there is no PageFile with basic version binary format, use the latest version of PageStorage. - auto detect_binary_version = PageStorage::getMinDataVersion(global_context.getFileProvider(), delegator); + auto detect_binary_version = PageStorage::getMaxDataVersion(global_context.getFileProvider(), delegator); bool run_in_compatibility_mode = path_pool.isRaftStorageCapatibilityModeEnabled() && (detect_binary_version == PageFile::VERSION_BASE); diff --git a/dbms/src/Storages/Transaction/tests/gtest_region_persister.cpp b/dbms/src/Storages/Transaction/tests/gtest_region_persister.cpp index 3a2ee159d5c..98b7eee4908 100644 --- a/dbms/src/Storages/Transaction/tests/gtest_region_persister.cpp +++ b/dbms/src/Storages/Transaction/tests/gtest_region_persister.cpp @@ -283,7 +283,7 @@ try TableID table_id = 100; PageStorage::Config config; - config.file_roll_size = 2 * MB; + config.file_roll_size = 16 * 1024; RegionManager region_manager; DB::Timestamp tso = 0; { From 08cdecdfdc6b9b4ffb65545ef1714c4b4e8a2e86 Mon Sep 17 00:00:00 2001 From: JaySon-Huang Date: Mon, 9 Nov 2020 17:22:44 +0800 Subject: [PATCH 6/8] Fix some namings --- dbms/src/Common/FailPoint.cpp | 4 ++-- dbms/src/Interpreters/Context.cpp | 4 ++-- dbms/src/Interpreters/Context.h | 2 +- dbms/src/Server/Server.cpp | 9 +++++---- dbms/src/Storages/PathPool.cpp | 4 ++-- dbms/src/Storages/PathPool.h | 6 +++--- .../Storages/Transaction/RegionPersister.cpp | 16 ++++++++-------- .../tests/gtest_region_persister.cpp | 18 +++++++++--------- 8 files changed, 32 insertions(+), 31 deletions(-) diff --git a/dbms/src/Common/FailPoint.cpp b/dbms/src/Common/FailPoint.cpp index 0e4bdfde224..da740399473 100644 --- a/dbms/src/Common/FailPoint.cpp +++ b/dbms/src/Common/FailPoint.cpp @@ -24,8 +24,8 @@ std::unordered_map> FailPointHelper::f M(region_exception_after_read_from_storage_all_error) \ M(exception_before_dmfile_remove_encryption) \ M(exception_before_dmfile_remove_from_disk) \ - M(force_enable_region_persister_compatibility_mode) \ - M(force_disable_region_persister_compatibility_mode) + M(force_enable_region_persister_compatible_mode) \ + M(force_disable_region_persister_compatible_mode) #define APPLY_FOR_FAILPOINTS_WITH_CHANNEL(M) M(pause_after_learner_read) diff --git a/dbms/src/Interpreters/Context.cpp b/dbms/src/Interpreters/Context.cpp index 78dcf2ac844..1431102d8a0 100644 --- a/dbms/src/Interpreters/Context.cpp +++ b/dbms/src/Interpreters/Context.cpp @@ -559,13 +559,13 @@ void Context::setPathPool( // const Strings & main_data_paths, const Strings & latest_data_paths, const Strings & kvstore_paths, - bool enable_raft_storage_compatibility_mode, + bool enable_raft_compatibility_mode, PathCapacityMetricsPtr global_capacity_, FileProviderPtr file_provider_) { auto lock = getLock(); shared->path_pool = PathPool( - main_data_paths, latest_data_paths, kvstore_paths, global_capacity_, file_provider_, enable_raft_storage_compatibility_mode); + main_data_paths, latest_data_paths, kvstore_paths, global_capacity_, file_provider_, enable_raft_compatibility_mode); } void Context::setConfig(const ConfigurationPtr & config) diff --git a/dbms/src/Interpreters/Context.h b/dbms/src/Interpreters/Context.h index 71d8e3fbd63..8ffac7f5c53 100644 --- a/dbms/src/Interpreters/Context.h +++ b/dbms/src/Interpreters/Context.h @@ -177,7 +177,7 @@ class Context void setPathPool(const Strings & main_data_paths, const Strings & latest_data_paths, const Strings & kvstore_paths, - bool enable_raft_storage_compatibility_mode, + bool enable_raft_compatibility_mode, PathCapacityMetricsPtr global_capacity_, FileProviderPtr file_provider); diff --git a/dbms/src/Server/Server.cpp b/dbms/src/Server/Server.cpp index b129ad40399..cc3f8276b02 100644 --- a/dbms/src/Server/Server.cpp +++ b/dbms/src/Server/Server.cpp @@ -170,7 +170,7 @@ struct TiFlashRaftConfig Strings kvstore_path; // Actually it is "flash.service_addr" std::string flash_server_addr; - bool enable_storage_compatibility_mode = true; + bool enable_compatibility_mode = true; static const TiDB::StorageEngine DEFAULT_ENGINE = TiDB::StorageEngine::DT; bool disable_bg_flush = false; @@ -270,9 +270,10 @@ TiFlashRaftConfig::TiFlashRaftConfig(const Strings & latest_data_paths, Poco::Ut disable_bg_flush = true; } - if (config.has("raft.enable_storage_compatibility_mode")) + // just for test + if (config.has("raft.enable_compatibility_mode")) { - enable_storage_compatibility_mode = config.getBool("raft.enable_storage_compatibility_mode"); + enable_compatibility_mode = config.getBool("raft.enable_compatibility_mode"); } } } @@ -541,7 +542,7 @@ int Server::main(const std::vector & /*args*/) const std::string path = all_normal_path[0]; TiFlashRaftConfig raft_config(latest_data_paths, config(), log); global_context->setPathPool(main_data_paths, latest_data_paths, raft_config.kvstore_path, // - raft_config.enable_storage_compatibility_mode, // + raft_config.enable_compatibility_mode, // global_context->getPathCapacity(), global_context->getFileProvider()); // Use pd address to define which default_database we use by defauly. diff --git a/dbms/src/Storages/PathPool.cpp b/dbms/src/Storages/PathPool.cpp index 424a41b9044..180c2dc913d 100644 --- a/dbms/src/Storages/PathPool.cpp +++ b/dbms/src/Storages/PathPool.cpp @@ -25,11 +25,11 @@ extern const int LOGICAL_ERROR; // Constructor to be used during initialization PathPool::PathPool(const Strings & main_data_paths_, const Strings & latest_data_paths_, const Strings & kvstore_paths_, // - PathCapacityMetricsPtr global_capacity_, FileProviderPtr file_provider_, bool enable_raft_storage_compatibility_mode_) + PathCapacityMetricsPtr global_capacity_, FileProviderPtr file_provider_, bool enable_raft_compatibility_mode_) : main_data_paths(main_data_paths_), latest_data_paths(latest_data_paths_), kvstore_paths(kvstore_paths_), - enable_raft_storage_compatibility_mode(enable_raft_storage_compatibility_mode_), + enable_raft_compatibility_mode(enable_raft_compatibility_mode_), global_capacity(global_capacity_), file_provider(file_provider_), log(&Poco::Logger::get("PathPool")) diff --git a/dbms/src/Storages/PathPool.h b/dbms/src/Storages/PathPool.h index dbf24bec430..636ab54a7ab 100644 --- a/dbms/src/Storages/PathPool.h +++ b/dbms/src/Storages/PathPool.h @@ -45,12 +45,12 @@ class PathPool const Strings & main_data_paths, const Strings & latest_data_paths, // const Strings & kvstore_paths, // PathCapacityMetricsPtr global_capacity_, FileProviderPtr file_provider_, // - bool enable_raft_storage_compatibility_mode_ = false); + bool enable_raft_compatibility_mode_ = false); // Constructor to create PathPool for one Storage StoragePathPool withTable(const String & database_, const String & table_, bool path_need_database_name_) const; - bool isRaftStorageCapatibilityModeEnabled() const { return enable_raft_storage_compatibility_mode; } + bool isRaftCompatibilityModeEnabled() const { return enable_raft_compatibility_mode; } // Generate a delegator for managing the paths of `RegionPersister`. // Those paths are generated from `kvstore_paths`. @@ -78,7 +78,7 @@ class PathPool Strings latest_data_paths; Strings kvstore_paths; - bool enable_raft_storage_compatibility_mode; + bool enable_raft_compatibility_mode; PathCapacityMetricsPtr global_capacity; diff --git a/dbms/src/Storages/Transaction/RegionPersister.cpp b/dbms/src/Storages/Transaction/RegionPersister.cpp index a0189d74186..58abef1a0a8 100644 --- a/dbms/src/Storages/Transaction/RegionPersister.cpp +++ b/dbms/src/Storages/Transaction/RegionPersister.cpp @@ -18,8 +18,8 @@ extern const int LOGICAL_ERROR; namespace FailPoints { -extern const char force_enable_region_persister_compatibility_mode[]; -extern const char force_disable_region_persister_compatibility_mode[]; +extern const char force_enable_region_persister_compatible_mode[]; +extern const char force_disable_region_persister_compatible_mode[]; } void RegionPersister::drop(RegionID region_id, const RegionTaskLock &) @@ -140,13 +140,13 @@ RegionMap RegionPersister::restore(IndexReaderCreateFunc * func, PageStorage::Co auto delegator = path_pool.getPSDiskDelegatorRaft(); // If there is no PageFile with basic version binary format, use the latest version of PageStorage. auto detect_binary_version = PageStorage::getMaxDataVersion(global_context.getFileProvider(), delegator); - bool run_in_compatibility_mode - = path_pool.isRaftStorageCapatibilityModeEnabled() && (detect_binary_version == PageFile::VERSION_BASE); + bool run_in_compatible_mode + = path_pool.isRaftCompatibilityModeEnabled() && (detect_binary_version == PageFile::VERSION_BASE); - fiu_do_on(FailPoints::force_enable_region_persister_compatibility_mode, { run_in_compatibility_mode = true; }); - fiu_do_on(FailPoints::force_disable_region_persister_compatibility_mode, { run_in_compatibility_mode = false; }); + fiu_do_on(FailPoints::force_enable_region_persister_compatible_mode, { run_in_compatible_mode = true; }); + fiu_do_on(FailPoints::force_disable_region_persister_compatible_mode, { run_in_compatible_mode = false; }); - if (!run_in_compatibility_mode) + if (!run_in_compatible_mode) { config.num_write_slots = 4; // extend write slots to 4 at least LOG_INFO(log, "RegionPersister running in normal mode"); @@ -160,7 +160,7 @@ RegionMap RegionPersister::restore(IndexReaderCreateFunc * func, PageStorage::Co } else { - LOG_INFO(log, "RegionPersister running in compatibility mode"); + LOG_INFO(log, "RegionPersister running in compatible mode"); auto c = getStablePSConfig(config); stable_page_storage = std::make_unique( // "RegionPersister", delegator->defaultPath(), c, global_context.getFileProvider()); diff --git a/dbms/src/Storages/Transaction/tests/gtest_region_persister.cpp b/dbms/src/Storages/Transaction/tests/gtest_region_persister.cpp index 98b7eee4908..1809060a187 100644 --- a/dbms/src/Storages/Transaction/tests/gtest_region_persister.cpp +++ b/dbms/src/Storages/Transaction/tests/gtest_region_persister.cpp @@ -22,8 +22,8 @@ namespace DB { namespace FailPoints { -extern const char force_enable_region_persister_compatibility_mode[]; -extern const char force_disable_region_persister_compatibility_mode[]; +extern const char force_enable_region_persister_compatible_mode[]; +extern const char force_disable_region_persister_compatible_mode[]; } // namespace FailPoints namespace tests @@ -270,9 +270,9 @@ try { std::string path = dir_path + "/compatibility_mode"; - // Force to run in compatibility mode for the default region persister - FailPointHelper::enableFailPoint(FailPoints::force_enable_region_persister_compatibility_mode); - SCOPE_EXIT({ FailPointHelper::disableFailPoint(FailPoints::force_enable_region_persister_compatibility_mode); }); + // Force to run in compatible mode for the default region persister + FailPointHelper::enableFailPoint(FailPoints::force_enable_region_persister_compatible_mode); + SCOPE_EXIT({ FailPointHelper::disableFailPoint(FailPoints::force_enable_region_persister_compatible_mode); }); auto & ctx = TiFlashTestEnv::getContext(DB::Settings(), Strings{ path, @@ -288,8 +288,8 @@ try DB::Timestamp tso = 0; { RegionPersister persister(ctx, region_manager); - // Force to run in compatibility mode - FailPointHelper::enableFailPoint(FailPoints::force_enable_region_persister_compatibility_mode); + // Force to run in compatible mode + FailPointHelper::enableFailPoint(FailPoints::force_enable_region_persister_compatible_mode); persister.restore(nullptr, config); ASSERT_EQ(persister.page_storage, nullptr); ASSERT_NE(persister.stable_page_storage, nullptr); @@ -310,7 +310,7 @@ try { RegionPersister persister(ctx, region_manager); - // restore normally, should run in compatibility mode. + // restore normally, should run in compatible mode. RegionMap new_regions = persister.restore(nullptr, config); ASSERT_EQ(persister.page_storage, nullptr); ASSERT_NE(persister.stable_page_storage, nullptr); @@ -329,7 +329,7 @@ try { RegionPersister persister(ctx, region_manager); // Force to run in normal mode - FailPointHelper::enableFailPoint(FailPoints::force_disable_region_persister_compatibility_mode); + FailPointHelper::enableFailPoint(FailPoints::force_disable_region_persister_compatible_mode); RegionMap new_regions = persister.restore(nullptr, config); ASSERT_NE(persister.page_storage, nullptr); ASSERT_EQ(persister.stable_page_storage, nullptr); From 7018c3ac30762fc30a4fbf99251079de88346543 Mon Sep 17 00:00:00 2001 From: JaySon-Huang Date: Mon, 9 Nov 2020 17:25:53 +0800 Subject: [PATCH 7/8] Address comment --- dbms/src/Storages/PathPool.cpp | 38 +++++++++++++++++++--------------- 1 file changed, 21 insertions(+), 17 deletions(-) diff --git a/dbms/src/Storages/PathPool.cpp b/dbms/src/Storages/PathPool.cpp index 180c2dc913d..a539660d8c2 100644 --- a/dbms/src/Storages/PathPool.cpp +++ b/dbms/src/Storages/PathPool.cpp @@ -23,6 +23,15 @@ namespace ErrorCodes extern const int LOGICAL_ERROR; } +inline String removeTrailingSlash(String s) +{ + if (s.back() == '/') + s.erase(s.begin() + s.size() - 1); + return s; +} + +inline String getNormalizedPath(const String & s) { return removeTrailingSlash(Poco::Path{s}.toString()); } + // Constructor to be used during initialization PathPool::PathPool(const Strings & main_data_paths_, const Strings & latest_data_paths_, const Strings & kvstore_paths_, // PathCapacityMetricsPtr global_capacity_, FileProviderPtr file_provider_, bool enable_raft_compatibility_mode_) @@ -40,9 +49,7 @@ PathPool::PathPool(const Strings & main_data_paths_, const Strings & latest_data for (const auto & s : latest_data_paths) { // Get a normalized path without trailing '/' - auto p = Poco::Path{s + "/kvstore"}.toString(); - if (p.back() == '/') - p.erase(p.begin() + p.size() - 1); + auto p = getNormalizedPath(s + "/kvstore"); kvstore_paths.emplace_back(std::move(p)); } } @@ -57,9 +64,9 @@ Strings PathPool::listPaths() const { std::set path_set; for (const auto & p : main_data_paths) - path_set.insert(Poco::Path{p + "/data"}.toString()); + path_set.insert(getNormalizedPath(p + "/data")); for (const auto & p : latest_data_paths) - path_set.insert(Poco::Path{p + "/data"}.toString()); + path_set.insert(getNormalizedPath(p + "/data")); Strings paths; for (const auto & p : path_set) paths.emplace_back(p); @@ -237,9 +244,9 @@ void StoragePathPool::drop(bool recursive, bool must_success) String StoragePathPool::getStorePath(const String & extra_path_root, const String & database_name, const String & table_name) { if (likely(!path_need_database_name)) - return Poco::Path{extra_path_root + "/" + escapeForFileName(table_name)}.toString(); + return getNormalizedPath(extra_path_root + "/" + escapeForFileName(table_name)); else - return Poco::Path{extra_path_root + "/" + escapeForFileName(database_name) + "/" + escapeForFileName(table_name)}.toString(); + return getNormalizedPath(extra_path_root + "/" + escapeForFileName(database_name) + "/" + escapeForFileName(table_name)); } void StoragePathPool::renamePath(const String & old_path, const String & new_path) @@ -414,9 +421,7 @@ size_t PSDiskDelegatorMulti::addPageFileUsedSize( const PageFileIdAndLevel & id_lvl, size_t size_to_add, const String & pf_parent_path, bool need_insert_location) { // Get a normalized path without `path_prefix` and trailing '/' - String upper_path = Poco::Path(pf_parent_path).parent().toString(); - if (upper_path.back() == '/') - upper_path.erase(upper_path.begin() + upper_path.size() - 1); + String upper_path = removeTrailingSlash(Poco::Path(pf_parent_path).parent().toString()); UInt32 index = UINT32_MAX; for (size_t i = 0; i < pool.latest_path_infos.size(); i++) { @@ -479,7 +484,10 @@ Strings PSDiskDelegatorSingle::listPaths() const return paths; } -String PSDiskDelegatorSingle::choosePath(const PageFileIdAndLevel & /*id_lvl*/) { return pool.latest_path_infos[0].path + "/" + path_prefix; } +String PSDiskDelegatorSingle::choosePath(const PageFileIdAndLevel & /*id_lvl*/) +{ + return pool.latest_path_infos[0].path + "/" + path_prefix; +} size_t PSDiskDelegatorSingle::addPageFileUsedSize( const PageFileIdAndLevel & /*id_lvl*/, size_t size_to_add, const String & pf_parent_path, bool /*need_insert_location*/) @@ -509,9 +517,7 @@ PSDiskDelegatorRaft::PSDiskDelegatorRaft(PathPool & pool_) : pool(pool_) { RaftPathInfo info; // Get a normalized path without trailing '/' - info.path = s; - if (info.path.back() == '/') - info.path.erase(info.path.begin() + info.path.size() - 1); + info.path = getNormalizedPath(s); info.total_size = 0; raft_path_infos.emplace_back(info); } @@ -565,9 +571,7 @@ size_t PSDiskDelegatorRaft::addPageFileUsedSize( const PageFileIdAndLevel & id_lvl, size_t size_to_add, const String & pf_parent_path, bool need_insert_location) { // Get a normalized path without trailing '/' - String upper_path = Poco::Path(pf_parent_path).toString(); - if (upper_path.back() == '/') - upper_path.erase(upper_path.begin() + upper_path.size() - 1); + String upper_path = getNormalizedPath(pf_parent_path); UInt32 index = UINT32_MAX; for (size_t i = 0; i < raft_path_infos.size(); i++) { From 297a906924fdfb4e397e6bccca4fef14f32245a3 Mon Sep 17 00:00:00 2001 From: JaySon-Huang Date: Mon, 9 Nov 2020 19:33:47 +0800 Subject: [PATCH 8/8] Fix bug in getMaxDataVersion --- dbms/src/Storages/Page/PageStorage.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dbms/src/Storages/Page/PageStorage.cpp b/dbms/src/Storages/Page/PageStorage.cpp index 3347da5acb9..c387677a45d 100644 --- a/dbms/src/Storages/Page/PageStorage.cpp +++ b/dbms/src/Storages/Page/PageStorage.cpp @@ -75,7 +75,7 @@ PageFile::Version PageStorage::getMaxDataVersion(const FileProviderPtr & file_pr // Simply check the last PageFile is good enough auto reader = const_cast(*page_files.rbegin()).createMetaMergingReader(); - PageFile::Version max_binary_version = PageFile::CURRENT_VERSION, temp_version; + PageFile::Version max_binary_version = PageFile::VERSION_BASE, temp_version = PageFile::CURRENT_VERSION; reader->moveNext(&temp_version); max_binary_version = std::max(max_binary_version, temp_version); while (reader->hasNext())