diff --git a/README.md b/README.md index 62d8ada..7adf8ee 100644 --- a/README.md +++ b/README.md @@ -92,7 +92,7 @@ Commands * `DEL`: split to multiple single key `DEL`. * `EXISTS`: split to multiple single key `EXISTS`. * `PING`: ignored and won't be forwarded. -* `INFO`, `TIME`: won't be forwarded to backend redis, information collected in proxy +* `INFO`, `TIME`, `SLOWLOG`: won't be forwarded to backend redis, information collected in proxy will be returned. * `AUTH`: do authentication in proxy. @@ -123,7 +123,7 @@ all backend redis instances. * `CLUSTER`. * `ECHO`, `QUIT`, `SELECT`. * `BGREWRITEAOF`, `BGSAVE`, `CLIENT`, `COMMAND`, `CONFIG`, `DBSIZE`, `DEBUG`, `FLUSHALL`, - `FLUSHDB`, `LASTSAVE`, `MONITOR`, `ROLE`, `SAVE`, `SHUTDOWN`, `SLAVEOF`, `SLOWLOG`, `SYNC`. + `FLUSHDB`, `LASTSAVE`, `MONITOR`, `ROLE`, `SAVE`, `SHUTDOWN`, `SLAVEOF`, `SYNC`. License ------- diff --git a/corvus.conf b/corvus.conf index 0e919ee..12398e5 100644 --- a/corvus.conf +++ b/corvus.conf @@ -63,3 +63,16 @@ syslog 0 # to tell corvus to use the newly added slaves. # # read-strategy master + +# Slowlog +# The following two configs are almost the same with redis. +# Every command whose lantency exceeds `slowlog-log-slower-than` will appear +# in the result of `slowlog get`. +# Note that the lantency here is the time spent in proxy, +# including redirection caused by MOVED and ASK. +# Set one of them to negative value to disable slow log. +# +# A zero value will log every command. +# slowlog-log-slower-than 10000 +# +# slowlog-max-len 1024 diff --git a/src/alloc.c b/src/alloc.c index a43cfd2..cf02509 100644 --- a/src/alloc.c +++ b/src/alloc.c @@ -1,4 +1,5 @@ #include +#include #include "logging.h" #include "alloc.h" @@ -39,3 +40,11 @@ void cv_free(void *ptr) { je_free(ptr); } + +char *cv_raw_strndup(const char *other, size_t size, const char *file, int line) +{ + char *p = cv_raw_malloc(size + 1, file, line); + strncpy(p, other, size); + p[size] = '\0'; + return p; +} diff --git a/src/alloc.h b/src/alloc.h index c502926..1598a51 100644 --- a/src/alloc.h +++ b/src/alloc.h @@ -15,10 +15,12 @@ #define cv_malloc(size) cv_raw_malloc(size, __FILE__, __LINE__) #define cv_calloc(number, size) cv_raw_calloc(number, size, __FILE__, __LINE__) #define cv_realloc(ptr, size) cv_raw_realloc(ptr, size, __FILE__, __LINE__) +#define cv_strndup(other, size) cv_raw_strndup(other, size, __FILE__, __LINE__) void *cv_raw_malloc(size_t size, const char *file, int line); void *cv_raw_calloc(size_t number, size_t size, const char *file, int line); void *cv_raw_realloc(void *ptr, size_t size, const char *file, int line); void cv_free(void *ptr); +char *cv_raw_strndup(const char *other, size_t size, const char *file, int line); #endif /* end of include guard: ALLOC_H */ diff --git a/src/command.c b/src/command.c index ead9b0e..2626ea7 100644 --- a/src/command.c +++ b/src/command.c @@ -6,6 +6,7 @@ #include #include #include +#include #include "corvus.h" #include "socket.h" #include "logging.h" @@ -16,6 +17,7 @@ #include "client.h" #include "stats.h" #include "alloc.h" +#include "slowlog.h" #define CMD_RECYCLE_SIZE 1024 @@ -29,13 +31,6 @@ do { \ } \ } while (0) -enum { - CMD_UNIMPL, - CMD_BASIC, - CMD_COMPLEX, - CMD_EXTRA, -}; - struct cmd_item { char *cmd; int value; @@ -50,11 +45,13 @@ const char *rep_redirect_err = "-ERR Proxy redirecting error\r\n"; const char *rep_addr_err = "-ERR Proxy fail to parse server address\r\n"; const char *rep_server_err = "-ERR Proxy fail to get server\r\n"; const char *rep_timeout_err = "-ERR Proxy timed out\r\n"; +const char *rep_slowlog_not_enabled = "-ERR Slowlog not enabled\r\n"; + +const char *rep_get = "*2\r\n$3\r\nGET\r\n"; +const char *rep_set = "*3\r\n$3\r\nSET\r\n"; +const char *rep_del = "*2\r\n$3\r\nDEL\r\n"; +const char *rep_exists = "*2\r\n$6\r\nEXISTS\r\n"; -static const char *rep_get = "*2\r\n$3\r\nGET\r\n"; -static const char *rep_set = "*3\r\n$3\r\nSET\r\n"; -static const char *rep_del = "*2\r\n$3\r\nDEL\r\n"; -static const char *rep_exists = "*2\r\n$6\r\nEXISTS\r\n"; static const char *rep_ok = "+OK\r\n"; static const char *rep_ping = "+PONG\r\n"; static const char *rep_noauth = "-NOAUTH Authentication required.\r\n"; @@ -64,6 +61,20 @@ static const char *rep_auth_not_set = "-ERR Client sent AUTH, but no password is static struct cmd_item cmds[] = {CMD_DO(CMD_BUILD_MAP)}; static struct dict command_map; + +const char *cmd_extract_prefix(const char *prefix) +{ + const char *get = "$3\r\nGET\r\n"; + const char *set = "$3\r\nSET\r\n"; + const char *del = "$3\r\nDEL\r\n"; + const char *exists = "$6\r\nEXISTS\r\n"; + return prefix == rep_get ? cv_strndup(get, strlen(get)) : + prefix == rep_set ? cv_strndup(set, strlen(set)) : + prefix == rep_del ? cv_strndup(del, strlen(del)) : + prefix == rep_exists ? cv_strndup(exists, strlen(exists)) : + NULL; +} + static inline uint8_t *cmd_get_data(struct mbuf *b, struct buf_ptr ptr[], int *len) { uint8_t *data; @@ -524,6 +535,184 @@ int cmd_quit(struct command *cmd) return CORVUS_OK; } +static int cmd_parse_len(struct redis_data *data, int *result) +{ + ASSERT_TYPE(data, REP_STRING); + char len_limit[data->pos.str_len + 1]; + if (pos_to_str(&data->pos, len_limit) == CORVUS_ERR) { + LOG(ERROR, "parse_len: emptry arg string"); + return CORVUS_ERR; + } + + *result = atoi(len_limit); + if (*result <= 0) { + return CORVUS_ERR; + } + + return CORVUS_OK; +} + +static int cmd_slowlog_entry_cmp(const void * lhs, const void * rhs) +{ + const struct slowlog_entry *e1 = *((struct slowlog_entry**)lhs); + const struct slowlog_entry *e2 = *((struct slowlog_entry**)rhs); + return e1->log_time < e2->log_time ? -1 : + e1->log_time > e2->log_time ? 1 : + e1->id < e2->id ? -1 : + e1->id > e2->id ? 1 : 0; +} + +int cmd_slowlog_get(struct command *cmd, struct redis_data *data) +{ + // For example 'slowlog get 128', element[2] is 128 here + int len = config.slowlog_max_len; + if (data->elements > 3) { + LOG(DEBUG, "cmd_slowlog_get: too many arguments"); + return CORVUS_ERR; + } else if (data->elements == 3) { + int len_limit; + struct redis_data *len_limit_data = &data->element[2]; + if (cmd_parse_len(len_limit_data, &len_limit) == CORVUS_ERR) { + return CORVUS_ERR; + } + if (len_limit == 0) { + LOG(ERROR, "cmd_slowlog_get: invalid len"); + return CORVUS_ERR; + } + if (len_limit < len) { + len = len_limit; + } + } + + struct context *contexts = get_contexts(); + struct slowlog_entry *entries[len]; + int count = 0; + size_t queue_len = contexts[0].slowlog.capacity; + for (size_t i = 0; i != queue_len && count < len; i++) { + for (size_t j = 0; j != config.thread && count < len; j++) { + struct slowlog_queue *queue = &contexts[j].slowlog; + // slowlog_get will lock mutex + struct slowlog_entry *entry = slowlog_get(queue, i); + if (entry) { + entries[count++] = entry; + } + } + } + + qsort(entries, count, sizeof(struct slowlog_entry*), cmd_slowlog_entry_cmp); + + // generate redis packet + const char *hdr_fmt = + "*4\r\n" + ":%lld\r\n" // id + ":%lld\r\n" // log time + ":%lld\r\n" // latency + "*%d\r\n"; // cmd arg len + char buf[150]; + + int size = snprintf(buf, sizeof buf, "*%d\r\n", count); + conn_add_data(cmd->client, (uint8_t*)buf, size, + &cmd->rep_buf[0], &cmd->rep_buf[1]); + + for (size_t i = 0; i != count; i++) { + struct slowlog_entry *entry = entries[i]; + assert(entry->argc > 0); + size = snprintf(buf, sizeof buf, hdr_fmt, + entry->id, entry->log_time, entry->latency, entry->argc); + assert(size < 150); + conn_add_data(cmd->client, (uint8_t*)buf, size, NULL, NULL); + + for (size_t j = 0; j != entry->argc; j++) { + struct pos *arg = entry->argv + j; + conn_add_data(cmd->client, arg->str, arg->len, + NULL, &cmd->rep_buf[1]); + } + slowlog_dec_ref(entry); + } + CMD_INCREF(cmd); + cmd_mark_done(cmd); + + return CORVUS_OK; +} + +int cmd_slowlog_len(struct command *cmd) +{ + int len = 0; + struct context *contexts = get_contexts(); + + for (size_t i = 0; i != config.thread; i++) { + struct slowlog_queue *queue = &contexts[i].slowlog; + for (size_t j = 0; j != queue->capacity && len < config.slowlog_max_len; j++) { + struct slowlog_entry *entry = slowlog_get(queue, j); + if (entry) { + len++; + slowlog_dec_ref(entry); + } + } + } + + char buf[30]; + int size = snprintf(buf, sizeof buf, ":%d\r\n", len); + conn_add_data(cmd->client, (uint8_t*)buf, size, + &cmd->rep_buf[0], &cmd->rep_buf[1]); + CMD_INCREF(cmd); + cmd_mark_done(cmd); + + return CORVUS_OK; +} + +int cmd_slowlog_reset(struct command *cmd) +{ + struct context *contexts = get_contexts(); + for (size_t i = 0; i != config.thread; i++) { + struct slowlog_queue *queue = &contexts[i].slowlog; + for (size_t j = 0; j != queue->capacity; j++) { + slowlog_set(queue, NULL); + } + } + + conn_add_data(cmd->client, (uint8_t*)rep_ok, strlen(rep_ok), + &cmd->rep_buf[0], &cmd->rep_buf[1]); + CMD_INCREF(cmd); + cmd_mark_done(cmd); + + return CORVUS_OK; +} + +int cmd_slowlog(struct command *cmd, struct redis_data *data) +{ + ASSERT_TYPE(data, REP_ARRAY); + ASSERT_ELEMENTS(data->elements >= 2, data); + + if (!slowlog_enabled()) { + conn_add_data(cmd->client, (uint8_t*)rep_slowlog_not_enabled, + strlen(rep_slowlog_not_enabled), + &cmd->rep_buf[0], &cmd->rep_buf[1]); + CMD_INCREF(cmd); + cmd_mark_done(cmd); + return CORVUS_OK; + } + + struct redis_data *op = &data->element[1]; + ASSERT_TYPE(op, REP_STRING); + + char type[op->pos.str_len + 1]; + if (pos_to_str(&op->pos, type) == CORVUS_ERR) { + LOG(ERROR, "cmd_slowlog: parse error"); + return CORVUS_ERR; + } + + if (strcasecmp(type, "GET") == 0) { + return cmd_slowlog_get(cmd, data); + } else if (strcasecmp(type, "LEN") == 0) { + return cmd_slowlog_len(cmd); + } else if (strcasecmp(type, "RESET") == 0) { + return cmd_slowlog_reset(cmd); + } + cmd_mark_fail(cmd, rep_parse_err); + return CORVUS_OK; +} + int cmd_extra(struct command *cmd, struct redis_data *data) { switch (cmd->cmd_type) { @@ -539,6 +728,8 @@ int cmd_extra(struct command *cmd, struct redis_data *data) return cmd_time(cmd); case CMD_QUIT: return cmd_quit(cmd); + case CMD_SLOWLOG: + return cmd_slowlog(cmd, data); default: LOG(ERROR, "%s: unknown command type %d", __func__, cmd->cmd_type); return CORVUS_ERR; @@ -606,6 +797,7 @@ int cmd_parse_req(struct command *cmd, struct mbuf *buf) cmd->keys = r->data.elements - 1; cmd->parse_done = true; cmd->cmd_count = 1; + cmd->data.type = REP_UNKNOWN; memcpy(&cmd->req_buf[0], &r->start, sizeof(r->start)); memset(&r->start, 0, sizeof(r->start)); @@ -623,7 +815,14 @@ int cmd_parse_req(struct command *cmd, struct mbuf *buf) cmd_mark_fail(cmd, rep_forward_err); return CORVUS_OK; } - redis_data_free(&r->data); + + if (!(slowlog_enabled() && slowlog_type_need_log(cmd))) { + redis_data_free(&r->data); + return CORVUS_OK; + } + cmd->data = r->data; + memset(&r->data, 0, sizeof(struct redis_data)); + r->data.type = REP_UNKNOWN; // avoid double free in conn_free } return CORVUS_OK; } @@ -942,6 +1141,12 @@ void cmd_stats(struct command *cmd, int64_t end_time) } else { latency = cmd->rep_time[1] - cmd->rep_time[0]; } + + if (slowlog_need_log(cmd, latency)) { + struct slowlog_entry *entry = slowlog_create_entry(cmd, latency); + slowlog_set(&cmd->ctx->slowlog, entry); + } + ATOMIC_INC(ctx->stats.remote_latency, latency); } @@ -1025,6 +1230,11 @@ void cmd_free(struct command *cmd) struct command *c; struct context *ctx = cmd->ctx; + if (cmd->data.type != REP_UNKNOWN) { + redis_data_free(&cmd->data); + cmd->data.type = REP_UNKNOWN; + } + if (cmd->parent == NULL && cmd->client != NULL) { client_range_clear(cmd->client, cmd); } diff --git a/src/command.h b/src/command.h index 7054f47..16b5312 100644 --- a/src/command.h +++ b/src/command.h @@ -143,6 +143,7 @@ HANDLER(PING, EXTRA, UNKNOWN) \ HANDLER(INFO, EXTRA, UNKNOWN) \ HANDLER(PROXY, EXTRA, UNKNOWN) \ + HANDLER(SLOWLOG, EXTRA, UNKNOWN) \ HANDLER(QUIT, EXTRA, UNKNOWN) \ HANDLER(SELECT, UNIMPL, UNKNOWN) \ HANDLER(TIME, EXTRA, UNKNOWN) @@ -153,6 +154,13 @@ enum { CMD_DO(CMD_DEFINE) }; +enum { + CMD_UNIMPL, + CMD_BASIC, + CMD_COMPLEX, + CMD_EXTRA, +}; + struct context; enum { @@ -223,6 +231,8 @@ struct command { bool parse_done; bool stale; bool cmd_fail; + + struct redis_data data; /* for slowlog */ }; struct redirect_info { @@ -240,6 +250,8 @@ const char *rep_err, *rep_server_err, *rep_timeout_err; +const char *rep_get, *rep_set, *rep_del, *rep_exists; + void cmd_map_init(); void cmd_map_destroy(); struct command *cmd_create(struct context *ctx); @@ -257,5 +269,6 @@ void cmd_iov_reset(struct iov_data *iov); void cmd_iov_clear(struct context *ctx, struct iov_data *iov); void cmd_iov_free(struct iov_data *iov); void cmd_free(struct command *cmd); +const char *cmd_extract_prefix(const char *prefix); #endif /* end of include guard: COMMAND_H */ diff --git a/src/corvus.c b/src/corvus.c index 34c4f69..79fda21 100644 --- a/src/corvus.c +++ b/src/corvus.c @@ -38,6 +38,8 @@ void config_init() config.bufsize = DEFAULT_BUFSIZE; config.requirepass = NULL; config.readslave = config.readmasterslave = false; + config.slowlog_max_len = -1; + config.slowlog_log_slower_than = -1; memset(config.statsd_addr, 0, sizeof(config.statsd_addr)); config.metric_interval = 10; @@ -147,6 +149,10 @@ int config_add(char *name, char *value) config.node.len++; p = strtok(NULL, ","); } + } else if (strcmp(name, "slowlog-log-slower-than") == 0) { + config.slowlog_log_slower_than = atoi(value); + } else if (strcmp(name, "slowlog-max-len") == 0) { + config.slowlog_max_len = atoi(value); } return 0; } @@ -316,6 +322,8 @@ void context_init(struct context *ctx) STAILQ_INIT(&ctx->free_conn_infoq); TAILQ_INIT(&ctx->servers); TAILQ_INIT(&ctx->conns); + + ctx->slowlog.capacity = 0; // for non worker threads } void build_contexts() @@ -346,6 +354,10 @@ void context_free(struct context *ctx) } dict_free(&ctx->server_table); + /* slowlog */ + if (ctx->slowlog.capacity > 0) + slowlog_free(&ctx->slowlog); + /* mbuf queue */ mbuf_destroy(ctx); @@ -400,6 +412,14 @@ void *main_loop(void *data) { struct context *ctx = data; + if (slowlog_enabled()) { + LOG(DEBUG, "slowlog enabled"); + if (slowlog_init(&ctx->slowlog) == CORVUS_ERR) { + LOG(ERROR, "Fatal: fail to init slowlog."); + exit(EXIT_FAILURE); + } + } + if (event_init(&ctx->loop, 1024) == -1) { LOG(ERROR, "Fatal: fail to create event loop."); exit(EXIT_FAILURE); diff --git a/src/corvus.h b/src/corvus.h index aff1bd8..5a22a21 100644 --- a/src/corvus.h +++ b/src/corvus.h @@ -12,6 +12,7 @@ #include "stats.h" #include "dict.h" #include "event.h" +#include "slowlog.h" #define VERSION "0.2.4" @@ -86,6 +87,9 @@ struct context { struct basic_stats stats; struct memory_stats mstats; long long last_command_latency; + + /* slowlog */ + struct slowlog_queue slowlog; }; struct { @@ -104,6 +108,8 @@ struct { int64_t client_timeout; int64_t server_timeout; int bufsize; + int slowlog_log_slower_than; + int slowlog_max_len; } config; int64_t get_time(); diff --git a/src/mbuf.c b/src/mbuf.c index 006b0ba..ffb9357 100644 --- a/src/mbuf.c +++ b/src/mbuf.c @@ -124,6 +124,52 @@ void mbuf_range_clear(struct context *ctx, struct buf_ptr ptr[]) memset(&ptr[1], 0, sizeof(struct buf_ptr)); } +uint32_t mbuf_range_len(struct buf_ptr ptr[2]) +{ + uint32_t len = 0; + struct mbuf *b = ptr[0].buf; + while (true) { + uint8_t *start = b->start; + uint8_t *end = b->end; + if (b == ptr[0].buf) { + start = ptr[0].pos; + } + if (b == ptr[1].buf) { + end = ptr[1].pos; + } + len += (end - start); + if (b == ptr[1].buf) + break; + b = TAILQ_NEXT(b, next); + } + return len; +} + +size_t mbuf_range_copy(uint8_t *dest, struct buf_ptr ptr[2], size_t max_len) +{ + struct mbuf *b = ptr[0].buf; + size_t len = 0; + while (len < max_len) { + uint8_t *start = b->start; + uint8_t *end = b->end; + if (b == ptr[0].buf) { + start = ptr[0].pos; + } + if (b == ptr[1].buf) { + end = ptr[1].pos; + } + if (end - start > max_len - len) { + end = start + max_len - len; + } + memcpy(dest + len, start, end - start); + len += (end - start); + if (b == ptr[1].buf) + break; + b = TAILQ_NEXT(b, next); + } + return len; +} + void mbuf_decref(struct context *ctx, struct mbuf **bufs, int n) { for (int i = 0; i < n; i++) { diff --git a/src/mbuf.h b/src/mbuf.h index eb0a539..5a12f1f 100644 --- a/src/mbuf.h +++ b/src/mbuf.h @@ -50,6 +50,8 @@ uint32_t mbuf_read_size(struct mbuf *); uint32_t mbuf_write_size(struct mbuf *); void mbuf_destroy(struct context *ctx); void mbuf_range_clear(struct context *ctx, struct buf_ptr ptr[]); +uint32_t mbuf_range_len(struct buf_ptr ptr[]); +size_t mbuf_range_copy(uint8_t *dest, struct buf_ptr ptr[], size_t max_len); void mbuf_decref(struct context *ctx, struct mbuf **bufs, int n); void buf_time_append(struct context *ctx, struct buf_time_tqh *queue, struct mbuf *buf, int64_t read_time); diff --git a/src/parser.c b/src/parser.c index b7c097a..b3261b0 100644 --- a/src/parser.c +++ b/src/parser.c @@ -536,17 +536,26 @@ int reader_ready(struct reader *r) int pos_to_str(struct pos_array *pos, char *str) { - int i, cur_len = 0; int length = pos->str_len; if (length <= 0) { LOG(ERROR, "pos_to_str: string length %d <= 0", length); return CORVUS_ERR; } - for (i = 0; i < pos->pos_len; i++) { - memcpy(str + cur_len, pos->items[i].str, pos->items[i].len); - cur_len += pos->items[i].len; - } + pos_to_str_with_limit(pos, (uint8_t*)str, length); str[length] = '\0'; return CORVUS_OK; } + +size_t pos_to_str_with_limit(struct pos_array *pos, uint8_t *str, size_t limit) +{ + size_t len = 0; + for (size_t i = 0; i != pos->pos_len && len < limit; i++) { + size_t curr_len = pos->items[i].len; + if (limit - len < curr_len) + curr_len = limit - len; + memcpy(str + len, pos->items[i].str, curr_len); + len += curr_len; + } + return len; +} diff --git a/src/parser.h b/src/parser.h index 4a99b20..d80648d 100644 --- a/src/parser.h +++ b/src/parser.h @@ -142,5 +142,6 @@ struct pos *pos_get(struct pos_array *arr, int idx); int pos_to_str(struct pos_array *pos, char *str); void redis_data_free(struct redis_data *data); void redis_data_move(struct redis_data *lhs, struct redis_data *rhs); +size_t pos_to_str_with_limit(struct pos_array *pos, uint8_t *str, size_t limit); #endif /* end of include guard: PARSER_H */ diff --git a/src/slowlog.c b/src/slowlog.c new file mode 100644 index 0000000..5128049 --- /dev/null +++ b/src/slowlog.c @@ -0,0 +1,160 @@ +#include +#include +#include "alloc.h" +#include "corvus.h" +#include "slowlog.h" +#include "logging.h" + +// use 32bit to guarantee slowlog id will not overflow in slowlog_entry.id +static uint32_t slowlog_id = 0; +static inline size_t min(size_t a, size_t b) { return a < b ? a : b; } + + +int slowlog_init(struct slowlog_queue *slowlog) +{ + size_t queue_len = 1 + (config.slowlog_max_len - 1) / config.thread; // round up + slowlog->capacity = queue_len; + slowlog->entries = cv_calloc(queue_len, sizeof(struct slowlog_entry)); + slowlog->entry_locks = cv_malloc(queue_len * sizeof(pthread_mutex_t)); + slowlog->curr = 0; + int err; + for (size_t i = 0; i != queue_len; i++) { + if ((err = pthread_mutex_init(slowlog->entry_locks + i, NULL)) != 0) { + LOG(ERROR, "pthread_mutex_init: %s", strerror(err)); + return CORVUS_ERR; + } + } + return CORVUS_OK; +} + +// Should be called only after all worker threads have stopped +void slowlog_free(struct slowlog_queue *slowlog) +{ + for (size_t i = 0; i != slowlog->capacity; i++) { + // no other worker threads will read entry queue any more + if (slowlog->entries[i] != NULL) { + slowlog_dec_ref(slowlog->entries[i]); + } + pthread_mutex_destroy(slowlog->entry_locks + i); + } + cv_free(slowlog->entries); + cv_free(slowlog->entry_locks); +} + +struct slowlog_entry *slowlog_create_entry(struct command *cmd, int64_t latency) +{ + struct slowlog_entry *entry = cv_calloc(1, sizeof(struct slowlog_entry)); + entry->id = ATOMIC_INC(slowlog_id, 1); + entry->log_time = time(NULL); // redis also uses time(NULL); + entry->latency = latency; + entry->refcount = 1; + + const char *bytes_fmt = "(%zd bytes)"; + char bytes_buf[19]; // strlen("4294967295") + strlen("( bytes)") + 1 + + assert(cmd->data.elements > 0); + size_t argc = min(cmd->data.elements, SLOWLOG_ENTRY_MAX_ARGC); + entry->argc = argc; + size_t i = 0; + + // for sub cmd + if (cmd->prefix) { + const char *c = cmd_extract_prefix(cmd->prefix); + assert(c != NULL); + entry->argv[i].len = strlen(c); + entry->argv[i++].str = (uint8_t*)c; + entry->argc = min(argc + 1, SLOWLOG_ENTRY_MAX_ARGC); + argc = min(argc, SLOWLOG_ENTRY_MAX_ARGC - 1); + } + // use the last argument to record total argument counts + if (cmd->data.elements > argc) { + argc--; + const size_t tail_max_len = 64; + char tail_buf[tail_max_len]; + uint8_t *buf = cv_malloc(tail_max_len); + int len = snprintf(tail_buf, tail_max_len, "(%zd arguments in total)", cmd->data.elements); + len = snprintf((char*)buf, tail_max_len, "$%d\r\n%s\r\n", len, tail_buf); + entry->argv[SLOWLOG_ENTRY_MAX_ARGC - 1].str = buf; + entry->argv[SLOWLOG_ENTRY_MAX_ARGC - 1].len = len; + } + size_t max_len = SLOWLOG_ENTRY_MAX_STRING; + for (size_t j = 0; j != argc; i++, j++) { + struct redis_data *arg = cmd->data.element + j; + assert(arg->type == REP_STRING); + size_t real_len = mbuf_range_len(arg->buf); + size_t len = min(real_len, max_len); + uint8_t *buf = cv_malloc(len); // will be freed by the last owner + if (real_len > max_len) { + size_t str_len = SLOWLOG_MAX_ARG_LEN; + int hdr_len = snprintf((char*)buf, len, "$%zd\r\n", str_len); + pos_to_str_with_limit(&arg->pos, buf + hdr_len, str_len); + int postfix_len = snprintf(bytes_buf, sizeof bytes_buf, bytes_fmt, arg->pos.str_len); + memcpy(buf + max_len - 2 - postfix_len, bytes_buf, postfix_len); + memcpy(buf + max_len - 2, "\r\n", 2); + } else { + mbuf_range_copy(buf, arg->buf, len); + } + entry->argv[i].str = buf; + entry->argv[i].len = len; + } + + return entry; +} + +void slowlog_dec_ref(struct slowlog_entry *entry) +{ + int refcount = ATOMIC_DEC(entry->refcount, 1); + assert(refcount >= 0); + if (refcount > 0) return; + + for (size_t i = 0; i != entry->argc; i++) { + uint8_t *buf = entry->argv[i].str; + cv_free(buf); + } + cv_free(entry); +} + +void slowlog_set(struct slowlog_queue *queue, struct slowlog_entry *entry) +{ + size_t curr = ATOMIC_GET(queue->curr); + pthread_mutex_lock(queue->entry_locks + curr); + struct slowlog_entry *old_entry = queue->entries[curr]; + queue->entries[curr] = entry; + pthread_mutex_unlock(queue->entry_locks + curr); + ATOMIC_SET(queue->curr, (curr + 1) % queue->capacity); + if (old_entry != NULL) { + slowlog_dec_ref(old_entry); + } +} + +struct slowlog_entry *slowlog_get(struct slowlog_queue *queue, size_t index) +{ + pthread_mutex_lock(queue->entry_locks + index); + struct slowlog_entry *entry = queue->entries[index]; + if (entry == NULL) { + pthread_mutex_unlock(queue->entry_locks + index); + return NULL; + } + ATOMIC_INC(entry->refcount, 1); + pthread_mutex_unlock(queue->entry_locks + index); + return entry; +} + +bool slowlog_enabled() +{ + return config.slowlog_max_len > 0 + && config.slowlog_log_slower_than >= 0; +} + +bool slowlog_type_need_log(struct command *cmd) +{ + return cmd->request_type != CMD_EXTRA + && cmd->request_type != CMD_UNIMPL; +} + +bool slowlog_need_log(struct command *cmd, long long latency) +{ + return slowlog_enabled() + && slowlog_type_need_log(cmd) + && latency > config.slowlog_log_slower_than * 1000; +} diff --git a/src/slowlog.h b/src/slowlog.h new file mode 100644 index 0000000..36c1d1d --- /dev/null +++ b/src/slowlog.h @@ -0,0 +1,49 @@ +#ifndef SLOWLOG_H +#define SLOWLOG_H + +#include +#include "mbuf.h" +#include "parser.h" + + +#define SLOWLOG_ENTRY_MAX_ARGC 32 +#define SLOWLOG_ENTRY_MAX_STRING 128 +#define SLOWLOG_MAX_ARG_LEN 120 // SLOWLOG_ENTRY_MAX_STRING - strlen("%120\r\n\r\n") + +// Same with slowlog format of redis. +// Note that slowlog_entry will be created from one thread +// and also held by mutliple reader threads executing 'slowlog get' command. +// It will be freed by the last owner thread. +struct slowlog_entry { + long long id; + long long log_time; + long long latency; + int refcount; + int argc; + struct pos argv[SLOWLOG_ENTRY_MAX_ARGC]; +}; + +struct slowlog_queue { + struct slowlog_entry **entries; + pthread_mutex_t *entry_locks; + size_t capacity; + size_t curr; +}; + +struct context; +struct command; + +int slowlog_init(struct slowlog_queue *slowlog); +void slowlog_free(struct slowlog_queue *slowlog); + +// only called by the thread who creates the log +struct slowlog_entry *slowlog_create_entry(struct command *cmd, int64_t latency); +// called by all worker threads +void slowlog_set(struct slowlog_queue *queue, struct slowlog_entry *entry); +void slowlog_dec_ref(struct slowlog_entry *entry); +struct slowlog_entry *slowlog_get(struct slowlog_queue *queue, size_t index); +bool slowlog_enabled(); +bool slowlog_type_need_log(struct command *cmd); +bool slowlog_need_log(struct command *cmd, long long latency); + +#endif diff --git a/tests/corvus_test.c b/tests/corvus_test.c index 6a722c3..38733fa 100644 --- a/tests/corvus_test.c +++ b/tests/corvus_test.c @@ -64,6 +64,8 @@ extern TEST_CASE(test_client); extern TEST_CASE(test_timer); extern TEST_CASE(test_config); extern TEST_CASE(test_stats); +extern TEST_CASE(test_mbuf); +extern TEST_CASE(test_slowlog); int main(int argc, const char *argv[]) { @@ -95,6 +97,8 @@ int main(int argc, const char *argv[]) RUN_CASE(test_timer); RUN_CASE(test_config); RUN_CASE(test_stats); + RUN_CASE(test_mbuf); + RUN_CASE(test_slowlog); usleep(10000); slot_create_job(SLOT_UPDATER_QUIT); diff --git a/tests/test_mbuf.c b/tests/test_mbuf.c new file mode 100644 index 0000000..60ecfc1 --- /dev/null +++ b/tests/test_mbuf.c @@ -0,0 +1,78 @@ +#include "test.h" +#include "alloc.h" + +void init_mbuf_for_test(struct mbuf *b, size_t len) { + uint8_t *p = cv_malloc(len); + b->start = p; + b->end = p + len; + b->pos = p; + b->last = p; +} + +TEST(test_mbuf_range_func) { + struct mhdr q; + TAILQ_INIT(&q); + struct buf_ptr ptr[2]; + struct mbuf b1, b2, b3; + init_mbuf_for_test(&b1, 10); + init_mbuf_for_test(&b2, 10); + init_mbuf_for_test(&b3, 10); + TAILQ_INSERT_TAIL(&q, &b1, next); + TAILQ_INSERT_TAIL(&q, &b2, next); + TAILQ_INSERT_TAIL(&q, &b3, next); + uint8_t *tmp = NULL; + + // one buf + uint8_t *a = b1.start + 3; + memcpy(a, "abcde", 5); + ptr[0].buf = &b1; + ptr[0].pos = a; + ptr[1].buf = &b1; + ptr[1].pos = b1.start + 8; + ASSERT(mbuf_range_len(ptr) == 5); + tmp = cv_calloc(10, 1); + ASSERT(mbuf_range_copy(tmp, ptr, 10) == 5); + ASSERT(memcmp(tmp, "abcde", 5) == 0); + ASSERT(tmp[5] == 0); + cv_free(tmp); + + // two buf + memcpy(a, "abcdefg", 7); + memcpy(b2.start, "12345", 5); + ptr[1].buf = &b2; + ptr[1].pos = b2.start + 5; + ASSERT(mbuf_range_len(ptr) == 7 + 5); + tmp = cv_calloc(20, 1); + ASSERT(mbuf_range_copy(tmp, ptr, 20) == 12); + ASSERT(memcmp(tmp, "abcdefg12345", 12) == 0); + ASSERT(tmp[12] == 0); + cv_free(tmp); + + // three buf + memcpy(b2.start, "0123456789", 10); + memcpy(b3.start, "crvs", 4); + ptr[1].buf = &b3; + ptr[1].pos = b3.start + 4; + ASSERT(mbuf_range_len(ptr) == 7 + 10 + 4); + tmp = cv_calloc(30, 1); + ASSERT(mbuf_range_copy(tmp, ptr, 30) == 21); + ASSERT(memcmp(tmp, "abcdefg0123456789crvs", 21) == 0); + ASSERT(tmp[21] == 0); + cv_free(tmp); + + // with max limit + uint8_t buf[20] = {0}; + ASSERT(mbuf_range_copy(buf, ptr, 10) == 10); + ASSERT(memcmp(buf, "abcdefg012", 10) == 0); + ASSERT(buf[10] == '\0'); + + cv_free(b1.start); + cv_free(b2.start); + cv_free(b3.start); + + PASS(NULL); +} + +TEST_CASE(test_mbuf) { + RUN_TEST(test_mbuf_range_func); +} diff --git a/tests/test_slowlog.c b/tests/test_slowlog.c new file mode 100644 index 0000000..127c27f --- /dev/null +++ b/tests/test_slowlog.c @@ -0,0 +1,185 @@ +#include "test.h" +#include "slowlog.h" +#include "alloc.h" + + +TEST(test_slowlog_create_entry) { + struct mbuf buf; + struct command cmd; + struct reader r = {0}; + const char cmd_data[] = "*3\r\n$3\r\nSET\r\n$4\r\nkey1\r\n$6\r\nvalue1\r\n"; + char *args[] = { + "$3\r\nSET\r\n", + "$4\r\nkey1\r\n", + "$6\r\nvalue1\r\n", + }; + const size_t args_len[] = {strlen(args[0]), strlen(args[1]), strlen(args[2])}; + buf.pos = (uint8_t*)cmd_data; + buf.last = (uint8_t*)cmd_data + strlen(cmd_data); + reader_init(&r); + reader_feed(&r, &buf); + ASSERT(parse(&r, MODE_REQ) != -1); + cmd.data = r.data; + cmd.prefix = NULL; + + struct slowlog_entry *entry = slowlog_create_entry(&cmd, 233666); + ASSERT(entry->latency == 233666); + ASSERT(entry->refcount == 1); + ASSERT(entry->argc == 3); + ASSERT(entry->argv[0].len == args_len[0]); + ASSERT(entry->argv[1].len == args_len[1]); + ASSERT(entry->argv[2].len == args_len[2]); + ASSERT(strncmp(args[0], entry->argv[0].str, args_len[0]) == 0); + ASSERT(strncmp(args[1], entry->argv[1].str, args_len[1]) == 0); + ASSERT(strncmp(args[2], entry->argv[2].str, args_len[2]) == 0); + slowlog_dec_ref(entry); + + redis_data_free(&cmd.data); + PASS(NULL); +} + +TEST(test_slowlog_create_entry_with_prefix) { + struct command cmd; + char *args[] = { + "$3\r\nSET\r\n", + "$4\r\nkey2\r\n", + "$6\r\nvalue2\r\n", + }; + const size_t args_len[] = {strlen(args[0]), strlen(args[1]), strlen(args[2])}; + struct pos key = {args[1], args_len[1]}; + struct pos value = {args[2], args_len[2]}; + struct pos_array key_pos = { + .items = &key, .pos_len = key.len, .pos_len = 1, .max_pos_size = 1 + }; + struct pos_array value_pos = { + .items = &value, .pos_len = value.len, .pos_len = 1, .max_pos_size = 1 + }; + struct mbuf buf1 = { + .pos = args[1], .last = args[1] + args_len[1], .start = args[1], .end = args[1] + }; + struct mbuf buf2 = { + .pos = args[2], .last = args[2] + args_len[2], .start = args[2], .end = args[2] + }; + struct redis_data element[2] = { + { + .type = REP_STRING, .pos = key_pos, .buf = { + { .buf = &buf1, .pos = buf1.start }, + { .buf = &buf1, .pos = buf1.last } + } + }, + { + .type = REP_STRING, .pos = value_pos, .buf = { + { .buf = &buf2, .pos = buf2.start }, + { .buf = &buf2, .pos = buf2.last } + } + } + }; + cmd.data.elements = 2; + cmd.data.element = element; + cmd.data.type = REP_ARRAY; + cmd.prefix = (char*)rep_set; + + struct slowlog_entry *entry = slowlog_create_entry(&cmd, 9394); + ASSERT(entry->latency == 9394); + ASSERT(entry->refcount == 1); + ASSERT(entry->argc == 3); + ASSERT(entry->argv[0].len == args_len[0]); + ASSERT(entry->argv[1].len == args_len[1]); + ASSERT(entry->argv[2].len == args_len[2]); + ASSERT(strncmp(entry->argv[0].str, args[0], args_len[0]) == 0); + ASSERT(strncmp(entry->argv[1].str, args[1], args_len[1]) == 0); + ASSERT(strncmp(entry->argv[2].str, args[2], args_len[2]) == 0); + slowlog_dec_ref(entry); + + PASS(NULL); +} + +TEST(test_slowlog_create_entry_with_long_arg) { + struct mbuf buf; + struct command cmd; + struct reader r = {0}; + const char cmd_data[] = "*37\r\n$4\r\nMGET\r\n" + "$144\r\n" + "1234567890qwertyuiopasdfghjklzxcvbnm" + "1234567890qwertyuiopasdfghjklzxcvbnm" + "1234567890qwertyuiopasdfghjklzxcvbnm" + "1234567890qwertyuiopasdfghjklzxcvbnm\r\n" + "$1\r\na\r\n$1\r\na\r\n$1\r\na\r\n$1\r\na\r\n$1\r\na\r\n$1\r\na\r\n" // "a" * 5 + "$1\r\na\r\n$1\r\na\r\n$1\r\na\r\n$1\r\na\r\n$1\r\na\r\n$1\r\na\r\n" + "$1\r\na\r\n$1\r\na\r\n$1\r\na\r\n$1\r\na\r\n$1\r\na\r\n$1\r\na\r\n" + "$1\r\na\r\n$1\r\na\r\n$1\r\na\r\n$1\r\na\r\n$1\r\na\r\n$1\r\na\r\n" + "$1\r\na\r\n$1\r\na\r\n$1\r\na\r\n$1\r\na\r\n$1\r\na\r\n$1\r\na\r\n" + "$1\r\na\r\n$1\r\na\r\n$1\r\na\r\n$1\r\na\r\n$1\r\na\r\n$1\r\na\r\n" + "$1\r\na\r\n$1\r\na\r\n$1\r\na\r\n$1\r\na\r\n$1\r\na\r\n$1\r\na\r\n"; + char a[] = "$1\r\na\r\n"; + char long_value[] = + "$120\r\n" + "1234567890qwertyuiopasdfghjklzxcvbnm" + "1234567890qwertyuiopasdfghjklzxcvbnm" + "1234567890qwertyuiopasdfghjklzxcvbnm" + "1(144 bytes)\r\n"; + "\r\n"; + char tail[] = "$23\r\n(37 arguments in total)\r\n"; + + buf.pos = (uint8_t*)cmd_data; + buf.last = (uint8_t*)cmd_data + strlen(cmd_data); + reader_init(&r); + reader_feed(&r, &buf); + ASSERT(parse(&r, MODE_REQ) != -1); + cmd.data = r.data; + cmd.prefix = NULL; + + struct slowlog_entry *entry = slowlog_create_entry(&cmd, 233666); + ASSERT(SLOWLOG_ENTRY_MAX_STRING == strlen(long_value)); + ASSERT(entry->argv[1].len == strlen(long_value)); + ASSERT(strncmp(long_value, entry->argv[1].str, strlen(long_value)) == 0); + ASSERT(entry->argv[30].len == strlen(a)); + ASSERT(strncmp(a, entry->argv[30].str, strlen(a)) == 0); + ASSERT(entry->argv[31].len == strlen(tail)); + ASSERT(strncmp(tail, entry->argv[31].str, strlen(tail)) == 0); + slowlog_dec_ref(entry); + + redis_data_free(&cmd.data); + PASS(NULL); +} + +TEST(test_entry_get_set) { + config.slowlog_max_len = 2; + config.thread = 1; + struct slowlog_queue q; + slowlog_init(&q); + + struct slowlog_entry e1, e2, e3; + e3.refcount = 2; + e3.argc = 0; + e1 = e2 = e3; + + ASSERT(q.curr == 0); + slowlog_set(&q, &e1); + ASSERT(q.curr == 1); + slowlog_set(&q, &e2); + ASSERT(q.curr == 0); + ASSERT(e1.refcount == 2); + ASSERT(e2.refcount == 2); + ASSERT(q.entries[0] == &e1); + ASSERT(q.entries[1] == &e2); + struct slowlog_entry *e4 = slowlog_get(&q, 0); + ASSERT(e1.refcount == 3); + ASSERT(&e1 == e4); + slowlog_set(&q, &e3); + ASSERT(q.curr == 1); + ASSERT(q.entries[0] == &e3); + ASSERT(e1.refcount == 2); + slowlog_dec_ref(e4); + ASSERT(e1.refcount == 1); + + slowlog_free(&q); + PASS(NULL); +} + +TEST_CASE(test_slowlog) { + RUN_TEST(test_slowlog_create_entry); + RUN_TEST(test_slowlog_create_entry_with_prefix); + RUN_TEST(test_slowlog_create_entry_with_long_arg); + RUN_TEST(test_entry_get_set); +}