Skip to content

Commit

Permalink
Add 'slowlog len' and 'slowlog reset'
Browse files Browse the repository at this point in the history
  • Loading branch information
doyoubi committed Aug 29, 2016
1 parent 6cbd412 commit 81a0ce2
Show file tree
Hide file tree
Showing 7 changed files with 79 additions and 43 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ Commands
* `DEL`: split to multiple single key `DEL`.
* `EXISTS`: split to multiple single key `EXISTS`.
* `PING`: ignored and won't be forwarded.
* `INFO`, `TIME`: won't be forwarded to backend redis, information collected in proxy
* `INFO`, `TIME`, `SLOWLOG`: won't be forwarded to backend redis, information collected in proxy
will be returned.
* `AUTH`: do authentication in proxy.

Expand Down Expand Up @@ -123,7 +123,7 @@ all backend redis instances.
* `CLUSTER`.
* `ECHO`, `QUIT`, `SELECT`.
* `BGREWRITEAOF`, `BGSAVE`, `CLIENT`, `COMMAND`, `CONFIG`, `DBSIZE`, `DEBUG`, `FLUSHALL`,
`FLUSHDB`, `LASTSAVE`, `MONITOR`, `ROLE`, `SAVE`, `SHUTDOWN`, `SLAVEOF`, `SLOWLOG`, `SYNC`.
`FLUSHDB`, `LASTSAVE`, `MONITOR`, `ROLE`, `SAVE`, `SHUTDOWN`, `SLAVEOF`, `SYNC`.

License
-------
Expand Down
4 changes: 3 additions & 1 deletion corvus.conf
Original file line number Diff line number Diff line change
Expand Up @@ -66,11 +66,13 @@ syslog 0

# Slowlog
# The following two configs are almost the same with redis.
# every command whose lantency exceeds `slowlog-log-slower-than` will appear
# Every command whose lantency exceeds `slowlog-log-slower-than` will appear
# in the result of `slowlog get`.
# Note that the lantency here is the time spent in proxy,
# including redirection caused by MOVED and ASK.
# Set one of them to negative value to disable slow log.
#
# A zero value will log every command.
# slowlog-log-slower-than 10000
#
# slowlog-max-len 1024
85 changes: 63 additions & 22 deletions src/command.c
Original file line number Diff line number Diff line change
Expand Up @@ -544,20 +544,24 @@ static int cmd_parse_len(struct redis_data *data, int *result)
return CORVUS_ERR;
}

int v = 0;
for (size_t i = 0; i != data->pos.str_len; i++) {
char c = len_limit[i];
if (c < '0' || c > '9') {
LOG(ERROR, "parse_len: char not between 0-9");
return CORVUS_ERR;
}
v = 10 * v + (c - '0');
*result = atoi(len_limit);
if (*result <= 0) {
return CORVUS_ERR;
}
*result = v;

return CORVUS_OK;
}

static int cmd_slowlog_entry_cmp(const void * lhs, const void * rhs)
{
const struct slowlog_entry *e1 = *((struct slowlog_entry**)lhs);
const struct slowlog_entry *e2 = *((struct slowlog_entry**)rhs);
return e1->log_time < e2->log_time ? -1 :
e1->log_time > e2->log_time ? 1 :
e1->id < e2->id ? -1 :
e1->id > e2->id ? 1 : 0;
}

