Skip to content

Commit

Permalink
Merge pull request #70 from doyoubi/feature/SlowLog
Browse files Browse the repository at this point in the history
Send slow command to statsd. Close #33
  • Loading branch information
maralla authored Jun 12, 2016
2 parents 2433232 + f8da3b4 commit 04c55ec
Show file tree
Hide file tree
Showing 8 changed files with 131 additions and 9 deletions.
9 changes: 9 additions & 0 deletions corvus.conf
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,15 @@ syslog 0
#
# statsd localhost:8125
# metric_interval 10
#
# Slowlog:
# Send slow command to statsd. The `slow_threshold` is in microseconds,
# every command whose lantency exceeds `slow_threshold` will be sent.
# Note that the lantency here is the time spent in proxy,
# including redirection caused by MOVED and ASK.
# Set it to negative value to disable slow log.
#
# slow_threshold 1000

# Buffer size allocated each time avoiding fregments
# Buffer used in processing data recieving or sending
Expand Down
14 changes: 6 additions & 8 deletions src/command.c
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,6 @@ enum {
CMD_EXTRA,
};

struct cmd_item {
char *cmd;
int value;
int type;
int access;
};

const char *rep_err = "-ERR Proxy error\r\n";
const char *rep_parse_err = "-ERR Proxy fail to parse command\r\n";
const char *rep_forward_err = "-ERR Proxy fail to forward command\r\n";
Expand All @@ -61,7 +54,8 @@ static const char *rep_noauth = "-NOAUTH Authentication required.\r\n";
static const char *rep_auth_err = "-ERR invalid password\r\n";
static const char *rep_auth_not_set = "-ERR Client sent AUTH, but no password is set\r\n";

static struct cmd_item cmds[] = {CMD_DO(CMD_BUILD_MAP)};
struct cmd_item cmds[] = {CMD_DO(CMD_BUILD_MAP)};
const size_t CMD_NUM = sizeof(cmds) / sizeof(struct cmd_item);
static struct dict command_map;

static inline uint8_t *cmd_get_data(struct mbuf *b, struct buf_ptr ptr[], int *len)
Expand Down Expand Up @@ -933,6 +927,10 @@ void cmd_stats(struct command *cmd, int64_t end_time)
ATOMIC_INC(ctx->stats.total_latency, latency);
ATOMIC_SET(ctx->last_command_latency, latency);

if (config.slow_threshold >= 0 && latency > config.slow_threshold * 1000) {
ATOMIC_INC(cmd->server->info->slow_cmd_counts[cmd->cmd_type], 1);
}

if (!STAILQ_EMPTY(&cmd->sub_cmds)) {
first = STAILQ_FIRST(&cmd->sub_cmds);
last = STAILQ_LAST(&cmd->sub_cmds, command, sub_cmd_next);
Expand Down
7 changes: 7 additions & 0 deletions src/command.h
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,13 @@ struct redirect_info {
int type;
};

struct cmd_item {
char *cmd;
int value;
int type;
int access;
};

/* error responses */
const char *rep_err,
*rep_parse_err,
Expand Down
3 changes: 3 additions & 0 deletions src/connection.c
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@ static struct connection *conn_create_server(struct context *ctx,
struct connection *server = server_create(ctx, fd);
struct conn_info *info = server->info;
memcpy(&info->addr, addr, sizeof(info->addr));
extern const size_t CMD_NUM;
info->slow_cmd_counts = cv_calloc(CMD_NUM, sizeof(uint32_t));

if (conn_connect(server) == CORVUS_ERR) {
LOG(ERROR, "conn_create_server: fail to connect %s:%d",
Expand Down Expand Up @@ -102,6 +104,7 @@ void conn_info_init(struct conn_info *info)
info->readonly = false;
info->readonly_sent = false;
info->quit = false;
info->slow_cmd_counts = NULL;

memset(&info->addr, 0, sizeof(info->addr));
memset(info->dsn, 0, sizeof(info->dsn));
Expand Down
3 changes: 3 additions & 0 deletions src/connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,9 @@ struct conn_info {
long long completed_commands;

int8_t status;

// slow log, only for server connection in worker thread
uint32_t *slow_cmd_counts;
};

TAILQ_HEAD(conn_tqh, connection);
Expand Down
4 changes: 4 additions & 0 deletions src/corvus.c
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ void config_init()
config.bufsize = DEFAULT_BUFSIZE;
config.requirepass = NULL;
config.readslave = config.readmasterslave = false;
config.slow_threshold = -1;

memset(config.statsd_addr, 0, sizeof(config.statsd_addr));
config.metric_interval = 10;
Expand Down Expand Up @@ -147,6 +148,8 @@ int config_add(char *name, char *value)
config.node.len++;
p = strtok(NULL, ",");
}
} else if (strcmp(name, "slow_threshold") == 0) {
config.slow_threshold = atoi(value);
}
return 0;
}
Expand Down Expand Up @@ -341,6 +344,7 @@ void context_free(struct context *ctx)
cmd_iov_free(&conn->info->iov);
conn_free(conn);
conn_buf_free(conn);
cv_free(conn->info->slow_cmd_counts);
cv_free(conn->info);
cv_free(conn);
}
Expand Down
1 change: 1 addition & 0 deletions src/corvus.h
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ struct {
int64_t client_timeout;
int64_t server_timeout;
int bufsize;
int64_t slow_threshold;
} config;

