diff --git a/src/server/pegasus_server_impl.cpp b/src/server/pegasus_server_impl.cpp index 019854b637..f67b8f55c6 100644 --- a/src/server/pegasus_server_impl.cpp +++ b/src/server/pegasus_server_impl.cpp @@ -1560,9 +1560,10 @@ dsn::error_code pegasus_server_impl::start(int argc, char **argv) ddebug_replica("start to open rocksDB's rdb({})", rdb_path); - // Here we create a `tmp_data_cf_opts` because we don't want to modify `_data_cf_opts`, which + // Here we create a `_table_data_cf_opts` because we don't want to modify `_data_cf_opts`, which // will be used elsewhere. - rocksdb::ColumnFamilyOptions tmp_data_cf_opts = _data_cf_opts; + _table_data_cf_opts = _data_cf_opts; + _table_data_cf_opts_recalculated = false; bool has_incompatible_db_options = false; if (db_exist) { // When DB exists, meta CF and data CF must be present. @@ -1609,7 +1610,7 @@ dsn::error_code pegasus_server_impl::start(int argc, char **argv) // We don't use `loaded_data_cf_opts` directly because pointer-typed options will // only be initialized with default values when calling 'LoadLatestOptions', see // 'rocksdb/utilities/options_util.h'. - reset_usage_scenario_options(loaded_data_cf_opts, &tmp_data_cf_opts); + reset_usage_scenario_options(loaded_data_cf_opts, &_table_data_cf_opts); _db_opts.allow_ingest_behind = parse_allow_ingest_behind(envs); } } else { @@ -1620,7 +1621,7 @@ dsn::error_code pegasus_server_impl::start(int argc, char **argv) } std::vector column_families( - {{DATA_COLUMN_FAMILY_NAME, tmp_data_cf_opts}, {META_COLUMN_FAMILY_NAME, _meta_cf_opts}}); + {{DATA_COLUMN_FAMILY_NAME, _table_data_cf_opts}, {META_COLUMN_FAMILY_NAME, _meta_cf_opts}}); auto s = rocksdb::CheckOptionsCompatibility(rdb_path, rocksdb::Env::Default(), _db_opts, @@ -2634,6 +2635,10 @@ void pegasus_server_impl::update_usage_scenario(const std::maplevel0_file_num_compaction_trigger = base_opts.level0_file_num_compaction_trigger; target_opts->level0_slowdown_writes_trigger = base_opts.level0_slowdown_writes_trigger; target_opts->level0_stop_writes_trigger = base_opts.level0_stop_writes_trigger; @@ -3007,6 +3013,98 @@ void pegasus_server_impl::reset_usage_scenario_options( target_opts->max_write_buffer_number = base_opts.max_write_buffer_number; } +void pegasus_server_impl::recalculate_data_cf_options( + const rocksdb::ColumnFamilyOptions &cur_data_cf_opts) +{ +#define UPDATE_NUMBER_OPTION_IF_NEEDED(option, value) \ + do { \ + auto _v = (value); \ + if (_v != cur_data_cf_opts.option) { \ + new_options[#option] = std::to_string(_v); \ + } \ + } while (0) + +#define UPDATE_BOOL_OPTION_IF_NEEDED(option, value) \ + do { \ + auto _v = (value); \ + if (_v != cur_data_cf_opts.option) { \ + if (_v) { \ + new_options[#option] = "true"; \ + } else { \ + new_options[#option] = "false"; \ + } \ + } \ + } while (0) + +#define UPDATE_OPTION_IF_NOT_NEARBY(option, value) \ + do { \ + auto _v = (value); \ + if (!check_value_if_nearby(_v, cur_data_cf_opts.option)) { \ + new_options[#option] = std::to_string(get_random_nearby(_v)); \ + } \ + } while (0) + +#define UPDATE_OPTION_IF_NEEDED(option) UPDATE_NUMBER_OPTION_IF_NEEDED(option, _data_cf_opts.option) + + if (_table_data_cf_opts_recalculated) { + return; + } + std::unordered_map new_options; + if (ROCKSDB_ENV_USAGE_SCENARIO_NORMAL == _usage_scenario || + ROCKSDB_ENV_USAGE_SCENARIO_PREFER_WRITE == _usage_scenario) { + if (ROCKSDB_ENV_USAGE_SCENARIO_NORMAL == _usage_scenario) { + UPDATE_OPTION_IF_NOT_NEARBY(write_buffer_size, _data_cf_opts.write_buffer_size); + UPDATE_OPTION_IF_NEEDED(level0_file_num_compaction_trigger); + } else { + uint64_t buffer_size = dsn::rand::next_u64(_data_cf_opts.write_buffer_size, + _data_cf_opts.write_buffer_size * 2); + if (cur_data_cf_opts.write_buffer_size < _data_cf_opts.write_buffer_size || + cur_data_cf_opts.write_buffer_size > _data_cf_opts.write_buffer_size * 2) { + new_options["write_buffer_size"] = std::to_string(buffer_size); + uint64_t max_size = get_random_nearby(_data_cf_opts.max_bytes_for_level_base); + new_options["level0_file_num_compaction_trigger"] = + std::to_string(std::max(4UL, max_size / buffer_size)); + } else if (!check_value_if_nearby(_data_cf_opts.max_bytes_for_level_base, + cur_data_cf_opts.max_bytes_for_level_base)) { + uint64_t max_size = get_random_nearby(_data_cf_opts.max_bytes_for_level_base); + new_options["level0_file_num_compaction_trigger"] = + std::to_string(std::max(4UL, max_size / buffer_size)); + } + } + UPDATE_OPTION_IF_NEEDED(level0_slowdown_writes_trigger); + UPDATE_OPTION_IF_NEEDED(level0_stop_writes_trigger); + UPDATE_OPTION_IF_NEEDED(soft_pending_compaction_bytes_limit); + UPDATE_OPTION_IF_NEEDED(hard_pending_compaction_bytes_limit); + UPDATE_BOOL_OPTION_IF_NEEDED(disable_auto_compactions, false); + UPDATE_OPTION_IF_NEEDED(max_compaction_bytes); + UPDATE_OPTION_IF_NEEDED(max_write_buffer_number); + } else { + // ROCKSDB_ENV_USAGE_SCENARIO_BULK_LOAD + UPDATE_NUMBER_OPTION_IF_NEEDED(level0_file_num_compaction_trigger, 1000000000); + UPDATE_NUMBER_OPTION_IF_NEEDED(level0_slowdown_writes_trigger, 1000000000); + UPDATE_NUMBER_OPTION_IF_NEEDED(level0_stop_writes_trigger, 1000000000); + UPDATE_NUMBER_OPTION_IF_NEEDED(soft_pending_compaction_bytes_limit, 0); + UPDATE_NUMBER_OPTION_IF_NEEDED(hard_pending_compaction_bytes_limit, 0); + UPDATE_BOOL_OPTION_IF_NEEDED(disable_auto_compactions, true); + UPDATE_NUMBER_OPTION_IF_NEEDED(max_compaction_bytes, static_cast(1) << 60); + UPDATE_OPTION_IF_NOT_NEARBY(write_buffer_size, _data_cf_opts.write_buffer_size * 4); + UPDATE_NUMBER_OPTION_IF_NEEDED(max_write_buffer_number, + std::max(_data_cf_opts.max_write_buffer_number, 6)); + } + if (new_options.size() > 0) { + if (set_options(new_options)) { + ddebug_replica( + "{}: recalculate the value of the options related to usage scenario \"{}\"", + replica_name(), + _usage_scenario); + } + } + _table_data_cf_opts_recalculated = true; +#undef UPDATE_OPTION_IF_NEEDED +#undef UPDATE_BOOL_OPTION_IF_NEEDED +#undef UPDATE_NUMBER_OPTION_IF_NEEDED +} + bool pegasus_server_impl::set_options( const std::unordered_map &new_options) { diff --git a/src/server/pegasus_server_impl.h b/src/server/pegasus_server_impl.h index d7f955b4a8..8f42405490 100644 --- a/src/server/pegasus_server_impl.h +++ b/src/server/pegasus_server_impl.h @@ -312,6 +312,9 @@ class pegasus_server_impl : public pegasus_read_service // return true if successfully changed bool set_usage_scenario(const std::string &usage_scenario); + // recalculate option value if necessary + void recalculate_data_cf_options(const rocksdb::ColumnFamilyOptions &cur_data_cf_opts); + void reset_usage_scenario_options(const rocksdb::ColumnFamilyOptions &base_opts, rocksdb::ColumnFamilyOptions *target_opts); @@ -325,6 +328,15 @@ class pegasus_server_impl : public pegasus_read_service return dsn::rand::next_u64(base_value - gap, base_value + gap); } + // return true if value in range of [0.75, 1.25] * base_value + bool check_value_if_nearby(uint64_t base_value, uint64_t check_value) + { + uint64_t gap = base_value / 4; + uint64_t actual_gap = + (base_value < check_value) ? check_value - base_value : base_value - check_value; + return actual_gap <= gap; + } + // return true if expired bool check_if_record_expired(uint32_t epoch_now, rocksdb::Slice raw_value) { @@ -413,11 +425,18 @@ class pegasus_server_impl : public pegasus_read_service std::shared_ptr _key_ttl_compaction_filter_factory; std::shared_ptr _statistics; rocksdb::DBOptions _db_opts; + // The value of option in data_cf according to conf template file config.ini rocksdb::ColumnFamilyOptions _data_cf_opts; + // Dynamically calculate the value of current data_cf option according to the conf module file + // and usage scenario + rocksdb::ColumnFamilyOptions _table_data_cf_opts; rocksdb::ColumnFamilyOptions _meta_cf_opts; rocksdb::ReadOptions _data_cf_rd_opts; std::string _usage_scenario; std::string _user_specified_compaction; + // Whether it is necessary to update the current data_cf, it is required when opening the db at + // the first time, but not later + bool _table_data_cf_opts_recalculated; rocksdb::DB *_db; rocksdb::ColumnFamilyHandle *_data_cf;