Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: improve performance of count_data #1091

Merged
merged 13 commits into from
Aug 30, 2022
3 changes: 3 additions & 0 deletions idl/rrdb.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -279,11 +279,13 @@ struct get_scanner_request
11:optional bool validate_partition_hash;
12:optional bool return_expire_ts;
13:optional bool full_scan; // true means client want to build 'full scan' context with the server side, false otherwise
14:optional bool only_return_count;
GehaFearless marked this conversation as resolved.
Show resolved Hide resolved
}

struct scan_request
{
1:i64 context_id;
2:optional bool only_return_count;
GehaFearless marked this conversation as resolved.
Show resolved Hide resolved
}

struct scan_response
Expand All @@ -294,6 +296,7 @@ struct scan_response
4:i32 app_id;
5:i32 partition_index;
6:string server;
7:optional i32 kv_count = -1;
}

struct duplicate_request
Expand Down
14 changes: 14 additions & 0 deletions src/client_lib/pegasus_client_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,11 @@ class pegasus_client_impl : public pegasus_client
std::string &value,
internal_info *info = nullptr) override;

int next(std::string &hashkey,
std::string &sortkey,
std::string &value,
int32_t &count_number) override;

void async_next(async_scan_next_callback_t &&) override;

bool safe_destructible() const override;
Expand Down Expand Up @@ -287,6 +292,7 @@ class pegasus_client_impl : public pegasus_client
std::vector<::dsn::apps::key_value> _kvs;
internal_info _info;
int32_t _p;
int32_t _kv_count;

int64_t _context;
mutable ::dsn::zlock _lock;
Expand Down Expand Up @@ -320,6 +326,14 @@ class pegasus_client_impl : public pegasus_client

void async_next(async_scan_next_callback_t &&callback) override;

int next(std::string &hashkey,
std::string &sortkey,
std::string &value,
int32_t &count_number) override
{
return _p->next(hashkey, sortkey, value, count_number);
}

int next(std::string &hashkey,
std::string &sortkey,
std::string &value,
Expand Down
63 changes: 50 additions & 13 deletions src/client_lib/pegasus_scanner_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,34 @@ pegasus_client_impl::pegasus_scanner_impl::pegasus_scanner_impl(::dsn::apps::rrd
_rpc_started(false),
_validate_partition_hash(validate_partition_hash),
_full_scan(full_scan)

GehaFearless marked this conversation as resolved.
Show resolved Hide resolved
{
}

int pegasus_client_impl::pegasus_scanner_impl::next(std::string &hashkey,
std::string &sortkey,
std::string &value,
int32_t &count_number)
{
::dsn::utils::notify_event op_completed;
int ret = -1;
auto callback = [&](int err,
std::string &&hash,
std::string &&sort,
std::string &&val,
internal_info &&ii,
uint32_t expire_ts_seconds,
int32_t kv_count) {
ret = err;
hashkey = std::move(hash);
sortkey = std::move(sort);
value = std::move(val);
count_number = kv_count;
op_completed.notify();
};
async_next(std::move(callback));
op_completed.wait();
return ret;
}

int pegasus_client_impl::pegasus_scanner_impl::next(std::string &hashkey,
GehaFearless marked this conversation as resolved.
Show resolved Hide resolved
Expand All @@ -70,7 +97,8 @@ int pegasus_client_impl::pegasus_scanner_impl::next(std::string &hashkey,
std::string &&sort,
std::string &&val,
internal_info &&ii,
uint32_t expire_ts_seconds) {
uint32_t expire_ts_seconds,
int32_t kv_count) {
ret = err;
hashkey = std::move(hash);
sortkey = std::move(sort);
Expand Down Expand Up @@ -139,7 +167,8 @@ void pegasus_client_impl::pegasus_scanner_impl::_async_next_internal()
std::string(),
std::string(),
std::move(info),
0);
0,
-1);
}
}
return;
Expand All @@ -162,13 +191,15 @@ void pegasus_client_impl::pegasus_scanner_impl::_async_next_internal()
}

