Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support multi-disks for PageStorage #1156

Merged
merged 10 commits into from
Nov 4, 2020
3 changes: 2 additions & 1 deletion dbms/src/Common/FileChecker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,8 @@ void FileChecker::load(Map & map) const
JSON json(out.str());

JSON files = json["yandex"];
for (const auto & name_value : files)
// Note: loop variable 'name_value' should always be a copy because the return type of iterator of type 'JSON' is not a reference.
for (const auto name_value : files)
map[unescapeForFileName(name_value.getName())] = name_value.getValue()["size"].toUInt();
}

Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Common/UnifiedLogPatternFormatter.h
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ void UnifiedLogPatternFormatter::writeEscapedString(DB::WriteBuffer & wb, const

bool UnifiedLogPatternFormatter::needJsonEncode(const std::string & src)
{
for (const uint8_t & byte : src)
for (const uint8_t byte : src)
{
if (byte <= 0x20 || byte == 0x22 || byte == 0x3D || byte == 0x5B || byte == 0x5D)
return true;
Expand Down
4 changes: 2 additions & 2 deletions dbms/src/DataTypes/DataTypeTuple.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ static inline const IColumn & extractElementColumn(const IColumn & column, size_
void DataTypeTuple::serializeBinary(const Field & field, WriteBuffer & ostr) const
{
const auto & tuple = get<const Tuple &>(field).t;
for (const auto & idx_elem : ext::enumerate(elems))
for (const auto idx_elem : ext::enumerate(elems))
idx_elem.second->serializeBinary(tuple[idx_elem.first], ostr);
}

Expand All @@ -114,7 +114,7 @@ void DataTypeTuple::deserializeBinary(Field & field, ReadBuffer & istr) const

void DataTypeTuple::serializeBinary(const IColumn & column, size_t row_num, WriteBuffer & ostr) const
{
for (const auto & idx_elem : ext::enumerate(elems))
for (const auto idx_elem : ext::enumerate(elems))
idx_elem.second->serializeBinary(extractElementColumn(column, idx_elem.first), row_num, ostr);
}

Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Dictionaries/CacheDictionary.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -854,7 +854,7 @@ void CacheDictionary::update(

const auto now = std::chrono::system_clock::now();
/// Check which ids have not been found and require setting null_value
for (const auto id_found_pair : remaining_ids)
for (const auto & id_found_pair : remaining_ids)
{
if (id_found_pair.second)
{
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Dictionaries/ComplexKeyCacheDictionary.h
Original file line number Diff line number Diff line change
Expand Up @@ -609,7 +609,7 @@ class ComplexKeyCacheDictionary final : public IDictionaryBase
const auto now = std::chrono::system_clock::now();

/// Check which ids have not been found and require setting null_value
for (const auto key_found_pair : remaining_keys)
for (const auto & key_found_pair : remaining_keys)
{
if (key_found_pair.second)
{
Expand Down
6 changes: 4 additions & 2 deletions dbms/src/Encryption/FileProvider.h
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
#pragma once

#include <Core/Types.h>
#include <Encryption/BlockAccessCipherStream.h>
#include <Encryption/KeyManager.h>
#include <Encryption/RandomAccessFile.h>
#include <Encryption/WritableFile.h>

#include <string>

namespace DB
Expand Down Expand Up @@ -38,8 +40,8 @@ class FileProvider

bool isEncryptionEnabled() const;

void renameFile(const String & src_file_path_, const EncryptionPath & src_encryption_path_,
const String & dst_file_path_, const EncryptionPath & dst_encryption_path_) const;
void renameFile(const String & src_file_path_, const EncryptionPath & src_encryption_path_, const String & dst_file_path_,
const EncryptionPath & dst_encryption_path_) const;

~FileProvider() = default;

Expand Down
1 change: 0 additions & 1 deletion dbms/src/Flash/Coprocessor/DAGDriver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
#include <Interpreters/Context.h>
#include <Interpreters/ProcessList.h>
#include <Interpreters/executeQuery.h>
#include <Storages/Transaction/KVStore.h>
#include <Storages/Transaction/LockException.h>
#include <Storages/Transaction/RegionException.h>
#include <Storages/Transaction/TMTContext.h>
Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Functions/FunctionsConversion.h
Original file line number Diff line number Diff line change
Expand Up @@ -2009,7 +2009,7 @@ class FunctionCast final : public IFunctionBase
element_wrappers.reserve(from_element_types.size());

/// Create conversion wrapper for each element in tuple
for (const auto & idx_type : ext::enumerate(from_type->getElements()))
for (const auto idx_type : ext::enumerate(from_type->getElements()))
element_wrappers.push_back(prepare(idx_type.second, to_element_types[idx_type.first]));

return [element_wrappers, from_element_types, to_element_types]
Expand All @@ -2035,7 +2035,7 @@ class FunctionCast final : public IFunctionBase
element_block.insert({ nullptr, std::make_shared<DataTypeTuple>(to_element_types), "" });

/// invoke conversion for each element
for (const auto & idx_element_wrapper : ext::enumerate(element_wrappers))
for (const auto idx_element_wrapper : ext::enumerate(element_wrappers))
idx_element_wrapper.second(element_block, { idx_element_wrapper.first },
tuple_size + idx_element_wrapper.first);

Expand Down
11 changes: 7 additions & 4 deletions dbms/src/Interpreters/Context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -555,10 +555,13 @@ void Context::setUserFilesPath(const String & path)
shared->user_files_path = path;
}

void Context::setExtraPaths(const std::vector<String> & extra_paths_, PathCapacityMetricsPtr global_capacity_, FileProviderPtr file_provider)
void Context::setExtraPaths(const Strings & main_data_paths,
const Strings & latest_data_paths,
PathCapacityMetricsPtr global_capacity_,
FileProviderPtr file_provider_)
{
auto lock = getLock();
shared->extra_paths = PathPool(extra_paths_, global_capacity_, file_provider);
shared->extra_paths = PathPool(main_data_paths, latest_data_paths, global_capacity_, file_provider_);
}

void Context::setConfig(const ConfigurationPtr & config)
Expand Down Expand Up @@ -1457,12 +1460,12 @@ void Context::createTMTContext(const std::vector<std::string> & pd_addrs,
shared->tmt_context = std::make_shared<TMTContext>(*this, pd_addrs, ignore_databases, kvstore_path, engine, disable_bg_flush, cluster_config);
}

void Context::initializePathCapacityMetric(const std::vector<std::string> & all_path, std::vector<size_t> && all_capacity)
void Context::initializePathCapacityMetric(const std::vector<std::string> & all_path, size_t capacity_quota)
{
auto lock = getLock();
if (shared->path_capacity_ptr)
throw Exception("PathCapacityMetrics instance has already existed", ErrorCodes::LOGICAL_ERROR);
shared->path_capacity_ptr = std::make_shared<PathCapacityMetrics>(all_path, all_capacity);
shared->path_capacity_ptr = std::make_shared<PathCapacityMetrics>(all_path, capacity_quota);
}

PathCapacityMetricsPtr Context::getPathCapacity() const
Expand Down
8 changes: 6 additions & 2 deletions dbms/src/Interpreters/Context.h
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,11 @@ class Context
void setTemporaryPath(const String & path);
void setFlagsPath(const String & path);
void setUserFilesPath(const String & path);
void setExtraPaths(const std::vector<String> & extra_paths, PathCapacityMetricsPtr global_capacity, FileProviderPtr file_provider);

void setExtraPaths(const Strings & main_data_paths,
const Strings & latest_data_paths,
PathCapacityMetricsPtr global_capacity_,
FileProviderPtr file_provider);

using ConfigurationPtr = Poco::AutoPtr<Poco::Util::AbstractConfiguration>;

Expand Down Expand Up @@ -396,7 +400,7 @@ class Context
void initializeSchemaSyncService();
SchemaSyncServicePtr & getSchemaSyncService();

void initializePathCapacityMetric(const std::vector<std::string> & all_path, std::vector<size_t> && all_capacity);
void initializePathCapacityMetric(const std::vector<std::string> & all_path, size_t capacity_quota);
PathCapacityMetricsPtr getPathCapacity() const;

void initializePartPathSelector(std::vector<std::string> && all_path, std::vector<std::string> && all_fast_path);
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Interpreters/IDAsPathUpgrader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ void tryRemoveDirectory(const String & directory, Poco::Logger * log, bool recur

// This function will tidy up path and compare if them are the same one.
// For example "/tmp/data/a.sql" is equal to "/tmp//data//a.sql"
inline bool isSamePath(const String & lhs, const String & rhs) { return Poco::Path{lhs}.toString() == Poco::Path{rhs}.toString(); }
inline bool isSamePath(const String & lhs, const String & rhs) { return Poco::Path{lhs}.absolute().toString() == Poco::Path{rhs}.absolute().toString(); }

} // namespace

Expand Down
122 changes: 99 additions & 23 deletions dbms/src/Server/Server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -382,8 +382,9 @@ int Server::main(const std::vector<std::string> & /*args*/)
global_context->initializeFileProvider(key_manager, false);
}

bool has_zookeeper = config().has("zookeeper");
/// ===== Paths related configuration initialized start ===== ///

// TODO: remove this configuration left by ClickHouse
std::vector<String> all_fast_path;
if (config().has("fast_path"))
{
Expand All @@ -399,51 +400,123 @@ int Server::main(const std::vector<std::string> & /*args*/)
}
}
}
std::vector<String> all_normal_path;
std::vector<size_t> all_capacity;

size_t capacity = 0; // "0" by default, means no quota, use the whole disk capacity.
if (config().has("capacity"))
{
// TODO: support human readable format for capacity, mark_cache_size, minmax_index_cache_size
// eg. 100GiB, 10MiB
String capacities = config().getString("capacity");
Poco::trimInPlace(capacities);
Poco::StringTokenizer string_tokens(capacities, ",");
size_t num_token = 0;
for (auto it = string_tokens.begin(); it != string_tokens.end(); ++it)
{
const std::string & s = *it;
size_t capacity = parse<size_t>(s.data(), s.size());
all_capacity.emplace_back(capacity);
if (num_token == 0)
{
const std::string & s = *it;
capacity = parse<size_t>(s.data(), s.size());
}
num_token++;
}
if (num_token != 1)
LOG_WARNING(log, "Only the first number in configuration \"capacity\" take effect");
LOG_INFO(log, "The capacity limit is: " + formatReadableSizeWithBinarySuffix(capacity));
}

Strings all_normal_path;
Strings main_data_paths, latest_data_paths;
if (config().has("main_data_path"))
{
auto parse_multiple_paths = [&log](String s, const String & logging_prefix) -> Strings {
Poco::trimInPlace(s);
Strings res;
Poco::StringTokenizer string_tokens(s, ",");
for (auto it = string_tokens.begin(); it != string_tokens.end(); it++)
{
res.emplace_back(getCanonicalPath(std::string(*it)));
LOG_INFO(log, logging_prefix << " data candidate path: " << std::string(*it));
}
return res;
};

main_data_paths = parse_multiple_paths(config().getString("main_data_path"), "Main");
if (main_data_paths.empty())
{
String error_msg
= "The configuration \"main_data_path\" is empty! [main_data_path=" + config().getString("main_data_path") + "]";
LOG_ERROR(log, error_msg);
throw Exception(error_msg, ErrorCodes::INVALID_CONFIG_PARAMETER);
}

if (config().has("latest_data_path"))
latest_data_paths = parse_multiple_paths(config().getString("latest_data_path"), "Latest");
if (latest_data_paths.empty())
{
LOG_INFO(log, "The configuration \"latest_data_paths\" is empty, use the same paths of \"main_data_path\"");
latest_data_paths = main_data_paths;
for (const auto & s : latest_data_paths)
LOG_INFO(log, "Latest data candidate path: " << s);
}

{
std::set<String> path_set;
for (const auto & s : main_data_paths)
path_set.insert(s);
for (const auto & s : latest_data_paths)
path_set.insert(s);
all_normal_path.emplace_back(main_data_paths[0]);
path_set.erase(main_data_paths[0]);
for (const auto & s : path_set)
all_normal_path.emplace_back(s);
}

if (config().has("path"))
LOG_WARNING(log, "The configuration \"path\" is ignored when \"main_data_path\" is defined.");
}
else if (config().has("path"))
{
LOG_WARNING(log, "The configuration \"path\" is deprecated, use \"main_data_path\" instead.");

String paths = config().getString("path");
Poco::trimInPlace(paths);
if (paths.empty())
throw Exception("path configuration parameter is empty");
throw Exception(
"The configuration \"path\" is empty! [path=" + config().getString("paths") + "]", ErrorCodes::INVALID_CONFIG_PARAMETER);
Poco::StringTokenizer string_tokens(paths, ",");
size_t idx = 0;
for (auto it = string_tokens.begin(); it != string_tokens.end(); it++)
{
all_normal_path.emplace_back(getCanonicalPath(std::string(*it)));
if (all_capacity.size() < all_normal_path.size())
all_capacity.emplace_back(0);
LOG_DEBUG(log, "Data part candidate path: " << std::string(*it) << ", capacity: " << all_capacity[idx++]);
LOG_DEBUG(log, "Data part candidate path: " << std::string(*it));
}
}
global_context->initializePathCapacityMetric(all_normal_path, std::move(all_capacity));

bool path_realtime_mode = config().getBool("path_realtime_mode", false);
std::vector<String> extra_paths(all_normal_path.begin(), all_normal_path.end());
for (auto & p : extra_paths)
p += "/data";

if (path_realtime_mode && all_normal_path.size() > 1)
global_context->setExtraPaths(std::vector<String>(extra_paths.begin() + 1, extra_paths.end()),
global_context->getPathCapacity(),
global_context->getFileProvider());
// If you set `path_realtime_mode` to `true` and multiple directories are deployed in the path, the latest data is stored in the first directory and older data is stored in the rest directories.
bool path_realtime_mode = config().getBool("path_realtime_mode", false);
for (size_t i = 0; i < all_normal_path.size(); ++i)
{
const String p = Poco::Path{all_normal_path[i]}.toString();
// Only use the first path for storing latest data
if (i == 0)
latest_data_paths.emplace_back(p);
if (path_realtime_mode)
{
if (i != 0)
main_data_paths.emplace_back(p);
}
else
{
main_data_paths.emplace_back(p);
}
}
}
else
global_context->setExtraPaths(extra_paths, global_context->getPathCapacity(), global_context->getFileProvider());
{
LOG_ERROR(log, "The configuration \"main_data_path\" is not defined.");
throw Exception("The configuration \"main_data_path\" is not defined.", ErrorCodes::INVALID_CONFIG_PARAMETER);
}

global_context->initializePathCapacityMetric(all_normal_path, capacity);
global_context->setExtraPaths(main_data_paths, latest_data_paths, global_context->getPathCapacity(), global_context->getFileProvider());

const std::string path = all_normal_path[0];
TiFlashRaftConfig raft_config(path, config(), log);
Expand All @@ -453,6 +526,8 @@ int Server::main(const std::vector<std::string> & /*args*/)
global_context->setPath(path);
global_context->initializePartPathSelector(std::move(all_normal_path), std::move(all_fast_path));

/// ===== Paths related configuration initialized end ===== ///

security_config = TiFlashSecurityConfig(config(), log);

/// Create directories for 'path' and for default database, if not exist.
Expand Down Expand Up @@ -652,6 +727,7 @@ int Server::main(const std::vector<std::string> & /*args*/)
/// After attaching system databases we can initialize system log.
global_context->initializeSystemLogs();
/// After the system database is created, attach virtual system tables (in addition to query_log and part_log)
bool has_zookeeper = config().has("zookeeper");
attachSystemTablesServer(*global_context->getDatabase("system"), has_zookeeper);

{
Expand Down
15 changes: 6 additions & 9 deletions dbms/src/Storages/DeltaMerge/DMContext.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
namespace DB
{

class PathPool;
class StoragePathPool;

class TiFlashMetrics;
using TiFlashMetricsPtr = std::shared_ptr<TiFlashMetrics>;
Expand All @@ -27,10 +27,9 @@ struct DMContext : private boost::noncopyable
const Context & db_context;
const TiFlashMetricsPtr metrics;

const String & store_path;
PathPool & extra_paths;
StoragePool & storage_pool;
const UInt64 hash_salt;
StoragePathPool & path_pool;
StoragePool & storage_pool;
const UInt64 hash_salt;

// The schema snapshot
// We need a consistent snapshot of columns, copy ColumnsDefines
Expand Down Expand Up @@ -66,8 +65,7 @@ struct DMContext : private boost::noncopyable
const bool enable_skippable_place;

DMContext(const Context & db_context_,
const String & store_path_,
PathPool & extra_paths_,
StoragePathPool & path_pool_,
StoragePool & storage_pool_,
const UInt64 hash_salt_,
const ColumnDefinesPtr & store_columns_,
Expand All @@ -78,8 +76,7 @@ struct DMContext : private boost::noncopyable
const DB::Settings & settings)
: db_context(db_context_),
metrics(db_context.getTiFlashMetrics()),
store_path(store_path_),
extra_paths(extra_paths_),
path_pool(path_pool_),
storage_pool(storage_pool_),
hash_salt(hash_salt_),
store_columns(store_columns_),
Expand Down
Loading