Skip to content

Commit

Permalink
rproxy: add incr/incrby/decr/decrby (apache#146)
Browse files Browse the repository at this point in the history
* add incr/incrby/decr/decrby for redis proxy

Former-commit-id: caee216d1b3c842d377b0c6086c3afa9165ed8a8 [formerly 2b92df0]
Former-commit-id: a5b375e2b944e927eb9c771b935916c74328ac07
  • Loading branch information
acelyc111 authored Jul 29, 2018
1 parent 58801f3 commit 76cbe45
Show file tree
Hide file tree
Showing 5 changed files with 119 additions and 1 deletion.
6 changes: 6 additions & 0 deletions src/redis_protocol/proxy/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,10 @@ set(MY_BINPLACES "config.ini")

set(MY_BOOST_PACKAGES system filesystem)

# Avoid megabytes of warnings like:
# rdsn/thirdparty/output/include/s2/s1angle.h:288:28: error:
# optimization attribute on ‘double sin(S1Angle)’ follows
# definition but the attribute doesn’t match [-Werror=attributes]
add_definitions(-Wno-attributes)

dsn_add_executable()
2 changes: 1 addition & 1 deletion src/redis_protocol/proxy/config.ini
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ count = 1
[apps.proxy]
name = proxy
type = proxy
arguments = redis_cluster temp temp_geo
arguments = redis_cluster temp
ports = 6379
pools = THREAD_POOL_DEFAULT
run = true
Expand Down
1 change: 1 addition & 0 deletions src/redis_protocol/proxy_lib/proxy_layer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ proxy_stub::proxy_stub(const proxy_session::factory &f,
dsn::task_spec::get(dsn::apps::RPC_RRDB_RRDB_GET_SCANNER_ACK)->allow_inline = true;
dsn::task_spec::get(dsn::apps::RPC_RRDB_RRDB_SCAN_ACK)->allow_inline = true;
dsn::task_spec::get(dsn::apps::RPC_RRDB_RRDB_CLEAR_SCANNER_ACK)->allow_inline = true;
dsn::task_spec::get(dsn::apps::RPC_RRDB_RRDB_INCR_ACK)->allow_inline = true;

_uri_address.assign_uri(
std::string("dsn://").append(_cluster).append("/").append(_app).c_str());
Expand Down
106 changes: 106 additions & 0 deletions src/redis_protocol/proxy_lib/redis_parser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@ std::unordered_map<std::string, redis_parser::redis_call_handler> redis_parser::
{"GEODIST", redis_parser::g_geo_dist},
{"GEORADIUS", redis_parser::g_geo_radius},
{"GEORADIUSBYMEMBER", redis_parser::g_geo_radius_by_member},
{"INCR", redis_parser::g_incr},
{"INCRBY", redis_parser::g_incr_by},
{"DECR", redis_parser::g_decr},
{"DECRBY", redis_parser::g_decr_by},
};

redis_parser::redis_call_handler redis_parser::get_handler(const char *command, unsigned int length)
Expand Down Expand Up @@ -938,6 +942,108 @@ void redis_parser::geo_radius_by_member(message_entry &entry)
hash_key, "", radius_m, count, sort_type, 2000, search_callback);
}

void redis_parser::incr(message_entry &entry) { counter_internal(entry); }

void redis_parser::incr_by(message_entry &entry) { counter_internal(entry); }

void redis_parser::decr(message_entry &entry) { counter_internal(entry); }

void redis_parser::decr_by(message_entry &entry) { counter_internal(entry); }

