Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support slowlog command #87

Merged
merged 5 commits into from
Aug 30, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ Commands
* `DEL`: split to multiple single key `DEL`.
* `EXISTS`: split to multiple single key `EXISTS`.
* `PING`: ignored and won't be forwarded.
* `INFO`, `TIME`: won't be forwarded to backend redis, information collected in proxy
* `INFO`, `TIME`, `SLOWLOG`: won't be forwarded to backend redis, information collected in proxy
will be returned.
* `AUTH`: do authentication in proxy.

Expand Down Expand Up @@ -123,7 +123,7 @@ all backend redis instances.
* `CLUSTER`.
* `ECHO`, `QUIT`, `SELECT`.
* `BGREWRITEAOF`, `BGSAVE`, `CLIENT`, `COMMAND`, `CONFIG`, `DBSIZE`, `DEBUG`, `FLUSHALL`,
`FLUSHDB`, `LASTSAVE`, `MONITOR`, `ROLE`, `SAVE`, `SHUTDOWN`, `SLAVEOF`, `SLOWLOG`, `SYNC`.
`FLUSHDB`, `LASTSAVE`, `MONITOR`, `ROLE`, `SAVE`, `SHUTDOWN`, `SLAVEOF`, `SYNC`.

License
-------
Expand Down
13 changes: 13 additions & 0 deletions corvus.conf
Original file line number Diff line number Diff line change
Expand Up @@ -63,3 +63,16 @@ syslog 0
# to tell corvus to use the newly added slaves.
#
# read-strategy master

# Slowlog
# The following two configs are almost the same with redis.
# Every command whose lantency exceeds `slowlog-log-slower-than` will appear
# in the result of `slowlog get`.
# Note that the lantency here is the time spent in proxy,
# including redirection caused by MOVED and ASK.
# Set one of them to negative value to disable slow log.
#
# A zero value will log every command.
# slowlog-log-slower-than 10000
#
# slowlog-max-len 1024
9 changes: 9 additions & 0 deletions src/alloc.c
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#include <stdlib.h>
#include <string.h>
#include "logging.h"
#include "alloc.h"

Expand Down Expand Up @@ -39,3 +40,11 @@ void cv_free(void *ptr)
{
je_free(ptr);
}

char *cv_raw_strndup(const char *other, size_t size, const char *file, int line)
{
char *p = cv_raw_malloc(size + 1, file, line);
strncpy(p, other, size);
p[size] = '\0';
return p;
}
2 changes: 2 additions & 0 deletions src/alloc.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,12 @@
#define cv_malloc(size) cv_raw_malloc(size, __FILE__, __LINE__)
#define cv_calloc(number, size) cv_raw_calloc(number, size, __FILE__, __LINE__)
#define cv_realloc(ptr, size) cv_raw_realloc(ptr, size, __FILE__, __LINE__)
#define cv_strndup(other, size) cv_raw_strndup(other, size, __FILE__, __LINE__)

void *cv_raw_malloc(size_t size, const char *file, int line);
void *cv_raw_calloc(size_t number, size_t size, const char *file, int line);
void *cv_raw_realloc(void *ptr, size_t size, const char *file, int line);
void cv_free(void *ptr);
char *cv_raw_strndup(const char *other, size_t size, const char *file, int line);

#endif /* end of include guard: ALLOC_H */
234 changes: 222 additions & 12 deletions src/command.c
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#include <time.h>
#include <sys/time.h>
#include <unistd.h>
#include <assert.h>
#include "corvus.h"
#include "socket.h"
#include "logging.h"
Expand All @@ -16,6 +17,7 @@
#include "client.h"
#include "stats.h"
#include "alloc.h"
#include "slowlog.h"

#define CMD_RECYCLE_SIZE 1024

Expand All @@ -29,13 +31,6 @@ do { \
} \
} while (0)

enum {
CMD_UNIMPL,
CMD_BASIC,
CMD_COMPLEX,
CMD_EXTRA,
};

