Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: rocksdb options not changed even if update in Pegasus config file #1108

Merged
merged 23 commits into from
Aug 25, 2022
103 changes: 98 additions & 5 deletions src/server/pegasus_server_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1560,9 +1560,10 @@ dsn::error_code pegasus_server_impl::start(int argc, char **argv)

ddebug_replica("start to open rocksDB's rdb({})", rdb_path);

// Here we create a `tmp_data_cf_opts` because we don't want to modify `_data_cf_opts`, which
// Here we create a `_table_data_cf_opts` because we don't want to modify `_data_cf_opts`, which
// will be used elsewhere.
rocksdb::ColumnFamilyOptions tmp_data_cf_opts = _data_cf_opts;
_table_data_cf_opts = _data_cf_opts;
_is_need_update_data_cf_opts = true;
bool has_incompatible_db_options = false;
if (db_exist) {
// When DB exists, meta CF and data CF must be present.
Expand Down Expand Up @@ -1609,7 +1610,7 @@ dsn::error_code pegasus_server_impl::start(int argc, char **argv)
// We don't use `loaded_data_cf_opts` directly because pointer-typed options will
// only be initialized with default values when calling 'LoadLatestOptions', see
// 'rocksdb/utilities/options_util.h'.
reset_usage_scenario_options(loaded_data_cf_opts, &tmp_data_cf_opts);
reset_usage_scenario_options(loaded_data_cf_opts, &_table_data_cf_opts);
_db_opts.allow_ingest_behind = parse_allow_ingest_behind(envs);
}
} else {
Expand All @@ -1620,7 +1621,7 @@ dsn::error_code pegasus_server_impl::start(int argc, char **argv)
}

std::vector<rocksdb::ColumnFamilyDescriptor> column_families(
{{DATA_COLUMN_FAMILY_NAME, tmp_data_cf_opts}, {META_COLUMN_FAMILY_NAME, _meta_cf_opts}});
{{DATA_COLUMN_FAMILY_NAME, _table_data_cf_opts}, {META_COLUMN_FAMILY_NAME, _meta_cf_opts}});
auto s = rocksdb::CheckOptionsCompatibility(rdb_path,
rocksdb::Env::Default(),
_db_opts,
Expand Down Expand Up @@ -2634,6 +2635,10 @@ void pegasus_server_impl::update_usage_scenario(const std::map<std::string, std:
old_usage_scenario,
new_usage_scenario);
}
} else {
// When an old db is opened and the conf is changed, the options related to usage scenario
acelyc111 marked this conversation as resolved.
Show resolved Hide resolved
// need to be recalculated with new values.
recalculate_data_cf_options(_table_data_cf_opts);
}
}