void redis_parser::counter_internal(message_entry &entry)
{
dassert(!entry.request.buffers.empty(), "");
dassert(entry.request.buffers[0].length > 0, "");
const char *command = entry.request.buffers[0].data.data();
int64_t increment = 1;
if (strcasecmp(command, "INCR") == 0 || strcasecmp(command, "DECR") == 0) {
if (entry.request.buffers.size() != 2) {
dwarn_f("{}: command {} seqid({}) with invalid arguments count: {}",
remote_address.to_string(),
command,
entry.sequence_id,
entry.request.buffers.size());
redis_simple_string result;
result.is_error = true;
result.message = fmt::format("ERR wrong number of arguments for '{}'", command);
reply_message(entry, result);
return;
}
} else if (strcasecmp(command, "INCRBY") == 0 || strcasecmp(command, "DECRBY") == 0) {
if (entry.request.buffers.size() != 3) {
dwarn_f("{}: command {} seqid({}) with invalid arguments count: {}",
remote_address.to_string(),
command,
entry.sequence_id,
entry.request.buffers.size());
redis_simple_string result;
result.is_error = true;
result.message = fmt::format("ERR wrong number of arguments for '{}'", command);
reply_message(entry, result);
return;
}
if (!dsn::buf2int64(entry.request.buffers[2].data, increment)) {
dwarn_f("{}: command {} seqid({}) with invalid 'increment': {}",
remote_address.to_string(),
command,
entry.sequence_id,
entry.request.buffers[2].data.to_string());
redis_simple_string result;
result.is_error = true;
result.message =
fmt::format("ERR wrong type of argument 'increment 'for '{}'", command);
reply_message(entry, result);
return;
}
} else {
dfatal_f("command not support: {}", command);
}
if (strncasecmp(command, "DECR", 4) == 0) {
increment = -increment;
}

std::shared_ptr<proxy_session> ref_this = shared_from_this();
auto on_incr_reply = [ref_this, this, command, &entry](
::dsn::error_code ec, dsn_message_t, dsn_message_t response) {
if (is_session_reset.load(std::memory_order_acquire)) {
dwarn_f("{}: command {} seqid({}) got reply, but session has reset",
remote_address.to_string(),
command,
entry.sequence_id);
return;
}

if (::dsn::ERR_OK != ec) {
dwarn_f("{}: command {} seqid({}) got reply with error = {}",
remote_address.to_string(),
command,
entry.sequence_id,
ec.to_string());
redis_simple_string result;
result.is_error = true;
result.message = std::string("ERR ") + ec.to_string();
reply_message(entry, result);
} else {
::dsn::apps::incr_response incr_resp;
::dsn::unmarshall(response, incr_resp);
if (incr_resp.error != 0) {
redis_simple_string result;
result.is_error = true;
result.message = "ERR internal error " + std::to_string(incr_resp.error);
reply_message(entry, result);
} else {
redis_integer result;
result.value = incr_resp.new_value;
reply_message(entry, result);
}
}
};
dsn::apps::incr_request req;
pegasus_generate_key(req.key, entry.request.buffers[1].data, dsn::blob());
req.increment = increment;
client->incr(req, on_incr_reply, std::chrono::milliseconds(2000), 0, pegasus_key_hash(req.key));
}

void redis_parser::parse_set_parameters(const std::vector<redis_bulk_string> &opts,
int &ttl_seconds)
{
Expand Down
5 changes: 5 additions & 0 deletions src/redis_protocol/proxy_lib/redis_parser.h
Original file line number Diff line number Diff line change
Expand Up @@ -148,12 +148,17 @@ class redis_parser : public proxy_session
DECLARE_REDIS_HANDLER(geo_dist)
DECLARE_REDIS_HANDLER(geo_radius)
DECLARE_REDIS_HANDLER(geo_radius_by_member)
DECLARE_REDIS_HANDLER(incr)
DECLARE_REDIS_HANDLER(incr_by)
DECLARE_REDIS_HANDLER(decr)
DECLARE_REDIS_HANDLER(decr_by)
DECLARE_REDIS_HANDLER(default_handler)

void set_internal(message_entry &entry);
void set_geo_internal(message_entry &entry);
void del_internal(message_entry &entry);
void del_geo_internal(message_entry &entry);
void counter_internal(message_entry &entry);
void parse_set_parameters(const std::vector<redis_bulk_string> &opts, int &ttl_seconds);
void parse_geo_radius_parameters(const std::vector<redis_bulk_string> &opts,
int base_index,
Expand Down

0 comments on commit 76cbe45

Please sign in to comment.