diff --git a/corvus.conf b/corvus.conf index 0e919ee..f4edefb 100644 --- a/corvus.conf +++ b/corvus.conf @@ -38,6 +38,15 @@ syslog 0 # # statsd localhost:8125 # metric_interval 10 +# +# Slowlog: +# Send slow command to statsd. The `slow_threshold` is in microseconds, +# every command whose lantency exceeds `slow_threshold` will be sent. +# Note that the lantency here is the time spent in proxy, +# including redirection caused by MOVED and ASK. +# Set it to negative value to disable slow log. +# +# slow_threshold 1000 # Buffer size allocated each time avoiding fregments # Buffer used in processing data recieving or sending diff --git a/src/command.c b/src/command.c index 04e140c..3f54beb 100644 --- a/src/command.c +++ b/src/command.c @@ -36,13 +36,6 @@ enum { CMD_EXTRA, }; -struct cmd_item { - char *cmd; - int value; - int type; - int access; -}; - const char *rep_err = "-ERR Proxy error\r\n"; const char *rep_parse_err = "-ERR Proxy fail to parse command\r\n"; const char *rep_forward_err = "-ERR Proxy fail to forward command\r\n"; @@ -61,7 +54,8 @@ static const char *rep_noauth = "-NOAUTH Authentication required.\r\n"; static const char *rep_auth_err = "-ERR invalid password\r\n"; static const char *rep_auth_not_set = "-ERR Client sent AUTH, but no password is set\r\n"; -static struct cmd_item cmds[] = {CMD_DO(CMD_BUILD_MAP)}; +struct cmd_item cmds[] = {CMD_DO(CMD_BUILD_MAP)}; +const size_t CMD_NUM = sizeof(cmds) / sizeof(struct cmd_item); static struct dict command_map; static inline uint8_t *cmd_get_data(struct mbuf *b, struct buf_ptr ptr[], int *len) @@ -933,6 +927,10 @@ void cmd_stats(struct command *cmd, int64_t end_time) ATOMIC_INC(ctx->stats.total_latency, latency); ATOMIC_SET(ctx->last_command_latency, latency); + if (config.slow_threshold >= 0 && latency > config.slow_threshold * 1000) { + ATOMIC_INC(cmd->server->info->slow_cmd_counts[cmd->cmd_type], 1); + } + if (!STAILQ_EMPTY(&cmd->sub_cmds)) { first = STAILQ_FIRST(&cmd->sub_cmds); last = STAILQ_LAST(&cmd->sub_cmds, command, sub_cmd_next); diff --git a/src/command.h b/src/command.h index 264989b..de1fcd9 100644 --- a/src/command.h +++ b/src/command.h @@ -231,6 +231,13 @@ struct redirect_info { int type; }; +struct cmd_item { + char *cmd; + int value; + int type; + int access; +}; + /* error responses */ const char *rep_err, *rep_parse_err, diff --git a/src/connection.c b/src/connection.c index 581ff2f..94bcb4e 100644 --- a/src/connection.c +++ b/src/connection.c @@ -75,6 +75,8 @@ static struct connection *conn_create_server(struct context *ctx, struct connection *server = server_create(ctx, fd); struct conn_info *info = server->info; memcpy(&info->addr, addr, sizeof(info->addr)); + extern const size_t CMD_NUM; + info->slow_cmd_counts = cv_calloc(CMD_NUM, sizeof(uint32_t)); if (conn_connect(server) == CORVUS_ERR) { LOG(ERROR, "conn_create_server: fail to connect %s:%d", @@ -102,6 +104,7 @@ void conn_info_init(struct conn_info *info) info->readonly = false; info->readonly_sent = false; info->quit = false; + info->slow_cmd_counts = NULL; memset(&info->addr, 0, sizeof(info->addr)); memset(info->dsn, 0, sizeof(info->dsn)); diff --git a/src/connection.h b/src/connection.h index e1b4a26..bf15560 100644 --- a/src/connection.h +++ b/src/connection.h @@ -69,6 +69,9 @@ struct conn_info { long long completed_commands; int8_t status; + + // slow log, only for server connection in worker thread + uint32_t *slow_cmd_counts; }; TAILQ_HEAD(conn_tqh, connection); diff --git a/src/corvus.c b/src/corvus.c index 34c4f69..3e13363 100644 --- a/src/corvus.c +++ b/src/corvus.c @@ -38,6 +38,7 @@ void config_init() config.bufsize = DEFAULT_BUFSIZE; config.requirepass = NULL; config.readslave = config.readmasterslave = false; + config.slow_threshold = -1; memset(config.statsd_addr, 0, sizeof(config.statsd_addr)); config.metric_interval = 10; @@ -147,6 +148,8 @@ int config_add(char *name, char *value) config.node.len++; p = strtok(NULL, ","); } + } else if (strcmp(name, "slow_threshold") == 0) { + config.slow_threshold = atoi(value); } return 0; } @@ -341,6 +344,7 @@ void context_free(struct context *ctx) cmd_iov_free(&conn->info->iov); conn_free(conn); conn_buf_free(conn); + cv_free(conn->info->slow_cmd_counts); cv_free(conn->info); cv_free(conn); } diff --git a/src/corvus.h b/src/corvus.h index f7e96bc..180dd94 100644 --- a/src/corvus.h +++ b/src/corvus.h @@ -104,6 +104,7 @@ struct { int64_t client_timeout; int64_t server_timeout; int bufsize; + int64_t slow_threshold; } config; int64_t get_time(); diff --git a/src/stats.c b/src/stats.c index d69593a..dd63e1f 100644 --- a/src/stats.c +++ b/src/stats.c @@ -8,6 +8,7 @@ #include "socket.h" #include "logging.h" #include "slot.h" +#include "alloc.h" #define HOST_LEN 255 @@ -33,6 +34,19 @@ static struct { double user; } used_cpu; +static struct dict slow_counts; // node dsn => slow cmd counts + + +static void stats_free_slow_counts() +{ + struct dict_iter iter = DICT_ITER_INITIALIZER; + DICT_FOREACH(&slow_counts, &iter) { + cv_free(iter.value); + } + + dict_free(&slow_counts); +} + static inline void stats_get_cpu_usage(struct stats *stats) { struct rusage ru; @@ -177,7 +191,7 @@ void stats_send_node_info() { struct bytes *value; - /* redis-node.127-0-0-1:8000.bytes.{send,recv} */ + /* redis-node.127-0-0-1-8000.bytes.{send,recv} */ int len = HOST_LEN + 64; char name[len]; @@ -216,6 +230,85 @@ void stats_get(struct stats *stats) } } +static void stats_send_slow_log() +{ + if (config.slow_threshold < 0) + return; + + const char *fmt = "redis-node.%s.slow_query.%s"; + const char *sum_fmt = "slow_query.%s"; + extern struct cmd_item cmds[]; + extern const size_t CMD_NUM; + + struct connection *server; + struct context *contexts = get_contexts(); + + { + struct dict_iter iter = DICT_ITER_INITIALIZER; + DICT_FOREACH(&slow_counts, &iter) { + memset(iter.value, 0, CMD_NUM * sizeof(uint32_t)); + } + } + + uint32_t counts_sum[CMD_NUM]; + memset(counts_sum, 0, sizeof(counts_sum)); + + for (size_t i = 0; i < config.thread; i++) { + TAILQ_FOREACH(server, &contexts[i].servers, next) { + const char *dsn = server->info->dsn; + uint32_t *node_counts = NULL; + for (size_t j = 0; j < CMD_NUM; j++) { + uint32_t count = ATOMIC_IGET(server->info->slow_cmd_counts[j], 0); + if (count == 0) continue; + + if (!node_counts) { + node_counts = (uint32_t*)dict_get(&slow_counts, dsn); + if (!node_counts) { + node_counts = cv_calloc(CMD_NUM, sizeof(uint32_t)); + dict_set(&slow_counts, dsn, node_counts); + } + } + node_counts[j] += count; + counts_sum[j] += count; + } + } + } + + struct dict_iter iter = DICT_ITER_INITIALIZER; + DICT_FOREACH(&slow_counts, &iter) { + const char *dsn = iter.key; + uint32_t *counts = (uint32_t*)iter.value; + + char addr[ADDRESS_LEN] = {0}; + strncpy(addr, dsn, ADDRESS_LEN); + for (size_t i = 0; i < ADDRESS_LEN; i++) { + if (addr[i] == '.' || addr[i] == ':') + addr[i] = '-'; + } + + for (size_t i = 0; i < CMD_NUM; i++) { + if(counts[i] == 0) continue; + + const char *cmd = cmds[i].cmd; + int n = snprintf(NULL, 0, fmt, addr, cmd); + char buf[n + 1]; + snprintf(buf, sizeof(buf), fmt, addr, cmd); + stats_send(buf, counts[i]); + } + } + + for (size_t i = 0; i < CMD_NUM; i++) { + uint32_t sum = counts_sum[i]; + if (sum) { + const char *cmd = cmds[i].cmd; + int n = snprintf(NULL, 0, sum_fmt, cmd); + char buf[n + 1]; + snprintf(buf, sizeof(buf), sum_fmt, cmd); + stats_send(buf, sum); + } + } +} + void *stats_daemon(void *data) { /* Make the thread killable at any time can work reliably. */ @@ -226,6 +319,7 @@ void *stats_daemon(void *data) sleep(config.metric_interval); stats_send_simple(); stats_send_node_info(); + stats_send_slow_log(); LOG(DEBUG, "sending metrics"); } return NULL; @@ -250,6 +344,8 @@ int stats_init() if (hostname[i] == '.') hostname[i] = '-'; } + dict_init(&slow_counts); + LOG(INFO, "starting stats thread"); return thread_spawn(&stats_ctx, stats_daemon); } @@ -259,6 +355,7 @@ void stats_kill() int err; dict_free(&bytes_map); + stats_free_slow_counts(); if (pthread_cancel(stats_ctx.thread) == 0) { if ((err = pthread_join(stats_ctx.thread, NULL)) != 0) {