Skip to content

Commit

Permalink
Amend response of slowlog get
Browse files Browse the repository at this point in the history
  • Loading branch information
doyoubi committed Aug 25, 2016
1 parent d01c298 commit 6cbd412
Show file tree
Hide file tree
Showing 7 changed files with 111 additions and 17 deletions.
23 changes: 14 additions & 9 deletions src/command.c
Original file line number Diff line number Diff line change
Expand Up @@ -551,7 +551,7 @@ static int cmd_parse_len(struct redis_data *data, int *result)
LOG(ERROR, "parse_len: char not between 0-9");
return CORVUS_ERR;
}
v += (10 * v + (c - '0'));
v = 10 * v + (c - '0');
}
*result = v;

Expand Down Expand Up @@ -583,15 +583,20 @@ int cmd_slowlog_get(struct command *cmd, struct redis_data *data)
struct context *contexts = get_contexts();
struct slowlog_entry *entries[len];
int count = 0;
size_t limit_per_thread = 1 + (len - 1) / config.thread;
for (size_t i = 0; i != config.thread; i++) {
struct slowlog_queue *queue = &contexts[i].slowlog;
assert(limit_per_thread <= queue->len);
for (size_t j = 0; j < limit_per_thread && count < len; j++) {
size_t queue_len = contexts[0].slowlog.len;
bool exhausted[config.thread];
memset(exhausted, 0, sizeof exhausted);
for (size_t i = 0; i != queue_len && count < len; i++) {
for (size_t j = 0; j != config.thread && count < len; j++) {
if (exhausted[j])
continue;
struct slowlog_queue *queue = &contexts[j].slowlog;
// slowlog_get will lock mutex
struct slowlog_entry *entry = slowlog_get(queue, j);
if (entry == NULL)
break; // NULL starts from here
struct slowlog_entry *entry = slowlog_get(queue, i);
if (entry == NULL) {
exhausted[j] = true;
continue;
}
entries[count++] = entry;
}
}
Expand Down
13 changes: 13 additions & 0 deletions src/parser.c
Original file line number Diff line number Diff line change
Expand Up @@ -550,3 +550,16 @@ int pos_to_str(struct pos_array *pos, char *str)
str[length] = '\0';
return CORVUS_OK;
}

size_t pos_to_str_with_limit(struct pos_array *pos, uint8_t *str, size_t limit)
{
size_t len = 0;
for (size_t i = 0; i != pos->pos_len && len < limit; i++) {
size_t curr_len = pos->items[i].len;
if (limit - len < curr_len)
curr_len = limit - len;
memcpy(str + len, pos->items[i].str, curr_len);
len += curr_len;
}
return len;
}
1 change: 1 addition & 0 deletions src/parser.h
Original file line number Diff line number Diff line change
Expand Up @@ -142,5 +142,6 @@ struct pos *pos_get(struct pos_array *arr, int idx);
int pos_to_str(struct pos_array *pos, char *str);
void redis_data_free(struct redis_data *data);
void redis_data_move(struct redis_data *lhs, struct redis_data *rhs);
size_t pos_to_str_with_limit(struct pos_array *pos, uint8_t *str, size_t limit);

#endif /* end of include guard: PARSER_H */
29 changes: 23 additions & 6 deletions src/slowlog.c
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@ struct slowlog_entry *slowlog_create_entry(struct command *cmd, int64_t latency)
entry->latency = latency;
entry->refcount = 1;

const char *fmt = "(%zd bytes)";
char tmp_buf[19]; // strlen("4294967295") + strlen("( bytes)") + 1
const char *bytes_fmt = "(%zd bytes)";
char bytes_buf[19]; // strlen("4294967295") + strlen("( bytes)") + 1

assert(cmd->data.elements > 0);
size_t argc = min(cmd->data.elements, SLOWLOG_ENTRY_MAX_ARGC);
Expand All @@ -67,16 +67,33 @@ struct slowlog_entry *slowlog_create_entry(struct command *cmd, int64_t latency)
entry->argc = min(argc + 1, SLOWLOG_ENTRY_MAX_ARGC);
argc = min(argc, SLOWLOG_ENTRY_MAX_ARGC - 1);
}
// use the last argument to record total argument counts
if (cmd->data.elements > argc) {
argc--;
const size_t tail_max_len = 64;
char tail_buf[tail_max_len];
uint8_t *buf = cv_malloc(tail_max_len);
int len = snprintf(tail_buf, tail_max_len, "(%zd arguments in total)", cmd->data.elements);
len = snprintf((char*)buf, tail_max_len, "$%d\r\n%s\r\n", len, tail_buf);
entry->argv[SLOWLOG_ENTRY_MAX_ARGC - 1].str = buf;
entry->argv[SLOWLOG_ENTRY_MAX_ARGC - 1].len = len;
}
size_t max_len = SLOWLOG_ENTRY_MAX_STRING;
for (size_t j = 0; j != argc; i++, j++) {
struct redis_data *arg = cmd->data.element + j;
assert(arg->type == REP_STRING);
size_t real_len = mbuf_range_len(arg->buf);
size_t len = min(real_len, max_len);
uint8_t *buf = cv_malloc(len); // will be freed by the last owner
mbuf_range_copy(buf, arg->buf, len);
if (real_len > max_len) {
int postfix_len = snprintf(tmp_buf, sizeof tmp_buf, fmt, real_len);
memcpy(buf + max_len - postfix_len, tmp_buf, postfix_len);
size_t str_len = SLOWLOG_MAX_ARG_LEN;
int hdr_len = snprintf((char*)buf, len, "$%zd\r\n", str_len);
pos_to_str_with_limit(&arg->pos, buf + hdr_len, str_len);
int postfix_len = snprintf(bytes_buf, sizeof bytes_buf, bytes_fmt, arg->pos.str_len);
memcpy(buf + max_len - 2 - postfix_len, bytes_buf, postfix_len);
memcpy(buf + max_len - 2, "\r\n", 2);
} else {
mbuf_range_copy(buf, arg->buf, len);
}
entry->argv[i].str = buf;
entry->argv[i].len = len;
Expand Down Expand Up @@ -128,7 +145,7 @@ struct slowlog_entry *slowlog_get(struct slowlog_queue *queue, size_t index)
bool slowlog_enabled()
{
return config.slowlog_max_len > 0
&& config.slowlog_log_slower_than > 0;
&& config.slowlog_log_slower_than >= 0;
}

bool slowlog_type_need_log(struct command *cmd)
Expand Down
1 change: 1 addition & 0 deletions src/slowlog.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

#define SLOWLOG_ENTRY_MAX_ARGC 32
#define SLOWLOG_ENTRY_MAX_STRING 128
#define SLOWLOG_MAX_ARG_LEN 120 // SLOWLOG_ENTRY_MAX_STRING - strlen("%120\r\n\r\n")

// Same with slowlog format of redis.
// Note that slowlog_entry will be created from one thread
Expand Down
6 changes: 6 additions & 0 deletions tests/test_mbuf.c
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,12 @@ TEST(test_mbuf_range_func) {
ASSERT(tmp[21] == 0);
cv_free(tmp);

// with max limit
uint8_t buf[20] = {0};
ASSERT(mbuf_range_copy(buf, ptr, 10) == 10);
ASSERT(memcmp(buf, "abcdefg012", 10) == 0);
ASSERT(buf[10] == '\0');

cv_free(b1.start);
cv_free(b2.start);
cv_free(b3.start);
Expand Down
55 changes: 53 additions & 2 deletions tests/test_slowlog.c
Original file line number Diff line number Diff line change
Expand Up @@ -62,20 +62,21 @@ TEST(test_slowlog_create_entry_with_prefix) {
};
struct redis_data element[2] = {
{
.pos = key_pos, .buf = {
.type = REP_STRING, .pos = key_pos, .buf = {
{ .buf = &buf1, .pos = buf1.start },
{ .buf = &buf1, .pos = buf1.last }
}
},
{
.pos = value_pos, .buf = {
.type = REP_STRING, .pos = value_pos, .buf = {
{ .buf = &buf2, .pos = buf2.start },
{ .buf = &buf2, .pos = buf2.last }
}
}
};
cmd.data.elements = 2;
cmd.data.element = element;
cmd.data.type = REP_ARRAY;
cmd.prefix = (char*)rep_set;

struct slowlog_entry *entry = slowlog_create_entry(&cmd, 9394);
Expand All @@ -93,6 +94,55 @@ TEST(test_slowlog_create_entry_with_prefix) {
PASS(NULL);
}

TEST(test_slowlog_create_entry_with_long_arg) {
struct mbuf buf;
struct command cmd;
struct reader r = {0};
const char cmd_data[] = "*37\r\n$4\r\nMGET\r\n"
"$144\r\n"
"1234567890qwertyuiopasdfghjklzxcvbnm"
"1234567890qwertyuiopasdfghjklzxcvbnm"
"1234567890qwertyuiopasdfghjklzxcvbnm"
"1234567890qwertyuiopasdfghjklzxcvbnm\r\n"
"$1\r\na\r\n$1\r\na\r\n$1\r\na\r\n$1\r\na\r\n$1\r\na\r\n$1\r\na\r\n" // "a" * 5
"$1\r\na\r\n$1\r\na\r\n$1\r\na\r\n$1\r\na\r\n$1\r\na\r\n$1\r\na\r\n"
"$1\r\na\r\n$1\r\na\r\n$1\r\na\r\n$1\r\na\r\n$1\r\na\r\n$1\r\na\r\n"
"$1\r\na\r\n$1\r\na\r\n$1\r\na\r\n$1\r\na\r\n$1\r\na\r\n$1\r\na\r\n"
"$1\r\na\r\n$1\r\na\r\n$1\r\na\r\n$1\r\na\r\n$1\r\na\r\n$1\r\na\r\n"
"$1\r\na\r\n$1\r\na\r\n$1\r\na\r\n$1\r\na\r\n$1\r\na\r\n$1\r\na\r\n"
"$1\r\na\r\n$1\r\na\r\n$1\r\na\r\n$1\r\na\r\n$1\r\na\r\n$1\r\na\r\n";
char a[] = "$1\r\na\r\n";
char long_value[] =
"$120\r\n"
"1234567890qwertyuiopasdfghjklzxcvbnm"
"1234567890qwertyuiopasdfghjklzxcvbnm"
"1234567890qwertyuiopasdfghjklzxcvbnm"
"1(144 bytes)\r\n";
"\r\n";
char tail[] = "$23\r\n(37 arguments in total)\r\n";

buf.pos = (uint8_t*)cmd_data;
buf.last = (uint8_t*)cmd_data + strlen(cmd_data);
reader_init(&r);
reader_feed(&r, &buf);
ASSERT(parse(&r, MODE_REQ) != -1);
cmd.data = r.data;
cmd.prefix = NULL;

struct slowlog_entry *entry = slowlog_create_entry(&cmd, 233666);
ASSERT(SLOWLOG_ENTRY_MAX_STRING == strlen(long_value));
ASSERT(entry->argv[1].len == strlen(long_value));
ASSERT(strncmp(long_value, entry->argv[1].str, strlen(long_value)) == 0);
ASSERT(entry->argv[30].len == strlen(a));
ASSERT(strncmp(a, entry->argv[30].str, strlen(a)) == 0);
ASSERT(entry->argv[31].len == strlen(tail));
ASSERT(strncmp(tail, entry->argv[31].str, strlen(tail)) == 0);
slowlog_dec_ref(entry);

redis_data_free(&cmd.data);
PASS(NULL);
}

TEST(test_entry_get_set) {
config.slowlog_max_len = 2;
config.thread = 1;
Expand Down Expand Up @@ -130,5 +180,6 @@ TEST(test_entry_get_set) {
TEST_CASE(test_slowlog) {
RUN_TEST(test_slowlog_create_entry);
RUN_TEST(test_slowlog_create_entry_with_prefix);
RUN_TEST(test_slowlog_create_entry_with_long_arg);
RUN_TEST(test_entry_get_set);
}

0 comments on commit 6cbd412

Please sign in to comment.