-
Notifications
You must be signed in to change notification settings - Fork 143
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support slowlog command #87
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -6,6 +6,7 @@ | |
#include <time.h> | ||
#include <sys/time.h> | ||
#include <unistd.h> | ||
#include <assert.h> | ||
#include "corvus.h" | ||
#include "socket.h" | ||
#include "logging.h" | ||
|
@@ -16,6 +17,7 @@ | |
#include "client.h" | ||
#include "stats.h" | ||
#include "alloc.h" | ||
#include "slowlog.h" | ||
|
||
#define CMD_RECYCLE_SIZE 1024 | ||
|
||
|
@@ -29,13 +31,6 @@ do { \ | |
} \ | ||
} while (0) | ||
|
||
enum { | ||
CMD_UNIMPL, | ||
CMD_BASIC, | ||
CMD_COMPLEX, | ||
CMD_EXTRA, | ||
}; | ||
|
||
struct cmd_item { | ||
char *cmd; | ||
int value; | ||
|
@@ -50,11 +45,13 @@ const char *rep_redirect_err = "-ERR Proxy redirecting error\r\n"; | |
const char *rep_addr_err = "-ERR Proxy fail to parse server address\r\n"; | ||
const char *rep_server_err = "-ERR Proxy fail to get server\r\n"; | ||
const char *rep_timeout_err = "-ERR Proxy timed out\r\n"; | ||
const char *rep_slowlog_not_enabled = "-ERR Slowlog not enabled\r\n"; | ||
|
||
const char *rep_get = "*2\r\n$3\r\nGET\r\n"; | ||
const char *rep_set = "*3\r\n$3\r\nSET\r\n"; | ||
const char *rep_del = "*2\r\n$3\r\nDEL\r\n"; | ||
const char *rep_exists = "*2\r\n$6\r\nEXISTS\r\n"; | ||
|
||
static const char *rep_get = "*2\r\n$3\r\nGET\r\n"; | ||
static const char *rep_set = "*3\r\n$3\r\nSET\r\n"; | ||
static const char *rep_del = "*2\r\n$3\r\nDEL\r\n"; | ||
static const char *rep_exists = "*2\r\n$6\r\nEXISTS\r\n"; | ||
static const char *rep_ok = "+OK\r\n"; | ||
static const char *rep_ping = "+PONG\r\n"; | ||
static const char *rep_noauth = "-NOAUTH Authentication required.\r\n"; | ||
|
@@ -64,6 +61,20 @@ static const char *rep_auth_not_set = "-ERR Client sent AUTH, but no password is | |
static struct cmd_item cmds[] = {CMD_DO(CMD_BUILD_MAP)}; | ||
static struct dict command_map; | ||
|
||
|
||
const char *cmd_extract_prefix(const char *prefix) | ||
{ | ||
const char *get = "$3\r\nGET\r\n"; | ||
const char *set = "$3\r\nSET\r\n"; | ||
const char *del = "$3\r\nDEL\r\n"; | ||
const char *exists = "$6\r\nEXISTS\r\n"; | ||
return prefix == rep_get ? cv_strndup(get, strlen(get)) : | ||
prefix == rep_set ? cv_strndup(set, strlen(set)) : | ||
prefix == rep_del ? cv_strndup(del, strlen(del)) : | ||
prefix == rep_exists ? cv_strndup(exists, strlen(exists)) : | ||
NULL; | ||
} | ||
|
||
static inline uint8_t *cmd_get_data(struct mbuf *b, struct buf_ptr ptr[], int *len) | ||
{ | ||
uint8_t *data; | ||
|
@@ -524,6 +535,184 @@ int cmd_quit(struct command *cmd) | |
return CORVUS_OK; | ||
} | ||
|
||
static int cmd_parse_len(struct redis_data *data, int *result) | ||
{ | ||
ASSERT_TYPE(data, REP_STRING); | ||
char len_limit[data->pos.str_len + 1]; | ||
if (pos_to_str(&data->pos, len_limit) == CORVUS_ERR) { | ||
LOG(ERROR, "parse_len: emptry arg string"); | ||
return CORVUS_ERR; | ||
} | ||
|
||
*result = atoi(len_limit); | ||
if (*result <= 0) { | ||
return CORVUS_ERR; | ||
} | ||
|
||
return CORVUS_OK; | ||
} | ||
|
||
static int cmd_slowlog_entry_cmp(const void * lhs, const void * rhs) | ||
{ | ||
const struct slowlog_entry *e1 = *((struct slowlog_entry**)lhs); | ||
const struct slowlog_entry *e2 = *((struct slowlog_entry**)rhs); | ||
return e1->log_time < e2->log_time ? -1 : | ||
e1->log_time > e2->log_time ? 1 : | ||
e1->id < e2->id ? -1 : | ||
e1->id > e2->id ? 1 : 0; | ||
} | ||
|
||
int cmd_slowlog_get(struct command *cmd, struct redis_data *data) | ||
{ | ||
// For example 'slowlog get 128', element[2] is 128 here | ||
int len = config.slowlog_max_len; | ||
if (data->elements > 3) { | ||
LOG(DEBUG, "cmd_slowlog_get: too many arguments"); | ||
return CORVUS_ERR; | ||
} else if (data->elements == 3) { | ||
int len_limit; | ||
struct redis_data *len_limit_data = &data->element[2]; | ||
if (cmd_parse_len(len_limit_data, &len_limit) == CORVUS_ERR) { | ||
return CORVUS_ERR; | ||
} | ||
if (len_limit == 0) { | ||
LOG(ERROR, "cmd_slowlog_get: invalid len"); | ||
return CORVUS_ERR; | ||
} | ||
if (len_limit < len) { | ||
len = len_limit; | ||
} | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What about There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
|
||
struct context *contexts = get_contexts(); | ||
struct slowlog_entry *entries[len]; | ||
int count = 0; | ||
size_t queue_len = contexts[0].slowlog.capacity; | ||
for (size_t i = 0; i != queue_len && count < len; i++) { | ||
for (size_t j = 0; j != config.thread && count < len; j++) { | ||
struct slowlog_queue *queue = &contexts[j].slowlog; | ||
// slowlog_get will lock mutex | ||
struct slowlog_entry *entry = slowlog_get(queue, i); | ||
if (entry) { | ||
entries[count++] = entry; | ||
} | ||
} | ||
} | ||
|
||
qsort(entries, count, sizeof(struct slowlog_entry*), cmd_slowlog_entry_cmp); | ||
|
||
// generate redis packet | ||
const char *hdr_fmt = | ||
"*4\r\n" | ||
":%lld\r\n" // id | ||
":%lld\r\n" // log time | ||
":%lld\r\n" // latency | ||
"*%d\r\n"; // cmd arg len | ||
char buf[150]; | ||
|
||
int size = snprintf(buf, sizeof buf, "*%d\r\n", count); | ||
conn_add_data(cmd->client, (uint8_t*)buf, size, | ||
&cmd->rep_buf[0], &cmd->rep_buf[1]); | ||
|
||
for (size_t i = 0; i != count; i++) { | ||
struct slowlog_entry *entry = entries[i]; | ||
assert(entry->argc > 0); | ||
size = snprintf(buf, sizeof buf, hdr_fmt, | ||
entry->id, entry->log_time, entry->latency, entry->argc); | ||
assert(size < 150); | ||
conn_add_data(cmd->client, (uint8_t*)buf, size, NULL, NULL); | ||
|
||
for (size_t j = 0; j != entry->argc; j++) { | ||
struct pos *arg = entry->argv + j; | ||
conn_add_data(cmd->client, arg->str, arg->len, | ||
NULL, &cmd->rep_buf[1]); | ||
} | ||
slowlog_dec_ref(entry); | ||
} | ||
CMD_INCREF(cmd); | ||
cmd_mark_done(cmd); | ||
|
||
return CORVUS_OK; | ||
} | ||
|
||
int cmd_slowlog_len(struct command *cmd) | ||
{ | ||
int len = 0; | ||
struct context *contexts = get_contexts(); | ||
|
||
for (size_t i = 0; i != config.thread; i++) { | ||
struct slowlog_queue *queue = &contexts[i].slowlog; | ||
for (size_t j = 0; j != queue->capacity && len < config.slowlog_max_len; j++) { | ||
struct slowlog_entry *entry = slowlog_get(queue, j); | ||
if (entry) { | ||
len++; | ||
slowlog_dec_ref(entry); | ||
} | ||
} | ||
} | ||
|
||
char buf[30]; | ||
int size = snprintf(buf, sizeof buf, ":%d\r\n", len); | ||
conn_add_data(cmd->client, (uint8_t*)buf, size, | ||
&cmd->rep_buf[0], &cmd->rep_buf[1]); | ||
CMD_INCREF(cmd); | ||
cmd_mark_done(cmd); | ||
|
||
return CORVUS_OK; | ||
} | ||
|
||
int cmd_slowlog_reset(struct command *cmd) | ||
{ | ||
struct context *contexts = get_contexts(); | ||
for (size_t i = 0; i != config.thread; i++) { | ||
struct slowlog_queue *queue = &contexts[i].slowlog; | ||
for (size_t j = 0; j != queue->capacity; j++) { | ||
slowlog_set(queue, NULL); | ||
} | ||
} | ||
|
||
conn_add_data(cmd->client, (uint8_t*)rep_ok, strlen(rep_ok), | ||
&cmd->rep_buf[0], &cmd->rep_buf[1]); | ||
CMD_INCREF(cmd); | ||
cmd_mark_done(cmd); | ||
|
||
return CORVUS_OK; | ||
} | ||
|
||
int cmd_slowlog(struct command *cmd, struct redis_data *data) | ||
{ | ||
ASSERT_TYPE(data, REP_ARRAY); | ||
ASSERT_ELEMENTS(data->elements >= 2, data); | ||
|
||
if (!slowlog_enabled()) { | ||
conn_add_data(cmd->client, (uint8_t*)rep_slowlog_not_enabled, | ||
strlen(rep_slowlog_not_enabled), | ||
&cmd->rep_buf[0], &cmd->rep_buf[1]); | ||
CMD_INCREF(cmd); | ||
cmd_mark_done(cmd); | ||
return CORVUS_OK; | ||
} | ||
|
||
struct redis_data *op = &data->element[1]; | ||
ASSERT_TYPE(op, REP_STRING); | ||
|
||
char type[op->pos.str_len + 1]; | ||
if (pos_to_str(&op->pos, type) == CORVUS_ERR) { | ||
LOG(ERROR, "cmd_slowlog: parse error"); | ||
return CORVUS_ERR; | ||
} | ||
|
||
if (strcasecmp(type, "GET") == 0) { | ||
return cmd_slowlog_get(cmd, data); | ||
} else if (strcasecmp(type, "LEN") == 0) { | ||
return cmd_slowlog_len(cmd); | ||
} else if (strcasecmp(type, "RESET") == 0) { | ||
return cmd_slowlog_reset(cmd); | ||
} | ||
cmd_mark_fail(cmd, rep_parse_err); | ||
return CORVUS_OK; | ||
} | ||
|
||
int cmd_extra(struct command *cmd, struct redis_data *data) | ||
{ | ||
switch (cmd->cmd_type) { | ||
|
@@ -539,6 +728,8 @@ int cmd_extra(struct command *cmd, struct redis_data *data) | |
return cmd_time(cmd); | ||
case CMD_QUIT: | ||
return cmd_quit(cmd); | ||
case CMD_SLOWLOG: | ||
return cmd_slowlog(cmd, data); | ||
default: | ||
LOG(ERROR, "%s: unknown command type %d", __func__, cmd->cmd_type); | ||
return CORVUS_ERR; | ||
|
@@ -606,6 +797,7 @@ int cmd_parse_req(struct command *cmd, struct mbuf *buf) | |
cmd->keys = r->data.elements - 1; | ||
cmd->parse_done = true; | ||
cmd->cmd_count = 1; | ||
cmd->data.type = REP_UNKNOWN; | ||
|
||
memcpy(&cmd->req_buf[0], &r->start, sizeof(r->start)); | ||
memset(&r->start, 0, sizeof(r->start)); | ||
|
@@ -623,7 +815,14 @@ int cmd_parse_req(struct command *cmd, struct mbuf *buf) | |
cmd_mark_fail(cmd, rep_forward_err); | ||
return CORVUS_OK; | ||
} | ||
redis_data_free(&r->data); | ||
|
||
if (!(slowlog_enabled() && slowlog_type_need_log(cmd))) { | ||
redis_data_free(&r->data); | ||
return CORVUS_OK; | ||
} | ||
cmd->data = r->data; | ||
memset(&r->data, 0, sizeof(struct redis_data)); | ||
r->data.type = REP_UNKNOWN; // avoid double free in conn_free | ||
} | ||
return CORVUS_OK; | ||
} | ||
|
@@ -942,6 +1141,12 @@ void cmd_stats(struct command *cmd, int64_t end_time) | |
} else { | ||
latency = cmd->rep_time[1] - cmd->rep_time[0]; | ||
} | ||
|
||
if (slowlog_need_log(cmd, latency)) { | ||
struct slowlog_entry *entry = slowlog_create_entry(cmd, latency); | ||
slowlog_set(&cmd->ctx->slowlog, entry); | ||
} | ||
|
||
ATOMIC_INC(ctx->stats.remote_latency, latency); | ||
} | ||
|
||
|
@@ -1025,6 +1230,11 @@ void cmd_free(struct command *cmd) | |
struct command *c; | ||
struct context *ctx = cmd->ctx; | ||
|
||
if (cmd->data.type != REP_UNKNOWN) { | ||
redis_data_free(&cmd->data); | ||
cmd->data.type = REP_UNKNOWN; | ||
} | ||
|
||
if (cmd->parent == NULL && cmd->client != NULL) { | ||
client_range_clear(cmd->client, cmd); | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about strtol?