Skip to content

Commit

Permalink
reply hycdong's comment, and adjust my code.
Browse files Browse the repository at this point in the history
  • Loading branch information
ruojieranyishen committed Jun 13, 2023
1 parent 97ea936 commit 98816c4
Show file tree
Hide file tree
Showing 5 changed files with 34 additions and 19 deletions.
1 change: 0 additions & 1 deletion src/common/replica_envs.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,6 @@ class replica_envs
static const std::string UPDATE_MAX_REPLICA_COUNT;
static const std::string ROCKSDB_WRITE_BUFFER_SIZE;
static const std::string ROCKSDB_NUM_LEVELS;
static const std::string VALUE_VERSION;

static const std::set<std::string> ROCKSDB_DYNAMIC_OPTIONS;
static const std::set<std::string> ROCKSDB_STATIC_OPTIONS;
Expand Down
1 change: 0 additions & 1 deletion src/common/replication_common.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -397,7 +397,6 @@ const std::string replica_envs::ROCKSDB_ALLOW_INGEST_BEHIND("rocksdb.allow_inges
const std::string replica_envs::UPDATE_MAX_REPLICA_COUNT("max_replica_count.update");
const std::string replica_envs::ROCKSDB_WRITE_BUFFER_SIZE("rocksdb.write_buffer_size");
const std::string replica_envs::ROCKSDB_NUM_LEVELS("rocksdb.num_levels");
const std::string replica_envs::VALUE_VERSION("value_version");

const std::set<std::string> replica_envs::ROCKSDB_DYNAMIC_OPTIONS = {
replica_envs::ROCKSDB_WRITE_BUFFER_SIZE,
Expand Down
7 changes: 3 additions & 4 deletions src/meta/app_env_validator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,10 @@ bool validate_app_envs(const std::map<std::string, std::string> &envs)
{
if (envs.size() == 0)
return true;
// check rocksdb app envs information
// only check rocksdb app envs currently
std::string hint_message;
bool all_envs_vaild = true;
for (auto &it : envs) {
for (const auto &it : envs) {
if (replica_envs::ROCKSDB_STATIC_OPTIONS.find(it.first) ==
replica_envs::ROCKSDB_STATIC_OPTIONS.end() &&
replica_envs::ROCKSDB_DYNAMIC_OPTIONS.find(it.first) ==
Expand Down Expand Up @@ -194,7 +194,7 @@ bool check_rocksdb_num_levels(const std::string &env_value, std::string &hint_me
int32_t val = 0;

if (!dsn::buf2int32(env_value, val)) {
hint_message = fmt::format("rocksdb.num_levels cannot set this val:", env_value);
hint_message = fmt::format("rocksdb.num_levels cannot set this val: {}", env_value);
return false;
}
if (val < 1 || val > 10) {
Expand Down Expand Up @@ -271,7 +271,6 @@ void app_env_validator::register_all_validators()
{replica_envs::MANUAL_COMPACT_PERIODIC_BOTTOMMOST_LEVEL_COMPACTION, nullptr},
{replica_envs::REPLICA_ACCESS_CONTROLLER_ALLOWED_USERS, nullptr},
{replica_envs::REPLICA_ACCESS_CONTROLLER_RANGER_POLICIES, nullptr},
{replica_envs::VALUE_VERSION, nullptr},
};
}

Expand Down
38 changes: 27 additions & 11 deletions src/server/pegasus_server_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1664,7 +1664,7 @@ dsn::error_code pegasus_server_impl::start(int argc, char **argv)
// We don't use `loaded_data_cf_opts` directly because pointer-typed options will
// only be initialized with default values when calling 'LoadLatestOptions', see
// 'rocksdb/utilities/options_util.h'.
reset_usage_scenario_options(loaded_data_cf_opts, &_table_data_cf_opts);
reset_rocksdb_options(loaded_data_cf_opts, &_table_data_cf_opts);
_db_opts.allow_ingest_behind = parse_allow_ingest_behind(envs);
}
} else {
Expand Down Expand Up @@ -2633,26 +2633,26 @@ void pegasus_server_impl::update_rocksdb_dynamic_options(
return;

std::unordered_map<std::string, std::string> new_options;
for (auto &option : ROCKSDB_DYNAMIC_OPTIONS) {
for (const auto &option : ROCKSDB_DYNAMIC_OPTIONS) {
auto find = envs.find(option);
if (option.compare(ROCKSDB_WRITE_BUFFER_SIZE) == 0 && find != envs.end()) {
new_options["write_buffer_size"] = find->second;
}
}

// doing set option
if (new_options.size() != 0 && !set_options(new_options)) {
LOG_WARNING("Set options fails");
if (new_options.size() > 0 && set_options(new_options)) {
LOG_INFO("Set rocksdb dynamic options success");
}
}

void pegasus_server_impl::update_rocksdb_options_before_create_replica(
void pegasus_server_impl::set_rocksdb_options_before_creating(
const std::map<std::string, std::string> &envs)
{
if (envs.size() == 0)
return;

for (auto &option : pegasus::ROCKSDB_STATIC_OPTIONS) {
for (const auto &option : pegasus::ROCKSDB_STATIC_OPTIONS) {
auto find = envs.find(option);
bool is_set = false;
if (option.compare(ROCKSDB_NUM_LEVELS) == 0 && find != envs.end()) {
Expand All @@ -2664,20 +2664,21 @@ void pegasus_server_impl::update_rocksdb_options_before_create_replica(
}

if (is_set)
LOG_INFO("Reset {} \"{}\" succeed", find->first, find->second);
LOG_INFO("Set {} \"{}\" succeed", find->first, find->second);
}

for (auto &option : pegasus::ROCKSDB_DYNAMIC_OPTIONS) {
for (const auto &option : pegasus::ROCKSDB_DYNAMIC_OPTIONS) {
auto find = envs.find(option);
bool is_set = false;
if (option.compare(ROCKSDB_WRITE_BUFFER_SIZE) == 0 && find != envs.end()) {
uint64_t val = 0;
if (!dsn::buf2uint64(find->second, val))
continue;
is_set = true;
_data_cf_opts.write_buffer_size = static_cast<size_t>(val);
}
if (is_set)
LOG_INFO("Reset {} \"{}\" succeed", find->first, find->second);
LOG_INFO("Set {} \"{}\" succeed", find->first, find->second);
}
}

Expand Down Expand Up @@ -2707,7 +2708,7 @@ void pegasus_server_impl::update_app_envs_before_open_db(
update_validate_partition_hash(envs);
update_user_specified_compaction(envs);
_manual_compact_svc.start_manual_compact_if_needed(envs);
update_rocksdb_options_before_create_replica(envs);
set_rocksdb_options_before_creating(envs);
}

void pegasus_server_impl::query_app_envs(/*out*/ std::map<std::string, std::string> &envs)
Expand Down Expand Up @@ -3096,6 +3097,22 @@ bool pegasus_server_impl::set_usage_scenario(const std::string &usage_scenario)
}
}

void pegasus_server_impl::reset_rocksdb_options(const rocksdb::ColumnFamilyOptions &base_opts,
rocksdb::ColumnFamilyOptions *target_opts)
{
// Reset rocksdb option includes two aspects:
// 1. Set usage_scenario related rocksdb options
// 2. Rocksdb option set in app envs, consists of ROCKSDB_DYNAMIC_OPTIONS and
// ROCKSDB_STATIC_OPTIONS

// aspect 1:
reset_usage_scenario_options(base_opts, target_opts);

// aspect 2:
target_opts->num_levels = base_opts.num_levels;
target_opts->write_buffer_size = base_opts.write_buffer_size;
}

void pegasus_server_impl::reset_usage_scenario_options(
const rocksdb::ColumnFamilyOptions &base_opts, rocksdb::ColumnFamilyOptions *target_opts)
{
Expand All @@ -3112,7 +3129,6 @@ void pegasus_server_impl::reset_usage_scenario_options(
target_opts->max_compaction_bytes = base_opts.max_compaction_bytes;
target_opts->write_buffer_size = base_opts.write_buffer_size;
target_opts->max_write_buffer_number = base_opts.max_write_buffer_number;
target_opts->num_levels = base_opts.num_levels;
}

void pegasus_server_impl::recalculate_data_cf_options(
Expand Down
6 changes: 4 additions & 2 deletions src/server/pegasus_server_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -331,8 +331,7 @@ class pegasus_server_impl : public pegasus_read_service

void update_rocksdb_dynamic_options(const std::map<std::string, std::string> &envs);

void
update_rocksdb_options_before_create_replica(const std::map<std::string, std::string> &envs);
void set_rocksdb_options_before_creating(const std::map<std::string, std::string> &envs);

void update_throttling_controller(const std::map<std::string, std::string> &envs);

Expand Down Expand Up @@ -365,6 +364,9 @@ class pegasus_server_impl : public pegasus_read_service
void reset_usage_scenario_options(const rocksdb::ColumnFamilyOptions &base_opts,
rocksdb::ColumnFamilyOptions *target_opts);

void reset_rocksdb_options(const rocksdb::ColumnFamilyOptions &base_opts,
rocksdb::ColumnFamilyOptions *target_opts);

// return true if successfully set
bool set_options(const std::unordered_map<std::string, std::string> &new_options);

Expand Down

0 comments on commit 98816c4

Please sign in to comment.