// valid data got
std::string hash_key, sort_key;
pegasus_restore_key(_kvs[_p].key, hash_key, sort_key);
std::string value(_kvs[_p].value.data(), _kvs[_p].value.length());
uint32_t expire_ts_seconds = _kvs[_p].__isset.expire_ts_seconds
? static_cast<uint32_t>(_kvs[_p].expire_ts_seconds)
: 0;

std::string hash_key, sort_key, value = "";
GehaFearless marked this conversation as resolved.
Show resolved Hide resolved
uint32_t expire_ts_seconds = 0;
if (_kv_count == -1) {
GehaFearless marked this conversation as resolved.
Show resolved Hide resolved
GehaFearless marked this conversation as resolved.
Show resolved Hide resolved
pegasus_restore_key(_kvs[_p].key, hash_key, sort_key);
value = std::string(_kvs[_p].value.data(), _kvs[_p].value.length());
if (_kvs[_p].__isset.expire_ts_seconds) {
expire_ts_seconds = static_cast<uint32_t>(_kvs[_p].expire_ts_seconds);
}
}
auto &callback = _queue.front();
if (callback) {
internal_info info(_info);
Expand All @@ -178,7 +209,8 @@ void pegasus_client_impl::pegasus_scanner_impl::_async_next_internal()
std::move(sort_key),
std::move(value),
std::move(info),
expire_ts_seconds);
expire_ts_seconds,
_kv_count);
_lock.lock();
if (_queue.size() == 1) {
// keep the last callback until exit this function
Expand All @@ -196,6 +228,7 @@ void pegasus_client_impl::pegasus_scanner_impl::_next_batch()
{
::dsn::apps::scan_request req;
req.context_id = _context;
req.__set_only_return_count(_options.only_return_count);

dassert(!_rpc_started, "");
_rpc_started = true;
Expand Down Expand Up @@ -230,6 +263,7 @@ void pegasus_client_impl::pegasus_scanner_impl::_start_scan()
req.__set_validate_partition_hash(_validate_partition_hash);
req.__set_return_expire_ts(_options.return_expire_ts);
req.__set_full_scan(_full_scan);
req.__set_only_return_count(_options.only_return_count);

dassert(!_rpc_started, "");
_rpc_started = true;
Expand Down Expand Up @@ -261,6 +295,7 @@ void pegasus_client_impl::pegasus_scanner_impl::_on_scan_response(::dsn::error_c
_kvs = std::move(response.kvs);
_p = -1;
_context = response.context_id;
_kv_count = response.kv_count;
_async_next_internal();
return;
} else if (get_rocksdb_server_error(response.error) == PERR_NOT_FOUND) {
Expand Down Expand Up @@ -288,7 +323,7 @@ void pegasus_client_impl::pegasus_scanner_impl::_on_scan_response(::dsn::error_c

for (auto &callback : temp) {
if (callback) {
callback(ret, std::string(), std::string(), std::string(), internal_info(info), 0);
callback(ret, std::string(), std::string(), std::string(), internal_info(info), 0, -1);
}
}
}
Expand Down Expand Up @@ -323,13 +358,15 @@ void pegasus_client_impl::pegasus_scanner_impl_wrapper::async_next(
std::string &&sort_key,
std::string &&value,
internal_info &&info,
uint32_t expire_ts_seconds) {
uint32_t expire_ts_seconds,
int32_t kv_count) {
user_callback(error_code,
std::move(hash_key),
std::move(sort_key),
std::move(value),
std::move(info),
expire_ts_seconds);
expire_ts_seconds,
kv_count);
});
}

