Skip to content

Commit

Permalink
Support multi-disks for RegionPersister (pingcap#1199)
Browse files Browse the repository at this point in the history
  • Loading branch information
JaySon-Huang committed Nov 15, 2020
1 parent f953d01 commit b064851
Show file tree
Hide file tree
Showing 32 changed files with 1,102 additions and 483 deletions.
4 changes: 3 additions & 1 deletion dbms/src/Common/FailPoint.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@ std::unordered_map<String, std::shared_ptr<FailPointChannel>> FailPointHelper::f
M(region_exception_after_read_from_storage_some_error) \
M(region_exception_after_read_from_storage_all_error) \
M(exception_before_dmfile_remove_encryption) \
M(exception_before_dmfile_remove_from_disk)
M(exception_before_dmfile_remove_from_disk) \
M(force_enable_region_persister_compatible_mode) \
M(force_disable_region_persister_compatible_mode)

#define APPLY_FOR_FAILPOINTS_WITH_CHANNEL(M) M(pause_after_learner_read)

Expand Down
17 changes: 10 additions & 7 deletions dbms/src/Interpreters/Context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ struct ContextShared
String tmp_path; /// The path to the temporary files that occur when processing the request.
String flags_path; /// Path to the directory with some control flags for server maintenance.
String user_files_path; /// Path to the directory with user provided files, usable by 'file' table function.
PathPool extra_paths; /// The extra data directories. Some Storage Engine like DeltaMerge will store the main data in them if specified.
PathPool path_pool; /// The data directories. RegionPersister and some Storage Engine like DeltaMerge will use this to manage data placement on disks.
ConfigurationPtr config; /// Global configuration settings.

Databases databases; /// List of databases and tables in them.
Expand Down Expand Up @@ -515,10 +515,10 @@ String Context::getUserFilesPath() const
return shared->user_files_path;
}

PathPool & Context::getExtraPaths() const
PathPool & Context::getPathPool() const
{
auto lock = getLock();
return shared->extra_paths;
return shared->path_pool;
}

void Context::setPath(const String & path)
Expand Down Expand Up @@ -555,13 +555,17 @@ void Context::setUserFilesPath(const String & path)
shared->user_files_path = path;
}

