Skip to content

Commit

Permalink
Thank you empiredan for the suggestion, which I took into my notes an…
Browse files Browse the repository at this point in the history
…d adjusted my code.
  • Loading branch information
ruojieranyishen committed Jun 19, 2023
1 parent 98816c4 commit 79b77aa
Show file tree
Hide file tree
Showing 4 changed files with 54 additions and 39 deletions.
14 changes: 6 additions & 8 deletions src/meta/app_env_validator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,25 +34,23 @@ namespace dsn {
namespace replication {
bool validate_app_envs(const std::map<std::string, std::string> &envs)
{
if (envs.size() == 0)
return true;
// only check rocksdb app envs currently
std::string hint_message;
bool all_envs_vaild = true;

for (const auto &it : envs) {
if (replica_envs::ROCKSDB_STATIC_OPTIONS.find(it.first) ==
replica_envs::ROCKSDB_STATIC_OPTIONS.end() &&
replica_envs::ROCKSDB_DYNAMIC_OPTIONS.find(it.first) ==
replica_envs::ROCKSDB_DYNAMIC_OPTIONS.end())
replica_envs::ROCKSDB_DYNAMIC_OPTIONS.end()) {
continue;
}
std::string hint_message;
if (!validate_app_env(it.first, it.second, hint_message)) {
LOG_WARNING(
"app env {}={} is invaild, hint_message:{}", it.first, it.second, hint_message);
all_envs_vaild = false;
break;
return false;
}
}
return all_envs_vaild;
return true;
}

bool validate_app_env(const std::string &env_name,
Expand Down
31 changes: 24 additions & 7 deletions src/server/pegasus_server_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2629,33 +2629,47 @@ pegasus_server_impl::get_restore_dir_from_env(const std::map<std::string, std::s
void pegasus_server_impl::update_rocksdb_dynamic_options(
const std::map<std::string, std::string> &envs)
{
if (envs.size() == 0)
if (envs.size() == 0) {
return;
}
auto extract_option = [](const std::string &option) -> std::string {
std::stringstream ss(option);
std::string prefix, rocksdb_opt;
std::getline(ss, prefix, '.');
std::getline(ss, rocksdb_opt);
LOG_INFO("Extract rocksdb dynamic opt ({}) from ({})", rocksdb_opt, option);
return rocksdb_opt;
};

std::unordered_map<std::string, std::string> new_options;
for (const auto &option : ROCKSDB_DYNAMIC_OPTIONS) {
auto find = envs.find(option);
if (option.compare(ROCKSDB_WRITE_BUFFER_SIZE) == 0 && find != envs.end()) {
new_options["write_buffer_size"] = find->second;
if (find == envs.end()) {
continue;
}
new_options[extract_option(option)] = find->second;
}

// doing set option
if (new_options.size() > 0 && set_options(new_options)) {
if (new_options.empty() && set_options(new_options)) {
LOG_INFO("Set rocksdb dynamic options success");
}
}

void pegasus_server_impl::set_rocksdb_options_before_creating(
const std::map<std::string, std::string> &envs)
{
if (envs.size() == 0)
if (envs.size() == 0) {
return;
}

for (const auto &option : pegasus::ROCKSDB_STATIC_OPTIONS) {
auto find = envs.find(option);
if (find == envs.end()) {
continue;
}
bool is_set = false;
if (option.compare(ROCKSDB_NUM_LEVELS) == 0 && find != envs.end()) {
if (option.compare(ROCKSDB_NUM_LEVELS) == 0) {
int32_t val = 0;
if (!dsn::buf2int32(find->second, val))
continue;
Expand All @@ -2669,8 +2683,11 @@ void pegasus_server_impl::set_rocksdb_options_before_creating(

for (const auto &option : pegasus::ROCKSDB_DYNAMIC_OPTIONS) {
auto find = envs.find(option);
if (find == envs.end()) {
continue;
}
bool is_set = false;
if (option.compare(ROCKSDB_WRITE_BUFFER_SIZE) == 0 && find != envs.end()) {
if (option.compare(ROCKSDB_WRITE_BUFFER_SIZE) == 0) {
uint64_t val = 0;
if (!dsn::buf2uint64(find->second, val))
continue;
Expand Down
2 changes: 1 addition & 1 deletion src/server/pegasus_server_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -476,7 +476,7 @@ class pegasus_server_impl : public pegasus_read_service
// Dynamically calculate the value of current data_cf option according to the conf module file
// and usage scenario
rocksdb::ColumnFamilyOptions _table_data_cf_opts;
rocksdb::BlockBasedTableOptions tbl_opts;
rocksdb::BlockBasedTableOptions _tbl_opts;
rocksdb::ColumnFamilyOptions _meta_cf_opts;
rocksdb::ReadOptions _data_cf_rd_opts;
std::string _usage_scenario;
Expand Down
46 changes: 23 additions & 23 deletions src/server/pegasus_server_impl_init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ DSN_DEFINE_bool(pegasus.server,
DSN_DEFINE_bool(pegasus.server,
rocksdb_disable_table_block_cache,
false,
"rocksdb tbl_opts.no_block_cache");
"rocksdb _tbl_opts.no_block_cache");
DSN_DEFINE_bool(pegasus.server,
rocksdb_enable_write_buffer_manager,
false,
Expand Down Expand Up @@ -466,11 +466,11 @@ pegasus_server_impl::pegasus_server_impl(dsn::replication::replica *r)
CHECK(parse_compression_types("none", _meta_cf_opts.compression_per_level),
"parse rocksdb_compression_type failed.");

tbl_opts.read_amp_bytes_per_bit = FLAGS_read_amp_bytes_per_bit;
_tbl_opts.read_amp_bytes_per_bit = FLAGS_read_amp_bytes_per_bit;

if (FLAGS_rocksdb_disable_table_block_cache) {
tbl_opts.no_block_cache = true;
tbl_opts.block_restart_interval = 4;
_tbl_opts.no_block_cache = true;
_tbl_opts.block_restart_interval = 4;
} else {
// If block cache is enabled, all replicas on this server will share the same block cache
// object. It's convenient to control the total memory used by this server, and the LRU
Expand All @@ -483,7 +483,7 @@ pegasus_server_impl::pegasus_server_impl(dsn::replication::replica *r)
});

// every replica has the same block cache
tbl_opts.block_cache = _s_block_cache;
_tbl_opts.block_cache = _s_block_cache;
}

// FLAGS_rocksdb_limiter_max_write_megabytes_per_sec <= 0 means close the rate limit.
Expand Down Expand Up @@ -519,7 +519,7 @@ pegasus_server_impl::pegasus_server_impl(dsn::replication::replica *r)
FLAGS_rocksdb_total_size_across_write_buffer);
_s_write_buffer_manager = std::make_shared<rocksdb::WriteBufferManager>(
static_cast<size_t>(FLAGS_rocksdb_total_size_across_write_buffer),
tbl_opts.block_cache);
_tbl_opts.block_cache);
});
_db_opts.write_buffer_manager = _s_write_buffer_manager;
}
Expand All @@ -540,33 +540,33 @@ pegasus_server_impl::pegasus_server_impl(dsn::replication::replica *r)
CHECK(index_type_item != INDEX_TYPE_STRING_MAP.end(),
"[pegasus.server]rocksdb_index_type should be one among binary_search, "
"hash_search, two_level_index_search or binary_search_with_first_key.");
tbl_opts.index_type = index_type_item->second;
_tbl_opts.index_type = index_type_item->second;
LOG_INFO_PREFIX("rocksdb_index_type = {}", FLAGS_rocksdb_index_type);

tbl_opts.partition_filters = FLAGS_rocksdb_partition_filters;
_tbl_opts.partition_filters = FLAGS_rocksdb_partition_filters;
// TODO(yingchun): clean up these useless log ?
LOG_INFO_PREFIX("rocksdb_partition_filters = {}", tbl_opts.partition_filters);
LOG_INFO_PREFIX("rocksdb_partition_filters = {}", _tbl_opts.partition_filters);

tbl_opts.metadata_block_size = FLAGS_rocksdb_metadata_block_size;
LOG_INFO_PREFIX("rocksdb_metadata_block_size = {}", tbl_opts.metadata_block_size);
_tbl_opts.metadata_block_size = FLAGS_rocksdb_metadata_block_size;
LOG_INFO_PREFIX("rocksdb_metadata_block_size = {}", _tbl_opts.metadata_block_size);

tbl_opts.cache_index_and_filter_blocks = FLAGS_rocksdb_cache_index_and_filter_blocks;
_tbl_opts.cache_index_and_filter_blocks = FLAGS_rocksdb_cache_index_and_filter_blocks;
LOG_INFO_PREFIX("rocksdb_cache_index_and_filter_blocks = {}",
tbl_opts.cache_index_and_filter_blocks);
_tbl_opts.cache_index_and_filter_blocks);

tbl_opts.pin_top_level_index_and_filter = FLAGS_rocksdb_pin_top_level_index_and_filter;
_tbl_opts.pin_top_level_index_and_filter = FLAGS_rocksdb_pin_top_level_index_and_filter;
LOG_INFO_PREFIX("rocksdb_pin_top_level_index_and_filter = {}",
tbl_opts.pin_top_level_index_and_filter);
_tbl_opts.pin_top_level_index_and_filter);

tbl_opts.cache_index_and_filter_blocks_with_high_priority =
_tbl_opts.cache_index_and_filter_blocks_with_high_priority =
FLAGS_rocksdb_cache_index_and_filter_blocks_with_high_priority;
LOG_INFO_PREFIX("rocksdb_cache_index_and_filter_blocks_with_high_priority = {}",
tbl_opts.cache_index_and_filter_blocks_with_high_priority);
_tbl_opts.cache_index_and_filter_blocks_with_high_priority);

tbl_opts.pin_l0_filter_and_index_blocks_in_cache =
_tbl_opts.pin_l0_filter_and_index_blocks_in_cache =
FLAGS_rocksdb_pin_l0_filter_and_index_blocks_in_cache;
LOG_INFO_PREFIX("rocksdb_pin_l0_filter_and_index_blocks_in_cache = {}",
tbl_opts.pin_l0_filter_and_index_blocks_in_cache);
_tbl_opts.pin_l0_filter_and_index_blocks_in_cache);

// Bloom filter configurations.
if (!FLAGS_rocksdb_disable_bloom_filter) {
Expand All @@ -583,8 +583,8 @@ pegasus_server_impl::pegasus_server_impl(dsn::replication::replica *r)
// 50 | 0.225453 | ~0.00003
// Recommend using no more than three decimal digits after the decimal point, as in 6.667.
// More details: https://github.com/facebook/rocksdb/wiki/RocksDB-Bloom-Filter
tbl_opts.format_version = FLAGS_rocksdb_format_version;
tbl_opts.filter_policy.reset(
_tbl_opts.format_version = FLAGS_rocksdb_format_version;
_tbl_opts.filter_policy.reset(
rocksdb::NewBloomFilterPolicy(FLAGS_rocksdb_bloom_filter_bits_per_key, false));

if (dsn::utils::equals(FLAGS_rocksdb_filter_type, "prefix")) {
Expand All @@ -595,8 +595,8 @@ pegasus_server_impl::pegasus_server_impl(dsn::replication::replica *r)
}
}

_data_cf_opts.table_factory.reset(NewBlockBasedTableFactory(tbl_opts));
_meta_cf_opts.table_factory.reset(NewBlockBasedTableFactory(tbl_opts));
_data_cf_opts.table_factory.reset(NewBlockBasedTableFactory(_tbl_opts));
_meta_cf_opts.table_factory.reset(NewBlockBasedTableFactory(_tbl_opts));

_key_ttl_compaction_filter_factory = std::make_shared<KeyWithTTLCompactionFilterFactory>();
_data_cf_opts.compaction_filter_factory = _key_ttl_compaction_filter_factory;
Expand Down

0 comments on commit 79b77aa

Please sign in to comment.