Skip to content

Commit

Permalink
server: add check_and_mutate() interface and implementation (#161)
Browse files Browse the repository at this point in the history
  • Loading branch information
HuangWei authored and qinzuoyan committed Aug 17, 2018
1 parent 931d17b commit 6476639
Show file tree
Hide file tree
Showing 26 changed files with 3,960 additions and 323 deletions.
2 changes: 1 addition & 1 deletion rdsn
Submodule rdsn updated 44 files
+1 −4 CMakeLists.txt
+7 −0 bin/dsn.cmake
+4 −0 include/dsn/dist/replication/meta_service_app.h
+4 −0 include/dsn/dist/replication/replication_service_app.h
+88 −0 include/dsn/tool-api/http_server.h
+3 −0 include/dsn/tool-api/rpc_address.h
+15 −1 include/dsn/tool-api/rpc_message.h
+3 −0 include/dsn/utility/strings.h
+6 −1 run.sh
+8 −0 scripts/linux/build.sh
+1 −1 src/apps/skv/CMakeLists.txt
+1 −0 src/core/CMakeLists.txt
+16 −0 src/core/core/rpc_address.cpp
+0 −16 src/core/core/rpc_message.cpp
+28 −0 src/core/core/strings.cpp
+1 −1 src/core/tests/CMakeLists.txt
+4 −4 src/core/tests/fail_point_test.cpp
+43 −0 src/core/tests/http_server_test.cpp
+1 −0 src/core/tools/CMakeLists.txt
+0 −444 src/core/tools/common/http_message_parser.cpp
+0 −3 src/core/tools/common/providers.common.cpp
+9 −0 src/core/tools/http/CMakeLists.txt
+181 −0 src/core/tools/http/http_message_parser.cpp
+22 −34 src/core/tools/http/http_message_parser.h
+0 −0 src/core/tools/http/http_parser.c
+0 −0 src/core/tools/http/http_parser.h
+131 −0 src/core/tools/http/http_server.cpp
+31 −0 src/core/tools/http/root_http_service.h
+256 −149 src/dist/block_service/local/local_service.cpp
+7 −0 src/dist/block_service/local/local_service.h
+1 −1 src/dist/nfs/test/CMakeLists.txt
+1 −1 src/dist/replication/common/replication_common.cpp
+1 −0 src/dist/replication/lib/replica.cpp
+4 −0 src/dist/replication/lib/replica_context.cpp
+7 −2 src/dist/replication/lib/replica_context.h
+10 −7 src/dist/replication/lib/replica_learn.cpp
+1 −0 src/dist/replication/lib/replica_stub.h
+4 −1 src/dist/replication/lib/replication_service_app.cpp
+3 −1 src/dist/replication/meta_server/meta_service_app.cpp
+1 −1 src/dist/replication/test/meta_test/balancer_simulator/CMakeLists.txt
+1 −1 src/dist/replication/test/meta_test/unit_test/CMakeLists.txt
+1 −1 src/dist/replication/test/replica_test/unit_test/CMakeLists.txt
+1 −5 src/dist/replication/test/simple_kv/CMakeLists.txt
+1 −1 src/tests/dsn/CMakeLists.txt
3 changes: 3 additions & 0 deletions src/base/pegasus_rpc_types.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,7 @@ using incr_rpc = dsn::rpc_holder<dsn::apps::incr_request, dsn::apps::incr_respon
using check_and_set_rpc =
dsn::rpc_holder<dsn::apps::check_and_set_request, dsn::apps::check_and_set_response>;

using check_and_mutate_rpc =
dsn::rpc_holder<dsn::apps::check_and_mutate_request, dsn::apps::check_and_mutate_response>;

} // namespace pegasus
1,225 changes: 942 additions & 283 deletions src/base/rrdb_types.cpp

Large diffs are not rendered by default.

129 changes: 127 additions & 2 deletions src/client_lib/pegasus_client_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -876,6 +876,14 @@ void pegasus_client_impl::async_check_and_set(const std::string &hash_key,
return;
}