int64_t get_time();
Expand Down
99 changes: 98 additions & 1 deletion src/stats.c
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
#include "socket.h"
#include "logging.h"
#include "slot.h"
#include "alloc.h"

#define HOST_LEN 255

Expand All @@ -33,6 +34,19 @@ static struct {
double user;
} used_cpu;

static struct dict slow_counts; // node dsn => slow cmd counts


static void stats_free_slow_counts()
{
struct dict_iter iter = DICT_ITER_INITIALIZER;
DICT_FOREACH(&slow_counts, &iter) {
cv_free(iter.value);
}

dict_free(&slow_counts);
}

static inline void stats_get_cpu_usage(struct stats *stats)
{
struct rusage ru;
Expand Down Expand Up @@ -177,7 +191,7 @@ void stats_send_node_info()
{
struct bytes *value;

/* redis-node.127-0-0-1:8000.bytes.{send,recv} */
/* redis-node.127-0-0-1-8000.bytes.{send,recv} */
int len = HOST_LEN + 64;
char name[len];

Expand Down Expand Up @@ -216,6 +230,85 @@ void stats_get(struct stats *stats)
}
}

static void stats_send_slow_log()
{
if (config.slow_threshold < 0)
return;

const char *fmt = "redis-node.%s.slow_query.%s";
const char *sum_fmt = "slow_query.%s";
extern struct cmd_item cmds[];
extern const size_t CMD_NUM;

struct connection *server;
struct context *contexts = get_contexts();

{
struct dict_iter iter = DICT_ITER_INITIALIZER;
DICT_FOREACH(&slow_counts, &iter) {
memset(iter.value, 0, CMD_NUM * sizeof(uint32_t));
}
}

uint32_t counts_sum[CMD_NUM];
memset(counts_sum, 0, sizeof(counts_sum));

for (size_t i = 0; i < config.thread; i++) {
TAILQ_FOREACH(server, &contexts[i].servers, next) {
const char *dsn = server->info->dsn;
uint32_t *node_counts = NULL;
for (size_t j = 0; j < CMD_NUM; j++) {
uint32_t count = ATOMIC_IGET(server->info->slow_cmd_counts[j], 0);
if (count == 0) continue;

if (!node_counts) {
node_counts = (uint32_t*)dict_get(&slow_counts, dsn);
if (!node_counts) {
node_counts = cv_calloc(CMD_NUM, sizeof(uint32_t));
dict_set(&slow_counts, dsn, node_counts);
}
}
node_counts[j] += count;
counts_sum[j] += count;
}
}
}

struct dict_iter iter = DICT_ITER_INITIALIZER;
DICT_FOREACH(&slow_counts, &iter) {
const char *dsn = iter.key;
uint32_t *counts = (uint32_t*)iter.value;

char addr[ADDRESS_LEN] = {0};
strncpy(addr, dsn, ADDRESS_LEN);
for (size_t i = 0; i < ADDRESS_LEN; i++) {
if (addr[i] == '.' || addr[i] == ':')
addr[i] = '-';
}

for (size_t i = 0; i < CMD_NUM; i++) {
if(counts[i] == 0) continue;

const char *cmd = cmds[i].cmd;
int n = snprintf(NULL, 0, fmt, addr, cmd);
char buf[n + 1];
snprintf(buf, sizeof(buf), fmt, addr, cmd);
stats_send(buf, counts[i]);
}
}

for (size_t i = 0; i < CMD_NUM; i++) {
uint32_t sum = counts_sum[i];
if (sum) {
const char *cmd = cmds[i].cmd;
int n = snprintf(NULL, 0, sum_fmt, cmd);
char buf[n + 1];
snprintf(buf, sizeof(buf), sum_fmt, cmd);
stats_send(buf, sum);
}
}
}

void *stats_daemon(void *data)
{
/* Make the thread killable at any time can work reliably. */
Expand All @@ -226,6 +319,7 @@ void *stats_daemon(void *data)
sleep(config.metric_interval);
stats_send_simple();
stats_send_node_info();
stats_send_slow_log();
LOG(DEBUG, "sending metrics");
}
return NULL;
Expand All @@ -250,6 +344,8 @@ int stats_init()
if (hostname[i] == '.') hostname[i] = '-';
}

dict_init(&slow_counts);

LOG(INFO, "starting stats thread");
return thread_spawn(&stats_ctx, stats_daemon);
}
Expand All @@ -259,6 +355,7 @@ void stats_kill()
int err;

dict_free(&bytes_map);
stats_free_slow_counts();

if (pthread_cancel(stats_ctx.thread) == 0) {
if ((err = pthread_join(stats_ctx.thread, NULL)) != 0) {
Expand Down

0 comments on commit 04c55ec

Please sign in to comment.