Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: rocksdb options not changed even if update in Pegasus config file #1108

Merged
merged 23 commits into from
Aug 25, 2022
108 changes: 103 additions & 5 deletions src/server/pegasus_server_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1560,9 +1560,10 @@ dsn::error_code pegasus_server_impl::start(int argc, char **argv)

ddebug_replica("start to open rocksDB's rdb({})", rdb_path);

// Here we create a `tmp_data_cf_opts` because we don't want to modify `_data_cf_opts`, which
// Here we create a `_table_data_cf_opts` because we don't want to modify `_data_cf_opts`, which
// will be used elsewhere.
rocksdb::ColumnFamilyOptions tmp_data_cf_opts = _data_cf_opts;
_table_data_cf_opts = _data_cf_opts;
_table_data_cf_opts_recalculated = false;
bool has_incompatible_db_options = false;
if (db_exist) {
// When DB exists, meta CF and data CF must be present.
Expand Down Expand Up @@ -1609,7 +1610,7 @@ dsn::error_code pegasus_server_impl::start(int argc, char **argv)
// We don't use `loaded_data_cf_opts` directly because pointer-typed options will
// only be initialized with default values when calling 'LoadLatestOptions', see
// 'rocksdb/utilities/options_util.h'.
reset_usage_scenario_options(loaded_data_cf_opts, &tmp_data_cf_opts);
reset_usage_scenario_options(loaded_data_cf_opts, &_table_data_cf_opts);
_db_opts.allow_ingest_behind = parse_allow_ingest_behind(envs);
}
} else {
Expand All @@ -1620,7 +1621,7 @@ dsn::error_code pegasus_server_impl::start(int argc, char **argv)
}

std::vector<rocksdb::ColumnFamilyDescriptor> column_families(
{{DATA_COLUMN_FAMILY_NAME, tmp_data_cf_opts}, {META_COLUMN_FAMILY_NAME, _meta_cf_opts}});
{{DATA_COLUMN_FAMILY_NAME, _table_data_cf_opts}, {META_COLUMN_FAMILY_NAME, _meta_cf_opts}});
auto s = rocksdb::CheckOptionsCompatibility(rdb_path,
rocksdb::Env::Default(),
_db_opts,
Expand Down Expand Up @@ -2634,6 +2635,10 @@ void pegasus_server_impl::update_usage_scenario(const std::map<std::string, std:
old_usage_scenario,
new_usage_scenario);
}
} else {
// When an old db is opened and the rocksDB related configs in server config.ini has been
// changed, the options related to usage scenario need to be recalculated with new values.
recalculate_data_cf_options(_table_data_cf_opts);
}
}