void Context::setExtraPaths(const Strings & main_data_paths,
void Context::setPathPool( //
const Strings & main_data_paths,
const Strings & latest_data_paths,
const Strings & kvstore_paths,
bool enable_raft_compatibility_mode,
PathCapacityMetricsPtr global_capacity_,
FileProviderPtr file_provider_)
{
auto lock = getLock();
shared->extra_paths = PathPool(main_data_paths, latest_data_paths, global_capacity_, file_provider_);
shared->path_pool = PathPool(
main_data_paths, latest_data_paths, kvstore_paths, global_capacity_, file_provider_, enable_raft_compatibility_mode);
}

void Context::setConfig(const ConfigurationPtr & config)
Expand Down Expand Up @@ -1449,15 +1453,14 @@ DDLWorker & Context::getDDLWorker() const

void Context::createTMTContext(const std::vector<std::string> & pd_addrs,
const std::unordered_set<std::string> & ignore_databases,
const std::string & kvstore_path,
::TiDB::StorageEngine engine,
bool disable_bg_flush,
pingcap::ClusterConfig cluster_config)
{
auto lock = getLock();
if (shared->tmt_context)
throw Exception("TMTContext has already existed", ErrorCodes::LOGICAL_ERROR);
shared->tmt_context = std::make_shared<TMTContext>(*this, pd_addrs, ignore_databases, kvstore_path, engine, disable_bg_flush, cluster_config);
shared->tmt_context = std::make_shared<TMTContext>(*this, pd_addrs, ignore_databases, engine, disable_bg_flush, cluster_config);
}

void Context::initializePathCapacityMetric(const std::vector<std::string> & all_path, size_t capacity_quota)
Expand Down
7 changes: 4 additions & 3 deletions dbms/src/Interpreters/Context.h
Original file line number Diff line number Diff line change
Expand Up @@ -167,15 +167,17 @@ class Context
String getTemporaryPath() const;
String getFlagsPath() const;
String getUserFilesPath() const;
PathPool & getExtraPaths() const;
PathPool & getPathPool() const;

void setPath(const String & path);
void setTemporaryPath(const String & path);
void setFlagsPath(const String & path);
void setUserFilesPath(const String & path);

void setExtraPaths(const Strings & main_data_paths,
void setPathPool(const Strings & main_data_paths,
const Strings & latest_data_paths,
const Strings & kvstore_paths,
bool enable_raft_compatibility_mode,
PathCapacityMetricsPtr global_capacity_,
FileProviderPtr file_provider);

Expand Down Expand Up @@ -392,7 +394,6 @@ class Context

void createTMTContext(const std::vector<std::string> & pd_addrs,
const std::unordered_set<std::string> & ignore_databases,
const std::string & kvstore_path,
::TiDB::StorageEngine engine,
bool disable_bg_tasks,
pingcap::ClusterConfig cluster_config = {});
Expand Down
14 changes: 7 additions & 7 deletions dbms/src/Interpreters/IDAsPathUpgrader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -371,7 +371,7 @@ void IDAsPathUpgrader::DatabaseDiskInfo::renameToTmpDirectories(const Context &
true);

// Rename database data dir for multi-paths
auto root_pool = ctx.getExtraPaths();
auto root_pool = ctx.getPathPool();
for (const auto & extra_path : root_pool.listPaths())
renamePath( //
doGetExtraDirectory(extra_path, /*escape*/ true, /*tmp*/ false), //
Expand Down Expand Up @@ -481,7 +481,7 @@ static void dropAbsentDatabase(
const String old_data_dir_not_escaped = db_info.getDataDirectory(root_path, false);
tryRemoveDirectory(old_data_dir_not_escaped, log, true);

const auto & data_extra_paths = context.getExtraPaths();
const auto & data_extra_paths = context.getPathPool();
for (const auto & extra_root_path : data_extra_paths.listPaths())
{
tryRemoveDirectory(db_info.getExtraDirectory(extra_root_path), log, true);
Expand Down Expand Up @@ -562,7 +562,7 @@ void IDAsPathUpgrader::fixNotEscapedDirectories()
if (Poco::File dir(escaped_db_data_dir); !dir.exists())
dir.createDirectory();

const auto & data_extra_paths = global_context.getExtraPaths();
const auto & data_extra_paths = global_context.getPathPool();
for (const auto & extra_root_path : data_extra_paths.listPaths())
{
auto escaped_extra_dir = db_info.getExtraDirectory(extra_root_path, /*escape=*/true);
Expand Down Expand Up @@ -611,7 +611,7 @@ void IDAsPathUpgrader::fixNotEscapedDirectories()
}

// Fix extra path.
const auto & data_extra_paths = global_context.getExtraPaths();
const auto & data_extra_paths = global_context.getPathPool();
for (const auto & extra_root_path : data_extra_paths.listPaths())
{
// It was created by old PathPool, both database name and table name are not escaped.
Expand All @@ -631,7 +631,7 @@ void IDAsPathUpgrader::fixNotEscapedDirectories()
// clean not escaped database dir created by old PathPool
const String not_escaped_data_dir = db_info.getDataDirectory(root_path, /*escape*/ false);
tryRemoveDirectory(not_escaped_data_dir, log, true);
const auto & data_extra_paths = global_context.getExtraPaths();
const auto & data_extra_paths = global_context.getPathPool();
for (const auto & extra_root_path : data_extra_paths.listPaths())
{
auto not_escaped_extra_data_dir = db_info.getExtraDirectory(extra_root_path, /*escape*/ false);
Expand Down Expand Up @@ -735,7 +735,7 @@ void IDAsPathUpgrader::renameDatabase(const String & db_name, const DatabaseDisk
// Remove old data dir
const String old_data_dir = db_info.getDataDirectory(root_path);
tryRemoveDirectory(old_data_dir, log);
const auto & data_extra_paths = global_context.getExtraPaths();
const auto & data_extra_paths = global_context.getPathPool();
for (const auto & extra_root_path : data_extra_paths.listPaths())
{
tryRemoveDirectory(db_info.getExtraDirectory(extra_root_path), log);
Expand All @@ -762,7 +762,7 @@ void IDAsPathUpgrader::renameTable(

{
// Rename data path for multi disk
auto data_extra_paths = global_context.getExtraPaths();
auto data_extra_paths = global_context.getPathPool();
for (const auto & extra_root_path : data_extra_paths.listPaths())
{
auto old_tbl_extra_data_path = table.getExtraDirectory(extra_root_path, db_info);
Expand Down
87 changes: 54 additions & 33 deletions dbms/src/Server/Server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,19 @@ static std::string getCanonicalPath(std::string path)
return path;
}

static Strings parseMultiplePaths(String s, const String & logging_prefix, Poco::Logger * log)
{
Poco::trimInPlace(s);
Strings res;
Poco::StringTokenizer string_tokens(s, ",");
for (auto it = string_tokens.begin(); it != string_tokens.end(); it++)
{
res.emplace_back(getCanonicalPath(std::string(*it)));
LOG_INFO(log, logging_prefix << " data candidate path: " << std::string(*it));
}
return res;
}

void Server::uninitialize()
{
logger().information("shutting down");
Expand Down Expand Up @@ -150,25 +163,26 @@ const std::string TiFlashProxyConfig::config_prefix = "flash.proxy";

struct TiFlashRaftConfig
{
const std::string learner_key = "engine";
const std::string learner_value = "tiflash";
std::vector<std::string> pd_addrs;
const std::string engine_key = "engine";
const std::string engine_value = "tiflash";
Strings pd_addrs;
std::unordered_set<std::string> ignore_databases{"system"};
std::string kvstore_path;
Strings kvstore_path;
// Actually it is "flash.service_addr"
std::string flash_server_addr;
bool enable_compatibility_mode = true;

static const TiDB::StorageEngine DEFAULT_ENGINE = TiDB::StorageEngine::DT;
bool disable_bg_flush = false;
TiDB::StorageEngine engine = DEFAULT_ENGINE;

public:
TiFlashRaftConfig(const std::string & path, Poco::Util::LayeredConfiguration & config, Poco::Logger * log);
TiFlashRaftConfig(const Strings & latest_data_paths, Poco::Util::LayeredConfiguration & config, Poco::Logger * log);
};

/// Load raft related configs.
TiFlashRaftConfig::TiFlashRaftConfig(const std::string & path, Poco::Util::LayeredConfiguration & config, Poco::Logger * log)
: ignore_databases{"system"}, kvstore_path{path + "kvstore/"}
TiFlashRaftConfig::TiFlashRaftConfig(const Strings & latest_data_paths, Poco::Util::LayeredConfiguration & config, Poco::Logger * log)
: ignore_databases{"system"}, kvstore_path{}
{
flash_server_addr = config.getString("flash.service_addr", "0.0.0.0:3930");

Expand Down Expand Up @@ -210,7 +224,17 @@ TiFlashRaftConfig::TiFlashRaftConfig(const std::string & path, Poco::Util::Layer

if (config.has("raft.kvstore_path"))
{
kvstore_path = config.getString("raft.kvstore_path");
kvstore_path = parseMultiplePaths(config.getString("raft.kvstore_path"), "Raft", log);
if (kvstore_path.empty())
{
LOG_INFO(log, "The configuration \"raft.kvstore_path\" is empty, generate the paths from \"latest_data_path\"");
for (const auto & s : latest_data_paths)
{
String path = Poco::Path{s + "/kvstore"}.toString();
LOG_INFO(log, "Raft data candidate path: " << path);
kvstore_path.emplace_back(std::move(path));
}
}
}

if (config.has("raft.storage_engine"))
Expand Down Expand Up @@ -245,14 +269,20 @@ TiFlashRaftConfig::TiFlashRaftConfig(const std::string & path, Poco::Util::Layer
ErrorCodes::INVALID_CONFIG_PARAMETER);
disable_bg_flush = true;
}

// just for test
if (config.has("raft.enable_compatibility_mode"))
{
enable_compatibility_mode = config.getBool("raft.enable_compatibility_mode");
}
}
}

pingcap::ClusterConfig getClusterConfig(const TiFlashSecurityConfig & security_config, const TiFlashRaftConfig & raft_config)
{
pingcap::ClusterConfig config;
config.learner_key = raft_config.learner_key;
config.learner_value = raft_config.learner_value;
config.learner_key = raft_config.engine_key;
config.learner_value = raft_config.engine_value;
config.ca_path = security_config.ca_path;
config.cert_path = security_config.cert_path;
config.key_path = security_config.key_path;
Expand Down Expand Up @@ -383,6 +413,11 @@ int Server::main(const std::vector<std::string> & /*args*/)
}

/// ===== Paths related configuration initialized start ===== ///
/// Note that theses global variables should be initialized by the following order:
// 1. capacity
// 2. path pool
// 3. TMTContext


// TODO: remove this configuration left by ClickHouse
std::vector<String> all_fast_path;
Expand Down Expand Up @@ -428,19 +463,7 @@ int Server::main(const std::vector<std::string> & /*args*/)
Strings main_data_paths, latest_data_paths;
if (config().has("main_data_path"))
{
auto parse_multiple_paths = [&log](String s, const String & logging_prefix) -> Strings {
Poco::trimInPlace(s);
Strings res;
Poco::StringTokenizer string_tokens(s, ",");
for (auto it = string_tokens.begin(); it != string_tokens.end(); it++)
{
res.emplace_back(getCanonicalPath(std::string(*it)));
LOG_INFO(log, logging_prefix << " data candidate path: " << std::string(*it));
}
return res;
};

main_data_paths = parse_multiple_paths(config().getString("main_data_path"), "Main");
main_data_paths = parseMultiplePaths(config().getString("main_data_path"), "Main", log);
if (main_data_paths.empty())
{
String error_msg
Expand All @@ -450,7 +473,7 @@ int Server::main(const std::vector<std::string> & /*args*/)
}

if (config().has("latest_data_path"))
latest_data_paths = parse_multiple_paths(config().getString("latest_data_path"), "Latest");
latest_data_paths = parseMultiplePaths(config().getString("latest_data_path"), "Latest", log);
if (latest_data_paths.empty())
{
LOG_INFO(log, "The configuration \"latest_data_paths\" is empty, use the same paths of \"main_data_path\"");
Expand Down Expand Up @@ -516,10 +539,12 @@ int Server::main(const std::vector<std::string> & /*args*/)
}

global_context->initializePathCapacityMetric(all_normal_path, capacity);
global_context->setExtraPaths(main_data_paths, latest_data_paths, global_context->getPathCapacity(), global_context->getFileProvider());

const std::string path = all_normal_path[0];
TiFlashRaftConfig raft_config(path, config(), log);
TiFlashRaftConfig raft_config(latest_data_paths, config(), log);
global_context->setPathPool(main_data_paths, latest_data_paths, raft_config.kvstore_path, //
raft_config.enable_compatibility_mode, //
global_context->getPathCapacity(), global_context->getFileProvider());

// Use pd address to define which default_database we use by defauly.
// For mock test, we use "default". For deployed with pd/tidb/tikv use "system", which is always exist in TiFlash.
std::string default_database = config().getString("default_database", raft_config.pd_addrs.empty() ? "default" : "system");
Expand Down Expand Up @@ -761,12 +786,8 @@ int Server::main(const std::vector<std::string> & /*args*/)
LOG_DEBUG(log, "Default storage engine: " << static_cast<Int64>(raft_config.engine));
/// create TMTContext
auto cluster_config = getClusterConfig(security_config, raft_config);
global_context->createTMTContext(raft_config.pd_addrs,
raft_config.ignore_databases,
raft_config.kvstore_path,
raft_config.engine,
raft_config.disable_bg_flush,
cluster_config);
global_context->createTMTContext(
raft_config.pd_addrs, raft_config.ignore_databases, raft_config.engine, raft_config.disable_bg_flush, cluster_config);
global_context->getTMTContext().reloadConfig(config());
}

Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ DeltaMergeStore::DeltaMergeStore(Context & db_context,
const ColumnDefine & handle,
const Settings & settings_)
: global_context(db_context.getGlobalContext()),
path_pool(global_context.getExtraPaths().withTable(db_name_, table_name_, data_path_contains_database_name)),
path_pool(global_context.getPathPool().withTable(db_name_, table_name_, data_path_contains_database_name)),
settings(settings_),
storage_pool(db_name_ + "." + table_name_, path_pool, global_context, db_context.getSettingsRef()),
db_name(db_name_),
Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Storages/DeltaMerge/tests/gtest_dm_file.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ class DMFile_Test : public ::testing::Test

auto & ctx = DMTestEnv::getContext();
auto settings = DB::Settings();
path_pool = std::make_unique<StoragePathPool>(ctx.getExtraPaths().withTable("test", "t1", false));
path_pool = std::make_unique<StoragePathPool>(ctx.getPathPool().withTable("test", "t1", false));
storage_pool = std::make_unique<StoragePool>("test.t1", *path_pool, ctx, settings);
dm_file = DMFile::create(0, parent_path);
db_context = std::make_unique<Context>(DMTestEnv::getContext(settings));
Expand All @@ -65,7 +65,7 @@ class DMFile_Test : public ::testing::Test
*table_columns_ = *cols;

auto & ctx = DMTestEnv::getContext();
*path_pool = ctx.getExtraPaths().withTable("test", "t1", false);
*path_pool = ctx.getPathPool().withTable("test", "t1", false);
dm_context = std::make_unique<DMContext>( //
*db_context,
*path_pool,
Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Storages/DeltaMerge/tests/gtest_dm_segment.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ class Segment_test : public ::testing::Test
void SetUp() override
{
db_context = std::make_unique<Context>(DMTestEnv::getContext(DB::Settings()));
storage_path_pool = std::make_unique<StoragePathPool>(db_context->getExtraPaths().withTable("test", "t1", false));
storage_path_pool = std::make_unique<StoragePathPool>(db_context->getPathPool().withTable("test", "t1", false));
storage_path_pool->drop(true);
table_columns_ = std::make_shared<ColumnDefines>();
dropDataOnDisk();
Expand All @@ -47,7 +47,7 @@ class Segment_test : public ::testing::Test
SegmentPtr reload(const ColumnDefinesPtr & pre_define_columns = {}, DB::Settings && db_settings = DB::Settings())
{
*db_context = DMTestEnv::getContext(db_settings);
storage_path_pool = std::make_unique<StoragePathPool>(db_context->getExtraPaths().withTable("test", "t1", false));
storage_path_pool = std::make_unique<StoragePathPool>(db_context->getPathPool().withTable("test", "t1", false));
storage_pool = std::make_unique<StoragePool>("test.t1", *storage_path_pool, *db_context, db_context->getSettingsRef());
storage_pool->restore();
ColumnDefinesPtr cols = (!pre_define_columns) ? DMTestEnv::getDefaultColumns() : pre_define_columns;
Expand Down
6 changes: 5 additions & 1 deletion dbms/src/Storages/Page/PageFile.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ bool PageFile::MetaMergingReader::hasNext() const
return (status == Status::Uninitialized) || (status == Status::Opened && meta_file_offset < meta_size);
}

void PageFile::MetaMergingReader::moveNext()
void PageFile::MetaMergingReader::moveNext(PageFile::Version * v)
{
curr_edit.clear();
curr_write_batch_sequence = 0;
Expand Down Expand Up @@ -297,6 +297,10 @@ void PageFile::MetaMergingReader::moveNext()
throw Exception("PageFile binary version not match, unknown version: " + DB::toString(binary_version), ErrorCodes::LOGICAL_ERROR);
}

// return the binary_version if `v` is not null
if (unlikely(v != nullptr))
*v = binary_version;

// check the checksum of WriteBatch
const auto wb_bytes_without_checksum = wb_bytes - sizeof(PageMetaFormat::Checksum);
const auto wb_checksum = PageUtil::get<PageMetaFormat::Checksum, false>(wb_start_pos + wb_bytes_without_checksum);
Expand Down
Loading

0 comments on commit b064851

Please sign in to comment.