if (dsn::apps::_cas_check_type_VALUES_TO_NAMES.find(check_type) ==
dsn::apps::_cas_check_type_VALUES_TO_NAMES.end()) {
derror("invalid check type: %d", (int)check_type);
if (callback != nullptr)
callback(PERR_INVALID_ARGUMENT, check_and_set_results(), internal_info());
return;
}

::dsn::apps::check_and_set_request req;
req.hash_key.assign(hash_key.c_str(), 0, hash_key.size());
req.check_sort_key.assign(check_sort_key.c_str(), 0, check_sort_key.size());
Expand Down Expand Up @@ -938,6 +946,123 @@ void pegasus_client_impl::async_check_and_set(const std::string &hash_key,
partition_hash);
}

int pegasus_client_impl::check_and_mutate(const std::string &hash_key,
const std::string &check_sort_key,
cas_check_type check_type,
const std::string &check_operand,
const mutations &mutations,
const check_and_mutate_options &options,
check_and_mutate_results &results,
int timeout_milliseconds,
internal_info *info)
{
::dsn::utils::notify_event op_completed;
int ret = -1;
auto callback = [&](int _err, check_and_mutate_results &&_results, internal_info &&_info) {
ret = _err;
results = std::move(_results);
if (info != nullptr)
(*info) = std::move(_info);
op_completed.notify();
};
async_check_and_mutate(hash_key,
check_sort_key,
check_type,
check_operand,
mutations,
options,
std::move(callback),
timeout_milliseconds);
op_completed.wait();
return ret;
}

void pegasus_client_impl::async_check_and_mutate(const std::string &hash_key,
const std::string &check_sort_key,
cas_check_type check_type,
const std::string &check_operand,
const mutations &mutations,
const check_and_mutate_options &options,
async_check_and_mutate_callback_t &&callback,
int timeout_milliseconds)
{
// check params
if (hash_key.size() >= UINT16_MAX) {
derror("invalid hash key: hash key length should be less than UINT16_MAX, but %d",
(int)hash_key.size());
if (callback != nullptr)
callback(PERR_INVALID_HASH_KEY, check_and_mutate_results(), internal_info());
return;
}

if (dsn::apps::_cas_check_type_VALUES_TO_NAMES.find(check_type) ==
dsn::apps::_cas_check_type_VALUES_TO_NAMES.end()) {
derror("invalid check type: %d", (int)check_type);
if (callback != nullptr)
callback(PERR_INVALID_ARGUMENT, check_and_mutate_results(), internal_info());
return;
}
if (mutations.is_empty()) {
derror("invalid mutations: mutations should not be empty.");
if (callback != nullptr)
callback(PERR_INVALID_ARGUMENT, check_and_mutate_results(), internal_info());
return;
}

::dsn::apps::check_and_mutate_request req;
req.hash_key.assign(hash_key.c_str(), 0, hash_key.size());
req.check_sort_key.assign(check_sort_key.c_str(), 0, check_sort_key.size());
req.check_type = (dsn::apps::cas_check_type::type)check_type;
req.check_operand.assign(check_operand.c_str(), 0, check_operand.size());
mutations.get_mutations(req.mutate_list);
req.return_check_value = options.return_check_value;

::dsn::blob tmp_key;
pegasus_generate_key(tmp_key, req.hash_key, ::dsn::blob());
auto partition_hash = pegasus_key_hash(tmp_key);
auto new_callback = [user_callback = std::move(callback)](
::dsn::error_code err, dsn_message_t req, dsn_message_t resp)
{
if (user_callback == nullptr) {
return;
}
check_and_mutate_results results;
internal_info info;
::dsn::apps::check_and_mutate_response response;
if (err == ::dsn::ERR_OK) {
::dsn::unmarshall(resp, response);
if (response.error == 0) {
results.mutate_succeed = true;
} else if (response.error == 13) { // kTryAgain
results.mutate_succeed = false;
response.error = 0;
} else {
results.mutate_succeed = false;
}
if (response.check_value_returned) {
results.check_value_returned = true;
if (response.check_value_exist) {
results.check_value_exist = true;
results.check_value.assign(response.check_value.data(),
response.check_value.length());
}
}
info.app_id = response.app_id;
info.partition_index = response.partition_index;
info.decree = response.decree;
info.server = response.server;
}
int ret =
get_client_error(err == ERR_OK ? get_rocksdb_server_error(response.error) : int(err));
user_callback(ret, std::move(results), std::move(info));
};
_client->check_and_mutate(req,
std::move(new_callback),
std::chrono::milliseconds(timeout_milliseconds),
0,
partition_hash);
}

