Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support multi-disks for RegionPersister #1199

Merged
merged 9 commits into from
Nov 11, 2020
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion dbms/src/Common/FailPoint.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@ std::unordered_map<String, std::shared_ptr<FailPointChannel>> FailPointHelper::f
M(region_exception_after_read_from_storage_some_error) \
M(region_exception_after_read_from_storage_all_error) \
M(exception_before_dmfile_remove_encryption) \
M(exception_before_dmfile_remove_from_disk)
M(exception_before_dmfile_remove_from_disk) \
M(force_enable_region_persister_compatibility_mode) \
JaySon-Huang marked this conversation as resolved.
Show resolved Hide resolved
M(force_disable_region_persister_compatibility_mode)

#define APPLY_FOR_FAILPOINTS_WITH_CHANNEL(M) M(pause_after_learner_read)

Expand Down
17 changes: 10 additions & 7 deletions dbms/src/Interpreters/Context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ struct ContextShared
String tmp_path; /// The path to the temporary files that occur when processing the request.
String flags_path; /// Path to the directory with some control flags for server maintenance.
String user_files_path; /// Path to the directory with user provided files, usable by 'file' table function.
PathPool extra_paths; /// The extra data directories. Some Storage Engine like DeltaMerge will store the main data in them if specified.
PathPool path_pool; /// The data directories. RegionPersister and some Storage Engine like DeltaMerge will use this to manage data placement on disks.
ConfigurationPtr config; /// Global configuration settings.

Databases databases; /// List of databases and tables in them.
Expand Down Expand Up @@ -515,10 +515,10 @@ String Context::getUserFilesPath() const
return shared->user_files_path;
}

PathPool & Context::getExtraPaths() const
PathPool & Context::getPathPool() const
{
auto lock = getLock();
return shared->extra_paths;
return shared->path_pool;
}

void Context::setPath(const String & path)
Expand Down Expand Up @@ -555,13 +555,17 @@ void Context::setUserFilesPath(const String & path)
shared->user_files_path = path;
}

