Skip to content
This repository has been archived by the owner on Jun 23, 2022. It is now read-only.

feat: add rate limiter for backup request #855

Merged
merged 13 commits into from
Jul 14, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions include/dsn/dist/replication/replica_envs.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ class replica_envs
static const std::string BUSINESS_INFO;
static const std::string REPLICA_ACCESS_CONTROLLER_ALLOWED_USERS;
static const std::string READ_QPS_THROTTLING;
static const std::string BACKUP_REQUEST_QPS_THROTTLING;
static const std::string SPLIT_VALIDATE_PARTITION_HASH;
static const std::string USER_SPECIFIED_COMPACTION;
};
Expand Down
1 change: 1 addition & 0 deletions src/common/replication_common.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -649,6 +649,7 @@ const std::string replica_envs::READ_QPS_THROTTLING("replica.read_throttling");
const std::string
replica_envs::SPLIT_VALIDATE_PARTITION_HASH("replica.split.validate_partition_hash");
const std::string replica_envs::USER_SPECIFIED_COMPACTION("user_specified_compaction");
const std::string replica_envs::BACKUP_REQUEST_QPS_THROTTLING("replica.backup_request_throttling");

const std::string bulk_load_constant::BULK_LOAD_INFO("bulk_load_info");
const int32_t bulk_load_constant::BULK_LOAD_REQUEST_INTERVAL = 10;
Expand Down
4 changes: 3 additions & 1 deletion src/meta/app_env_validator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,9 @@ void app_env_validator::register_all_validators()
std::bind(&check_throttling, std::placeholders::_1, std::placeholders::_2)},
{replica_envs::SPLIT_VALIDATE_PARTITION_HASH,
std::bind(&check_split_validation, std::placeholders::_1, std::placeholders::_2)},
{replica_envs::USER_SPECIFIED_COMPACTION, nullptr}};
{replica_envs::USER_SPECIFIED_COMPACTION, nullptr},
{replica_envs::BACKUP_REQUEST_QPS_THROTTLING,
std::bind(&check_throttling, std::placeholders::_1, std::placeholders::_2)}};
}

} // namespace replication
Expand Down
21 changes: 17 additions & 4 deletions src/replica/replica.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,16 @@ replica::replica(
_counter_recent_read_throttling_reject_count.init_app_counter(
"eon.replica", counter_str.c_str(), COUNTER_TYPE_VOLATILE_NUMBER, counter_str.c_str());

counter_str =
fmt::format("recent.backup.request.throttling.delay.count@{}", _app_info.app_name);
_counter_recent_backup_request_throttling_delay_count.init_app_counter(
"eon.replica", counter_str.c_str(), COUNTER_TYPE_VOLATILE_NUMBER, counter_str.c_str());

counter_str =
fmt::format("recent.backup.request.throttling.reject.count@{}", _app_info.app_name);
_counter_recent_backup_request_throttling_reject_count.init_app_counter(
"eon.replica", counter_str.c_str(), COUNTER_TYPE_VOLATILE_NUMBER, counter_str.c_str());

counter_str = fmt::format("dup.disabled_non_idempotent_write_count@{}", _app_info.app_name);
_counter_dup_disabled_non_idempotent_write_count.init_app_counter(
"eon.replica", counter_str.c_str(), COUNTER_TYPE_VOLATILE_NUMBER, counter_str.c_str());
Expand Down Expand Up @@ -185,13 +195,13 @@ void replica::on_client_read(dsn::message_ex *request, bool ignore_throttling)
return;
}

if (!ignore_throttling && throttle_read_request(request)) {
return;
}

if (!request->is_backup_request()) {
// only backup request is allowed to read from a stale replica

if (!ignore_throttling && throttle_read_request(request)) {
return;
}

if (status() != partition_status::PS_PRIMARY) {
response_client_read(request, ERR_INVALID_STATE);
return;
Expand All @@ -207,6 +217,9 @@ void replica::on_client_read(dsn::message_ex *request, bool ignore_throttling)
return;
}
} else {
if (!ignore_throttling && throttle_backup_request(request)) {
return;
}
_counter_backup_request_qps->increment();
}

Expand Down
4 changes: 4 additions & 0 deletions src/replica/replica.h
Original file line number Diff line number Diff line change
Expand Up @@ -433,6 +433,7 @@ class replica : public serverlet<replica>, public ref_counter, public replica_ba
/// return true if request is throttled.
bool throttle_write_request(message_ex *request);
bool throttle_read_request(message_ex *request);
bool throttle_backup_request(message_ex *request);
/// update throttling controllers
/// \see replica::update_app_envs
void update_throttle_envs(const std::map<std::string, std::string> &envs);
Expand Down Expand Up @@ -536,6 +537,7 @@ class replica : public serverlet<replica>, public ref_counter, public replica_ba
throttling_controller _write_qps_throttling_controller; // throttling by requests-per-second
throttling_controller _write_size_throttling_controller; // throttling by bytes-per-second
throttling_controller _read_qps_throttling_controller;
throttling_controller _backup_request_qps_throttling_controller;

// duplication
std::unique_ptr<replica_duplicator_manager> _duplication_mgr;
Expand Down Expand Up @@ -563,6 +565,8 @@ class replica : public serverlet<replica>, public ref_counter, public replica_ba
perf_counter_wrapper _counter_recent_write_throttling_reject_count;
perf_counter_wrapper _counter_recent_read_throttling_delay_count;
perf_counter_wrapper _counter_recent_read_throttling_reject_count;
perf_counter_wrapper _counter_recent_backup_request_throttling_delay_count;
perf_counter_wrapper _counter_recent_backup_request_throttling_reject_count;
std::vector<perf_counter *> _counters_table_level_latency;
perf_counter_wrapper _counter_dup_disabled_non_idempotent_write_count;
perf_counter_wrapper _counter_backup_request_qps;
Expand Down
24 changes: 24 additions & 0 deletions src/replica/replica_throttle.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,27 @@ bool replica::throttle_read_request(message_ex *request)
return false;
}

bool replica::throttle_backup_request(message_ex *request)
{
int64_t delay_ms = 0;
auto type = _backup_request_qps_throttling_controller.control(
request->header->client.timeout_ms, 1, delay_ms);
if (type != throttling_controller::PASS) {
if (type == throttling_controller::DELAY) {
tasking::enqueue(LPC_read_THROTTLING_DELAY,
&_tracker,
[ this, req = message_ptr(request) ]() { on_client_read(req, true); },
get_gpid().thread_hash(),
std::chrono::milliseconds(delay_ms));
_counter_recent_backup_request_throttling_delay_count->increment();
} else { /** type == throttling_controller::REJECT **/
_counter_recent_backup_request_throttling_reject_count->increment();
}
return true;
}
return false;
}

void replica::update_throttle_envs(const std::map<std::string, std::string> &envs)
{
update_throttle_env_internal(
Expand All @@ -80,6 +101,9 @@ void replica::update_throttle_envs(const std::map<std::string, std::string> &env
envs, replica_envs::WRITE_SIZE_THROTTLING, _write_size_throttling_controller);
update_throttle_env_internal(
envs, replica_envs::READ_QPS_THROTTLING, _read_qps_throttling_controller);
update_throttle_env_internal(envs,
replica_envs::BACKUP_REQUEST_QPS_THROTTLING,
_backup_request_qps_throttling_controller);
}

void replica::update_throttle_env_internal(const std::map<std::string, std::string> &envs,
Expand Down