Expand Down Expand Up @@ -2993,7 +2998,8 @@ bool pegasus_server_impl::set_usage_scenario(const std::string &usage_scenario)
void pegasus_server_impl::reset_usage_scenario_options(
const rocksdb::ColumnFamilyOptions &base_opts, rocksdb::ColumnFamilyOptions *target_opts)
{
// reset usage scenario related options, refer to options set in 'set_usage_scenario' function.
// reset usage scenario related options, refer to options set in 'set_usage_scenario'
// function.
target_opts->level0_file_num_compaction_trigger = base_opts.level0_file_num_compaction_trigger;
target_opts->level0_slowdown_writes_trigger = base_opts.level0_slowdown_writes_trigger;
target_opts->level0_stop_writes_trigger = base_opts.level0_stop_writes_trigger;
Expand All @@ -3007,6 +3013,93 @@ void pegasus_server_impl::reset_usage_scenario_options(
target_opts->max_write_buffer_number = base_opts.max_write_buffer_number;
}

void pegasus_server_impl::recalculate_data_cf_options(
const rocksdb::ColumnFamilyOptions &cur_data_cf_opts)
{
#define UPDATE_NUMBER_OPTION_IF_NEEDED(option, value) \
if ((value) != cur_data_cf_opts.option) { \
new_options[#option] = std::to_string((value)); \
}
acelyc111 marked this conversation as resolved.
Show resolved Hide resolved
#define UPDATE_BOOL_OPTION_IF_NEEDED(option, value) \
if ((value) != cur_data_cf_opts.option) { \
if ((value)) \
new_options[#option] = "true"; \
else \
new_options[#option] = "false"; \
}

if (!_is_need_update_data_cf_opts)
acelyc111 marked this conversation as resolved.
Show resolved Hide resolved
return;
empiredan marked this conversation as resolved.
Show resolved Hide resolved
std::unordered_map<std::string, std::string> new_options;
if (ROCKSDB_ENV_USAGE_SCENARIO_NORMAL == _usage_scenario ||
ROCKSDB_ENV_USAGE_SCENARIO_PREFER_WRITE == _usage_scenario) {
if (ROCKSDB_ENV_USAGE_SCENARIO_NORMAL == _usage_scenario) {
if (!check_value_if_nearby(_data_cf_opts.write_buffer_size,
cur_data_cf_opts.write_buffer_size)) {
new_options["write_buffer_size"] =
std::to_string(get_random_nearby(_data_cf_opts.write_buffer_size));
}
acelyc111 marked this conversation as resolved.
Show resolved Hide resolved
UPDATE_NUMBER_OPTION_IF_NEEDED(level0_file_num_compaction_trigger,
_data_cf_opts.level0_file_num_compaction_trigger);
} else {
uint64_t buffer_size = dsn::rand::next_u64(_data_cf_opts.write_buffer_size,
_data_cf_opts.write_buffer_size * 2);
if (!(cur_data_cf_opts.write_buffer_size >= _data_cf_opts.write_buffer_size &&
cur_data_cf_opts.write_buffer_size <= _data_cf_opts.write_buffer_size * 2)) {
empiredan marked this conversation as resolved.
Show resolved Hide resolved
new_options["write_buffer_size"] = std::to_string(buffer_size);
uint64_t max_size = get_random_nearby(_data_cf_opts.max_bytes_for_level_base);
new_options["level0_file_num_compaction_trigger"] =
std::to_string(std::max<uint64_t>(4UL, max_size / buffer_size));
} else if (!check_value_if_nearby(_data_cf_opts.max_bytes_for_level_base,
cur_data_cf_opts.max_bytes_for_level_base)) {
uint64_t max_size = get_random_nearby(_data_cf_opts.max_bytes_for_level_base);
new_options["level0_file_num_compaction_trigger"] =
std::to_string(std::max<uint64_t>(4UL, max_size / buffer_size));
}
}
UPDATE_NUMBER_OPTION_IF_NEEDED(level0_slowdown_writes_trigger,
_data_cf_opts.level0_slowdown_writes_trigger);
UPDATE_NUMBER_OPTION_IF_NEEDED(level0_stop_writes_trigger,
_data_cf_opts.level0_stop_writes_trigger);
UPDATE_NUMBER_OPTION_IF_NEEDED(soft_pending_compaction_bytes_limit,
_data_cf_opts.soft_pending_compaction_bytes_limit);
UPDATE_NUMBER_OPTION_IF_NEEDED(hard_pending_compaction_bytes_limit,
_data_cf_opts.hard_pending_compaction_bytes_limit);
acelyc111 marked this conversation as resolved.
Show resolved Hide resolved
UPDATE_BOOL_OPTION_IF_NEEDED(disable_auto_compactions, false);
UPDATE_NUMBER_OPTION_IF_NEEDED(max_compaction_bytes, _data_cf_opts.max_compaction_bytes);
UPDATE_NUMBER_OPTION_IF_NEEDED(max_write_buffer_number,
_data_cf_opts.max_write_buffer_number);
} else {
// ROCKSDB_ENV_USAGE_SCENARIO_BULK_LOAD
UPDATE_NUMBER_OPTION_IF_NEEDED(level0_file_num_compaction_trigger, 1000000000);
UPDATE_NUMBER_OPTION_IF_NEEDED(level0_slowdown_writes_trigger, 1000000000);
UPDATE_NUMBER_OPTION_IF_NEEDED(level0_stop_writes_trigger, 1000000000);
UPDATE_NUMBER_OPTION_IF_NEEDED(soft_pending_compaction_bytes_limit, 0);
UPDATE_NUMBER_OPTION_IF_NEEDED(hard_pending_compaction_bytes_limit, 0);
UPDATE_BOOL_OPTION_IF_NEEDED(disable_auto_compactions, true);
UPDATE_NUMBER_OPTION_IF_NEEDED(max_compaction_bytes, static_cast<uint64_t>(1) << 60);
if (!check_value_if_nearby(_data_cf_opts.write_buffer_size * 4,
cur_data_cf_opts.write_buffer_size)) {
new_options["write_buffer_size"] =
std::to_string(get_random_nearby(_data_cf_opts.write_buffer_size * 4));
}
if (cur_data_cf_opts.max_write_buffer_number !=
std::max(_data_cf_opts.max_write_buffer_number, 6)) {
new_options["max_write_buffer_number"] =
std::to_string(std::max(_data_cf_opts.max_write_buffer_number, 6));
}
acelyc111 marked this conversation as resolved.
Show resolved Hide resolved
}
if (new_options.size() > 0) {
if (set_options(new_options)) {
ddebug_replica(
"{}: recalculate the value of the options related to usage scenario \"{}\"",
replica_name(),
_usage_scenario);
}
}
_is_need_update_data_cf_opts = false;
}

bool pegasus_server_impl::set_options(
const std::unordered_map<std::string, std::string> &new_options)
{
Expand Down
16 changes: 16 additions & 0 deletions src/server/pegasus_server_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,9 @@ class pegasus_server_impl : public pegasus_read_service
// return true if successfully changed
bool set_usage_scenario(const std::string &usage_scenario);

// recalculate option value if necessary
void recalculate_data_cf_options(const rocksdb::ColumnFamilyOptions &cur_data_cf_opts);

void reset_usage_scenario_options(const rocksdb::ColumnFamilyOptions &base_opts,
rocksdb::ColumnFamilyOptions *target_opts);

Expand All @@ -325,6 +328,15 @@ class pegasus_server_impl : public pegasus_read_service
return dsn::rand::next_u64(base_value - gap, base_value + gap);
}

// return true if value in range of [0.75, 1.25] * base_value
bool check_value_if_nearby(uint64_t base_value, uint64_t check_value)
{
uint64_t gap = base_value / 4;
uint64_t actual_gap =
(base_value < check_value) ? check_value - base_value : base_value - check_value;
acelyc111 marked this conversation as resolved.
Show resolved Hide resolved
return actual_gap <= gap;
}

// return true if expired
bool check_if_record_expired(uint32_t epoch_now, rocksdb::Slice raw_value)
{
Expand Down Expand Up @@ -414,10 +426,14 @@ class pegasus_server_impl : public pegasus_read_service
std::shared_ptr<rocksdb::Statistics> _statistics;
rocksdb::DBOptions _db_opts;
rocksdb::ColumnFamilyOptions _data_cf_opts;
rocksdb::ColumnFamilyOptions _table_data_cf_opts;
acelyc111 marked this conversation as resolved.
Show resolved Hide resolved
rocksdb::ColumnFamilyOptions _meta_cf_opts;
rocksdb::ReadOptions _data_cf_rd_opts;
std::string _usage_scenario;
std::string _user_specified_compaction;
// Whether it is necessary to update the current data_cf, it is required when opening the db,
// but not later
WHBANG marked this conversation as resolved.
Show resolved Hide resolved
bool _is_need_update_data_cf_opts;
acelyc111 marked this conversation as resolved.
Show resolved Hide resolved

rocksdb::DB *_db;
rocksdb::ColumnFamilyHandle *_data_cf;
Expand Down