struct cmd_item {
char *cmd;
int value;
Expand All @@ -50,11 +45,13 @@ const char *rep_redirect_err = "-ERR Proxy redirecting error\r\n";
const char *rep_addr_err = "-ERR Proxy fail to parse server address\r\n";
const char *rep_server_err = "-ERR Proxy fail to get server\r\n";
const char *rep_timeout_err = "-ERR Proxy timed out\r\n";
const char *rep_slowlog_not_enabled = "-ERR Slowlog not enabled\r\n";

const char *rep_get = "*2\r\n$3\r\nGET\r\n";
const char *rep_set = "*3\r\n$3\r\nSET\r\n";
const char *rep_del = "*2\r\n$3\r\nDEL\r\n";
const char *rep_exists = "*2\r\n$6\r\nEXISTS\r\n";

static const char *rep_get = "*2\r\n$3\r\nGET\r\n";
static const char *rep_set = "*3\r\n$3\r\nSET\r\n";
static const char *rep_del = "*2\r\n$3\r\nDEL\r\n";
static const char *rep_exists = "*2\r\n$6\r\nEXISTS\r\n";
static const char *rep_ok = "+OK\r\n";
static const char *rep_ping = "+PONG\r\n";
static const char *rep_noauth = "-NOAUTH Authentication required.\r\n";
Expand All @@ -64,6 +61,20 @@ static const char *rep_auth_not_set = "-ERR Client sent AUTH, but no password is
static struct cmd_item cmds[] = {CMD_DO(CMD_BUILD_MAP)};
static struct dict command_map;


const char *cmd_extract_prefix(const char *prefix)
{
const char *get = "$3\r\nGET\r\n";
const char *set = "$3\r\nSET\r\n";
const char *del = "$3\r\nDEL\r\n";
const char *exists = "$6\r\nEXISTS\r\n";
return prefix == rep_get ? cv_strndup(get, strlen(get)) :
prefix == rep_set ? cv_strndup(set, strlen(set)) :
prefix == rep_del ? cv_strndup(del, strlen(del)) :
prefix == rep_exists ? cv_strndup(exists, strlen(exists)) :
NULL;
}

static inline uint8_t *cmd_get_data(struct mbuf *b, struct buf_ptr ptr[], int *len)
{
uint8_t *data;
Expand Down Expand Up @@ -524,6 +535,184 @@ int cmd_quit(struct command *cmd)
return CORVUS_OK;
}

static int cmd_parse_len(struct redis_data *data, int *result)
{
ASSERT_TYPE(data, REP_STRING);
char len_limit[data->pos.str_len + 1];
if (pos_to_str(&data->pos, len_limit) == CORVUS_ERR) {
LOG(ERROR, "parse_len: emptry arg string");
return CORVUS_ERR;
}

*result = atoi(len_limit);
if (*result <= 0) {
return CORVUS_ERR;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about strtol?


return CORVUS_OK;
}

static int cmd_slowlog_entry_cmp(const void * lhs, const void * rhs)
{
const struct slowlog_entry *e1 = *((struct slowlog_entry**)lhs);
const struct slowlog_entry *e2 = *((struct slowlog_entry**)rhs);
return e1->log_time < e2->log_time ? -1 :
e1->log_time > e2->log_time ? 1 :
e1->id < e2->id ? -1 :
e1->id > e2->id ? 1 : 0;
}

int cmd_slowlog_get(struct command *cmd, struct redis_data *data)
{
// For example 'slowlog get 128', element[2] is 128 here
int len = config.slowlog_max_len;
if (data->elements > 3) {
LOG(DEBUG, "cmd_slowlog_get: too many arguments");
return CORVUS_ERR;
} else if (data->elements == 3) {
int len_limit;
struct redis_data *len_limit_data = &data->element[2];
if (cmd_parse_len(len_limit_data, &len_limit) == CORVUS_ERR) {
return CORVUS_ERR;
}
if (len_limit == 0) {
LOG(ERROR, "cmd_slowlog_get: invalid len");
return CORVUS_ERR;
}
if (len_limit < len) {
len = len_limit;
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about data->elements < 3?

Copy link
Contributor Author

@doyoubi doyoubi Aug 25, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

data->elements < 3 means only slowlog get is sent, in this case len is setted to config.slowlog_max_len(line 564), the default max length of slowlog queue.


struct context *contexts = get_contexts();
struct slowlog_entry *entries[len];
int count = 0;
size_t queue_len = contexts[0].slowlog.capacity;
for (size_t i = 0; i != queue_len && count < len; i++) {
for (size_t j = 0; j != config.thread && count < len; j++) {
struct slowlog_queue *queue = &contexts[j].slowlog;
// slowlog_get will lock mutex
struct slowlog_entry *entry = slowlog_get(queue, i);
if (entry) {
entries[count++] = entry;
}
}
}

qsort(entries, count, sizeof(struct slowlog_entry*), cmd_slowlog_entry_cmp);

// generate redis packet
const char *hdr_fmt =
"*4\r\n"
":%lld\r\n" // id
":%lld\r\n" // log time
":%lld\r\n" // latency
"*%d\r\n"; // cmd arg len
char buf[150];

int size = snprintf(buf, sizeof buf, "*%d\r\n", count);
conn_add_data(cmd->client, (uint8_t*)buf, size,
&cmd->rep_buf[0], &cmd->rep_buf[1]);

for (size_t i = 0; i != count; i++) {
struct slowlog_entry *entry = entries[i];
assert(entry->argc > 0);
size = snprintf(buf, sizeof buf, hdr_fmt,
entry->id, entry->log_time, entry->latency, entry->argc);
assert(size < 150);
conn_add_data(cmd->client, (uint8_t*)buf, size, NULL, NULL);

for (size_t j = 0; j != entry->argc; j++) {
struct pos *arg = entry->argv + j;
conn_add_data(cmd->client, arg->str, arg->len,
NULL, &cmd->rep_buf[1]);
}
slowlog_dec_ref(entry);
}
CMD_INCREF(cmd);
cmd_mark_done(cmd);

return CORVUS_OK;
}

int cmd_slowlog_len(struct command *cmd)
{
int len = 0;
struct context *contexts = get_contexts();

for (size_t i = 0; i != config.thread; i++) {
struct slowlog_queue *queue = &contexts[i].slowlog;
for (size_t j = 0; j != queue->capacity && len < config.slowlog_max_len; j++) {
struct slowlog_entry *entry = slowlog_get(queue, j);
if (entry) {
len++;
slowlog_dec_ref(entry);
}
}
}

char buf[30];
int size = snprintf(buf, sizeof buf, ":%d\r\n", len);
conn_add_data(cmd->client, (uint8_t*)buf, size,
&cmd->rep_buf[0], &cmd->rep_buf[1]);
CMD_INCREF(cmd);
cmd_mark_done(cmd);

return CORVUS_OK;
}

int cmd_slowlog_reset(struct command *cmd)
{
struct context *contexts = get_contexts();
for (size_t i = 0; i != config.thread; i++) {
struct slowlog_queue *queue = &contexts[i].slowlog;
for (size_t j = 0; j != queue->capacity; j++) {
slowlog_set(queue, NULL);
}
}

conn_add_data(cmd->client, (uint8_t*)rep_ok, strlen(rep_ok),
&cmd->rep_buf[0], &cmd->rep_buf[1]);
CMD_INCREF(cmd);
cmd_mark_done(cmd);

return CORVUS_OK;
}

int cmd_slowlog(struct command *cmd, struct redis_data *data)
{
ASSERT_TYPE(data, REP_ARRAY);
ASSERT_ELEMENTS(data->elements >= 2, data);

if (!slowlog_enabled()) {
conn_add_data(cmd->client, (uint8_t*)rep_slowlog_not_enabled,
strlen(rep_slowlog_not_enabled),
&cmd->rep_buf[0], &cmd->rep_buf[1]);
CMD_INCREF(cmd);
cmd_mark_done(cmd);
return CORVUS_OK;
}

struct redis_data *op = &data->element[1];
ASSERT_TYPE(op, REP_STRING);

char type[op->pos.str_len + 1];
if (pos_to_str(&op->pos, type) == CORVUS_ERR) {
LOG(ERROR, "cmd_slowlog: parse error");
return CORVUS_ERR;
}

if (strcasecmp(type, "GET") == 0) {
return cmd_slowlog_get(cmd, data);
} else if (strcasecmp(type, "LEN") == 0) {
return cmd_slowlog_len(cmd);
} else if (strcasecmp(type, "RESET") == 0) {
return cmd_slowlog_reset(cmd);
}
cmd_mark_fail(cmd, rep_parse_err);
return CORVUS_OK;
}

int cmd_extra(struct command *cmd, struct redis_data *data)
{
switch (cmd->cmd_type) {
Expand All @@ -539,6 +728,8 @@ int cmd_extra(struct command *cmd, struct redis_data *data)
return cmd_time(cmd);
case CMD_QUIT:
return cmd_quit(cmd);
case CMD_SLOWLOG:
return cmd_slowlog(cmd, data);
default:
LOG(ERROR, "%s: unknown command type %d", __func__, cmd->cmd_type);
return CORVUS_ERR;
Expand Down Expand Up @@ -606,6 +797,7 @@ int cmd_parse_req(struct command *cmd, struct mbuf *buf)
cmd->keys = r->data.elements - 1;
cmd->parse_done = true;
cmd->cmd_count = 1;
cmd->data.type = REP_UNKNOWN;

memcpy(&cmd->req_buf[0], &r->start, sizeof(r->start));
memset(&r->start, 0, sizeof(r->start));
Expand All @@ -623,7 +815,14 @@ int cmd_parse_req(struct command *cmd, struct mbuf *buf)
cmd_mark_fail(cmd, rep_forward_err);
return CORVUS_OK;
}
redis_data_free(&r->data);

if (!(slowlog_enabled() && slowlog_type_need_log(cmd))) {
redis_data_free(&r->data);
return CORVUS_OK;
}
cmd->data = r->data;
memset(&r->data, 0, sizeof(struct redis_data));
r->data.type = REP_UNKNOWN; // avoid double free in conn_free
}
return CORVUS_OK;
}
Expand Down Expand Up @@ -942,6 +1141,12 @@ void cmd_stats(struct command *cmd, int64_t end_time)
} else {
latency = cmd->rep_time[1] - cmd->rep_time[0];
}

if (slowlog_need_log(cmd, latency)) {
struct slowlog_entry *entry = slowlog_create_entry(cmd, latency);
slowlog_set(&cmd->ctx->slowlog, entry);
}

ATOMIC_INC(ctx->stats.remote_latency, latency);
}

Expand Down Expand Up @@ -1025,6 +1230,11 @@ void cmd_free(struct command *cmd)
struct command *c;
struct context *ctx = cmd->ctx;

if (cmd->data.type != REP_UNKNOWN) {
redis_data_free(&cmd->data);
cmd->data.type = REP_UNKNOWN;
}

if (cmd->parent == NULL && cmd->client != NULL) {
client_range_clear(cmd->client, cmd);
}
Expand Down
Loading