Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(hotspot): calculator auto detect hotkey in the hot partition #604

Merged
merged 14 commits into from
Sep 22, 2020
48 changes: 47 additions & 1 deletion src/server/hotspot_partition_calculator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
#include <dsn/dist/replication/duplication_common.h>
#include <dsn/tool-api/task_tracker.h>
#include "pegasus_read_service.h"
#include <dsn/utility/fail_point.h>

namespace pegasus {
namespace server {
Expand All @@ -42,6 +43,22 @@ DSN_DEFINE_int64("pegasus.collector",
"eliminate outdated historical "
"data");

DSN_DEFINE_bool("pegasus.collector",
enable_hotkey_detect,
false,
"auto detect hot key in the hot paritition");

DSN_DEFINE_int32("pegasus.collector",
Smityz marked this conversation as resolved.
Show resolved Hide resolved
hot_partition_threshold,
3,
"threshold of hotspot partition value, if app.stat.hotspots >= "
"FLAGS_hotpartition_threshold, this partition is a hot partition");

DSN_DEFINE_int32("pegasus.collector",
occurrence_threshold,
100,
"hot paritiotion occurrence times' threshold to send rpc to detect hotkey");

void hotspot_partition_calculator::data_aggregate(const std::vector<row_data> &partition_stats)
{
while (_partitions_stat_histories.size() >= FLAGS_max_hotspot_store_size) {
Expand Down Expand Up @@ -128,15 +145,44 @@ void hotspot_partition_calculator::data_analyse()
stat_histories_analyse(data_type, hot_points);
update_hot_point(data_type, hot_points);
}
if (!FLAGS_enable_hotkey_detect) {
return;
}
for (int data_type = 0; data_type <= 1; data_type++) {
detect_hotkey_in_hotpartition(data_type);
}
}

void hotspot_partition_calculator::detect_hotkey_in_hotpartition(int data_type)
{
for (int index = 0; index < _hot_points.size(); index++) {
if (_hot_points[index][data_type].get()->get_value() >= FLAGS_hot_partition_threshold) {
if (++_hotpartition_counter[index][data_type] >= FLAGS_occurrence_threshold) {
derror_f("Find a {} hot partition {}.{}",
(data_type == partition_qps_type::READ_HOTSPOT_DATA ? "read" : "write"),
_app_name,
index);
send_hotkey_detect_request(_app_name,
index,
(data_type == dsn::apps::hotkey_type::type::READ)
? dsn::apps::hotkey_type::type::READ
: dsn::apps::hotkey_type::type::WRITE,
dsn::apps::hotkey_detect_action::type::START);
}
} else {
_hotpartition_counter[index][data_type] =
std::max(_hotpartition_counter[index][data_type] - 1, 0);
}
}
}

// TODO:(TangYanzhao) call this function to start hotkey detection
/*static*/ void hotspot_partition_calculator::send_hotkey_detect_request(
const std::string &app_name,
const uint64_t partition_index,
const dsn::apps::hotkey_type::type hotkey_type,
const dsn::apps::hotkey_detect_action::type action)
{
FAIL_POINT_INJECT_F("send_hotkey_detect_request", [](dsn::string_view) {});
auto request = std::make_unique<dsn::apps::hotkey_detect_request>();
request->type = hotkey_type;
request->action = action;
Expand Down
14 changes: 12 additions & 2 deletions src/server/hotspot_partition_calculator.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@

#pragma once

#include "hotspot_partition_stat.h"
#include <gtest/gtest_prod.h>

#include <dsn/perf_counter/perf_counter.h>
#include <dsn/utility/flags.h>
#include "hotspot_partition_stat.h"

namespace pegasus {
namespace server {
Expand All @@ -36,7 +38,7 @@ class hotspot_partition_calculator
{
public:
hotspot_partition_calculator(const std::string &app_name, int partition_count)
: _app_name(app_name), _hot_points(partition_count)
: _app_name(app_name), _hot_points(partition_count), _hotpartition_counter(partition_count)
{
init_perf_counter(partition_count);
}
Expand All @@ -55,6 +57,7 @@ class hotspot_partition_calculator
void stat_histories_analyse(int data_type, std::vector<int> &hot_points);
// set hot_point to corresponding perf_counter
void update_hot_point(int data_type, std::vector<int> &hot_points);
void detect_hotkey_in_hotpartition(int data_type);

const std::string _app_name;
void init_perf_counter(int perf_counter_count);
Expand All @@ -63,7 +66,14 @@ class hotspot_partition_calculator
// saving historical data can improve accuracy
stat_histories _partitions_stat_histories;

// _hotpartition_counter p[index_of_partitions][type_of_read(0)/write(1)_stat]
// it's a counter to find partitions that often exceed the threshold
// If the hot_point of some partitions are always high, calculator will send a RPC to detect
// hotkey on the replica automatically
std::vector<std::array<int, 2>> _hotpartition_counter;

friend class hotspot_partition_test;
FRIEND_TEST(hotspot_partition_test, send_hotkey_detect_request);
};

} // namespace server
Expand Down
3 changes: 3 additions & 0 deletions src/server/test/config.ini
Original file line number Diff line number Diff line change
Expand Up @@ -509,3 +509,6 @@ onebox2 = 2
[pegasus.clusters]
onebox = 0.0.0.0:34701
onebox2 = 0.0.0.0:35701

[pegasus.collector]
enable_hotkey_detect = true
62 changes: 61 additions & 1 deletion src/server/test/hotspot_partition_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,25 @@

#include "pegasus_server_test_base.h"
#include <gtest/gtest.h>
#include <dsn/utility/fail_point.h>

namespace pegasus {
namespace server {

DSN_DECLARE_int32(occurrence_threshold);

class hotspot_partition_test : public pegasus_server_test_base
{
public:
hotspot_partition_test() : calculator("TEST", 8){};
hotspot_partition_test() : calculator("TEST", 8)
{
dsn::fail::setup();
dsn::fail::cfg("send_hotkey_detect_request", "return()");
acelyc111 marked this conversation as resolved.
Show resolved Hide resolved
};
~hotspot_partition_test() { dsn::fail::teardown(); }

hotspot_partition_calculator calculator;

std::vector<row_data> generate_row_data()
{
std::vector<row_data> test_rows;
Expand All @@ -38,6 +48,7 @@ class hotspot_partition_test : public pegasus_server_test_base
}
return test_rows;
}

std::vector<std::vector<double>> get_calculator_result(const hot_partition_counters &counters)
{
std::vector<std::vector<double>> result;
Expand All @@ -49,6 +60,7 @@ class hotspot_partition_test : public pegasus_server_test_base
}
return result;
}

void test_policy_in_scenarios(std::vector<row_data> scenario,
std::vector<std::vector<double>> &expect_result,
hotspot_partition_calculator &calculator)
Expand All @@ -58,6 +70,23 @@ class hotspot_partition_test : public pegasus_server_test_base
std::vector<std::vector<double>> result = get_calculator_result(calculator._hot_points);
ASSERT_EQ(result, expect_result);
}

void aggregate_analyse_data(hotspot_partition_calculator &calculator,
std::vector<row_data> scenario,
std::vector<std::array<int, 2>> &expect_result,
int loop_times)
{
for (int i = 0; i < loop_times; i++) {
calculator.data_aggregate(scenario);
calculator.data_analyse();
}
ASSERT_EQ(calculator._hotpartition_counter, expect_result);
}
Smityz marked this conversation as resolved.
Show resolved Hide resolved

void clear_calculator_histories(hotspot_partition_calculator &calculator)
{
calculator._partitions_stat_histories.clear();
}
};

TEST_F(hotspot_partition_test, hotspot_partition_policy)
Expand Down Expand Up @@ -92,6 +121,37 @@ TEST_F(hotspot_partition_test, hotspot_partition_policy)
test_rows[HOT_SCENARIO_1_WRITE_HOT_PARTITION].put_qps = 5000.0;
expect_vector = {{0, 0, 0, 4, 0, 0, 0, 0}, {0, 0, 4, 0, 0, 0, 0, 0}};
test_policy_in_scenarios(test_rows, expect_vector, calculator);
clear_calculator_histories(calculator);
}

TEST_F(hotspot_partition_test, send_hotkey_detect_request)
{
const int READ_HOT_PARTITION = 7;
const int WRITE_HOT_PARTITION = 0;
std::vector<row_data> test_rows = generate_row_data();
test_rows[READ_HOT_PARTITION].get_qps = 5000.0;
test_rows[WRITE_HOT_PARTITION].put_qps = 5000.0;
int hotpartition_count = FLAGS_occurrence_threshold;
std::vector<std::array<int, 2>> expect_result = {{0, hotpartition_count},
{0, 0},
{0, 0},
{0, 0},
{0, 0},
{0, 0},
{0, 0},
{hotpartition_count, 0}};
aggregate_analyse_data(calculator, test_rows, expect_result, FLAGS_occurrence_threshold);
const int back_to_normal = 30;
hotpartition_count = FLAGS_occurrence_threshold - back_to_normal;
expect_result = {{0, hotpartition_count},
{0, 0},
{0, 0},
{0, 0},
{0, 0},
{0, 0},
{0, 0},
{hotpartition_count, 0}};
hycdong marked this conversation as resolved.
Show resolved Hide resolved
aggregate_analyse_data(calculator, generate_row_data(), expect_result, back_to_normal);
}

} // namespace server
Expand Down