Skip to content

Commit

Permalink
cr4
Browse files Browse the repository at this point in the history
  • Loading branch information
GehaFearless committed Aug 15, 2022
1 parent 501ce57 commit dad6442
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 11 deletions.
1 change: 1 addition & 0 deletions src/client_lib/pegasus_client_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,7 @@ class pegasus_client_impl : public pegasus_client
volatile bool _rpc_started;
bool _validate_partition_hash;
bool _full_scan;
bool _only_calculate_count;

void _async_next_internal();
void _start_scan();
Expand Down
28 changes: 23 additions & 5 deletions src/client_lib/pegasus_scanner_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,12 @@ pegasus_client_impl::pegasus_scanner_impl::pegasus_scanner_impl(::dsn::apps::rrd
_options(options),
_splits_hash(std::move(hash)),
_p(-1),
_kv_count(-1),
_context(SCAN_CONTEXT_ID_COMPLETED),
_rpc_started(false),
_validate_partition_hash(validate_partition_hash),
_full_scan(full_scan)
_full_scan(full_scan),
_only_calculate_count(false)
{
}

Expand Down Expand Up @@ -124,8 +126,12 @@ void pegasus_client_impl::pegasus_scanner_impl::_async_next_internal()
dassert(!_queue.empty(), "queue should not be empty when _async_next_internal start");

std::list<async_scan_next_callback_t> temp;
bool already_add_count = true;
if (_only_calculate_count) {
already_add_count = false;
}
while (true) {
while (++_p >= _kvs.size()) {
while (++_p >= _kvs.size() && already_add_count) {
if (_context == SCAN_CONTEXT_ID_COMPLETED) {
// reach the end of one partition
if (_splits_hash.empty()) {
Expand Down Expand Up @@ -170,13 +176,15 @@ void pegasus_client_impl::pegasus_scanner_impl::_async_next_internal()
// valid data got
std::string hash_key, sort_key, value;
uint32_t expire_ts_seconds = 0;
// _kv_count > -1 means req just want to get data counts, not include data value
if (_kv_count == -1) {

if (!_only_calculate_count) {
pegasus_restore_key(_kvs[_p].key, hash_key, sort_key);
value = std::string(_kvs[_p].value.data(), _kvs[_p].value.length());
if (_kvs[_p].__isset.expire_ts_seconds) {
expire_ts_seconds = static_cast<uint32_t>(_kvs[_p].expire_ts_seconds);
}
} else {
already_add_count = true;
}
auto &callback = _queue.front();
if (callback) {
Expand Down Expand Up @@ -272,7 +280,17 @@ void pegasus_client_impl::pegasus_scanner_impl::_on_scan_response(::dsn::error_c
_kvs = std::move(response.kvs);
_p = -1;
_context = response.context_id;
_kv_count = response.kv_count;
// 1. kv_count exist on response means server is newer version (added counting size only
// implementation)
// 1> kv_count == -1 indicates response still have key and value data
// 2> kv_count > -1 indicates response only have kv size count, but not key && value
// 2. kv_count is not existed means server is older version
if (response.__isset.kv_count) {
if (response.kv_count != -1) {
_only_calculate_count = true;
_kv_count = response.kv_count;
}
}
_async_next_internal();
return;
} else if (get_rocksdb_server_error(response.error) == PERR_NOT_FOUND) {
Expand Down
10 changes: 4 additions & 6 deletions src/server/pegasus_server_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1162,6 +1162,7 @@ void pegasus_server_impl::on_get_scanner(get_scanner_rpc rpc)
resp.kvs.reserve(batch_count);

bool return_expire_ts = request.__isset.return_expire_ts ? request.return_expire_ts : false;
bool only_return_count = request.__isset.only_return_count ? request.only_return_count : false;

std::unique_ptr<range_read_limiter> limiter =
dsn::make_unique<range_read_limiter>(_rng_rd_opts.rocksdb_max_iteration_count,
Expand Down Expand Up @@ -1200,7 +1201,7 @@ void pegasus_server_impl::on_get_scanner(get_scanner_rpc rpc)
switch (state) {
case range_iteration_state::kNormal:
count++;
if (!request.only_return_count) {
if (!only_return_count) {
append_key_value(
resp.kvs, it->key(), it->value(), request.no_value, return_expire_ts);
}
Expand All @@ -1223,9 +1224,7 @@ void pegasus_server_impl::on_get_scanner(get_scanner_rpc rpc)

it->Next();
}
if (request.only_return_count) {
// add a null data avoid to refactor the client code
resp.kvs.emplace_back(::dsn::apps::key_value());
if (only_return_count) {
resp.__set_kv_count(count);
}

Expand Down Expand Up @@ -1282,7 +1281,7 @@ void pegasus_server_impl::on_get_scanner(get_scanner_rpc rpc)
request.no_value,
request.__isset.validate_partition_hash ? request.validate_partition_hash : true,
return_expire_ts,
request.only_return_count));
only_return_count));
int64_t handle = _context_cache.put(std::move(context));
resp.context_id = handle;
// if the context is used, it will be fetched and re-put into cache,
Expand Down Expand Up @@ -1401,7 +1400,6 @@ void pegasus_server_impl::on_scan(scan_rpc rpc)
}

if (only_return_count) {
resp.kvs.emplace_back(::dsn::apps::key_value());
resp.__set_kv_count(count);
}

Expand Down

0 comments on commit dad6442

Please sign in to comment.