Expand Down
3 changes: 2 additions & 1 deletion src/geo/lib/geo_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -862,7 +862,8 @@ void geo_client::do_scan(pegasus_client::pegasus_scanner_wrapper scanner_wrapper
std::string &&geo_sort_key,
std::string &&value,
pegasus_client::internal_info &&info,
uint32_t expire_ts_seconds) mutable {
uint32_t expire_ts_seconds,
int32_t kv_count) mutable {
if (ret == PERR_SCAN_COMPLETE) {
cb();
return;
Expand Down
15 changes: 12 additions & 3 deletions src/include/pegasus/client.h
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,7 @@ class pegasus_client
std::string sort_key_filter_pattern;
bool no_value; // only fetch hash_key and sort_key, but not fetch value
bool return_expire_ts;
bool only_return_count;
scan_options()
: timeout_ms(5000),
batch_size(100),
Expand All @@ -260,7 +261,8 @@ class pegasus_client
hash_key_filter_type(FT_NO_FILTER),
sort_key_filter_type(FT_NO_FILTER),
no_value(false),
return_expire_ts(false)
return_expire_ts(false),
only_return_count(false)
{
}
scan_options(const scan_options &o)
Expand All @@ -273,7 +275,8 @@ class pegasus_client
sort_key_filter_type(o.sort_key_filter_type),
sort_key_filter_pattern(o.sort_key_filter_pattern),
no_value(o.no_value),
return_expire_ts(o.return_expire_ts)
return_expire_ts(o.return_expire_ts),
only_return_count(o.only_return_count)
{
}
};
Expand Down Expand Up @@ -312,7 +315,8 @@ class pegasus_client
std::string && /*sort_key*/,
std::string && /*value*/,
internal_info && /*info*/,
uint32_t /*expire_ts_seconds*/)>
uint32_t /*expire_ts_seconds*/,
int32_t /*kv_count*/)>
async_scan_next_callback_t;
typedef std::function<void(int /*error_code*/, pegasus_scanner * /*hash_scanner*/)>
async_get_scanner_callback_t;
Expand Down Expand Up @@ -343,6 +347,11 @@ class pegasus_client
std::string &value,
internal_info *info = nullptr) = 0;

virtual int next(std::string &hashkey,
std::string &sortkey,
std::string &value,
int32_t &count_number) = 0;

///
/// \brief async get the next key-value pair of this scanner
/// thread-safe
Expand Down
62 changes: 41 additions & 21 deletions src/server/pegasus_server_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1198,7 +1198,8 @@ void pegasus_server_impl::on_get_scanner(get_scanner_rpc rpc)
epoch_now,
request.no_value,
request.__isset.validate_partition_hash ? request.validate_partition_hash : true,
return_expire_ts);
return_expire_ts,
request.only_return_count ? false : true);
GehaFearless marked this conversation as resolved.
Show resolved Hide resolved
switch (state) {
case range_iteration_state::kNormal:
count++;
Expand All @@ -1221,6 +1222,10 @@ void pegasus_server_impl::on_get_scanner(get_scanner_rpc rpc)

it->Next();
}
if (request.only_return_count) {
resp.kvs.emplace_back(::dsn::apps::key_value());
acelyc111 marked this conversation as resolved.
Show resolved Hide resolved
resp.__set_kv_count(count);
}

// check iteration time whether exceed limit
if (!complete) {
Expand Down Expand Up @@ -1297,7 +1302,10 @@ void pegasus_server_impl::on_get_scanner(get_scanner_rpc rpc)
_pfc_recent_filter_count->add(filter_count);
}

_cu_calculator->add_scan_cu(req, resp.error, resp.kvs);
// abandon calculate capacity unit
if (!request.only_return_count) {
GehaFearless marked this conversation as resolved.
Show resolved Hide resolved
_cu_calculator->add_scan_cu(req, resp.error, resp.kvs);
}
_pfc_scan_latency->set(dsn_now_ns() - start_time);
}

Expand Down Expand Up @@ -1365,7 +1373,8 @@ void pegasus_server_impl::on_scan(scan_rpc rpc)
epoch_now,
no_value,
validate_hash,
return_expire_ts);
return_expire_ts,
request.only_return_count ? false : true);
GehaFearless marked this conversation as resolved.
Show resolved Hide resolved
switch (state) {
case range_iteration_state::kNormal:
count++;
Expand All @@ -1389,6 +1398,11 @@ void pegasus_server_impl::on_scan(scan_rpc rpc)
it->Next();
}