void Context::setExtraPaths(const Strings & main_data_paths,
void Context::setPathPool( //
const Strings & main_data_paths,
const Strings & latest_data_paths,
const Strings & kvstore_paths,
bool enable_raft_storage_compatibility_mode,
PathCapacityMetricsPtr global_capacity_,
FileProviderPtr file_provider_)
{
auto lock = getLock();
shared->extra_paths = PathPool(main_data_paths, latest_data_paths, global_capacity_, file_provider_);
shared->path_pool = PathPool(
main_data_paths, latest_data_paths, kvstore_paths, global_capacity_, file_provider_, enable_raft_storage_compatibility_mode);
}

void Context::setConfig(const ConfigurationPtr & config)
Expand Down Expand Up @@ -1449,15 +1453,14 @@ DDLWorker & Context::getDDLWorker() const

void Context::createTMTContext(const std::vector<std::string> & pd_addrs,
const std::unordered_set<std::string> & ignore_databases,
const std::string & kvstore_path,
::TiDB::StorageEngine engine,
bool disable_bg_flush,
pingcap::ClusterConfig cluster_config)
{
auto lock = getLock();
if (shared->tmt_context)
throw Exception("TMTContext has already existed", ErrorCodes::LOGICAL_ERROR);
shared->tmt_context = std::make_shared<TMTContext>(*this, pd_addrs, ignore_databases, kvstore_path, engine, disable_bg_flush, cluster_config);
shared->tmt_context = std::make_shared<TMTContext>(*this, pd_addrs, ignore_databases, engine, disable_bg_flush, cluster_config);
}

void Context::initializePathCapacityMetric(const std::vector<std::string> & all_path, size_t capacity_quota)
Expand Down
7 changes: 4 additions & 3 deletions dbms/src/Interpreters/Context.h
Original file line number Diff line number Diff line change
Expand Up @@ -167,15 +167,17 @@ class Context
String getTemporaryPath() const;
String getFlagsPath() const;
String getUserFilesPath() const;
PathPool & getExtraPaths() const;
PathPool & getPathPool() const;

void setPath(const String & path);
void setTemporaryPath(const String & path);
void setFlagsPath(const String & path);
void setUserFilesPath(const String & path);

void setExtraPaths(const Strings & main_data_paths,
void setPathPool(const Strings & main_data_paths,
const Strings & latest_data_paths,
const Strings & kvstore_paths,
bool enable_raft_storage_compatibility_mode,
PathCapacityMetricsPtr global_capacity_,
FileProviderPtr file_provider);

Expand Down Expand Up @@ -392,7 +394,6 @@ class Context

void createTMTContext(const std::vector<std::string> & pd_addrs,
const std::unordered_set<std::string> & ignore_databases,
const std::string & kvstore_path,
::TiDB::StorageEngine engine,
bool disable_bg_tasks,
pingcap::ClusterConfig cluster_config = {});
Expand Down
14 changes: 7 additions & 7 deletions dbms/src/Interpreters/IDAsPathUpgrader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -371,7 +371,7 @@ void IDAsPathUpgrader::DatabaseDiskInfo::renameToTmpDirectories(const Context &
true);

// Rename database data dir for multi-paths
auto root_pool = ctx.getExtraPaths();
auto root_pool = ctx.getPathPool();
for (const auto & extra_path : root_pool.listPaths())
renamePath( //
doGetExtraDirectory(extra_path, /*escape*/ true, /*tmp*/ false), //
Expand Down Expand Up @@ -481,7 +481,7 @@ static void dropAbsentDatabase(
const String old_data_dir_not_escaped = db_info.getDataDirectory(root_path, false);
tryRemoveDirectory(old_data_dir_not_escaped, log, true);

const auto & data_extra_paths = context.getExtraPaths();
const auto & data_extra_paths = context.getPathPool();
for (const auto & extra_root_path : data_extra_paths.listPaths())
{
tryRemoveDirectory(db_info.getExtraDirectory(extra_root_path), log, true);
Expand Down Expand Up @@ -562,7 +562,7 @@ void IDAsPathUpgrader::fixNotEscapedDirectories()
if (Poco::File dir(escaped_db_data_dir); !dir.exists())
dir.createDirectory();

const auto & data_extra_paths = global_context.getExtraPaths();
const auto & data_extra_paths = global_context.getPathPool();
for (const auto & extra_root_path : data_extra_paths.listPaths())
{
auto escaped_extra_dir = db_info.getExtraDirectory(extra_root_path, /*escape=*/true);
Expand Down Expand Up @@ -611,7 +611,7 @@ void IDAsPathUpgrader::fixNotEscapedDirectories()
}

// Fix extra path.
const auto & data_extra_paths = global_context.getExtraPaths();
const auto & data_extra_paths = global_context.getPathPool();
for (const auto & extra_root_path : data_extra_paths.listPaths())
{
// It was created by old PathPool, both database name and table name are not escaped.
Expand All @@ -631,7 +631,7 @@ void IDAsPathUpgrader::fixNotEscapedDirectories()
// clean not escaped database dir created by old PathPool
const String not_escaped_data_dir = db_info.getDataDirectory(root_path, /*escape*/ false);
tryRemoveDirectory(not_escaped_data_dir, log, true);
const auto & data_extra_paths = global_context.getExtraPaths();
const auto & data_extra_paths = global_context.getPathPool();
for (const auto & extra_root_path : data_extra_paths.listPaths())
{
auto not_escaped_extra_data_dir = db_info.getExtraDirectory(extra_root_path, /*escape*/ false);
Expand Down Expand Up @@ -735,7 +735,7 @@ void IDAsPathUpgrader::renameDatabase(const String & db_name, const DatabaseDisk
// Remove old data dir
const String old_data_dir = db_info.getDataDirectory(root_path);
tryRemoveDirectory(old_data_dir, log);
const auto & data_extra_paths = global_context.getExtraPaths();
const auto & data_extra_paths = global_context.getPathPool();
for (const auto & extra_root_path : data_extra_paths.listPaths())
{
tryRemoveDirectory(db_info.getExtraDirectory(extra_root_path), log);
Expand All @@ -762,7 +762,7 @@ void IDAsPathUpgrader::renameTable(

{
// Rename data path for multi disk
auto data_extra_paths = global_context.getExtraPaths();
auto data_extra_paths = global_context.getPathPool();
for (const auto & extra_root_path : data_extra_paths.listPaths())
{
auto old_tbl_extra_data_path = table.getExtraDirectory(extra_root_path, db_info);
Expand Down
71 changes: 47 additions & 24 deletions dbms/src/Server/Server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,19 @@ static std::string getCanonicalPath(std::string path)
return path;
}

static Strings parseMultiplePaths(String s, const String & logging_prefix, Poco::Logger * log)
{
Poco::trimInPlace(s);
Strings res;
Poco::StringTokenizer string_tokens(s, ",");
for (auto it = string_tokens.begin(); it != string_tokens.end(); it++)
{
res.emplace_back(getCanonicalPath(std::string(*it)));
LOG_INFO(log, logging_prefix << " data candidate path: " << std::string(*it));
}
return res;
}

void Server::uninitialize()
{
logger().information("shutting down");
Expand Down Expand Up @@ -152,23 +165,24 @@ struct TiFlashRaftConfig
{
const std::string engine_key = "engine";
const std::string engine_value = "tiflash";
std::vector<std::string> pd_addrs;
Strings pd_addrs;
std::unordered_set<std::string> ignore_databases{"system"};
std::string kvstore_path;
Strings kvstore_path;
// Actually it is "flash.service_addr"
std::string flash_server_addr;
bool enable_storage_compatibility_mode = true;

static const TiDB::StorageEngine DEFAULT_ENGINE = TiDB::StorageEngine::DT;
bool disable_bg_flush = false;
TiDB::StorageEngine engine = DEFAULT_ENGINE;

public:
TiFlashRaftConfig(const std::string & path, Poco::Util::LayeredConfiguration & config, Poco::Logger * log);
TiFlashRaftConfig(const Strings & latest_data_paths, Poco::Util::LayeredConfiguration & config, Poco::Logger * log);
};

/// Load raft related configs.
TiFlashRaftConfig::TiFlashRaftConfig(const std::string & path, Poco::Util::LayeredConfiguration & config, Poco::Logger * log)
: ignore_databases{"system"}, kvstore_path{path + "kvstore/"}
TiFlashRaftConfig::TiFlashRaftConfig(const Strings & latest_data_paths, Poco::Util::LayeredConfiguration & config, Poco::Logger * log)
: ignore_databases{"system"}, kvstore_path{}
{
flash_server_addr = config.getString("flash.service_addr", "0.0.0.0:3930");

Expand Down Expand Up @@ -210,7 +224,17 @@ TiFlashRaftConfig::TiFlashRaftConfig(const std::string & path, Poco::Util::Layer

if (config.has("raft.kvstore_path"))
{
kvstore_path = config.getString("raft.kvstore_path");
kvstore_path = parseMultiplePaths(config.getString("raft.kvstore_path"), "Raft", log);
if (kvstore_path.empty())
{
LOG_INFO(log, "The configuration \"raft.kvstore_path\" is empty, generate the paths from \"latest_data_path\"");
for (const auto & s : latest_data_paths)
{
String path = Poco::Path{s + "/kvstore"}.toString();
LOG_INFO(log, "Raft data candidate path: " << path);
kvstore_path.emplace_back(std::move(path));
}
}
}

if (config.has("raft.storage_engine"))
Expand Down Expand Up @@ -245,6 +269,11 @@ TiFlashRaftConfig::TiFlashRaftConfig(const std::string & path, Poco::Util::Layer
ErrorCodes::INVALID_CONFIG_PARAMETER);
disable_bg_flush = true;
}

if (config.has("raft.enable_storage_compatibility_mode"))
{
enable_storage_compatibility_mode = config.getBool("raft.enable_storage_compatibility_mode");
}
}
}

Expand Down Expand Up @@ -383,6 +412,11 @@ int Server::main(const std::vector<std::string> & /*args*/)
}

/// ===== Paths related configuration initialized start ===== ///
/// Note that theses global variables should be initialized by the following order:
// 1. capacity
// 2. path pool
// 3. TMTContext


// TODO: remove this configuration left by ClickHouse
std::vector<String> all_fast_path;
Expand Down Expand Up @@ -428,19 +462,7 @@ int Server::main(const std::vector<std::string> & /*args*/)
Strings main_data_paths, latest_data_paths;
if (config().has("main_data_path"))
{
auto parse_multiple_paths = [&log](String s, const String & logging_prefix) -> Strings {
Poco::trimInPlace(s);
Strings res;
Poco::StringTokenizer string_tokens(s, ",");
for (auto it = string_tokens.begin(); it != string_tokens.end(); it++)
{
res.emplace_back(getCanonicalPath(std::string(*it)));
LOG_INFO(log, logging_prefix << " data candidate path: " << std::string(*it));
}
return res;
};

main_data_paths = parse_multiple_paths(config().getString("main_data_path"), "Main");
main_data_paths = parseMultiplePaths(config().getString("main_data_path"), "Main", log);
if (main_data_paths.empty())
{
String error_msg
Expand All @@ -450,7 +472,7 @@ int Server::main(const std::vector<std::string> & /*args*/)
}

if (config().has("latest_data_path"))
latest_data_paths = parse_multiple_paths(config().getString("latest_data_path"), "Latest");
latest_data_paths = parseMultiplePaths(config().getString("latest_data_path"), "Latest", log);
if (latest_data_paths.empty())
{
LOG_INFO(log, "The configuration \"latest_data_paths\" is empty, use the same paths of \"main_data_path\"");
Expand Down Expand Up @@ -516,10 +538,12 @@ int Server::main(const std::vector<std::string> & /*args*/)
}

global_context->initializePathCapacityMetric(all_normal_path, capacity);
global_context->setExtraPaths(main_data_paths, latest_data_paths, global_context->getPathCapacity(), global_context->getFileProvider());

const std::string path = all_normal_path[0];
TiFlashRaftConfig raft_config(path, config(), log);
TiFlashRaftConfig raft_config(latest_data_paths, config(), log);
global_context->setPathPool(main_data_paths, latest_data_paths, raft_config.kvstore_path, //
raft_config.enable_storage_compatibility_mode, //
global_context->getPathCapacity(), global_context->getFileProvider());

// Use pd address to define which default_database we use by defauly.
// For mock test, we use "default". For deployed with pd/tidb/tikv use "system", which is always exist in TiFlash.
std::string default_database = config().getString("default_database", raft_config.pd_addrs.empty() ? "default" : "system");
Expand Down Expand Up @@ -763,7 +787,6 @@ int Server::main(const std::vector<std::string> & /*args*/)
auto cluster_config = getClusterConfig(security_config, raft_config);
global_context->createTMTContext(raft_config.pd_addrs,
raft_config.ignore_databases,
raft_config.kvstore_path,
raft_config.engine,
raft_config.disable_bg_flush,
cluster_config);
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ DeltaMergeStore::DeltaMergeStore(Context & db_context,
size_t rowkey_column_size_,
const Settings & settings_)
: global_context(db_context.getGlobalContext()),
path_pool(global_context.getExtraPaths().withTable(db_name_, table_name_, data_path_contains_database_name)),
path_pool(global_context.getPathPool().withTable(db_name_, table_name_, data_path_contains_database_name)),
settings(settings_),
storage_pool(db_name_ + "." + table_name_, path_pool, global_context, db_context.getSettingsRef()),
db_name(db_name_),
Expand Down
6 changes: 3 additions & 3 deletions dbms/src/Storages/DeltaMerge/tests/gtest_dm_file.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ class DMFile_Test : public ::testing::Test

auto & ctx = DMTestEnv::getContext();
auto settings = DB::Settings();
path_pool = std::make_unique<StoragePathPool>(ctx.getExtraPaths().withTable("test", "t1", false));
path_pool = std::make_unique<StoragePathPool>(ctx.getPathPool().withTable("test", "t1", false));
storage_pool = std::make_unique<StoragePool>("test.t1", *path_pool, ctx, settings);
dm_file = DMFile::create(0, parent_path);
db_context = std::make_unique<Context>(DMTestEnv::getContext(settings));
Expand All @@ -65,7 +65,7 @@ class DMFile_Test : public ::testing::Test
*table_columns_ = *cols;

auto & ctx = DMTestEnv::getContext();
*path_pool = ctx.getExtraPaths().withTable("test", "t1", false);
*path_pool = ctx.getPathPool().withTable("test", "t1", false);
dm_context = std::make_unique<DMContext>( //
*db_context,
*path_pool,
Expand Down Expand Up @@ -917,7 +917,7 @@ class DMFile_Clustered_Index_Test : public ::testing::Test

auto settings = DB::Settings();
auto & ctx = DMTestEnv::getContext();
path_pool = std::make_unique<StoragePathPool>(ctx.getExtraPaths().withTable("test", "t", false));
path_pool = std::make_unique<StoragePathPool>(ctx.getPathPool().withTable("test", "t", false));
storage_pool = std::make_unique<StoragePool>("test.t1", *path_pool, ctx, settings);
dm_file = DMFile::create(0, path);
db_context = std::make_unique<Context>(DMTestEnv::getContext(settings));
Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Storages/DeltaMerge/tests/gtest_dm_segment.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ class Segment_test : public ::testing::Test
void SetUp() override
{
db_context = std::make_unique<Context>(DMTestEnv::getContext(DB::Settings()));
storage_path_pool = std::make_unique<StoragePathPool>(db_context->getExtraPaths().withTable("test", "t1", false));
storage_path_pool = std::make_unique<StoragePathPool>(db_context->getPathPool().withTable("test", "t1", false));
storage_path_pool->drop(true);
table_columns_ = std::make_shared<ColumnDefines>();
dropDataOnDisk();
Expand All @@ -47,7 +47,7 @@ class Segment_test : public ::testing::Test
SegmentPtr reload(const ColumnDefinesPtr & pre_define_columns = {}, DB::Settings && db_settings = DB::Settings())
{
*db_context = DMTestEnv::getContext(db_settings);
storage_path_pool = std::make_unique<StoragePathPool>(db_context->getExtraPaths().withTable("test", "t1", false));
storage_path_pool = std::make_unique<StoragePathPool>(db_context->getPathPool().withTable("test", "t1", false));
storage_pool = std::make_unique<StoragePool>("test.t1", *storage_path_pool, *db_context, db_context->getSettingsRef());
storage_pool->restore();
ColumnDefinesPtr cols = (!pre_define_columns) ? DMTestEnv::getDefaultColumns() : pre_define_columns;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ class Segment_Common_Handle_test : public ::testing::Test
{
*db_context = DMTestEnv::getContext(db_settings);
auto & ctx = DMTestEnv::getContext();
path_pool = std::make_unique<StoragePathPool>(ctx.getExtraPaths().withTable("test", "t", false));
path_pool = std::make_unique<StoragePathPool>(ctx.getPathPool().withTable("test", "t", false));
storage_pool = std::make_unique<StoragePool>("test.t1", *path_pool, ctx, db_context->getSettingsRef());
storage_pool->restore();
ColumnDefinesPtr cols = (!pre_define_columns) ? DMTestEnv::getDefaultColumns(is_common_handle) : pre_define_columns;
Expand Down
Loading