Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rproxy: add incr/incrby/decr/decrby #146

Merged
merged 22 commits into from
Jul 29, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
40be8ff
use static lib s2
acelyc111 Jul 12, 2018
cb631bb
Merge branch 'master' into master
acelyc111 Jul 13, 2018
971ce36
Merge branch 'master' into master
acelyc111 Jul 13, 2018
74d9bc3
update rdns
acelyc111 Jul 13, 2018
883f005
Merge branch 'master' of github.com:acelyc111/pegasus
acelyc111 Jul 13, 2018
7523b21
Merge branch 'master' of github.com:acelyc111/pegasus
acelyc111 Jul 14, 2018
fd9464f
Merge remote-tracking branch 'origin_xm/master'
acelyc111 Jul 16, 2018
beb3103
Merge remote-tracking branch 'origin_xm/master'
acelyc111 Jul 17, 2018
65a33cd
Merge remote-tracking branch 'origin_xm/master'
acelyc111 Jul 18, 2018
c08ea38
fix pack tools script
acelyc111 Jul 18, 2018
a253cd6
Merge branch 'master' into master
acelyc111 Jul 18, 2018
f7005eb
Merge remote-tracking branch 'origin_xm/master'
acelyc111 Jul 23, 2018
7893bc0
add incr/incrby/decr/decrby for redis proxy
acelyc111 Jul 24, 2018
25ba527
add incr/incrby/decr/decrby for redis proxy
acelyc111 Jul 24, 2018
a67d264
Merge branch 'master' into inrc_for_rproxy
acelyc111 Jul 24, 2018
fbb1b8b
fix compile error on gcc 7.3
acelyc111 Jul 25, 2018
a8139c3
Merge branch 'inrc_for_rproxy' of github.com:acelyc111/pegasus into i…
acelyc111 Jul 25, 2018
71afe73
add incr/incrby/decr/decrby for redis proxy
acelyc111 Jul 27, 2018
65c0218
log optimize
acelyc111 Jul 27, 2018
570ea00
optimize log
acelyc111 Jul 27, 2018
0b251a7
Merge branch 'master' into inrc_for_rproxy
qinzuoyan Jul 27, 2018
4b87b25
set response as inline for INCR
acelyc111 Jul 28, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions src/redis_protocol/proxy/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,10 @@ set(MY_BINPLACES "config.ini")

set(MY_BOOST_PACKAGES system filesystem)

# Avoid megabytes of warnings like:
# rdsn/thirdparty/output/include/s2/s1angle.h:288:28: error:
# optimization attribute on ‘double sin(S1Angle)’ follows
# definition but the attribute doesn’t match [-Werror=attributes]
add_definitions(-Wno-attributes)

dsn_add_executable()
2 changes: 1 addition & 1 deletion src/redis_protocol/proxy/config.ini
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ count = 1
[apps.proxy]
name = proxy
type = proxy
arguments = redis_cluster temp temp_geo
arguments = redis_cluster temp
ports = 6379
pools = THREAD_POOL_DEFAULT
run = true
Expand Down
1 change: 1 addition & 0 deletions src/redis_protocol/proxy_lib/proxy_layer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ proxy_stub::proxy_stub(const proxy_session::factory &f,
dsn::task_spec::get(dsn::apps::RPC_RRDB_RRDB_GET_SCANNER_ACK)->allow_inline = true;
dsn::task_spec::get(dsn::apps::RPC_RRDB_RRDB_SCAN_ACK)->allow_inline = true;
dsn::task_spec::get(dsn::apps::RPC_RRDB_RRDB_CLEAR_SCANNER_ACK)->allow_inline = true;
dsn::task_spec::get(dsn::apps::RPC_RRDB_RRDB_INCR_ACK)->allow_inline = true;

_uri_address.assign_uri(
std::string("dsn://").append(_cluster).append("/").append(_app).c_str());
Expand Down
106 changes: 106 additions & 0 deletions src/redis_protocol/proxy_lib/redis_parser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@ std::unordered_map<std::string, redis_parser::redis_call_handler> redis_parser::
{"GEODIST", redis_parser::g_geo_dist},
{"GEORADIUS", redis_parser::g_geo_radius},
{"GEORADIUSBYMEMBER", redis_parser::g_geo_radius_by_member},
{"INCR", redis_parser::g_incr},
{"INCRBY", redis_parser::g_incr_by},
{"DECR", redis_parser::g_decr},
{"DECRBY", redis_parser::g_decr_by},
};

redis_parser::redis_call_handler redis_parser::get_handler(const char *command, unsigned int length)
Expand Down Expand Up @@ -938,6 +942,108 @@ void redis_parser::geo_radius_by_member(message_entry &entry)
hash_key, "", radius_m, count, sort_type, 2000, search_callback);
}

void redis_parser::incr(message_entry &entry) { counter_internal(entry); }
Copy link
Contributor

@neverchanje neverchanje Jul 27, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