if (request.only_return_count) {
GehaFearless marked this conversation as resolved.
Show resolved Hide resolved
resp.kvs.emplace_back(::dsn::apps::key_value());
GehaFearless marked this conversation as resolved.
Show resolved Hide resolved
resp.__set_kv_count(count);
}

// check iteration time whether exceed limit
if (!complete) {
limiter->time_check_after_incomplete_scan();
Expand Down Expand Up @@ -1449,7 +1463,10 @@ void pegasus_server_impl::on_scan(scan_rpc rpc)
resp.error = rocksdb::Status::Code::kNotFound;
}

_cu_calculator->add_scan_cu(req, resp.error, resp.kvs);
// abandon calculate capacity unit
if (request.only_return_count) {
GehaFearless marked this conversation as resolved.
Show resolved Hide resolved
_cu_calculator->add_scan_cu(req, resp.error, resp.kvs);
}
_pfc_scan_latency->set(dsn_now_ns() - start_time);
}

Expand Down Expand Up @@ -2274,7 +2291,8 @@ pegasus_server_impl::append_key_value_for_scan(std::vector<::dsn::apps::key_valu
uint32_t epoch_now,
bool no_value,
bool request_validate_hash,
bool request_expire_ts)
bool request_expire_ts,
bool fill_value)
GehaFearless marked this conversation as resolved.
Show resolved Hide resolved
{
if (check_if_record_expired(epoch_now, value)) {
if (_verbose_log) {
Expand All @@ -2293,8 +2311,6 @@ pegasus_server_impl::append_key_value_for_scan(std::vector<::dsn::apps::key_valu
}
}

::dsn::apps::key_value kv;

// extract raw key
::dsn::blob raw_key(key.data(), 0, key.size());
if (hash_key_filter_type != ::dsn::apps::filter_type::FT_NO_FILTER ||
Expand All @@ -2316,24 +2332,28 @@ pegasus_server_impl::append_key_value_for_scan(std::vector<::dsn::apps::key_valu
return range_iteration_state::kFiltered;
}
}
std::shared_ptr<char> key_buf(::dsn::utils::make_shared_array<char>(raw_key.length()));
::memcpy(key_buf.get(), raw_key.data(), raw_key.length());
kv.key.assign(std::move(key_buf), 0, raw_key.length());
if (fill_value) {
GehaFearless marked this conversation as resolved.
Show resolved Hide resolved
::dsn::apps::key_value kv;

// extract expire ts if necessary
if (request_expire_ts) {
auto expire_ts_seconds =
pegasus_extract_expire_ts(_pegasus_data_version, utils::to_string_view(value));
kv.__set_expire_ts_seconds(static_cast<int32_t>(expire_ts_seconds));
}
std::shared_ptr<char> key_buf(::dsn::utils::make_shared_array<char>(raw_key.length()));
::memcpy(key_buf.get(), raw_key.data(), raw_key.length());
kv.key.assign(std::move(key_buf), 0, raw_key.length());

// extract value
if (!no_value) {
std::string value_buf(value.data(), value.size());
pegasus_extract_user_data(_pegasus_data_version, std::move(value_buf), kv.value);
// extract expire ts if necessary
if (request_expire_ts) {
auto expire_ts_seconds =
pegasus_extract_expire_ts(_pegasus_data_version, utils::to_string_view(value));
kv.__set_expire_ts_seconds(static_cast<int32_t>(expire_ts_seconds));
}

// extract value
if (!no_value) {
std::string value_buf(value.data(), value.size());
pegasus_extract_user_data(_pegasus_data_version, std::move(value_buf), kv.value);
}
kvs.emplace_back(std::move(kv));
}

kvs.emplace_back(std::move(kv));
return range_iteration_state::kNormal;
}

Expand Down
3 changes: 2 additions & 1 deletion src/server/pegasus_server_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,8 @@ class pegasus_server_impl : public pegasus_read_service
uint32_t epoch_now,
bool no_value,
bool request_validate_hash,
bool request_expire_ts);
bool request_expire_ts,
bool fill_value);

range_iteration_state
append_key_value_for_multi_get(std::vector<::dsn::apps::key_value> &kvs,
Expand Down
Loading