int cmd_slowlog_get(struct command *cmd, struct redis_data *data)
{
// For example 'slowlog get 128', element[2] is 128 here
Expand All @@ -583,24 +587,20 @@ int cmd_slowlog_get(struct command *cmd, struct redis_data *data)
struct context *contexts = get_contexts();
struct slowlog_entry *entries[len];
int count = 0;
size_t queue_len = contexts[0].slowlog.len;
bool exhausted[config.thread];
memset(exhausted, 0, sizeof exhausted);
size_t queue_len = contexts[0].slowlog.capacity;
for (size_t i = 0; i != queue_len && count < len; i++) {
for (size_t j = 0; j != config.thread && count < len; j++) {
if (exhausted[j])
continue;
struct slowlog_queue *queue = &contexts[j].slowlog;
// slowlog_get will lock mutex
struct slowlog_entry *entry = slowlog_get(queue, i);
if (entry == NULL) {
exhausted[j] = true;
continue;
if (entry) {
entries[count++] = entry;
}
entries[count++] = entry;
}
}

qsort(entries, count, sizeof(struct slowlog_entry*), cmd_slowlog_entry_cmp);

// generate redis packet
const char *hdr_fmt =
"*4\r\n"
Expand All @@ -611,7 +611,6 @@ int cmd_slowlog_get(struct command *cmd, struct redis_data *data)
char buf[150];

int size = snprintf(buf, sizeof buf, "*%d\r\n", count);
assert(size < 150);
conn_add_data(cmd->client, (uint8_t*)buf, size,
&cmd->rep_buf[0], &cmd->rep_buf[1]);

Expand All @@ -620,6 +619,7 @@ int cmd_slowlog_get(struct command *cmd, struct redis_data *data)
assert(entry->argc > 0);
size = snprintf(buf, sizeof buf, hdr_fmt,
entry->id, entry->log_time, entry->latency, entry->argc);
assert(size < 150);
conn_add_data(cmd->client, (uint8_t*)buf, size, NULL, NULL);

for (size_t j = 0; j != entry->argc; j++) {
Expand All @@ -635,8 +635,49 @@ int cmd_slowlog_get(struct command *cmd, struct redis_data *data)
return CORVUS_OK;
}

int cmd_slowlog_len(struct command *cmd) { return CORVUS_ERR; }
int cmd_slowlog_reset(struct command *cmd) { return CORVUS_ERR; }
int cmd_slowlog_len(struct command *cmd)
{
int len = 0;
struct context *contexts = get_contexts();

for (size_t i = 0; i != config.thread; i++) {
struct slowlog_queue *queue = &contexts[i].slowlog;
for (size_t j = 0; j != queue->capacity && len < config.slowlog_max_len; j++) {
struct slowlog_entry *entry = slowlog_get(queue, j);
if (entry) {
len++;
slowlog_dec_ref(entry);
}
}
}

char buf[30];
int size = snprintf(buf, sizeof buf, ":%d\r\n", len);
conn_add_data(cmd->client, (uint8_t*)buf, size,
&cmd->rep_buf[0], &cmd->rep_buf[1]);
CMD_INCREF(cmd);
cmd_mark_done(cmd);

return CORVUS_OK;
}

int cmd_slowlog_reset(struct command *cmd)
{
struct context *contexts = get_contexts();
for (size_t i = 0; i != config.thread; i++) {
struct slowlog_queue *queue = &contexts[i].slowlog;
for (size_t j = 0; j != queue->capacity; j++) {
slowlog_set(queue, NULL);
}
}

conn_add_data(cmd->client, (uint8_t*)rep_ok, strlen(rep_ok),
&cmd->rep_buf[0], &cmd->rep_buf[1]);
CMD_INCREF(cmd);
cmd_mark_done(cmd);

return CORVUS_OK;
}

int cmd_slowlog(struct command *cmd, struct redis_data *data)
{
Expand Down Expand Up @@ -775,7 +816,7 @@ int cmd_parse_req(struct command *cmd, struct mbuf *buf)
return CORVUS_OK;
}

if (!slowlog_enabled() || !slowlog_type_need_log(cmd)) {
if (!(slowlog_enabled() && slowlog_type_need_log(cmd))) {
redis_data_free(&r->data);
return CORVUS_OK;
}
Expand Down
4 changes: 2 additions & 2 deletions src/corvus.c
Original file line number Diff line number Diff line change
Expand Up @@ -323,7 +323,7 @@ void context_init(struct context *ctx)
TAILQ_INIT(&ctx->servers);
TAILQ_INIT(&ctx->conns);

ctx->slowlog.len = 0; // for non worker threads
ctx->slowlog.capacity = 0; // for non worker threads
}

void build_contexts()
Expand Down Expand Up @@ -355,7 +355,7 @@ void context_free(struct context *ctx)
dict_free(&ctx->server_table);

/* slowlog */
if (ctx->slowlog.len > 0)
if (ctx->slowlog.capacity > 0)
slowlog_free(&ctx->slowlog);

/* mbuf queue */
Expand Down
6 changes: 1 addition & 5 deletions src/parser.c
Original file line number Diff line number Diff line change
Expand Up @@ -536,17 +536,13 @@ int reader_ready(struct reader *r)

int pos_to_str(struct pos_array *pos, char *str)
{
int i, cur_len = 0;
int length = pos->str_len;
if (length <= 0) {
LOG(ERROR, "pos_to_str: string length %d <= 0", length);
return CORVUS_ERR;
}

for (i = 0; i < pos->pos_len; i++) {
memcpy(str + cur_len, pos->items[i].str, pos->items[i].len);
cur_len += pos->items[i].len;
}
pos_to_str_with_limit(pos, (uint8_t*)str, length);
str[length] = '\0';
return CORVUS_OK;
}
Expand Down
16 changes: 7 additions & 9 deletions src/slowlog.c
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
#include <assert.h>
#include <string.h>

#include "alloc.h"
#include "corvus.h"
#include "slowlog.h"
Expand All @@ -14,7 +13,7 @@ static inline size_t min(size_t a, size_t b) { return a < b ? a : b; }
int slowlog_init(struct slowlog_queue *slowlog)
{
size_t queue_len = 1 + (config.slowlog_max_len - 1) / config.thread; // round up
slowlog->len = queue_len;
slowlog->capacity = queue_len;
slowlog->entries = cv_calloc(queue_len, sizeof(struct slowlog_entry));
slowlog->entry_locks = cv_malloc(queue_len * sizeof(pthread_mutex_t));
slowlog->curr = 0;
Expand All @@ -31,7 +30,7 @@ int slowlog_init(struct slowlog_queue *slowlog)
// Should be called only after all worker threads have stopped
void slowlog_free(struct slowlog_queue *slowlog)
{
for (size_t i = 0; i != slowlog->len; i++) {
for (size_t i = 0; i != slowlog->capacity; i++) {
// no other worker threads will read entry queue any more
if (slowlog->entries[i] != NULL) {
slowlog_dec_ref(slowlog->entries[i]);
Expand Down Expand Up @@ -117,22 +116,21 @@ void slowlog_dec_ref(struct slowlog_entry *entry)

void slowlog_set(struct slowlog_queue *queue, struct slowlog_entry *entry)
{
size_t curr = queue->curr;
// TODO: remove lock or atomic here
size_t curr = ATOMIC_GET(queue->curr);
pthread_mutex_lock(queue->entry_locks + curr);
struct slowlog_entry *old_entry = ATOMIC_IGET(queue->entries[curr], entry);
struct slowlog_entry *old_entry = queue->entries[curr];
queue->entries[curr] = entry;
pthread_mutex_unlock(queue->entry_locks + curr);
queue->curr = (curr + 1) % queue->len;
ATOMIC_SET(queue->curr, (curr + 1) % queue->capacity);
if (old_entry != NULL) {
slowlog_dec_ref(old_entry);
}
}

struct slowlog_entry *slowlog_get(struct slowlog_queue *queue, size_t index)
{
// TODO: remove lock or atomic here
pthread_mutex_lock(queue->entry_locks + index);
struct slowlog_entry *entry = ATOMIC_GET(queue->entries[index]);
struct slowlog_entry *entry = queue->entries[index];
if (entry == NULL) {
pthread_mutex_unlock(queue->entry_locks + index);
return NULL;
Expand Down
3 changes: 1 addition & 2 deletions src/slowlog.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
#define SLOWLOG_H

#include <pthread.h>

#include "mbuf.h"
#include "parser.h"

Expand All @@ -27,7 +26,7 @@ struct slowlog_entry {
struct slowlog_queue {
struct slowlog_entry **entries;
pthread_mutex_t *entry_locks;
size_t len;
size_t capacity;
size_t curr;
};

Expand Down

0 comments on commit 81a0ce2

Please sign in to comment.