Skip to content

Commit

Permalink
shell: copy_data command supports filter (#271)
Browse files Browse the repository at this point in the history
  • Loading branch information
qinzuoyan authored Feb 15, 2019
1 parent 5db88c0 commit 7314ec1
Show file tree
Hide file tree
Showing 4 changed files with 309 additions and 112 deletions.
2 changes: 1 addition & 1 deletion rocksdb
Submodule rocksdb updated 1 files
+5 −5 docs/Gemfile.lock
285 changes: 185 additions & 100 deletions src/shell/command_helper.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include <dsn/dist/replication/replication_ddl_client.h>
#include <dsn/dist/replication/mutation_log_tool.h>
#include <dsn/perf_counter/perf_counter_utils.h>
#include <dsn/utility/string_view.h>

#include <rrdb/rrdb.code.definition.h>
#include <pegasus/version.h>
Expand Down Expand Up @@ -101,6 +102,11 @@ struct scan_data_context
int split_id;
int max_batch_count;
int timeout_ms;
bool no_overwrite; // if set true, then use check_and_set() instead of set()
// when inserting data to destination table for copy_data,
// to not overwrite old data if it aleady exist.
pegasus::pegasus_client::filter_type value_filter_type;
std::string value_filter_pattern;
pegasus::pegasus_client::pegasus_scanner_wrapper scanner;
pegasus::pegasus_client *client;
pegasus::geo::geo_client *geoclient;
Expand Down Expand Up @@ -133,6 +139,8 @@ struct scan_data_context
split_id(split_id_),
max_batch_count(max_batch_count_),
timeout_ms(timeout_ms_),
no_overwrite(false),
value_filter_type(pegasus::pegasus_client::FT_NO_FILTER),
scanner(scanner_),
client(client_),
geoclient(geoclient_),
Expand All @@ -150,6 +158,12 @@ struct scan_data_context
// when split_request_count = 1
dassert(max_batch_count > 1, "");
}
void set_value_filter(pegasus::pegasus_client::filter_type type, const std::string &pattern)
{
value_filter_type = type;
value_filter_pattern = pattern;
}
void set_no_overwrite() { no_overwrite = true; }
};
inline void update_atomic_max(std::atomic_long &max, long value)
{
Expand All @@ -160,6 +174,36 @@ inline void update_atomic_max(std::atomic_long &max, long value)
}
}
}
// return true if the data is valid for the filter
inline bool validate_filter(pegasus::pegasus_client::filter_type filter_type,
const std::string &filter_pattern,
const std::string &value)
{
switch (filter_type) {
case pegasus::pegasus_client::FT_NO_FILTER:
return true;
case pegasus::pegasus_client::FT_MATCH_ANYWHERE:
case pegasus::pegasus_client::FT_MATCH_PREFIX:
case pegasus::pegasus_client::FT_MATCH_POSTFIX: {
if (filter_pattern.length() == 0)
return true;
if (value.length() < filter_pattern.length())
return false;
if (filter_type == pegasus::pegasus_client::FT_MATCH_ANYWHERE) {
return dsn::string_view(value).find(filter_pattern) != dsn::string_view::npos;
} else if (filter_type == pegasus::pegasus_client::FT_MATCH_PREFIX) {
return ::memcmp(value.data(), filter_pattern.data(), filter_pattern.length()) == 0;
} else { // filter_type == pegasus::pegasus_client::FT_MATCH_POSTFIX
return ::memcmp(value.data() + value.length() - filter_pattern.length(),
filter_pattern.data(),
filter_pattern.length()) == 0;
}
}
default:
dassert(false, "unsupported filter type: %d", filter_type);
}
return false;
}
inline void scan_data_next(scan_data_context *context)
{
while (!context->split_completed.load() && !context->error_occurred->load() &&
Expand All @@ -171,112 +215,153 @@ inline void scan_data_next(scan_data_context *context)
std::string &&value,
pegasus::pegasus_client::internal_info &&info) {
if (ret == pegasus::PERR_OK) {
switch (context->op) {
case SCAN_COPY:
context->split_request_count++;
context->client->async_set(
hash_key,
sort_key,
value,
[context](int err, pegasus::pegasus_client::internal_info &&info) {
if (err != pegasus::PERR_OK) {
if (!context->split_completed.exchange(true)) {
fprintf(stderr,
"ERROR: split[%d] async set failed: %s\n",
context->split_id,
context->client->get_error_string(err));
context->error_occurred->store(true);
if (context->value_filter_type == pegasus::pegasus_client::FT_NO_FILTER ||
validate_filter(
context->value_filter_type, context->value_filter_pattern, value)) {
switch (context->op) {
case SCAN_COPY:
context->split_request_count++;
if (context->no_overwrite) {
auto callback = [context](
int err,
pegasus::pegasus_client::check_and_set_results &&results,
pegasus::pegasus_client::internal_info &&info) {
if (err != pegasus::PERR_OK) {
if (!context->split_completed.exchange(true)) {
fprintf(stderr,
"ERROR: split[%d] async check and set failed: %s\n",
context->split_id,
context->client->get_error_string(err));
context->error_occurred->store(true);
}
} else {
if (results.set_succeed) {
context->split_rows++;
}
scan_data_next(context);
}
} else {
context->split_rows++;
scan_data_next(context);
}
// should put "split_request_count--" at end of the scope,
// to prevent that split_request_count becomes 0 in the middle.
context->split_request_count--;
},
context->timeout_ms);
break;
case SCAN_CLEAR:
context->split_request_count++;
context->client->async_del(
hash_key,
sort_key,
[context](int err, pegasus::pegasus_client::internal_info &&info) {
if (err != pegasus::PERR_OK) {
if (!context->split_completed.exchange(true)) {
fprintf(stderr,
"ERROR: split[%d] async del failed: %s\n",
context->split_id,
context->client->get_error_string(err));
context->error_occurred->store(true);
// should put "split_request_count--" at end of the scope,
// to prevent that split_request_count becomes 0 in the middle.
context->split_request_count--;
};
pegasus::pegasus_client::check_and_set_options options;
context->client->async_check_and_set(
hash_key,
sort_key,
pegasus::pegasus_client::cas_check_type::CT_VALUE_NOT_EXIST,
"",
sort_key,
value,
options,
std::move(callback),
context->timeout_ms);
} else {
auto callback =
[context](int err, pegasus::pegasus_client::internal_info &&info) {
if (err != pegasus::PERR_OK) {
if (!context->split_completed.exchange(true)) {
fprintf(stderr,
"ERROR: split[%d] async set failed: %s\n",
context->split_id,
context->client->get_error_string(err));
context->error_occurred->store(true);
}
} else {
context->split_rows++;
scan_data_next(context);
}
// should put "split_request_count--" at end of the scope,
// to prevent that split_request_count becomes 0 in the middle.
context->split_request_count--;
};
context->client->async_set(hash_key,
sort_key,
value,
std::move(callback),
context->timeout_ms);
}
break;
case SCAN_CLEAR:
context->split_request_count++;
context->client->async_del(
hash_key,
sort_key,
[context](int err, pegasus::pegasus_client::internal_info &&info) {
if (err != pegasus::PERR_OK) {
if (!context->split_completed.exchange(true)) {
fprintf(stderr,
"ERROR: split[%d] async del failed: %s\n",
context->split_id,
context->client->get_error_string(err));
context->error_occurred->store(true);
}
} else {
context->split_rows++;
scan_data_next(context);
}
} else {
context->split_rows++;
scan_data_next(context);
// should put "split_request_count--" at end of the scope,
// to prevent that split_request_count becomes 0 in the middle.
context->split_request_count--;
},
context->timeout_ms);
break;
case SCAN_COUNT:
context->split_rows++;
if (context->stat_size) {
long hash_key_size = hash_key.size();
context->hash_key_size_histogram.Add(hash_key_size);

long sort_key_size = sort_key.size();
context->sort_key_size_histogram.Add(sort_key_size);

long value_size = value.size();
context->value_size_histogram.Add(value_size);

long row_size = hash_key_size + sort_key_size + value_size;
context->row_size_histogram.Add(row_size);

if (context->top_count > 0) {
context->top_rows.push(
std::move(hash_key), std::move(sort_key), row_size);
}
// should put "split_request_count--" at end of the scope,
// to prevent that split_request_count becomes 0 in the middle.
context->split_request_count--;
},
context->timeout_ms);
break;
case SCAN_COUNT:
context->split_rows++;
if (context->stat_size) {
long hash_key_size = hash_key.size();
context->hash_key_size_histogram.Add(hash_key_size);

long sort_key_size = sort_key.size();
context->sort_key_size_histogram.Add(sort_key_size);

long value_size = value.size();
context->value_size_histogram.Add(value_size);

long row_size = hash_key_size + sort_key_size + value_size;
context->row_size_histogram.Add(row_size);

if (context->top_count > 0) {
context->top_rows.push(
std::move(hash_key), std::move(sort_key), row_size);
}
}
if (context->count_hash_key) {
if (hash_key != context->last_hash_key) {
context->split_hash_key_count++;
context->last_hash_key = std::move(hash_key);
if (context->count_hash_key) {
if (hash_key != context->last_hash_key) {
context->split_hash_key_count++;
context->last_hash_key = std::move(hash_key);
}
}
}
scan_data_next(context);
break;
case SCAN_GEN_GEO:
context->split_request_count++;
context->geoclient->async_set(
hash_key,
sort_key,
value,
[context](int err, pegasus::pegasus_client::internal_info &&info) {
if (err != pegasus::PERR_OK) {
if (!context->split_completed.exchange(true)) {
fprintf(stderr,
"ERROR: split[%d] async set failed: %s\n",
context->split_id,
context->client->get_error_string(err));
context->error_occurred->store(true);
scan_data_next(context);
break;
case SCAN_GEN_GEO:
context->split_request_count++;
context->geoclient->async_set(
hash_key,
sort_key,
value,
[context](int err, pegasus::pegasus_client::internal_info &&info) {
if (err != pegasus::PERR_OK) {
if (!context->split_completed.exchange(true)) {
fprintf(stderr,
"ERROR: split[%d] async set failed: %s\n",
context->split_id,
context->client->get_error_string(err));
context->error_occurred->store(true);
}
} else {
context->split_rows++;
scan_data_next(context);
}
} else {
context->split_rows++;
scan_data_next(context);
}
// should put "split_request_count--" at end of the scope,
// to prevent that split_request_count becomes 0 in the middle.
context->split_request_count--;
},
context->timeout_ms);
break;
default:
dassert(false, "op = %d", context->op);
break;
// should put "split_request_count--" at end of the scope,
// to prevent that split_request_count becomes 0 in the middle.
context->split_request_count--;
},
context->timeout_ms);
break;
default:
dassert(false, "op = %d", context->op);
break;
}
}
} else if (ret == pegasus::PERR_SCAN_COMPLETE) {
context->split_completed.store(true);
Expand Down
Loading

0 comments on commit 7314ec1

Please sign in to comment.