From 09e33834d05571b239fea254b186b52e6dde85ee Mon Sep 17 00:00:00 2001 From: doyoubi Date: Wed, 17 Aug 2016 06:27:51 +0000 Subject: [PATCH 1/5] Add mbuf helper function --- src/mbuf.c | 46 ++++++++++++++++++++++++++++ src/mbuf.h | 2 ++ tests/corvus_test.c | 2 ++ tests/test_mbuf.c | 73 +++++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 123 insertions(+) create mode 100644 tests/test_mbuf.c diff --git a/src/mbuf.c b/src/mbuf.c index 006b0ba..ffb9357 100644 --- a/src/mbuf.c +++ b/src/mbuf.c @@ -124,6 +124,52 @@ void mbuf_range_clear(struct context *ctx, struct buf_ptr ptr[]) memset(&ptr[1], 0, sizeof(struct buf_ptr)); } +uint32_t mbuf_range_len(struct buf_ptr ptr[2]) +{ + uint32_t len = 0; + struct mbuf *b = ptr[0].buf; + while (true) { + uint8_t *start = b->start; + uint8_t *end = b->end; + if (b == ptr[0].buf) { + start = ptr[0].pos; + } + if (b == ptr[1].buf) { + end = ptr[1].pos; + } + len += (end - start); + if (b == ptr[1].buf) + break; + b = TAILQ_NEXT(b, next); + } + return len; +} + +size_t mbuf_range_copy(uint8_t *dest, struct buf_ptr ptr[2], size_t max_len) +{ + struct mbuf *b = ptr[0].buf; + size_t len = 0; + while (len < max_len) { + uint8_t *start = b->start; + uint8_t *end = b->end; + if (b == ptr[0].buf) { + start = ptr[0].pos; + } + if (b == ptr[1].buf) { + end = ptr[1].pos; + } + if (end - start > max_len - len) { + end = start + max_len - len; + } + memcpy(dest + len, start, end - start); + len += (end - start); + if (b == ptr[1].buf) + break; + b = TAILQ_NEXT(b, next); + } + return len; +} + void mbuf_decref(struct context *ctx, struct mbuf **bufs, int n) { for (int i = 0; i < n; i++) { diff --git a/src/mbuf.h b/src/mbuf.h index eb0a539..5a12f1f 100644 --- a/src/mbuf.h +++ b/src/mbuf.h @@ -50,6 +50,8 @@ uint32_t mbuf_read_size(struct mbuf *); uint32_t mbuf_write_size(struct mbuf *); void mbuf_destroy(struct context *ctx); void mbuf_range_clear(struct context *ctx, struct buf_ptr ptr[]); +uint32_t mbuf_range_len(struct buf_ptr ptr[]); +size_t mbuf_range_copy(uint8_t *dest, struct buf_ptr ptr[], size_t max_len); void mbuf_decref(struct context *ctx, struct mbuf **bufs, int n); void buf_time_append(struct context *ctx, struct buf_time_tqh *queue, struct mbuf *buf, int64_t read_time); diff --git a/tests/corvus_test.c b/tests/corvus_test.c index 6a722c3..07751e7 100644 --- a/tests/corvus_test.c +++ b/tests/corvus_test.c @@ -64,6 +64,7 @@ extern TEST_CASE(test_client); extern TEST_CASE(test_timer); extern TEST_CASE(test_config); extern TEST_CASE(test_stats); +extern TEST_CASE(test_mbuf); int main(int argc, const char *argv[]) { @@ -95,6 +96,7 @@ int main(int argc, const char *argv[]) RUN_CASE(test_timer); RUN_CASE(test_config); RUN_CASE(test_stats); + RUN_CASE(test_mbuf); usleep(10000); slot_create_job(SLOT_UPDATER_QUIT); diff --git a/tests/test_mbuf.c b/tests/test_mbuf.c new file mode 100644 index 0000000..6b1d9af --- /dev/null +++ b/tests/test_mbuf.c @@ -0,0 +1,73 @@ +#include "test.h" +#include "slowlog.h" +#include "alloc.h" + +void init_mbuf_for_test(struct mbuf *b, size_t len) { + uint8_t *p = cv_malloc(len); + b->start = p; + b->end = p + len; + b->pos = p; + b->last = p; +} + +TEST(test_mbuf_range_func) { + struct mhdr q; + TAILQ_INIT(&q); + struct buf_ptr ptr[2]; + struct mbuf b1, b2, b3; + init_mbuf_for_test(&b1, 10); + init_mbuf_for_test(&b2, 10); + init_mbuf_for_test(&b3, 10); + TAILQ_INSERT_TAIL(&q, &b1, next); + TAILQ_INSERT_TAIL(&q, &b2, next); + TAILQ_INSERT_TAIL(&q, &b3, next); + uint8_t *tmp = NULL; + + // one buf + uint8_t *a = b1.start + 3; + memcpy(a, "abcde", 5); + ptr[0].buf = &b1; + ptr[0].pos = a; + ptr[1].buf = &b1; + ptr[1].pos = b1.start + 8; + ASSERT(mbuf_range_len(ptr) == 5); + tmp = cv_calloc(10, 1); + ASSERT(mbuf_range_copy(tmp, ptr, 10) == 5); + ASSERT(memcmp(tmp, "abcde", 5) == 0); + ASSERT(tmp[5] == 0); + cv_free(tmp); + + // two buf + memcpy(a, "abcdefg", 7); + memcpy(b2.start, "12345", 5); + ptr[1].buf = &b2; + ptr[1].pos = b2.start + 5; + ASSERT(mbuf_range_len(ptr) == 7 + 5); + tmp = cv_calloc(20, 1); + ASSERT(mbuf_range_copy(tmp, ptr, 20) == 12); + ASSERT(memcmp(tmp, "abcdefg12345", 12) == 0); + ASSERT(tmp[12] == 0); + cv_free(tmp); + + // three buf + memcpy(b2.start, "0123456789", 10); + memcpy(b3.start, "crvs", 4); + ptr[1].buf = &b3; + ptr[1].pos = b3.start + 4; + ASSERT(mbuf_range_len(ptr) == 7 + 10 + 4); + tmp = cv_calloc(30, 1); + ASSERT(mbuf_range_copy(tmp, ptr, 30) == 21); + ASSERT(memcmp(tmp, "abcdefg0123456789crvs", 21) == 0); + ASSERT(tmp[21] == 0); + cv_free(tmp); + + cv_free(b1.start); + cv_free(b2.start); + cv_free(b3.start); + + PASS(NULL); +} + +TEST_CASE(test_mbuf) { + RUN_TEST(test_mbuf_range_func); +} From 931cf586a46af9221065af6c85022dea36db1e47 Mon Sep 17 00:00:00 2001 From: doyoubi Date: Fri, 19 Aug 2016 11:33:17 +0000 Subject: [PATCH 2/5] Add slowlog --- src/alloc.c | 9 +++ src/alloc.h | 2 + src/command.c | 171 +++++++++++++++++++++++++++++++++++++++++-- src/command.h | 6 ++ src/corvus.c | 14 ++++ src/corvus.h | 5 ++ src/slowlog.c | 126 +++++++++++++++++++++++++++++++ src/slowlog.h | 46 ++++++++++++ tests/corvus_test.c | 2 + tests/test_mbuf.c | 1 - tests/test_slowlog.c | 134 +++++++++++++++++++++++++++++++++ 11 files changed, 510 insertions(+), 6 deletions(-) create mode 100644 src/slowlog.c create mode 100644 src/slowlog.h create mode 100644 tests/test_slowlog.c diff --git a/src/alloc.c b/src/alloc.c index a43cfd2..cf02509 100644 --- a/src/alloc.c +++ b/src/alloc.c @@ -1,4 +1,5 @@ #include +#include #include "logging.h" #include "alloc.h" @@ -39,3 +40,11 @@ void cv_free(void *ptr) { je_free(ptr); } + +char *cv_raw_strndup(const char *other, size_t size, const char *file, int line) +{ + char *p = cv_raw_malloc(size + 1, file, line); + strncpy(p, other, size); + p[size] = '\0'; + return p; +} diff --git a/src/alloc.h b/src/alloc.h index c502926..1598a51 100644 --- a/src/alloc.h +++ b/src/alloc.h @@ -15,10 +15,12 @@ #define cv_malloc(size) cv_raw_malloc(size, __FILE__, __LINE__) #define cv_calloc(number, size) cv_raw_calloc(number, size, __FILE__, __LINE__) #define cv_realloc(ptr, size) cv_raw_realloc(ptr, size, __FILE__, __LINE__) +#define cv_strndup(other, size) cv_raw_strndup(other, size, __FILE__, __LINE__) void *cv_raw_malloc(size_t size, const char *file, int line); void *cv_raw_calloc(size_t number, size_t size, const char *file, int line); void *cv_raw_realloc(void *ptr, size_t size, const char *file, int line); void cv_free(void *ptr); +char *cv_raw_strndup(const char *other, size_t size, const char *file, int line); #endif /* end of include guard: ALLOC_H */ diff --git a/src/command.c b/src/command.c index ead9b0e..88ca561 100644 --- a/src/command.c +++ b/src/command.c @@ -6,6 +6,7 @@ #include #include #include +#include #include "corvus.h" #include "socket.h" #include "logging.h" @@ -16,6 +17,7 @@ #include "client.h" #include "stats.h" #include "alloc.h" +#include "slowlog.h" #define CMD_RECYCLE_SIZE 1024 @@ -51,10 +53,11 @@ const char *rep_addr_err = "-ERR Proxy fail to parse server address\r\n"; const char *rep_server_err = "-ERR Proxy fail to get server\r\n"; const char *rep_timeout_err = "-ERR Proxy timed out\r\n"; -static const char *rep_get = "*2\r\n$3\r\nGET\r\n"; -static const char *rep_set = "*3\r\n$3\r\nSET\r\n"; -static const char *rep_del = "*2\r\n$3\r\nDEL\r\n"; -static const char *rep_exists = "*2\r\n$6\r\nEXISTS\r\n"; +const char *rep_get = "*2\r\n$3\r\nGET\r\n"; +const char *rep_set = "*3\r\n$3\r\nSET\r\n"; +const char *rep_del = "*2\r\n$3\r\nDEL\r\n"; +const char *rep_exists = "*2\r\n$6\r\nEXISTS\r\n"; + static const char *rep_ok = "+OK\r\n"; static const char *rep_ping = "+PONG\r\n"; static const char *rep_noauth = "-NOAUTH Authentication required.\r\n"; @@ -64,6 +67,20 @@ static const char *rep_auth_not_set = "-ERR Client sent AUTH, but no password is static struct cmd_item cmds[] = {CMD_DO(CMD_BUILD_MAP)}; static struct dict command_map; + +const char *cmd_extract_prefix(const char *prefix) +{ + const char *get = "$3\r\nGET\r\n"; + const char *set = "$3\r\nSET\r\n"; + const char *del = "$3\r\nDEL\r\n"; + const char *exists = "$6\r\nEXISTS\r\n"; + return prefix == rep_get ? cv_strndup(get, strlen(get)) : + prefix == rep_set ? cv_strndup(set, strlen(set)) : + prefix == rep_del ? cv_strndup(del, strlen(del)) : + prefix == rep_exists ? cv_strndup(exists, strlen(exists)) : + NULL; +} + static inline uint8_t *cmd_get_data(struct mbuf *b, struct buf_ptr ptr[], int *len) { uint8_t *data; @@ -524,6 +541,127 @@ int cmd_quit(struct command *cmd) return CORVUS_OK; } +static int parse_len(struct redis_data *data, int *result) +{ + ASSERT_TYPE(data, REP_STRING); + char len_limit[data->pos.str_len + 1]; + if (pos_to_str(&data->pos, len_limit) == CORVUS_ERR) { + LOG(ERROR, "parse_len: emptry arg string"); + return CORVUS_ERR; + } + + int v = 0; + for (size_t i = 0; i != data->pos.str_len; i++) { + char c = len_limit[i]; + if (c < '0' || c > '9') { + LOG(ERROR, "parse_len: char not between 0-9"); + return CORVUS_ERR; + } + v += (10 * v + (c - '0')); + } + *result = v; + + return CORVUS_OK; +} + +int cmd_slowlog_get(struct command *cmd, struct redis_data *data) +{ + // For example 'slowlog get 128', element[2] is 128 here + int len = config.slowlog_max_len; + if (data->elements > 3) { + LOG(DEBUG, "cmd_slowlog_get: too many arguments"); + return CORVUS_ERR; + } else if (data->elements == 3) { + int len_limit; + struct redis_data *len_limit_data = &data->element[2]; + if (parse_len(len_limit_data, &len_limit) == CORVUS_ERR) { + return CORVUS_ERR; + } + if (len_limit == 0) { + LOG(ERROR, "cmd_slowlog_get: invalid len"); + return CORVUS_ERR; + } + if (len_limit < len) { + len = len_limit; + } + } + + struct context *contexts = get_contexts(); + struct slowlog_entry *entries[len]; + int count = 0; + for (size_t i = 0; i != config.thread; i++) { + struct slowlog_queue *queue = &contexts[i].slowlog; + for (size_t j = 0; j != queue->len && count < len; j++) { + // slowlog_get will lock mutex + struct slowlog_entry *entry = slowlog_get(queue, j); + if (entry == NULL) + break; // NULL starts from here + entries[count++] = entry; + } + } + + // generate redis packet + const char *hdr_fmt = + "*4\r\n" + ":%lld\r\n" // id + ":%lld\r\n" // log time + ":%lld\r\n" // latency + "*%d\r\n"; // cmd arg len + char buf[150]; + + int size = snprintf(buf, sizeof buf, "*%d\r\n", count); + assert(size < 150); + conn_add_data(cmd->client, (uint8_t*)buf, size, + &cmd->rep_buf[0], &cmd->rep_buf[1]); + + for (size_t i = 0; i != count; i++) { + struct slowlog_entry *entry = entries[i]; + assert(entry->argc > 0); + size = snprintf(buf, sizeof buf, hdr_fmt, + entry->id, entry->log_time, entry->latency, entry->argc); + conn_add_data(cmd->client, (uint8_t*)buf, size, NULL, NULL); + + for (size_t j = 0; j != entry->argc; j++) { + struct pos *arg = entry->argv + j; + conn_add_data(cmd->client, arg->str, arg->len, + NULL, &cmd->rep_buf[1]); + } + slowlog_dec_ref(entry); + } + CMD_INCREF(cmd); + cmd_mark_done(cmd); + + return CORVUS_OK; +} + +int cmd_slowlog_len(struct command *cmd) { return CORVUS_ERR; } +int cmd_slowlog_reset(struct command *cmd) { return CORVUS_ERR; } + +int cmd_slowlog(struct command *cmd, struct redis_data *data) +{ + ASSERT_TYPE(data, REP_ARRAY); + ASSERT_ELEMENTS(data->elements >= 2, data); + + struct redis_data *op = &data->element[1]; + ASSERT_TYPE(op, REP_STRING); + + char type[op->pos.str_len + 1]; + if (pos_to_str(&op->pos, type) == CORVUS_ERR) { + LOG(ERROR, "cmd_slowlog: parse error"); + return CORVUS_ERR; + } + + if (strcasecmp(type, "GET") == 0) { + return cmd_slowlog_get(cmd, data); + } else if (strcasecmp(type, "LEN") == 0) { + return cmd_slowlog_len(cmd); + } else if (strcasecmp(type, "RESET") == 0) { + return cmd_slowlog_reset(cmd); + } + cmd_mark_fail(cmd, rep_parse_err); + return CORVUS_OK; +} + int cmd_extra(struct command *cmd, struct redis_data *data) { switch (cmd->cmd_type) { @@ -539,6 +677,8 @@ int cmd_extra(struct command *cmd, struct redis_data *data) return cmd_time(cmd); case CMD_QUIT: return cmd_quit(cmd); + case CMD_SLOWLOG: + return cmd_slowlog(cmd, data); default: LOG(ERROR, "%s: unknown command type %d", __func__, cmd->cmd_type); return CORVUS_ERR; @@ -606,6 +746,7 @@ int cmd_parse_req(struct command *cmd, struct mbuf *buf) cmd->keys = r->data.elements - 1; cmd->parse_done = true; cmd->cmd_count = 1; + cmd->data.type = REP_UNKNOWN; memcpy(&cmd->req_buf[0], &r->start, sizeof(r->start)); memset(&r->start, 0, sizeof(r->start)); @@ -623,7 +764,15 @@ int cmd_parse_req(struct command *cmd, struct mbuf *buf) cmd_mark_fail(cmd, rep_forward_err); return CORVUS_OK; } - redis_data_free(&r->data); + + if (cmd->request_type == CMD_EXTRA + || cmd->request_type == CMD_UNIMPL) { + redis_data_free(&r->data); + return CORVUS_OK; + } + cmd->data = r->data; + memset(&r->data, 0, sizeof(struct redis_data)); + r->data.type = REP_UNKNOWN; // avoid double free in conn_free } return CORVUS_OK; } @@ -942,6 +1091,13 @@ void cmd_stats(struct command *cmd, int64_t end_time) } else { latency = cmd->rep_time[1] - cmd->rep_time[0]; } + + if (cmd->request_type != CMD_EXTRA + && cmd->request_type != CMD_UNIMPL) { + struct slowlog_entry *entry = slowlog_create_entry(cmd, latency); + slowlog_set(&cmd->ctx->slowlog, entry); + } + ATOMIC_INC(ctx->stats.remote_latency, latency); } @@ -1025,6 +1181,11 @@ void cmd_free(struct command *cmd) struct command *c; struct context *ctx = cmd->ctx; + if (cmd->data.type != REP_UNKNOWN) { + redis_data_free(&cmd->data); + cmd->data.type = REP_UNKNOWN; + } + if (cmd->parent == NULL && cmd->client != NULL) { client_range_clear(cmd->client, cmd); } diff --git a/src/command.h b/src/command.h index 7054f47..ca585ba 100644 --- a/src/command.h +++ b/src/command.h @@ -143,6 +143,7 @@ HANDLER(PING, EXTRA, UNKNOWN) \ HANDLER(INFO, EXTRA, UNKNOWN) \ HANDLER(PROXY, EXTRA, UNKNOWN) \ + HANDLER(SLOWLOG, EXTRA, UNKNOWN) \ HANDLER(QUIT, EXTRA, UNKNOWN) \ HANDLER(SELECT, UNIMPL, UNKNOWN) \ HANDLER(TIME, EXTRA, UNKNOWN) @@ -223,6 +224,8 @@ struct command { bool parse_done; bool stale; bool cmd_fail; + + struct redis_data data; /* for slowlog */ }; struct redirect_info { @@ -240,6 +243,8 @@ const char *rep_err, *rep_server_err, *rep_timeout_err; +const char *rep_get, *rep_set, *rep_del, *rep_exists; + void cmd_map_init(); void cmd_map_destroy(); struct command *cmd_create(struct context *ctx); @@ -257,5 +262,6 @@ void cmd_iov_reset(struct iov_data *iov); void cmd_iov_clear(struct context *ctx, struct iov_data *iov); void cmd_iov_free(struct iov_data *iov); void cmd_free(struct command *cmd); +const char *cmd_extract_prefix(const char *prefix); #endif /* end of include guard: COMMAND_H */ diff --git a/src/corvus.c b/src/corvus.c index 34c4f69..0096d32 100644 --- a/src/corvus.c +++ b/src/corvus.c @@ -38,6 +38,7 @@ void config_init() config.bufsize = DEFAULT_BUFSIZE; config.requirepass = NULL; config.readslave = config.readmasterslave = false; + config.slowlog_max_len = 128; memset(config.statsd_addr, 0, sizeof(config.statsd_addr)); config.metric_interval = 10; @@ -316,6 +317,8 @@ void context_init(struct context *ctx) STAILQ_INIT(&ctx->free_conn_infoq); TAILQ_INIT(&ctx->servers); TAILQ_INIT(&ctx->conns); + + ctx->slowlog.len = 0; // for non worker threads } void build_contexts() @@ -346,6 +349,10 @@ void context_free(struct context *ctx) } dict_free(&ctx->server_table); + /* slowlog */ + if (ctx->slowlog.len > 0) + slowlog_free(&ctx->slowlog); + /* mbuf queue */ mbuf_destroy(ctx); @@ -400,6 +407,13 @@ void *main_loop(void *data) { struct context *ctx = data; + if (config.slowlog_max_len > 0) { + if (slowlog_init(&ctx->slowlog) == CORVUS_ERR) { + LOG(ERROR, "Fatal: fail to init slowlog."); + exit(EXIT_FAILURE); + } + } + if (event_init(&ctx->loop, 1024) == -1) { LOG(ERROR, "Fatal: fail to create event loop."); exit(EXIT_FAILURE); diff --git a/src/corvus.h b/src/corvus.h index aff1bd8..d9fff95 100644 --- a/src/corvus.h +++ b/src/corvus.h @@ -12,6 +12,7 @@ #include "stats.h" #include "dict.h" #include "event.h" +#include "slowlog.h" #define VERSION "0.2.4" @@ -86,6 +87,9 @@ struct context { struct basic_stats stats; struct memory_stats mstats; long long last_command_latency; + + /* slowlog */ + struct slowlog_queue slowlog; }; struct { @@ -104,6 +108,7 @@ struct { int64_t client_timeout; int64_t server_timeout; int bufsize; + int slowlog_max_len; } config; int64_t get_time(); diff --git a/src/slowlog.c b/src/slowlog.c new file mode 100644 index 0000000..75a58a9 --- /dev/null +++ b/src/slowlog.c @@ -0,0 +1,126 @@ +#include +#include + +#include "alloc.h" +#include "corvus.h" +#include "slowlog.h" +#include "logging.h" + +// use 32bit to guarantee slowlog id will not overflow in slowlog_entry.id +static uint32_t slowlog_id = 0; +static inline size_t min(size_t a, size_t b) { return a < b ? a : b; } + + +int slowlog_init(struct slowlog_queue *slowlog) +{ + size_t queue_len = 1 + (config.slowlog_max_len - 1) / config.thread; // round up + slowlog->len = queue_len; + slowlog->entries = cv_calloc(queue_len, sizeof(struct slowlog_entry)); + slowlog->entry_locks = cv_malloc(queue_len * sizeof(pthread_mutex_t)); + slowlog->curr = 0; + int err; + for (size_t i = 0; i != queue_len; i++) { + if ((err = pthread_mutex_init(slowlog->entry_locks + i, NULL)) != 0) { + LOG(ERROR, "pthread_mutex_init: %s", strerror(err)); + return CORVUS_ERR; + } + } + return CORVUS_OK; +} + +// Should be called only after all worker threads have stopped +void slowlog_free(struct slowlog_queue *slowlog) +{ + for (size_t i = 0; i != slowlog->len; i++) { + // no other worker threads will read entry queue any more + if (slowlog->entries[i] != NULL) { + slowlog_dec_ref(slowlog->entries[i]); + } + pthread_mutex_destroy(slowlog->entry_locks + i); + } + cv_free(slowlog->entries); + cv_free(slowlog->entry_locks); +} + +struct slowlog_entry *slowlog_create_entry(struct command *cmd, int64_t latency) +{ + struct slowlog_entry *entry = cv_calloc(1, sizeof(struct slowlog_entry)); + entry->id = ATOMIC_INC(slowlog_id, 1); + entry->log_time = time(NULL); // redis also uses time(NULL); + entry->latency = latency; + entry->refcount = 1; + + const char *fmt = "(%zd bytes)"; + char tmp_buf[18]; // strlen("4294967295") + strlen("(bytes)") + 1 + + assert(cmd->data.elements > 0); + size_t argc = min(cmd->data.elements, SLOWLOG_ENTRY_MAX_ARGC); + entry->argc = argc; + size_t i = 0; + + // for sub cmd + if (cmd->prefix) { + const char *c = cmd_extract_prefix(cmd->prefix); + assert(c != NULL); + entry->argv[i].len = strlen(c); + entry->argv[i++].str = (uint8_t*)c; + entry->argc = min(argc + 1, SLOWLOG_ENTRY_MAX_ARGC); + argc = min(argc, SLOWLOG_ENTRY_MAX_ARGC - 1); + } + size_t max_len = SLOWLOG_ENTRY_MAX_STRING; + for (size_t j = 0; j != argc; i++, j++) { + struct redis_data *arg = cmd->data.element + j; + size_t real_len = mbuf_range_len(arg->buf); + size_t len = min(real_len, max_len); + uint8_t *buf = cv_malloc(len); // will be freed by the last owner + mbuf_range_copy(buf, arg->buf, len); + if (real_len > max_len) { + int postfix_len = snprintf(tmp_buf, sizeof tmp_buf, fmt, real_len); + memcpy(buf + max_len - postfix_len, tmp_buf, postfix_len); + } + entry->argv[i].str = buf; + entry->argv[i].len = len; + } + + return entry; +} + +void slowlog_dec_ref(struct slowlog_entry *entry) +{ + int refcount = ATOMIC_DEC(entry->refcount, 1); + assert(refcount >= 0); + if (refcount > 0) return; + + for (size_t i = 0; i != entry->argc; i++) { + uint8_t *buf = entry->argv[i].str; + cv_free(buf); + } + cv_free(entry); +} + +void slowlog_set(struct slowlog_queue *queue, struct slowlog_entry *entry) +{ + size_t curr = queue->curr; + // TODO: remove lock or atomic here + pthread_mutex_lock(queue->entry_locks + curr); + struct slowlog_entry *old_entry = ATOMIC_IGET(queue->entries[curr], entry); + pthread_mutex_unlock(queue->entry_locks + curr); + queue->curr = (curr + 1) % queue->len; + if (old_entry != NULL) { + slowlog_dec_ref(old_entry); + } +} + +struct slowlog_entry *slowlog_get(struct slowlog_queue *queue, size_t index) +{ + // TODO: remove lock or atomic here + pthread_mutex_lock(queue->entry_locks + index); + struct slowlog_entry *entry = ATOMIC_GET(queue->entries[index]); + if (entry == NULL) { + pthread_mutex_unlock(queue->entry_locks + index); + return NULL; + } + ATOMIC_INC(entry->refcount, 1); + pthread_mutex_unlock(queue->entry_locks + index); + return entry; +} diff --git a/src/slowlog.h b/src/slowlog.h new file mode 100644 index 0000000..9f01adb --- /dev/null +++ b/src/slowlog.h @@ -0,0 +1,46 @@ +#ifndef SLOWLOG_H +#define SLOWLOG_H + +#include + +#include "mbuf.h" +#include "parser.h" + + +#define SLOWLOG_ENTRY_MAX_ARGC 32 +#define SLOWLOG_ENTRY_MAX_STRING 128 + +// Same with slowlog format of redis. +// Note that slowlog_entry will be created from one thread +// and also held by mutliple reader threads executing 'slowlog get' command. +// It will be freed by the last owner thread. +struct slowlog_entry { + long long id; + long long log_time; + long long latency; + int refcount; + int argc; + struct pos argv[SLOWLOG_ENTRY_MAX_ARGC]; +}; + +struct slowlog_queue { + struct slowlog_entry **entries; + pthread_mutex_t *entry_locks; + size_t len; + size_t curr; +}; + +struct context; +struct command; + +int slowlog_init(struct slowlog_queue *slowlog); +void slowlog_free(struct slowlog_queue *slowlog); + +// only called by the thread who creates the log +struct slowlog_entry *slowlog_create_entry(struct command *cmd, int64_t latency); +// called by all worker threads +void slowlog_set(struct slowlog_queue *queue, struct slowlog_entry *entry); +void slowlog_dec_ref(struct slowlog_entry *entry); +struct slowlog_entry *slowlog_get(struct slowlog_queue *queue, size_t index); + +#endif diff --git a/tests/corvus_test.c b/tests/corvus_test.c index 07751e7..38733fa 100644 --- a/tests/corvus_test.c +++ b/tests/corvus_test.c @@ -65,6 +65,7 @@ extern TEST_CASE(test_timer); extern TEST_CASE(test_config); extern TEST_CASE(test_stats); extern TEST_CASE(test_mbuf); +extern TEST_CASE(test_slowlog); int main(int argc, const char *argv[]) { @@ -97,6 +98,7 @@ int main(int argc, const char *argv[]) RUN_CASE(test_config); RUN_CASE(test_stats); RUN_CASE(test_mbuf); + RUN_CASE(test_slowlog); usleep(10000); slot_create_job(SLOT_UPDATER_QUIT); diff --git a/tests/test_mbuf.c b/tests/test_mbuf.c index 6b1d9af..156116a 100644 --- a/tests/test_mbuf.c +++ b/tests/test_mbuf.c @@ -1,5 +1,4 @@ #include "test.h" -#include "slowlog.h" #include "alloc.h" void init_mbuf_for_test(struct mbuf *b, size_t len) { diff --git a/tests/test_slowlog.c b/tests/test_slowlog.c new file mode 100644 index 0000000..9517230 --- /dev/null +++ b/tests/test_slowlog.c @@ -0,0 +1,134 @@ +#include "test.h" +#include "slowlog.h" +#include "alloc.h" + + +TEST(test_slowlog_create_entry) { + struct mbuf buf; + struct command cmd; + struct reader r = {0}; + const char cmd_data[] = "*3\r\n$3\r\nSET\r\n$4\r\nkey1\r\n$6\r\nvalue1\r\n"; + char *args[] = { + "$3\r\nSET\r\n", + "$4\r\nkey1\r\n", + "$6\r\nvalue1\r\n", + }; + const size_t args_len[] = {strlen(args[0]), strlen(args[1]), strlen(args[2])}; + buf.pos = (uint8_t*)cmd_data; + buf.last = (uint8_t*)cmd_data + strlen(cmd_data); + reader_init(&r); + reader_feed(&r, &buf); + ASSERT(parse(&r, MODE_REQ) != -1); + cmd.data = r.data; + cmd.prefix = NULL; + + struct slowlog_entry *entry = slowlog_create_entry(&cmd, 233666); + ASSERT(entry->latency == 233666); + ASSERT(entry->refcount == 1); + ASSERT(entry->argc == 3); + ASSERT(entry->argv[0].len == args_len[0]); + ASSERT(entry->argv[1].len == args_len[1]); + ASSERT(entry->argv[2].len == args_len[2]); + ASSERT(strncmp(args[0], entry->argv[0].str, args_len[0]) == 0); + ASSERT(strncmp(args[1], entry->argv[1].str, args_len[1]) == 0); + ASSERT(strncmp(args[2], entry->argv[2].str, args_len[2]) == 0); + slowlog_dec_ref(entry); + + redis_data_free(&cmd.data); + PASS(NULL); +} + +TEST(test_slowlog_create_entry_with_prefix) { + struct command cmd; + char *args[] = { + "$3\r\nSET\r\n", + "$4\r\nkey2\r\n", + "$6\r\nvalue2\r\n", + }; + const size_t args_len[] = {strlen(args[0]), strlen(args[1]), strlen(args[2])}; + struct pos key = {args[1], args_len[1]}; + struct pos value = {args[2], args_len[2]}; + struct pos_array key_pos = { + .items = &key, .pos_len = key.len, .pos_len = 1, .max_pos_size = 1 + }; + struct pos_array value_pos = { + .items = &value, .pos_len = value.len, .pos_len = 1, .max_pos_size = 1 + }; + struct mbuf buf1 = { + .pos = args[1], .last = args[1] + args_len[1], .start = args[1], .end = args[1] + }; + struct mbuf buf2 = { + .pos = args[2], .last = args[2] + args_len[2], .start = args[2], .end = args[2] + }; + struct redis_data element[2] = { + { + .pos = key_pos, .buf = { + { .buf = &buf1, .pos = buf1.start }, + { .buf = &buf1, .pos = buf1.last } + } + }, + { + .pos = value_pos, .buf = { + { .buf = &buf2, .pos = buf2.start }, + { .buf = &buf2, .pos = buf2.last } + } + } + }; + cmd.data.elements = 2; + cmd.data.element = element; + cmd.prefix = (char*)rep_set; + + struct slowlog_entry *entry = slowlog_create_entry(&cmd, 9394); + ASSERT(entry->latency == 9394); + ASSERT(entry->refcount == 1); + ASSERT(entry->argc == 3); + ASSERT(entry->argv[0].len == args_len[0]); + ASSERT(entry->argv[1].len == args_len[1]); + ASSERT(entry->argv[2].len == args_len[2]); + ASSERT(strncmp(entry->argv[0].str, args[0], args_len[0]) == 0); + ASSERT(strncmp(entry->argv[1].str, args[1], args_len[1]) == 0); + ASSERT(strncmp(entry->argv[2].str, args[2], args_len[2]) == 0); + slowlog_dec_ref(entry); + + PASS(NULL); +} + +TEST(test_entry_get_set) { + config.slowlog_max_len = 2; + config.thread = 1; + struct slowlog_queue q; + slowlog_init(&q); + + struct slowlog_entry e1, e2, e3; + e3.refcount = 2; + e3.argc = 0; + e1 = e2 = e3; + + ASSERT(q.curr == 0); + slowlog_set(&q, &e1); + ASSERT(q.curr == 1); + slowlog_set(&q, &e2); + ASSERT(q.curr == 0); + ASSERT(e1.refcount == 2); + ASSERT(e2.refcount == 2); + ASSERT(q.entries[0] == &e1); + ASSERT(q.entries[1] == &e2); + struct slowlog_entry *e4 = slowlog_get(&q, 0); + ASSERT(e1.refcount == 3); + ASSERT(&e1 == e4); + slowlog_set(&q, &e3); + ASSERT(q.curr == 1); + ASSERT(q.entries[0] == &e3); + ASSERT(e1.refcount == 2); + slowlog_dec_ref(e4); + ASSERT(e1.refcount == 1); + + slowlog_free(&q); + PASS(NULL); +} + +TEST_CASE(test_slowlog) { + RUN_TEST(test_slowlog_create_entry); + RUN_TEST(test_slowlog_create_entry_with_prefix); + RUN_TEST(test_entry_get_set); +} From d01c298a7f955350229207765978d03d07fb7844 Mon Sep 17 00:00:00 2001 From: doyoubi Date: Mon, 22 Aug 2016 10:13:19 +0000 Subject: [PATCH 3/5] Add config --- corvus.conf | 11 +++++++++++ src/command.c | 31 +++++++++++++++++-------------- src/command.h | 7 +++++++ src/corvus.c | 10 ++++++++-- src/corvus.h | 1 + src/slowlog.c | 21 ++++++++++++++++++++- src/slowlog.h | 3 +++ 7 files changed, 67 insertions(+), 17 deletions(-) diff --git a/corvus.conf b/corvus.conf index 0e919ee..dd2b542 100644 --- a/corvus.conf +++ b/corvus.conf @@ -63,3 +63,14 @@ syslog 0 # to tell corvus to use the newly added slaves. # # read-strategy master + +# Slowlog +# The following two configs are almost the same with redis. +# every command whose lantency exceeds `slowlog-log-slower-than` will appear +# in the result of `slowlog get`. +# Note that the lantency here is the time spent in proxy, +# including redirection caused by MOVED and ASK. +# Set one of them to negative value to disable slow log. +# +# slowlog-log-slower-than 10000 +# slowlog-max-len 1024 diff --git a/src/command.c b/src/command.c index 88ca561..2efdd83 100644 --- a/src/command.c +++ b/src/command.c @@ -31,13 +31,6 @@ do { \ } \ } while (0) -enum { - CMD_UNIMPL, - CMD_BASIC, - CMD_COMPLEX, - CMD_EXTRA, -}; - struct cmd_item { char *cmd; int value; @@ -52,6 +45,7 @@ const char *rep_redirect_err = "-ERR Proxy redirecting error\r\n"; const char *rep_addr_err = "-ERR Proxy fail to parse server address\r\n"; const char *rep_server_err = "-ERR Proxy fail to get server\r\n"; const char *rep_timeout_err = "-ERR Proxy timed out\r\n"; +const char *rep_slowlog_not_enabled = "-ERR Slowlog not enabled\r\n"; const char *rep_get = "*2\r\n$3\r\nGET\r\n"; const char *rep_set = "*3\r\n$3\r\nSET\r\n"; @@ -541,7 +535,7 @@ int cmd_quit(struct command *cmd) return CORVUS_OK; } -static int parse_len(struct redis_data *data, int *result) +static int cmd_parse_len(struct redis_data *data, int *result) { ASSERT_TYPE(data, REP_STRING); char len_limit[data->pos.str_len + 1]; @@ -574,7 +568,7 @@ int cmd_slowlog_get(struct command *cmd, struct redis_data *data) } else if (data->elements == 3) { int len_limit; struct redis_data *len_limit_data = &data->element[2]; - if (parse_len(len_limit_data, &len_limit) == CORVUS_ERR) { + if (cmd_parse_len(len_limit_data, &len_limit) == CORVUS_ERR) { return CORVUS_ERR; } if (len_limit == 0) { @@ -589,9 +583,11 @@ int cmd_slowlog_get(struct command *cmd, struct redis_data *data) struct context *contexts = get_contexts(); struct slowlog_entry *entries[len]; int count = 0; + size_t limit_per_thread = 1 + (len - 1) / config.thread; for (size_t i = 0; i != config.thread; i++) { struct slowlog_queue *queue = &contexts[i].slowlog; - for (size_t j = 0; j != queue->len && count < len; j++) { + assert(limit_per_thread <= queue->len); + for (size_t j = 0; j < limit_per_thread && count < len; j++) { // slowlog_get will lock mutex struct slowlog_entry *entry = slowlog_get(queue, j); if (entry == NULL) @@ -642,6 +638,15 @@ int cmd_slowlog(struct command *cmd, struct redis_data *data) ASSERT_TYPE(data, REP_ARRAY); ASSERT_ELEMENTS(data->elements >= 2, data); + if (!slowlog_enabled()) { + conn_add_data(cmd->client, (uint8_t*)rep_slowlog_not_enabled, + strlen(rep_slowlog_not_enabled), + &cmd->rep_buf[0], &cmd->rep_buf[1]); + CMD_INCREF(cmd); + cmd_mark_done(cmd); + return CORVUS_OK; + } + struct redis_data *op = &data->element[1]; ASSERT_TYPE(op, REP_STRING); @@ -765,8 +770,7 @@ int cmd_parse_req(struct command *cmd, struct mbuf *buf) return CORVUS_OK; } - if (cmd->request_type == CMD_EXTRA - || cmd->request_type == CMD_UNIMPL) { + if (!slowlog_enabled() || !slowlog_type_need_log(cmd)) { redis_data_free(&r->data); return CORVUS_OK; } @@ -1092,8 +1096,7 @@ void cmd_stats(struct command *cmd, int64_t end_time) latency = cmd->rep_time[1] - cmd->rep_time[0]; } - if (cmd->request_type != CMD_EXTRA - && cmd->request_type != CMD_UNIMPL) { + if (slowlog_need_log(cmd, latency)) { struct slowlog_entry *entry = slowlog_create_entry(cmd, latency); slowlog_set(&cmd->ctx->slowlog, entry); } diff --git a/src/command.h b/src/command.h index ca585ba..16b5312 100644 --- a/src/command.h +++ b/src/command.h @@ -154,6 +154,13 @@ enum { CMD_DO(CMD_DEFINE) }; +enum { + CMD_UNIMPL, + CMD_BASIC, + CMD_COMPLEX, + CMD_EXTRA, +}; + struct context; enum { diff --git a/src/corvus.c b/src/corvus.c index 0096d32..32723c4 100644 --- a/src/corvus.c +++ b/src/corvus.c @@ -38,7 +38,8 @@ void config_init() config.bufsize = DEFAULT_BUFSIZE; config.requirepass = NULL; config.readslave = config.readmasterslave = false; - config.slowlog_max_len = 128; + config.slowlog_max_len = -1; + config.slowlog_log_slower_than = -1; memset(config.statsd_addr, 0, sizeof(config.statsd_addr)); config.metric_interval = 10; @@ -148,6 +149,10 @@ int config_add(char *name, char *value) config.node.len++; p = strtok(NULL, ","); } + } else if (strcmp(name, "slowlog-log-slower-than") == 0) { + config.slowlog_log_slower_than = atoi(value); + } else if (strcmp(name, "slowlog-max-len") == 0) { + config.slowlog_max_len = atoi(value); } return 0; } @@ -407,7 +412,8 @@ void *main_loop(void *data) { struct context *ctx = data; - if (config.slowlog_max_len > 0) { + if (slowlog_enabled()) { + LOG(DEBUG, "slowlog enabled"); if (slowlog_init(&ctx->slowlog) == CORVUS_ERR) { LOG(ERROR, "Fatal: fail to init slowlog."); exit(EXIT_FAILURE); diff --git a/src/corvus.h b/src/corvus.h index d9fff95..5a22a21 100644 --- a/src/corvus.h +++ b/src/corvus.h @@ -108,6 +108,7 @@ struct { int64_t client_timeout; int64_t server_timeout; int bufsize; + int slowlog_log_slower_than; int slowlog_max_len; } config; diff --git a/src/slowlog.c b/src/slowlog.c index 75a58a9..e08d516 100644 --- a/src/slowlog.c +++ b/src/slowlog.c @@ -51,7 +51,7 @@ struct slowlog_entry *slowlog_create_entry(struct command *cmd, int64_t latency) entry->refcount = 1; const char *fmt = "(%zd bytes)"; - char tmp_buf[18]; // strlen("4294967295") + strlen("(bytes)") + 1 + char tmp_buf[19]; // strlen("4294967295") + strlen("( bytes)") + 1 assert(cmd->data.elements > 0); size_t argc = min(cmd->data.elements, SLOWLOG_ENTRY_MAX_ARGC); @@ -124,3 +124,22 @@ struct slowlog_entry *slowlog_get(struct slowlog_queue *queue, size_t index) pthread_mutex_unlock(queue->entry_locks + index); return entry; } + +bool slowlog_enabled() +{ + return config.slowlog_max_len > 0 + && config.slowlog_log_slower_than > 0; +} + +bool slowlog_type_need_log(struct command *cmd) +{ + return cmd->request_type != CMD_EXTRA + && cmd->request_type != CMD_UNIMPL; +} + +bool slowlog_need_log(struct command *cmd, long long latency) +{ + return slowlog_enabled() + && slowlog_type_need_log(cmd) + && latency > config.slowlog_log_slower_than * 1000; +} diff --git a/src/slowlog.h b/src/slowlog.h index 9f01adb..911cf72 100644 --- a/src/slowlog.h +++ b/src/slowlog.h @@ -42,5 +42,8 @@ struct slowlog_entry *slowlog_create_entry(struct command *cmd, int64_t latency) void slowlog_set(struct slowlog_queue *queue, struct slowlog_entry *entry); void slowlog_dec_ref(struct slowlog_entry *entry); struct slowlog_entry *slowlog_get(struct slowlog_queue *queue, size_t index); +bool slowlog_enabled(); +bool slowlog_type_need_log(struct command *cmd); +bool slowlog_need_log(struct command *cmd, long long latency); #endif From 6cbd412b63c247b96c346f088526f48633ca30f7 Mon Sep 17 00:00:00 2001 From: doyoubi Date: Tue, 23 Aug 2016 08:12:42 +0000 Subject: [PATCH 4/5] Amend response of slowlog get --- src/command.c | 23 ++++++++++-------- src/parser.c | 13 +++++++++++ src/parser.h | 1 + src/slowlog.c | 29 ++++++++++++++++++----- src/slowlog.h | 1 + tests/test_mbuf.c | 6 +++++ tests/test_slowlog.c | 55 ++++++++++++++++++++++++++++++++++++++++++-- 7 files changed, 111 insertions(+), 17 deletions(-) diff --git a/src/command.c b/src/command.c index 2efdd83..f2e6db3 100644 --- a/src/command.c +++ b/src/command.c @@ -551,7 +551,7 @@ static int cmd_parse_len(struct redis_data *data, int *result) LOG(ERROR, "parse_len: char not between 0-9"); return CORVUS_ERR; } - v += (10 * v + (c - '0')); + v = 10 * v + (c - '0'); } *result = v; @@ -583,15 +583,20 @@ int cmd_slowlog_get(struct command *cmd, struct redis_data *data) struct context *contexts = get_contexts(); struct slowlog_entry *entries[len]; int count = 0; - size_t limit_per_thread = 1 + (len - 1) / config.thread; - for (size_t i = 0; i != config.thread; i++) { - struct slowlog_queue *queue = &contexts[i].slowlog; - assert(limit_per_thread <= queue->len); - for (size_t j = 0; j < limit_per_thread && count < len; j++) { + size_t queue_len = contexts[0].slowlog.len; + bool exhausted[config.thread]; + memset(exhausted, 0, sizeof exhausted); + for (size_t i = 0; i != queue_len && count < len; i++) { + for (size_t j = 0; j != config.thread && count < len; j++) { + if (exhausted[j]) + continue; + struct slowlog_queue *queue = &contexts[j].slowlog; // slowlog_get will lock mutex - struct slowlog_entry *entry = slowlog_get(queue, j); - if (entry == NULL) - break; // NULL starts from here + struct slowlog_entry *entry = slowlog_get(queue, i); + if (entry == NULL) { + exhausted[j] = true; + continue; + } entries[count++] = entry; } } diff --git a/src/parser.c b/src/parser.c index b7c097a..1eacb15 100644 --- a/src/parser.c +++ b/src/parser.c @@ -550,3 +550,16 @@ int pos_to_str(struct pos_array *pos, char *str) str[length] = '\0'; return CORVUS_OK; } + +size_t pos_to_str_with_limit(struct pos_array *pos, uint8_t *str, size_t limit) +{ + size_t len = 0; + for (size_t i = 0; i != pos->pos_len && len < limit; i++) { + size_t curr_len = pos->items[i].len; + if (limit - len < curr_len) + curr_len = limit - len; + memcpy(str + len, pos->items[i].str, curr_len); + len += curr_len; + } + return len; +} diff --git a/src/parser.h b/src/parser.h index 4a99b20..d80648d 100644 --- a/src/parser.h +++ b/src/parser.h @@ -142,5 +142,6 @@ struct pos *pos_get(struct pos_array *arr, int idx); int pos_to_str(struct pos_array *pos, char *str); void redis_data_free(struct redis_data *data); void redis_data_move(struct redis_data *lhs, struct redis_data *rhs); +size_t pos_to_str_with_limit(struct pos_array *pos, uint8_t *str, size_t limit); #endif /* end of include guard: PARSER_H */ diff --git a/src/slowlog.c b/src/slowlog.c index e08d516..77f0fb0 100644 --- a/src/slowlog.c +++ b/src/slowlog.c @@ -50,8 +50,8 @@ struct slowlog_entry *slowlog_create_entry(struct command *cmd, int64_t latency) entry->latency = latency; entry->refcount = 1; - const char *fmt = "(%zd bytes)"; - char tmp_buf[19]; // strlen("4294967295") + strlen("( bytes)") + 1 + const char *bytes_fmt = "(%zd bytes)"; + char bytes_buf[19]; // strlen("4294967295") + strlen("( bytes)") + 1 assert(cmd->data.elements > 0); size_t argc = min(cmd->data.elements, SLOWLOG_ENTRY_MAX_ARGC); @@ -67,16 +67,33 @@ struct slowlog_entry *slowlog_create_entry(struct command *cmd, int64_t latency) entry->argc = min(argc + 1, SLOWLOG_ENTRY_MAX_ARGC); argc = min(argc, SLOWLOG_ENTRY_MAX_ARGC - 1); } + // use the last argument to record total argument counts + if (cmd->data.elements > argc) { + argc--; + const size_t tail_max_len = 64; + char tail_buf[tail_max_len]; + uint8_t *buf = cv_malloc(tail_max_len); + int len = snprintf(tail_buf, tail_max_len, "(%zd arguments in total)", cmd->data.elements); + len = snprintf((char*)buf, tail_max_len, "$%d\r\n%s\r\n", len, tail_buf); + entry->argv[SLOWLOG_ENTRY_MAX_ARGC - 1].str = buf; + entry->argv[SLOWLOG_ENTRY_MAX_ARGC - 1].len = len; + } size_t max_len = SLOWLOG_ENTRY_MAX_STRING; for (size_t j = 0; j != argc; i++, j++) { struct redis_data *arg = cmd->data.element + j; + assert(arg->type == REP_STRING); size_t real_len = mbuf_range_len(arg->buf); size_t len = min(real_len, max_len); uint8_t *buf = cv_malloc(len); // will be freed by the last owner - mbuf_range_copy(buf, arg->buf, len); if (real_len > max_len) { - int postfix_len = snprintf(tmp_buf, sizeof tmp_buf, fmt, real_len); - memcpy(buf + max_len - postfix_len, tmp_buf, postfix_len); + size_t str_len = SLOWLOG_MAX_ARG_LEN; + int hdr_len = snprintf((char*)buf, len, "$%zd\r\n", str_len); + pos_to_str_with_limit(&arg->pos, buf + hdr_len, str_len); + int postfix_len = snprintf(bytes_buf, sizeof bytes_buf, bytes_fmt, arg->pos.str_len); + memcpy(buf + max_len - 2 - postfix_len, bytes_buf, postfix_len); + memcpy(buf + max_len - 2, "\r\n", 2); + } else { + mbuf_range_copy(buf, arg->buf, len); } entry->argv[i].str = buf; entry->argv[i].len = len; @@ -128,7 +145,7 @@ struct slowlog_entry *slowlog_get(struct slowlog_queue *queue, size_t index) bool slowlog_enabled() { return config.slowlog_max_len > 0 - && config.slowlog_log_slower_than > 0; + && config.slowlog_log_slower_than >= 0; } bool slowlog_type_need_log(struct command *cmd) diff --git a/src/slowlog.h b/src/slowlog.h index 911cf72..986751b 100644 --- a/src/slowlog.h +++ b/src/slowlog.h @@ -9,6 +9,7 @@ #define SLOWLOG_ENTRY_MAX_ARGC 32 #define SLOWLOG_ENTRY_MAX_STRING 128 +#define SLOWLOG_MAX_ARG_LEN 120 // SLOWLOG_ENTRY_MAX_STRING - strlen("%120\r\n\r\n") // Same with slowlog format of redis. // Note that slowlog_entry will be created from one thread diff --git a/tests/test_mbuf.c b/tests/test_mbuf.c index 156116a..60ecfc1 100644 --- a/tests/test_mbuf.c +++ b/tests/test_mbuf.c @@ -60,6 +60,12 @@ TEST(test_mbuf_range_func) { ASSERT(tmp[21] == 0); cv_free(tmp); + // with max limit + uint8_t buf[20] = {0}; + ASSERT(mbuf_range_copy(buf, ptr, 10) == 10); + ASSERT(memcmp(buf, "abcdefg012", 10) == 0); + ASSERT(buf[10] == '\0'); + cv_free(b1.start); cv_free(b2.start); cv_free(b3.start); diff --git a/tests/test_slowlog.c b/tests/test_slowlog.c index 9517230..127c27f 100644 --- a/tests/test_slowlog.c +++ b/tests/test_slowlog.c @@ -62,13 +62,13 @@ TEST(test_slowlog_create_entry_with_prefix) { }; struct redis_data element[2] = { { - .pos = key_pos, .buf = { + .type = REP_STRING, .pos = key_pos, .buf = { { .buf = &buf1, .pos = buf1.start }, { .buf = &buf1, .pos = buf1.last } } }, { - .pos = value_pos, .buf = { + .type = REP_STRING, .pos = value_pos, .buf = { { .buf = &buf2, .pos = buf2.start }, { .buf = &buf2, .pos = buf2.last } } @@ -76,6 +76,7 @@ TEST(test_slowlog_create_entry_with_prefix) { }; cmd.data.elements = 2; cmd.data.element = element; + cmd.data.type = REP_ARRAY; cmd.prefix = (char*)rep_set; struct slowlog_entry *entry = slowlog_create_entry(&cmd, 9394); @@ -93,6 +94,55 @@ TEST(test_slowlog_create_entry_with_prefix) { PASS(NULL); } +TEST(test_slowlog_create_entry_with_long_arg) { + struct mbuf buf; + struct command cmd; + struct reader r = {0}; + const char cmd_data[] = "*37\r\n$4\r\nMGET\r\n" + "$144\r\n" + "1234567890qwertyuiopasdfghjklzxcvbnm" + "1234567890qwertyuiopasdfghjklzxcvbnm" + "1234567890qwertyuiopasdfghjklzxcvbnm" + "1234567890qwertyuiopasdfghjklzxcvbnm\r\n" + "$1\r\na\r\n$1\r\na\r\n$1\r\na\r\n$1\r\na\r\n$1\r\na\r\n$1\r\na\r\n" // "a" * 5 + "$1\r\na\r\n$1\r\na\r\n$1\r\na\r\n$1\r\na\r\n$1\r\na\r\n$1\r\na\r\n" + "$1\r\na\r\n$1\r\na\r\n$1\r\na\r\n$1\r\na\r\n$1\r\na\r\n$1\r\na\r\n" + "$1\r\na\r\n$1\r\na\r\n$1\r\na\r\n$1\r\na\r\n$1\r\na\r\n$1\r\na\r\n" + "$1\r\na\r\n$1\r\na\r\n$1\r\na\r\n$1\r\na\r\n$1\r\na\r\n$1\r\na\r\n" + "$1\r\na\r\n$1\r\na\r\n$1\r\na\r\n$1\r\na\r\n$1\r\na\r\n$1\r\na\r\n" + "$1\r\na\r\n$1\r\na\r\n$1\r\na\r\n$1\r\na\r\n$1\r\na\r\n$1\r\na\r\n"; + char a[] = "$1\r\na\r\n"; + char long_value[] = + "$120\r\n" + "1234567890qwertyuiopasdfghjklzxcvbnm" + "1234567890qwertyuiopasdfghjklzxcvbnm" + "1234567890qwertyuiopasdfghjklzxcvbnm" + "1(144 bytes)\r\n"; + "\r\n"; + char tail[] = "$23\r\n(37 arguments in total)\r\n"; + + buf.pos = (uint8_t*)cmd_data; + buf.last = (uint8_t*)cmd_data + strlen(cmd_data); + reader_init(&r); + reader_feed(&r, &buf); + ASSERT(parse(&r, MODE_REQ) != -1); + cmd.data = r.data; + cmd.prefix = NULL; + + struct slowlog_entry *entry = slowlog_create_entry(&cmd, 233666); + ASSERT(SLOWLOG_ENTRY_MAX_STRING == strlen(long_value)); + ASSERT(entry->argv[1].len == strlen(long_value)); + ASSERT(strncmp(long_value, entry->argv[1].str, strlen(long_value)) == 0); + ASSERT(entry->argv[30].len == strlen(a)); + ASSERT(strncmp(a, entry->argv[30].str, strlen(a)) == 0); + ASSERT(entry->argv[31].len == strlen(tail)); + ASSERT(strncmp(tail, entry->argv[31].str, strlen(tail)) == 0); + slowlog_dec_ref(entry); + + redis_data_free(&cmd.data); + PASS(NULL); +} + TEST(test_entry_get_set) { config.slowlog_max_len = 2; config.thread = 1; @@ -130,5 +180,6 @@ TEST(test_entry_get_set) { TEST_CASE(test_slowlog) { RUN_TEST(test_slowlog_create_entry); RUN_TEST(test_slowlog_create_entry_with_prefix); + RUN_TEST(test_slowlog_create_entry_with_long_arg); RUN_TEST(test_entry_get_set); } From 81a0ce2a67c89ea7ed42ae43b0830338c8373652 Mon Sep 17 00:00:00 2001 From: doyoubi Date: Tue, 23 Aug 2016 09:13:35 +0000 Subject: [PATCH 5/5] Add 'slowlog len' and 'slowlog reset' --- README.md | 4 +-- corvus.conf | 4 ++- src/command.c | 85 ++++++++++++++++++++++++++++++++++++++------------- src/corvus.c | 4 +-- src/parser.c | 6 +--- src/slowlog.c | 16 +++++----- src/slowlog.h | 3 +- 7 files changed, 79 insertions(+), 43 deletions(-) diff --git a/README.md b/README.md index 62d8ada..7adf8ee 100644 --- a/README.md +++ b/README.md @@ -92,7 +92,7 @@ Commands * `DEL`: split to multiple single key `DEL`. * `EXISTS`: split to multiple single key `EXISTS`. * `PING`: ignored and won't be forwarded. -* `INFO`, `TIME`: won't be forwarded to backend redis, information collected in proxy +* `INFO`, `TIME`, `SLOWLOG`: won't be forwarded to backend redis, information collected in proxy will be returned. * `AUTH`: do authentication in proxy. @@ -123,7 +123,7 @@ all backend redis instances. * `CLUSTER`. * `ECHO`, `QUIT`, `SELECT`. * `BGREWRITEAOF`, `BGSAVE`, `CLIENT`, `COMMAND`, `CONFIG`, `DBSIZE`, `DEBUG`, `FLUSHALL`, - `FLUSHDB`, `LASTSAVE`, `MONITOR`, `ROLE`, `SAVE`, `SHUTDOWN`, `SLAVEOF`, `SLOWLOG`, `SYNC`. + `FLUSHDB`, `LASTSAVE`, `MONITOR`, `ROLE`, `SAVE`, `SHUTDOWN`, `SLAVEOF`, `SYNC`. License ------- diff --git a/corvus.conf b/corvus.conf index dd2b542..12398e5 100644 --- a/corvus.conf +++ b/corvus.conf @@ -66,11 +66,13 @@ syslog 0 # Slowlog # The following two configs are almost the same with redis. -# every command whose lantency exceeds `slowlog-log-slower-than` will appear +# Every command whose lantency exceeds `slowlog-log-slower-than` will appear # in the result of `slowlog get`. # Note that the lantency here is the time spent in proxy, # including redirection caused by MOVED and ASK. # Set one of them to negative value to disable slow log. # +# A zero value will log every command. # slowlog-log-slower-than 10000 +# # slowlog-max-len 1024 diff --git a/src/command.c b/src/command.c index f2e6db3..2626ea7 100644 --- a/src/command.c +++ b/src/command.c @@ -544,20 +544,24 @@ static int cmd_parse_len(struct redis_data *data, int *result) return CORVUS_ERR; } - int v = 0; - for (size_t i = 0; i != data->pos.str_len; i++) { - char c = len_limit[i]; - if (c < '0' || c > '9') { - LOG(ERROR, "parse_len: char not between 0-9"); - return CORVUS_ERR; - } - v = 10 * v + (c - '0'); + *result = atoi(len_limit); + if (*result <= 0) { + return CORVUS_ERR; } - *result = v; return CORVUS_OK; } +static int cmd_slowlog_entry_cmp(const void * lhs, const void * rhs) +{ + const struct slowlog_entry *e1 = *((struct slowlog_entry**)lhs); + const struct slowlog_entry *e2 = *((struct slowlog_entry**)rhs); + return e1->log_time < e2->log_time ? -1 : + e1->log_time > e2->log_time ? 1 : + e1->id < e2->id ? -1 : + e1->id > e2->id ? 1 : 0; +} + int cmd_slowlog_get(struct command *cmd, struct redis_data *data) { // For example 'slowlog get 128', element[2] is 128 here @@ -583,24 +587,20 @@ int cmd_slowlog_get(struct command *cmd, struct redis_data *data) struct context *contexts = get_contexts(); struct slowlog_entry *entries[len]; int count = 0; - size_t queue_len = contexts[0].slowlog.len; - bool exhausted[config.thread]; - memset(exhausted, 0, sizeof exhausted); + size_t queue_len = contexts[0].slowlog.capacity; for (size_t i = 0; i != queue_len && count < len; i++) { for (size_t j = 0; j != config.thread && count < len; j++) { - if (exhausted[j]) - continue; struct slowlog_queue *queue = &contexts[j].slowlog; // slowlog_get will lock mutex struct slowlog_entry *entry = slowlog_get(queue, i); - if (entry == NULL) { - exhausted[j] = true; - continue; + if (entry) { + entries[count++] = entry; } - entries[count++] = entry; } } + qsort(entries, count, sizeof(struct slowlog_entry*), cmd_slowlog_entry_cmp); + // generate redis packet const char *hdr_fmt = "*4\r\n" @@ -611,7 +611,6 @@ int cmd_slowlog_get(struct command *cmd, struct redis_data *data) char buf[150]; int size = snprintf(buf, sizeof buf, "*%d\r\n", count); - assert(size < 150); conn_add_data(cmd->client, (uint8_t*)buf, size, &cmd->rep_buf[0], &cmd->rep_buf[1]); @@ -620,6 +619,7 @@ int cmd_slowlog_get(struct command *cmd, struct redis_data *data) assert(entry->argc > 0); size = snprintf(buf, sizeof buf, hdr_fmt, entry->id, entry->log_time, entry->latency, entry->argc); + assert(size < 150); conn_add_data(cmd->client, (uint8_t*)buf, size, NULL, NULL); for (size_t j = 0; j != entry->argc; j++) { @@ -635,8 +635,49 @@ int cmd_slowlog_get(struct command *cmd, struct redis_data *data) return CORVUS_OK; } -int cmd_slowlog_len(struct command *cmd) { return CORVUS_ERR; } -int cmd_slowlog_reset(struct command *cmd) { return CORVUS_ERR; } +int cmd_slowlog_len(struct command *cmd) +{ + int len = 0; + struct context *contexts = get_contexts(); + + for (size_t i = 0; i != config.thread; i++) { + struct slowlog_queue *queue = &contexts[i].slowlog; + for (size_t j = 0; j != queue->capacity && len < config.slowlog_max_len; j++) { + struct slowlog_entry *entry = slowlog_get(queue, j); + if (entry) { + len++; + slowlog_dec_ref(entry); + } + } + } + + char buf[30]; + int size = snprintf(buf, sizeof buf, ":%d\r\n", len); + conn_add_data(cmd->client, (uint8_t*)buf, size, + &cmd->rep_buf[0], &cmd->rep_buf[1]); + CMD_INCREF(cmd); + cmd_mark_done(cmd); + + return CORVUS_OK; +} + +int cmd_slowlog_reset(struct command *cmd) +{ + struct context *contexts = get_contexts(); + for (size_t i = 0; i != config.thread; i++) { + struct slowlog_queue *queue = &contexts[i].slowlog; + for (size_t j = 0; j != queue->capacity; j++) { + slowlog_set(queue, NULL); + } + } + + conn_add_data(cmd->client, (uint8_t*)rep_ok, strlen(rep_ok), + &cmd->rep_buf[0], &cmd->rep_buf[1]); + CMD_INCREF(cmd); + cmd_mark_done(cmd); + + return CORVUS_OK; +} int cmd_slowlog(struct command *cmd, struct redis_data *data) { @@ -775,7 +816,7 @@ int cmd_parse_req(struct command *cmd, struct mbuf *buf) return CORVUS_OK; } - if (!slowlog_enabled() || !slowlog_type_need_log(cmd)) { + if (!(slowlog_enabled() && slowlog_type_need_log(cmd))) { redis_data_free(&r->data); return CORVUS_OK; } diff --git a/src/corvus.c b/src/corvus.c index 32723c4..79fda21 100644 --- a/src/corvus.c +++ b/src/corvus.c @@ -323,7 +323,7 @@ void context_init(struct context *ctx) TAILQ_INIT(&ctx->servers); TAILQ_INIT(&ctx->conns); - ctx->slowlog.len = 0; // for non worker threads + ctx->slowlog.capacity = 0; // for non worker threads } void build_contexts() @@ -355,7 +355,7 @@ void context_free(struct context *ctx) dict_free(&ctx->server_table); /* slowlog */ - if (ctx->slowlog.len > 0) + if (ctx->slowlog.capacity > 0) slowlog_free(&ctx->slowlog); /* mbuf queue */ diff --git a/src/parser.c b/src/parser.c index 1eacb15..b3261b0 100644 --- a/src/parser.c +++ b/src/parser.c @@ -536,17 +536,13 @@ int reader_ready(struct reader *r) int pos_to_str(struct pos_array *pos, char *str) { - int i, cur_len = 0; int length = pos->str_len; if (length <= 0) { LOG(ERROR, "pos_to_str: string length %d <= 0", length); return CORVUS_ERR; } - for (i = 0; i < pos->pos_len; i++) { - memcpy(str + cur_len, pos->items[i].str, pos->items[i].len); - cur_len += pos->items[i].len; - } + pos_to_str_with_limit(pos, (uint8_t*)str, length); str[length] = '\0'; return CORVUS_OK; } diff --git a/src/slowlog.c b/src/slowlog.c index 77f0fb0..5128049 100644 --- a/src/slowlog.c +++ b/src/slowlog.c @@ -1,6 +1,5 @@ #include #include - #include "alloc.h" #include "corvus.h" #include "slowlog.h" @@ -14,7 +13,7 @@ static inline size_t min(size_t a, size_t b) { return a < b ? a : b; } int slowlog_init(struct slowlog_queue *slowlog) { size_t queue_len = 1 + (config.slowlog_max_len - 1) / config.thread; // round up - slowlog->len = queue_len; + slowlog->capacity = queue_len; slowlog->entries = cv_calloc(queue_len, sizeof(struct slowlog_entry)); slowlog->entry_locks = cv_malloc(queue_len * sizeof(pthread_mutex_t)); slowlog->curr = 0; @@ -31,7 +30,7 @@ int slowlog_init(struct slowlog_queue *slowlog) // Should be called only after all worker threads have stopped void slowlog_free(struct slowlog_queue *slowlog) { - for (size_t i = 0; i != slowlog->len; i++) { + for (size_t i = 0; i != slowlog->capacity; i++) { // no other worker threads will read entry queue any more if (slowlog->entries[i] != NULL) { slowlog_dec_ref(slowlog->entries[i]); @@ -117,12 +116,12 @@ void slowlog_dec_ref(struct slowlog_entry *entry) void slowlog_set(struct slowlog_queue *queue, struct slowlog_entry *entry) { - size_t curr = queue->curr; - // TODO: remove lock or atomic here + size_t curr = ATOMIC_GET(queue->curr); pthread_mutex_lock(queue->entry_locks + curr); - struct slowlog_entry *old_entry = ATOMIC_IGET(queue->entries[curr], entry); + struct slowlog_entry *old_entry = queue->entries[curr]; + queue->entries[curr] = entry; pthread_mutex_unlock(queue->entry_locks + curr); - queue->curr = (curr + 1) % queue->len; + ATOMIC_SET(queue->curr, (curr + 1) % queue->capacity); if (old_entry != NULL) { slowlog_dec_ref(old_entry); } @@ -130,9 +129,8 @@ void slowlog_set(struct slowlog_queue *queue, struct slowlog_entry *entry) struct slowlog_entry *slowlog_get(struct slowlog_queue *queue, size_t index) { - // TODO: remove lock or atomic here pthread_mutex_lock(queue->entry_locks + index); - struct slowlog_entry *entry = ATOMIC_GET(queue->entries[index]); + struct slowlog_entry *entry = queue->entries[index]; if (entry == NULL) { pthread_mutex_unlock(queue->entry_locks + index); return NULL; diff --git a/src/slowlog.h b/src/slowlog.h index 986751b..36c1d1d 100644 --- a/src/slowlog.h +++ b/src/slowlog.h @@ -2,7 +2,6 @@ #define SLOWLOG_H #include - #include "mbuf.h" #include "parser.h" @@ -27,7 +26,7 @@ struct slowlog_entry { struct slowlog_queue { struct slowlog_entry **entries; pthread_mutex_t *entry_locks; - size_t len; + size_t capacity; size_t curr; };