Expand Down Expand Up @@ -2993,7 +2998,8 @@ bool pegasus_server_impl::set_usage_scenario(const std::string &usage_scenario)
void pegasus_server_impl::reset_usage_scenario_options(
const rocksdb::ColumnFamilyOptions &base_opts, rocksdb::ColumnFamilyOptions *target_opts)
{
// reset usage scenario related options, refer to options set in 'set_usage_scenario' function.
// reset usage scenario related options, refer to options set in 'set_usage_scenario'
// function.
target_opts->level0_file_num_compaction_trigger = base_opts.level0_file_num_compaction_trigger;
target_opts->level0_slowdown_writes_trigger = base_opts.level0_slowdown_writes_trigger;
target_opts->level0_stop_writes_trigger = base_opts.level0_stop_writes_trigger;
Expand All @@ -3007,6 +3013,98 @@ void pegasus_server_impl::reset_usage_scenario_options(
target_opts->max_write_buffer_number = base_opts.max_write_buffer_number;
}

void pegasus_server_impl::recalculate_data_cf_options(
const rocksdb::ColumnFamilyOptions &cur_data_cf_opts)
{
#define UPDATE_NUMBER_OPTION_IF_NEEDED(option, value) \
do { \
auto _v = (value); \
if (_v != cur_data_cf_opts.option) { \
new_options[#option] = std::to_string(_v); \
} \
} while (0)

#define UPDATE_BOOL_OPTION_IF_NEEDED(option, value) \
do { \
auto _v = (value); \
if (_v != cur_data_cf_opts.option) { \
if (_v) { \
new_options[#option] = "true"; \
} else { \
new_options[#option] = "false"; \
} \
} \
} while (0)

#define UPDATE_OPTION_IF_NOT_NEARBY(option, value) \
do { \
auto _v = (value); \
if (!check_value_if_nearby(_v, cur_data_cf_opts.option)) { \
new_options[#option] = std::to_string(get_random_nearby(_v)); \
} \
} while (0)

#define UPDATE_OPTION_IF_NEEDED(option) UPDATE_NUMBER_OPTION_IF_NEEDED(option, _data_cf_opts.option)

if (_table_data_cf_opts_recalculated) {
return;
empiredan marked this conversation as resolved.
Show resolved Hide resolved
}
std::unordered_map<std::string, std::string> new_options;
if (ROCKSDB_ENV_USAGE_SCENARIO_NORMAL == _usage_scenario ||
ROCKSDB_ENV_USAGE_SCENARIO_PREFER_WRITE == _usage_scenario) {
if (ROCKSDB_ENV_USAGE_SCENARIO_NORMAL == _usage_scenario) {
UPDATE_OPTION_IF_NOT_NEARBY(write_buffer_size, _data_cf_opts.write_buffer_size);
UPDATE_OPTION_IF_NEEDED(level0_file_num_compaction_trigger);
} else {
uint64_t buffer_size = dsn::rand::next_u64(_data_cf_opts.write_buffer_size,
_data_cf_opts.write_buffer_size * 2);
if (cur_data_cf_opts.write_buffer_size < _data_cf_opts.write_buffer_size ||
cur_data_cf_opts.write_buffer_size > _data_cf_opts.write_buffer_size * 2) {
new_options["write_buffer_size"] = std::to_string(buffer_size);
uint64_t max_size = get_random_nearby(_data_cf_opts.max_bytes_for_level_base);
new_options["level0_file_num_compaction_trigger"] =
std::to_string(std::max<uint64_t>(4UL, max_size / buffer_size));
} else if (!check_value_if_nearby(_data_cf_opts.max_bytes_for_level_base,
cur_data_cf_opts.max_bytes_for_level_base)) {
uint64_t max_size = get_random_nearby(_data_cf_opts.max_bytes_for_level_base);
new_options["level0_file_num_compaction_trigger"] =
std::to_string(std::max<uint64_t>(4UL, max_size / buffer_size));
}
}
UPDATE_OPTION_IF_NEEDED(level0_slowdown_writes_trigger);
UPDATE_OPTION_IF_NEEDED(level0_stop_writes_trigger);
UPDATE_OPTION_IF_NEEDED(soft_pending_compaction_bytes_limit);
UPDATE_OPTION_IF_NEEDED(hard_pending_compaction_bytes_limit);
UPDATE_BOOL_OPTION_IF_NEEDED(disable_auto_compactions, false);
UPDATE_OPTION_IF_NEEDED(max_compaction_bytes);
UPDATE_OPTION_IF_NEEDED(max_write_buffer_number);
} else {
// ROCKSDB_ENV_USAGE_SCENARIO_BULK_LOAD
UPDATE_NUMBER_OPTION_IF_NEEDED(level0_file_num_compaction_trigger, 1000000000);
UPDATE_NUMBER_OPTION_IF_NEEDED(level0_slowdown_writes_trigger, 1000000000);
UPDATE_NUMBER_OPTION_IF_NEEDED(level0_stop_writes_trigger, 1000000000);
UPDATE_NUMBER_OPTION_IF_NEEDED(soft_pending_compaction_bytes_limit, 0);
UPDATE_NUMBER_OPTION_IF_NEEDED(hard_pending_compaction_bytes_limit, 0);
UPDATE_BOOL_OPTION_IF_NEEDED(disable_auto_compactions, true);
UPDATE_NUMBER_OPTION_IF_NEEDED(max_compaction_bytes, static_cast<uint64_t>(1) << 60);
UPDATE_OPTION_IF_NOT_NEARBY(write_buffer_size, _data_cf_opts.write_buffer_size * 4);
UPDATE_NUMBER_OPTION_IF_NEEDED(max_write_buffer_number,
std::max(_data_cf_opts.max_write_buffer_number, 6));
}
if (new_options.size() > 0) {
if (set_options(new_options)) {
ddebug_replica(
"{}: recalculate the value of the options related to usage scenario \"{}\"",
replica_name(),
_usage_scenario);
}
}
_table_data_cf_opts_recalculated = true;
#undef UPDATE_OPTION_IF_NEEDED
#undef UPDATE_BOOL_OPTION_IF_NEEDED
#undef UPDATE_NUMBER_OPTION_IF_NEEDED
}

bool pegasus_server_impl::set_options(
const std::unordered_map<std::string, std::string> &new_options)
{
Expand Down
19 changes: 19 additions & 0 deletions src/server/pegasus_server_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,9 @@ class pegasus_server_impl : public pegasus_read_service
// return true if successfully changed
bool set_usage_scenario(const std::string &usage_scenario);

// recalculate option value if necessary
void recalculate_data_cf_options(const rocksdb::ColumnFamilyOptions &cur_data_cf_opts);

void reset_usage_scenario_options(const rocksdb::ColumnFamilyOptions &base_opts,
rocksdb::ColumnFamilyOptions *target_opts);

Expand All @@ -325,6 +328,15 @@ class pegasus_server_impl : public pegasus_read_service
return dsn::rand::next_u64(base_value - gap, base_value + gap);
}

// return true if value in range of [0.75, 1.25] * base_value
bool check_value_if_nearby(uint64_t base_value, uint64_t check_value)
{
uint64_t gap = base_value / 4;
uint64_t actual_gap =
(base_value < check_value) ? check_value - base_value : base_value - check_value;
acelyc111 marked this conversation as resolved.
Show resolved Hide resolved
return actual_gap <= gap;
}

// return true if expired
bool check_if_record_expired(uint32_t epoch_now, rocksdb::Slice raw_value)
{
Expand Down Expand Up @@ -413,11 +425,18 @@ class pegasus_server_impl : public pegasus_read_service
std::shared_ptr<KeyWithTTLCompactionFilterFactory> _key_ttl_compaction_filter_factory;
std::shared_ptr<rocksdb::Statistics> _statistics;
rocksdb::DBOptions _db_opts;
// The value of option in data_cf according to conf template file config.ini
rocksdb::ColumnFamilyOptions _data_cf_opts;
// Dynamically calculate the value of current data_cf option according to the conf module file
// and usage scenario
rocksdb::ColumnFamilyOptions _table_data_cf_opts;
acelyc111 marked this conversation as resolved.
Show resolved Hide resolved
rocksdb::ColumnFamilyOptions _meta_cf_opts;
rocksdb::ReadOptions _data_cf_rd_opts;
std::string _usage_scenario;
std::string _user_specified_compaction;
// Whether it is necessary to update the current data_cf, it is required when opening the db at
// the first time, but not later
bool _table_data_cf_opts_recalculated;
acelyc111 marked this conversation as resolved.
Show resolved Hide resolved

rocksdb::DB *_db;
rocksdb::ColumnFamilyHandle *_data_cf;
Expand Down