diff --git a/rocksdb b/rocksdb index 6309d3703f..7d8817d0a5 160000 --- a/rocksdb +++ b/rocksdb @@ -1 +1 @@ -Subproject commit 6309d3703f25d162885accf56556c7b23c7a267d +Subproject commit 7d8817d0a57d055d33c6a03d45bc8125f40a3188 diff --git a/src/shell/command_helper.h b/src/shell/command_helper.h index c305e800ff..1034d738c6 100644 --- a/src/shell/command_helper.h +++ b/src/shell/command_helper.h @@ -19,6 +19,7 @@ #include #include #include +#include #include #include @@ -101,6 +102,11 @@ struct scan_data_context int split_id; int max_batch_count; int timeout_ms; + bool no_overwrite; // if set true, then use check_and_set() instead of set() + // when inserting data to destination table for copy_data, + // to not overwrite old data if it aleady exist. + pegasus::pegasus_client::filter_type value_filter_type; + std::string value_filter_pattern; pegasus::pegasus_client::pegasus_scanner_wrapper scanner; pegasus::pegasus_client *client; pegasus::geo::geo_client *geoclient; @@ -133,6 +139,8 @@ struct scan_data_context split_id(split_id_), max_batch_count(max_batch_count_), timeout_ms(timeout_ms_), + no_overwrite(false), + value_filter_type(pegasus::pegasus_client::FT_NO_FILTER), scanner(scanner_), client(client_), geoclient(geoclient_), @@ -150,6 +158,12 @@ struct scan_data_context // when split_request_count = 1 dassert(max_batch_count > 1, ""); } + void set_value_filter(pegasus::pegasus_client::filter_type type, const std::string &pattern) + { + value_filter_type = type; + value_filter_pattern = pattern; + } + void set_no_overwrite() { no_overwrite = true; } }; inline void update_atomic_max(std::atomic_long &max, long value) { @@ -160,6 +174,36 @@ inline void update_atomic_max(std::atomic_long &max, long value) } } } +// return true if the data is valid for the filter +inline bool validate_filter(pegasus::pegasus_client::filter_type filter_type, + const std::string &filter_pattern, + const std::string &value) +{ + switch (filter_type) { + case pegasus::pegasus_client::FT_NO_FILTER: + return true; + case pegasus::pegasus_client::FT_MATCH_ANYWHERE: + case pegasus::pegasus_client::FT_MATCH_PREFIX: + case pegasus::pegasus_client::FT_MATCH_POSTFIX: { + if (filter_pattern.length() == 0) + return true; + if (value.length() < filter_pattern.length()) + return false; + if (filter_type == pegasus::pegasus_client::FT_MATCH_ANYWHERE) { + return dsn::string_view(value).find(filter_pattern) != dsn::string_view::npos; + } else if (filter_type == pegasus::pegasus_client::FT_MATCH_PREFIX) { + return ::memcmp(value.data(), filter_pattern.data(), filter_pattern.length()) == 0; + } else { // filter_type == pegasus::pegasus_client::FT_MATCH_POSTFIX + return ::memcmp(value.data() + value.length() - filter_pattern.length(), + filter_pattern.data(), + filter_pattern.length()) == 0; + } + } + default: + dassert(false, "unsupported filter type: %d", filter_type); + } + return false; +} inline void scan_data_next(scan_data_context *context) { while (!context->split_completed.load() && !context->error_occurred->load() && @@ -171,112 +215,153 @@ inline void scan_data_next(scan_data_context *context) std::string &&value, pegasus::pegasus_client::internal_info &&info) { if (ret == pegasus::PERR_OK) { - switch (context->op) { - case SCAN_COPY: - context->split_request_count++; - context->client->async_set( - hash_key, - sort_key, - value, - [context](int err, pegasus::pegasus_client::internal_info &&info) { - if (err != pegasus::PERR_OK) { - if (!context->split_completed.exchange(true)) { - fprintf(stderr, - "ERROR: split[%d] async set failed: %s\n", - context->split_id, - context->client->get_error_string(err)); - context->error_occurred->store(true); + if (context->value_filter_type == pegasus::pegasus_client::FT_NO_FILTER || + validate_filter( + context->value_filter_type, context->value_filter_pattern, value)) { + switch (context->op) { + case SCAN_COPY: + context->split_request_count++; + if (context->no_overwrite) { + auto callback = [context]( + int err, + pegasus::pegasus_client::check_and_set_results &&results, + pegasus::pegasus_client::internal_info &&info) { + if (err != pegasus::PERR_OK) { + if (!context->split_completed.exchange(true)) { + fprintf(stderr, + "ERROR: split[%d] async check and set failed: %s\n", + context->split_id, + context->client->get_error_string(err)); + context->error_occurred->store(true); + } + } else { + if (results.set_succeed) { + context->split_rows++; + } + scan_data_next(context); } - } else { - context->split_rows++; - scan_data_next(context); - } - // should put "split_request_count--" at end of the scope, - // to prevent that split_request_count becomes 0 in the middle. - context->split_request_count--; - }, - context->timeout_ms); - break; - case SCAN_CLEAR: - context->split_request_count++; - context->client->async_del( - hash_key, - sort_key, - [context](int err, pegasus::pegasus_client::internal_info &&info) { - if (err != pegasus::PERR_OK) { - if (!context->split_completed.exchange(true)) { - fprintf(stderr, - "ERROR: split[%d] async del failed: %s\n", - context->split_id, - context->client->get_error_string(err)); - context->error_occurred->store(true); + // should put "split_request_count--" at end of the scope, + // to prevent that split_request_count becomes 0 in the middle. + context->split_request_count--; + }; + pegasus::pegasus_client::check_and_set_options options; + context->client->async_check_and_set( + hash_key, + sort_key, + pegasus::pegasus_client::cas_check_type::CT_VALUE_NOT_EXIST, + "", + sort_key, + value, + options, + std::move(callback), + context->timeout_ms); + } else { + auto callback = + [context](int err, pegasus::pegasus_client::internal_info &&info) { + if (err != pegasus::PERR_OK) { + if (!context->split_completed.exchange(true)) { + fprintf(stderr, + "ERROR: split[%d] async set failed: %s\n", + context->split_id, + context->client->get_error_string(err)); + context->error_occurred->store(true); + } + } else { + context->split_rows++; + scan_data_next(context); + } + // should put "split_request_count--" at end of the scope, + // to prevent that split_request_count becomes 0 in the middle. + context->split_request_count--; + }; + context->client->async_set(hash_key, + sort_key, + value, + std::move(callback), + context->timeout_ms); + } + break; + case SCAN_CLEAR: + context->split_request_count++; + context->client->async_del( + hash_key, + sort_key, + [context](int err, pegasus::pegasus_client::internal_info &&info) { + if (err != pegasus::PERR_OK) { + if (!context->split_completed.exchange(true)) { + fprintf(stderr, + "ERROR: split[%d] async del failed: %s\n", + context->split_id, + context->client->get_error_string(err)); + context->error_occurred->store(true); + } + } else { + context->split_rows++; + scan_data_next(context); } - } else { - context->split_rows++; - scan_data_next(context); + // should put "split_request_count--" at end of the scope, + // to prevent that split_request_count becomes 0 in the middle. + context->split_request_count--; + }, + context->timeout_ms); + break; + case SCAN_COUNT: + context->split_rows++; + if (context->stat_size) { + long hash_key_size = hash_key.size(); + context->hash_key_size_histogram.Add(hash_key_size); + + long sort_key_size = sort_key.size(); + context->sort_key_size_histogram.Add(sort_key_size); + + long value_size = value.size(); + context->value_size_histogram.Add(value_size); + + long row_size = hash_key_size + sort_key_size + value_size; + context->row_size_histogram.Add(row_size); + + if (context->top_count > 0) { + context->top_rows.push( + std::move(hash_key), std::move(sort_key), row_size); } - // should put "split_request_count--" at end of the scope, - // to prevent that split_request_count becomes 0 in the middle. - context->split_request_count--; - }, - context->timeout_ms); - break; - case SCAN_COUNT: - context->split_rows++; - if (context->stat_size) { - long hash_key_size = hash_key.size(); - context->hash_key_size_histogram.Add(hash_key_size); - - long sort_key_size = sort_key.size(); - context->sort_key_size_histogram.Add(sort_key_size); - - long value_size = value.size(); - context->value_size_histogram.Add(value_size); - - long row_size = hash_key_size + sort_key_size + value_size; - context->row_size_histogram.Add(row_size); - - if (context->top_count > 0) { - context->top_rows.push( - std::move(hash_key), std::move(sort_key), row_size); } - } - if (context->count_hash_key) { - if (hash_key != context->last_hash_key) { - context->split_hash_key_count++; - context->last_hash_key = std::move(hash_key); + if (context->count_hash_key) { + if (hash_key != context->last_hash_key) { + context->split_hash_key_count++; + context->last_hash_key = std::move(hash_key); + } } - } - scan_data_next(context); - break; - case SCAN_GEN_GEO: - context->split_request_count++; - context->geoclient->async_set( - hash_key, - sort_key, - value, - [context](int err, pegasus::pegasus_client::internal_info &&info) { - if (err != pegasus::PERR_OK) { - if (!context->split_completed.exchange(true)) { - fprintf(stderr, - "ERROR: split[%d] async set failed: %s\n", - context->split_id, - context->client->get_error_string(err)); - context->error_occurred->store(true); + scan_data_next(context); + break; + case SCAN_GEN_GEO: + context->split_request_count++; + context->geoclient->async_set( + hash_key, + sort_key, + value, + [context](int err, pegasus::pegasus_client::internal_info &&info) { + if (err != pegasus::PERR_OK) { + if (!context->split_completed.exchange(true)) { + fprintf(stderr, + "ERROR: split[%d] async set failed: %s\n", + context->split_id, + context->client->get_error_string(err)); + context->error_occurred->store(true); + } + } else { + context->split_rows++; + scan_data_next(context); } - } else { - context->split_rows++; - scan_data_next(context); - } - // should put "split_request_count--" at end of the scope, - // to prevent that split_request_count becomes 0 in the middle. - context->split_request_count--; - }, - context->timeout_ms); - break; - default: - dassert(false, "op = %d", context->op); - break; + // should put "split_request_count--" at end of the scope, + // to prevent that split_request_count becomes 0 in the middle. + context->split_request_count--; + }, + context->timeout_ms); + break; + default: + dassert(false, "op = %d", context->op); + break; + } } } else if (ret == pegasus::PERR_SCAN_COMPLETE) { context->split_completed.store(true); diff --git a/src/shell/commands.h b/src/shell/commands.h index 625b4953fc..92fa00e5d3 100644 --- a/src/shell/commands.h +++ b/src/shell/commands.h @@ -2157,9 +2157,17 @@ inline bool copy_data(command_executor *e, shell_context *sc, arguments args) { static struct option long_options[] = {{"target_cluster_name", required_argument, 0, 'c'}, {"target_app_name", required_argument, 0, 'a'}, - {"max_split_count", required_argument, 0, 's'}, + {"partition", required_argument, 0, 'p'}, {"max_batch_count", required_argument, 0, 'b'}, {"timeout_ms", required_argument, 0, 't'}, + {"hash_key_filter_type", required_argument, 0, 'h'}, + {"hash_key_filter_pattern", required_argument, 0, 'x'}, + {"sort_key_filter_type", required_argument, 0, 's'}, + {"sort_key_filter_pattern", required_argument, 0, 'y'}, + {"value_filter_type", required_argument, 0, 'v'}, + {"value_filter_pattern", required_argument, 0, 'z'}, + {"no_overwrite", no_argument, 0, 'n'}, + {"no_value", no_argument, 0, 'i'}, {"geo_data", no_argument, 0, 'g'}, {0, 0, 0, 0}}; @@ -2167,15 +2175,24 @@ inline bool copy_data(command_executor *e, shell_context *sc, arguments args) std::string target_app_name; std::string target_geo_app_name; int max_split_count = 100000000; + int32_t partition = -1; int max_batch_count = 500; int timeout_ms = sc->timeout_ms; bool is_geo_data = false; + bool no_overwrite = false; + std::string hash_key_filter_type_name("no_filter"); + std::string sort_key_filter_type_name("no_filter"); + std::string value_filter_type_name("no_filter"); + pegasus::pegasus_client::filter_type value_filter_type = pegasus::pegasus_client::FT_NO_FILTER; + std::string value_filter_pattern; + pegasus::pegasus_client::scan_options options; optind = 0; while (true) { int option_index = 0; int c; - c = getopt_long(args.argc, args.argv, "c:a:s:b:t:g", long_options, &option_index); + c = getopt_long( + args.argc, args.argv, "c:a:p:b:t:h:x:s:y:v:z:nig", long_options, &option_index); if (c == -1) break; switch (c) { @@ -2186,23 +2203,75 @@ inline bool copy_data(command_executor *e, shell_context *sc, arguments args) target_app_name = optarg; target_geo_app_name = target_app_name + "_geo"; break; - case 's': - if (!dsn::buf2int32(optarg, max_split_count)) { - fprintf(stderr, "parse %s as max_split_count failed\n", optarg); + case 'p': + if (!dsn::buf2int32(optarg, partition)) { + fprintf(stderr, "ERROR: parse %s as partition failed\n", optarg); + return false; + } + if (partition < 0) { + fprintf(stderr, "ERROR: partition should be greater than 0\n"); return false; } break; case 'b': if (!dsn::buf2int32(optarg, max_batch_count)) { - fprintf(stderr, "parse %s as max_batch_count failed\n", optarg); + fprintf(stderr, "ERROR: parse %s as max_batch_count failed\n", optarg); return false; } break; case 't': if (!dsn::buf2int32(optarg, timeout_ms)) { - fprintf(stderr, "parse %s as timeout_ms failed\n", optarg); + fprintf(stderr, "ERROR: parse %s as timeout_ms failed\n", optarg); + return false; + } + break; + case 'h': + options.hash_key_filter_type = (pegasus::pegasus_client::filter_type)type_from_string( + ::dsn::apps::_filter_type_VALUES_TO_NAMES, + std::string("ft_match_") + optarg, + ::dsn::apps::filter_type::FT_NO_FILTER); + if (options.hash_key_filter_type == pegasus::pegasus_client::FT_NO_FILTER) { + fprintf(stderr, "ERROR: invalid hash_key_filter_type param\n"); + return false; + } + hash_key_filter_type_name = optarg; + break; + case 'x': + options.hash_key_filter_pattern = unescape_str(optarg); + break; + case 's': + options.sort_key_filter_type = (pegasus::pegasus_client::filter_type)type_from_string( + dsn::apps::_filter_type_VALUES_TO_NAMES, + std::string("ft_match_") + optarg, + ::dsn::apps::filter_type::FT_NO_FILTER); + if (options.sort_key_filter_type == pegasus::pegasus_client::FT_NO_FILTER) { + fprintf(stderr, "ERROR: invalid sort_key_filter_type param\n"); + return false; + } + sort_key_filter_type_name = optarg; + break; + case 'y': + options.sort_key_filter_pattern = unescape_str(optarg); + break; + case 'v': + value_filter_type = (pegasus::pegasus_client::filter_type)type_from_string( + dsn::apps::_filter_type_VALUES_TO_NAMES, + std::string("ft_match_") + optarg, + ::dsn::apps::filter_type::FT_NO_FILTER); + if (value_filter_type == pegasus::pegasus_client::FT_NO_FILTER) { + fprintf(stderr, "ERROR: invalid value_filter_type param\n"); return false; } + value_filter_type_name = optarg; + break; + case 'z': + value_filter_pattern = unescape_str(optarg); + break; + case 'n': + no_overwrite = true; + break; + case 'i': + options.no_value = true; break; case 'g': is_geo_data = true; @@ -2244,6 +2313,28 @@ inline bool copy_data(command_executor *e, shell_context *sc, arguments args) if (is_geo_data) { fprintf(stderr, "INFO: target_geo_app_name = %s\n", target_geo_app_name.c_str()); } + fprintf(stderr, + "INFO: partition = %s\n", + partition >= 0 ? boost::lexical_cast(partition).c_str() : "all"); + fprintf(stderr, "INFO: hash_key_filter_type = %s\n", hash_key_filter_type_name.c_str()); + if (options.hash_key_filter_type != pegasus::pegasus_client::FT_NO_FILTER) { + fprintf(stderr, + "INFO: hash_key_filter_pattern = \"%s\"\n", + pegasus::utils::c_escape_string(options.hash_key_filter_pattern).c_str()); + } + fprintf(stderr, "INFO: sort_key_filter_type = %s\n", sort_key_filter_type_name.c_str()); + if (options.sort_key_filter_type != pegasus::pegasus_client::FT_NO_FILTER) { + fprintf(stderr, + "INFO: sort_key_filter_pattern = \"%s\"\n", + pegasus::utils::c_escape_string(options.sort_key_filter_pattern).c_str()); + } + fprintf(stderr, "INFO: value_filter_type = %s\n", value_filter_type_name.c_str()); + if (value_filter_type != pegasus::pegasus_client::FT_NO_FILTER) { + fprintf(stderr, + "INFO: value_filter_pattern = \"%s\"\n", + pegasus::utils::c_escape_string(value_filter_pattern).c_str()); + } + fprintf(stderr, "INFO: no_value = %s\n", options.no_value ? "true" : "false"); fprintf(stderr, "INFO: max_split_count = %d\n", max_split_count); fprintf(stderr, "INFO: max_batch_count = %d\n", max_batch_count); fprintf(stderr, "INFO: timeout_ms = %d\n", timeout_ms); @@ -2279,7 +2370,6 @@ inline bool copy_data(command_executor *e, shell_context *sc, arguments args) } std::vector scanners; - pegasus::pegasus_client::scan_options options; options.timeout_ms = timeout_ms; ret = sc->pg_client->get_unordered_scanners(max_split_count, options, scanners); if (ret != pegasus::PERR_OK) { @@ -2289,8 +2379,21 @@ inline bool copy_data(command_executor *e, shell_context *sc, arguments args) delete target_geo_client; return true; } + fprintf(stderr, + "INFO: open source app scanner succeed, partition_count = %d\n", + (int)scanners.size()); + if (partition != -1) { + if (partition >= scanners.size()) { + fprintf(stderr, "ERROR: invalid partition param: %d\n", partition); + delete target_geo_client; + return true; + } + std::vector tmp_scanners; + tmp_scanners.push_back(scanners[partition]); + tmp_scanners.swap(scanners); + } int split_count = scanners.size(); - fprintf(stderr, "INFO: open source app scanner succeed, split_count = %d\n", split_count); + fprintf(stderr, "INFO: prepare scanners succeed, split_count = %d\n", split_count); std::atomic_bool error_occurred(false); std::vector contexts; @@ -2303,6 +2406,9 @@ inline bool copy_data(command_executor *e, shell_context *sc, arguments args) target_client, target_geo_client, &error_occurred); + context->set_value_filter(value_filter_type, value_filter_pattern); + if (no_overwrite) + context->set_no_overwrite(); contexts.push_back(context); dsn::tasking::enqueue(LPC_SCAN_DATA, nullptr, std::bind(scan_data_next, context)); } diff --git a/src/shell/main.cpp b/src/shell/main.cpp index 190407f00c..9ddd4c7dfe 100644 --- a/src/shell/main.cpp +++ b/src/shell/main.cpp @@ -246,8 +246,14 @@ static command_executor commands[] = { "copy_data", "copy app data", "<-c|--target_cluster_name str> <-a|--target_app_name str> " - "[-s|--max_split_count num] [-b|--max_batch_count num] [-t|--timeout_ms num] " - "[-g|--geo_data]", + "[-h|--hash_key_filter_type anywhere|prefix|postfix] " + "[-x|--hash_key_filter_pattern str] " + "[-s|--sort_key_filter_type anywhere|prefix|postfix] " + "[-y|--sort_key_filter_pattern str] " + "[-v|--value_filter_type anywhere|prefix|postfix] " + "[-z|--value_filter_pattern str] " + "[-p|--partition num] [-b|--max_batch_count num] [-t|--timeout_ms num] " + "[-g|--geo_data] [-i|--no_value] [-n|--no_overwrite]", data_operations, }, {