From 76cbe4501f153be05386a4d3725178129a4a3b3a Mon Sep 17 00:00:00 2001 From: Lai Yingchun <405403881@qq.com> Date: Sun, 29 Jul 2018 23:38:49 +0800 Subject: [PATCH] rproxy: add incr/incrby/decr/decrby (#146) * add incr/incrby/decr/decrby for redis proxy Former-commit-id: caee216d1b3c842d377b0c6086c3afa9165ed8a8 [formerly 2b92df072370bbfad7253c85a9a54b4be700f8e5] Former-commit-id: a5b375e2b944e927eb9c771b935916c74328ac07 --- src/redis_protocol/proxy/CMakeLists.txt | 6 + src/redis_protocol/proxy/config.ini | 2 +- src/redis_protocol/proxy_lib/proxy_layer.cpp | 1 + src/redis_protocol/proxy_lib/redis_parser.cpp | 106 ++++++++++++++++++ src/redis_protocol/proxy_lib/redis_parser.h | 5 + 5 files changed, 119 insertions(+), 1 deletion(-) diff --git a/src/redis_protocol/proxy/CMakeLists.txt b/src/redis_protocol/proxy/CMakeLists.txt index 1cf8cd2f27..cbf84a2812 100644 --- a/src/redis_protocol/proxy/CMakeLists.txt +++ b/src/redis_protocol/proxy/CMakeLists.txt @@ -24,4 +24,10 @@ set(MY_BINPLACES "config.ini") set(MY_BOOST_PACKAGES system filesystem) +# Avoid megabytes of warnings like: +# rdsn/thirdparty/output/include/s2/s1angle.h:288:28: error: +# optimization attribute on ‘double sin(S1Angle)’ follows +# definition but the attribute doesn’t match [-Werror=attributes] +add_definitions(-Wno-attributes) + dsn_add_executable() diff --git a/src/redis_protocol/proxy/config.ini b/src/redis_protocol/proxy/config.ini index 1ff6432a20..298fefda77 100644 --- a/src/redis_protocol/proxy/config.ini +++ b/src/redis_protocol/proxy/config.ini @@ -8,7 +8,7 @@ count = 1 [apps.proxy] name = proxy type = proxy -arguments = redis_cluster temp temp_geo +arguments = redis_cluster temp ports = 6379 pools = THREAD_POOL_DEFAULT run = true diff --git a/src/redis_protocol/proxy_lib/proxy_layer.cpp b/src/redis_protocol/proxy_lib/proxy_layer.cpp index fe2d5712f0..7482bef984 100644 --- a/src/redis_protocol/proxy_lib/proxy_layer.cpp +++ b/src/redis_protocol/proxy_lib/proxy_layer.cpp @@ -35,6 +35,7 @@ proxy_stub::proxy_stub(const proxy_session::factory &f, dsn::task_spec::get(dsn::apps::RPC_RRDB_RRDB_GET_SCANNER_ACK)->allow_inline = true; dsn::task_spec::get(dsn::apps::RPC_RRDB_RRDB_SCAN_ACK)->allow_inline = true; dsn::task_spec::get(dsn::apps::RPC_RRDB_RRDB_CLEAR_SCANNER_ACK)->allow_inline = true; + dsn::task_spec::get(dsn::apps::RPC_RRDB_RRDB_INCR_ACK)->allow_inline = true; _uri_address.assign_uri( std::string("dsn://").append(_cluster).append("/").append(_app).c_str()); diff --git a/src/redis_protocol/proxy_lib/redis_parser.cpp b/src/redis_protocol/proxy_lib/redis_parser.cpp index 9273f38046..b08f548bad 100644 --- a/src/redis_protocol/proxy_lib/redis_parser.cpp +++ b/src/redis_protocol/proxy_lib/redis_parser.cpp @@ -31,6 +31,10 @@ std::unordered_map redis_parser:: {"GEODIST", redis_parser::g_geo_dist}, {"GEORADIUS", redis_parser::g_geo_radius}, {"GEORADIUSBYMEMBER", redis_parser::g_geo_radius_by_member}, + {"INCR", redis_parser::g_incr}, + {"INCRBY", redis_parser::g_incr_by}, + {"DECR", redis_parser::g_decr}, + {"DECRBY", redis_parser::g_decr_by}, }; redis_parser::redis_call_handler redis_parser::get_handler(const char *command, unsigned int length) @@ -938,6 +942,108 @@ void redis_parser::geo_radius_by_member(message_entry &entry) hash_key, "", radius_m, count, sort_type, 2000, search_callback); } +void redis_parser::incr(message_entry &entry) { counter_internal(entry); } + +void redis_parser::incr_by(message_entry &entry) { counter_internal(entry); } + +void redis_parser::decr(message_entry &entry) { counter_internal(entry); } + +void redis_parser::decr_by(message_entry &entry) { counter_internal(entry); } + +void redis_parser::counter_internal(message_entry &entry) +{ + dassert(!entry.request.buffers.empty(), ""); + dassert(entry.request.buffers[0].length > 0, ""); + const char *command = entry.request.buffers[0].data.data(); + int64_t increment = 1; + if (strcasecmp(command, "INCR") == 0 || strcasecmp(command, "DECR") == 0) { + if (entry.request.buffers.size() != 2) { + dwarn_f("{}: command {} seqid({}) with invalid arguments count: {}", + remote_address.to_string(), + command, + entry.sequence_id, + entry.request.buffers.size()); + redis_simple_string result; + result.is_error = true; + result.message = fmt::format("ERR wrong number of arguments for '{}'", command); + reply_message(entry, result); + return; + } + } else if (strcasecmp(command, "INCRBY") == 0 || strcasecmp(command, "DECRBY") == 0) { + if (entry.request.buffers.size() != 3) { + dwarn_f("{}: command {} seqid({}) with invalid arguments count: {}", + remote_address.to_string(), + command, + entry.sequence_id, + entry.request.buffers.size()); + redis_simple_string result; + result.is_error = true; + result.message = fmt::format("ERR wrong number of arguments for '{}'", command); + reply_message(entry, result); + return; + } + if (!dsn::buf2int64(entry.request.buffers[2].data, increment)) { + dwarn_f("{}: command {} seqid({}) with invalid 'increment': {}", + remote_address.to_string(), + command, + entry.sequence_id, + entry.request.buffers[2].data.to_string()); + redis_simple_string result; + result.is_error = true; + result.message = + fmt::format("ERR wrong type of argument 'increment 'for '{}'", command); + reply_message(entry, result); + return; + } + } else { + dfatal_f("command not support: {}", command); + } + if (strncasecmp(command, "DECR", 4) == 0) { + increment = -increment; + } + + std::shared_ptr ref_this = shared_from_this(); + auto on_incr_reply = [ref_this, this, command, &entry]( + ::dsn::error_code ec, dsn_message_t, dsn_message_t response) { + if (is_session_reset.load(std::memory_order_acquire)) { + dwarn_f("{}: command {} seqid({}) got reply, but session has reset", + remote_address.to_string(), + command, + entry.sequence_id); + return; + } + + if (::dsn::ERR_OK != ec) { + dwarn_f("{}: command {} seqid({}) got reply with error = {}", + remote_address.to_string(), + command, + entry.sequence_id, + ec.to_string()); + redis_simple_string result; + result.is_error = true; + result.message = std::string("ERR ") + ec.to_string(); + reply_message(entry, result); + } else { + ::dsn::apps::incr_response incr_resp; + ::dsn::unmarshall(response, incr_resp); + if (incr_resp.error != 0) { + redis_simple_string result; + result.is_error = true; + result.message = "ERR internal error " + std::to_string(incr_resp.error); + reply_message(entry, result); + } else { + redis_integer result; + result.value = incr_resp.new_value; + reply_message(entry, result); + } + } + }; + dsn::apps::incr_request req; + pegasus_generate_key(req.key, entry.request.buffers[1].data, dsn::blob()); + req.increment = increment; + client->incr(req, on_incr_reply, std::chrono::milliseconds(2000), 0, pegasus_key_hash(req.key)); +} + void redis_parser::parse_set_parameters(const std::vector &opts, int &ttl_seconds) { diff --git a/src/redis_protocol/proxy_lib/redis_parser.h b/src/redis_protocol/proxy_lib/redis_parser.h index 75f72b2eeb..39d759b491 100644 --- a/src/redis_protocol/proxy_lib/redis_parser.h +++ b/src/redis_protocol/proxy_lib/redis_parser.h @@ -148,12 +148,17 @@ class redis_parser : public proxy_session DECLARE_REDIS_HANDLER(geo_dist) DECLARE_REDIS_HANDLER(geo_radius) DECLARE_REDIS_HANDLER(geo_radius_by_member) + DECLARE_REDIS_HANDLER(incr) + DECLARE_REDIS_HANDLER(incr_by) + DECLARE_REDIS_HANDLER(decr) + DECLARE_REDIS_HANDLER(decr_by) DECLARE_REDIS_HANDLER(default_handler) void set_internal(message_entry &entry); void set_geo_internal(message_entry &entry); void del_internal(message_entry &entry); void del_geo_internal(message_entry &entry); + void counter_internal(message_entry &entry); void parse_set_parameters(const std::vector &opts, int &ttl_seconds); void parse_geo_radius_parameters(const std::vector &opts, int base_index,