int pegasus_client_impl::ttl(const std::string &hash_key,
const std::string &sort_key,
int &ttl_seconds,
Expand Down Expand Up @@ -1171,5 +1296,5 @@ const char *pegasus_client_impl::get_error_string(int error_code) const
{
return (rocskdb_error == 0) ? 0 : ROCSKDB_ERROR_START - rocskdb_error;
}
}
} // namespace
} // namespace client
} // namespace pegasus
23 changes: 21 additions & 2 deletions src/client_lib/pegasus_client_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,25 @@ class pegasus_client_impl : public pegasus_client
async_check_and_set_callback_t &&callback = nullptr,
int timeout_milliseconds = 5000) override;

virtual int check_and_mutate(const std::string &hash_key,
const std::string &check_sort_key,
cas_check_type check_type,
const std::string &check_operand,
const mutations &mutations,
const check_and_mutate_options &options,
check_and_mutate_results &results,
int timeout_milliseconds = 5000,
internal_info *info = nullptr) override;

virtual void async_check_and_mutate(const std::string &hash_key,
const std::string &check_sort_key,
cas_check_type check_type,
const std::string &check_operand,
const mutations &mutations,
const check_and_mutate_options &options,
async_check_and_mutate_callback_t &&callback = nullptr,
int timeout_milliseconds = 5000) override;

virtual int ttl(const std::string &hashkey,
const std::string &sortkey,
int &ttl_seconds,
Expand Down Expand Up @@ -300,5 +319,5 @@ class pegasus_client_impl : public pegasus_client
///
static std::unordered_map<int, int> _server_error_to_client;
};
}
} // namespace
} // namespace client
} // namespace pegasus
39 changes: 39 additions & 0 deletions src/idl/rrdb.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,12 @@ enum cas_check_type
CT_VALUE_INT_GREATER // int compare: value > operand
}

enum mutate_operation
{
MO_PUT,
MO_DELETE
}

struct update_request
{
1:dsn.blob key;
Expand Down Expand Up @@ -181,6 +187,38 @@ struct check_and_set_response
8:string server;
}

struct mutate
{
1:mutate_operation operation;
2:dsn.blob sort_key;
3:dsn.blob value; // set null if operation is MO_DELETE
4:i32 set_expire_ts_seconds; // set 0 if operation is MO_DELETE
}

struct check_and_mutate_request
{
1:dsn.blob hash_key;
2:dsn.blob check_sort_key;
3:cas_check_type check_type;
4:dsn.blob check_operand;
5:list<mutate> mutate_list;
6:bool return_check_value;
}

struct check_and_mutate_response
{
1:i32 error; // return kTryAgain if check not passed.
// return kInvalidArgument if check type is int compare and
// check_operand/check_value is not integer or out of range.
2:bool check_value_returned;
3:bool check_value_exist; // used only if check_value_returned is true
4:dsn.blob check_value; // used only if check_value_returned and check_value_exist is true
5:i32 app_id;
6:i32 partition_index;
7:i64 decree;
8:string server;
}

struct get_scanner_request
{
1:dsn.blob start_key;
Expand Down Expand Up @@ -218,6 +256,7 @@ service rrdb
multi_remove_response multi_remove(1:multi_remove_request request);
incr_response incr(1:incr_request request);
check_and_set_response check_and_set(1:check_and_set_request request);
check_and_mutate_response check_and_mutate(1:check_and_mutate_request request);
read_response get(1:dsn.blob key);
multi_get_response multi_get(1:multi_get_request request);
count_response sortkey_count(1:dsn.blob hash_key);
Expand Down
Loading

0 comments on commit 6476639

Please sign in to comment.