補充一下注釋?像其他函數一樣。

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里不用加注释,因为函数名本身就是自解释的。其他有的加了注释是因为跟redis原生命令有些出入


void redis_parser::incr_by(message_entry &entry) { counter_internal(entry); }

void redis_parser::decr(message_entry &entry) { counter_internal(entry); }

void redis_parser::decr_by(message_entry &entry) { counter_internal(entry); }

void redis_parser::counter_internal(message_entry &entry)
{
dassert(!entry.request.buffers.empty(), "");
dassert(entry.request.buffers[0].length > 0, "");
const char *command = entry.request.buffers[0].data.data();
int64_t increment = 1;
if (strcasecmp(command, "INCR") == 0 || strcasecmp(command, "DECR") == 0) {
if (entry.request.buffers.size() != 2) {
dwarn_f("{}: command {} seqid({}) with invalid arguments count: {}",
remote_address.to_string(),
command,
entry.sequence_id,
entry.request.buffers.size());
redis_simple_string result;
result.is_error = true;
result.message = fmt::format("ERR wrong number of arguments for '{}'", command);
reply_message(entry, result);
return;
}
} else if (strcasecmp(command, "INCRBY") == 0 || strcasecmp(command, "DECRBY") == 0) {
if (entry.request.buffers.size() != 3) {
dwarn_f("{}: command {} seqid({}) with invalid arguments count: {}",
remote_address.to_string(),
command,
entry.sequence_id,
entry.request.buffers.size());
redis_simple_string result;
result.is_error = true;
result.message = fmt::format("ERR wrong number of arguments for '{}'", command);
reply_message(entry, result);
return;
}
if (!dsn::buf2int64(entry.request.buffers[2].data, increment)) {
dwarn_f("{}: command {} seqid({}) with invalid 'increment': {}",
remote_address.to_string(),
command,
entry.sequence_id,
entry.request.buffers[2].data.to_string());
redis_simple_string result;
result.is_error = true;
result.message =
fmt::format("ERR wrong type of argument 'increment 'for '{}'", command);
reply_message(entry, result);
return;
}
} else {
dfatal_f("command not support: {}", command);
}
if (strncasecmp(command, "DECR", 4) == 0) {
increment = -increment;
}

std::shared_ptr<proxy_session> ref_this = shared_from_this();
auto on_incr_reply = [ref_this, this, command, &entry](
::dsn::error_code ec, dsn_message_t, dsn_message_t response) {
if (is_session_reset.load(std::memory_order_acquire)) {
dwarn_f("{}: command {} seqid({}) got reply, but session has reset",
remote_address.to_string(),
command,
entry.sequence_id);
return;
}

if (::dsn::ERR_OK != ec) {
dwarn_f("{}: command {} seqid({}) got reply with error = {}",
remote_address.to_string(),
command,
entry.sequence_id,
ec.to_string());
redis_simple_string result;
result.is_error = true;
result.message = std::string("ERR ") + ec.to_string();
reply_message(entry, result);
} else {
::dsn::apps::incr_response incr_resp;
::dsn::unmarshall(response, incr_resp);
if (incr_resp.error != 0) {
redis_simple_string result;
result.is_error = true;
result.message = "ERR internal error " + std::to_string(incr_resp.error);
reply_message(entry, result);
} else {
redis_integer result;
result.value = incr_resp.new_value;
reply_message(entry, result);
}
}
};
dsn::apps::incr_request req;
pegasus_generate_key(req.key, entry.request.buffers[1].data, dsn::blob());
req.increment = increment;
client->incr(req, on_incr_reply, std::chrono::milliseconds(2000), 0, pegasus_key_hash(req.key));
}

void redis_parser::parse_set_parameters(const std::vector<redis_bulk_string> &opts,
int &ttl_seconds)
{
Expand Down
5 changes: 5 additions & 0 deletions src/redis_protocol/proxy_lib/redis_parser.h
Original file line number Diff line number Diff line change
Expand Up @@ -148,12 +148,17 @@ class redis_parser : public proxy_session
DECLARE_REDIS_HANDLER(geo_dist)
DECLARE_REDIS_HANDLER(geo_radius)
DECLARE_REDIS_HANDLER(geo_radius_by_member)
DECLARE_REDIS_HANDLER(incr)
DECLARE_REDIS_HANDLER(incr_by)
DECLARE_REDIS_HANDLER(decr)
DECLARE_REDIS_HANDLER(decr_by)
DECLARE_REDIS_HANDLER(default_handler)

void set_internal(message_entry &entry);
void set_geo_internal(message_entry &entry);
void del_internal(message_entry &entry);
void del_geo_internal(message_entry &entry);
void counter_internal(message_entry &entry);
void parse_set_parameters(const std::vector<redis_bulk_string> &opts, int &ttl_seconds);
void parse_geo_radius_parameters(const std::vector<redis_bulk_string> &opts,
int base_index,
Expand Down