Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add config set command to modify corresponding redis nodes dynamically #95

Merged
merged 4 commits into from
Jan 10, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 84 additions & 1 deletion src/command.c
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#include <sys/time.h>
#include <unistd.h>
#include <assert.h>
#include <pthread.h>
#include "corvus.h"
#include "socket.h"
#include "logging.h"
Expand Down Expand Up @@ -40,6 +41,11 @@ const char *rep_server_err = "-ERR Proxy fail to get server\r\n";
const char *rep_timeout_err = "-ERR Proxy timed out\r\n";
const char *rep_slowlog_not_enabled = "-ERR Slowlog not enabled\r\n";

const char *rep_config_err = "-ERR Config error\r\n";
const char *rep_config_unsupported_err = "-ERR Config cmd not supported\r\n";
const char *rep_config_parse_err = "-ERR Config fail to parse command\r\n";
const char *rep_config_addr_err = "-ERR Config fail to parse address\r\n";

const char *rep_get = "*2\r\n$3\r\nGET\r\n";
const char *rep_set = "*3\r\n$3\r\nSET\r\n";
const char *rep_del = "*2\r\n$3\r\nDEL\r\n";
Expand All @@ -55,7 +61,6 @@ struct cmd_item cmds[] = {CMD_DO(CMD_BUILD_MAP)};
const size_t CMD_NUM = sizeof(cmds) / sizeof(struct cmd_item);
static struct dict command_map;


const char *cmd_extract_prefix(const char *prefix)
{
const char *get = "$3\r\nGET\r\n";
Expand Down Expand Up @@ -465,6 +470,82 @@ int cmd_proxy(struct command *cmd, struct redis_data *data)
return CORVUS_OK;
}

int cmd_config(struct command *cmd, struct redis_data *data)
{
ASSERT_TYPE(data, REP_ARRAY);
ASSERT_ELEMENTS(data->elements >= 2, data);

struct redis_data *op = &data->element[1];
struct redis_data *opt = &data->element[2];
ASSERT_TYPE(op, REP_STRING);
ASSERT_TYPE(opt, REP_STRING);

char type[op->pos.str_len + 1];
char option[opt->pos.str_len + 1];
if (pos_to_str(&op->pos, type) == CORVUS_ERR
|| pos_to_str(&opt->pos, option) == CORVUS_ERR) {
LOG(ERROR, "cmd_config: parse error");
return CORVUS_ERR;
}
if (strcasecmp(type, "SET") == 0) {
//config set <item> <val>
ASSERT_ELEMENTS(data->elements >= 4, data);
if (strcasecmp(option, "NODE") == 0) {
// config set node host:port,host1:port1
if (data->elements != 4) {
cmd_mark_fail(cmd, rep_config_parse_err);
} else {
char value[data->element[3].pos.str_len + 1];
// the host-port pair is in form "1.1.1.1:1"(9 bytes) at least
if (data->element[3].pos.str_len < 9
|| pos_to_str(&data->element[3].pos, value) != CORVUS_OK) {
cmd_mark_fail(cmd, rep_config_parse_err);
} else if (config_add("node", value) != CORVUS_OK) {
cmd_mark_fail(cmd, rep_config_addr_err);
} else {
slot_create_job(SLOT_UPDATE);
conn_add_data(cmd->client, (uint8_t*) rep_ok, strlen(rep_ok),
&cmd->rep_buf[0], &cmd->rep_buf[1]);
CMD_INCREF(cmd);
cmd_mark_done(cmd);
}
}
} else {
cmd_mark_fail(cmd, rep_config_unsupported_err);
}
} else if (strcasecmp(type, "GET") == 0) {
//config get <item>
ASSERT_ELEMENTS(data->elements >= 3, data);
if (strcasecmp(option, "NODE") == 0) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We'd better check data->elements here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

aha, sorry about changes after merging.
I change the assert for elements judging ensure that.

struct node_conf *node = conf_node_inc_ref();
int n = 1024, pos = 0;
char content[n + ADDRESS_LEN];
char data[n + ADDRESS_LEN + 100]; //100 bytes for control data
for (int i = 0; i < node->len; i++) {
if (i > 0) {
content[pos++] = ',';
}
pos += snprintf(content + pos, ADDRESS_LEN, "%s:%d",
node->addr[i].ip, node->addr[i].port);
if (pos >= n) {
break;
}
}
conf_node_dec_ref(node);
int data_len = snprintf(data, sizeof(data), "$%d\r\n%s\r\n", pos, content);
conn_add_data(cmd->client, (uint8_t*) data, data_len,
&cmd->rep_buf[0], &cmd->rep_buf[1]);
CMD_INCREF(cmd);
cmd_mark_done(cmd);
} else {
cmd_mark_fail(cmd, rep_config_parse_err);
}
} else {
cmd_mark_fail(cmd, rep_config_err);
}
return CORVUS_OK;
}

int cmd_auth(struct command *cmd, struct redis_data *data)
{
ASSERT_TYPE(data, REP_ARRAY);
Expand Down Expand Up @@ -720,6 +801,8 @@ int cmd_extra(struct command *cmd, struct redis_data *data)
return cmd_auth(cmd, data);
case CMD_TIME:
return cmd_time(cmd);
case CMD_CONFIG:
return cmd_config(cmd, data);
case CMD_QUIT:
return cmd_quit(cmd);
case CMD_SLOWLOG:
Expand Down
3 changes: 2 additions & 1 deletion src/command.h
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,8 @@
HANDLER(SLOWLOG, EXTRA, UNKNOWN) \
HANDLER(QUIT, EXTRA, UNKNOWN) \
HANDLER(SELECT, UNIMPL, UNKNOWN) \
HANDLER(TIME, EXTRA, UNKNOWN)
HANDLER(TIME, EXTRA, UNKNOWN) \
HANDLER(CONFIG, EXTRA, UNKNOWN)

#define CMD_DEFINE(cmd, type, access) CMD_##cmd,

Expand Down
6 changes: 4 additions & 2 deletions src/connection.c
Original file line number Diff line number Diff line change
Expand Up @@ -303,11 +303,13 @@ struct connection *conn_get_raw_server(struct context *ctx)
int i;
struct connection *server = NULL;

for (i = 0; i < config.node.len; i++) {
server = conn_get_server_from_pool(ctx, &config.node.addr[i], false);
struct node_conf *node = conf_node_inc_ref();
for (i = 0; i < node->len; i++) {
server = conn_get_server_from_pool(ctx, &node->addr[i], false);
if (server == NULL) continue;
break;
}
conf_node_dec_ref(node);
if (server == NULL) {
LOG(ERROR, "conn_get_raw_server: cannot connect to redis server.");
return NULL;
Expand Down
62 changes: 47 additions & 15 deletions src/corvus.c
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#include <pthread.h>
#include <unistd.h>
#include <execinfo.h>
#include <assert.h>
#include <getopt.h>
#include "corvus.h"
#include "mbuf.h"
Expand All @@ -22,14 +23,16 @@
#define MIN_BUFSIZE 64

static struct context *contexts;
static pthread_mutex_t lock_conf_node = PTHREAD_MUTEX_INITIALIZER;

void config_init()
{
memset(config.cluster, 0, CLUSTER_NAME_SIZE + 1);
strncpy(config.cluster, "default", CLUSTER_NAME_SIZE);

config.bind = 12345;
memset(&config.node, 0, sizeof(struct node_conf));
config.node = cv_calloc(1, sizeof(struct node_conf));
config.node->refcount = 1;
config.thread = 4;
config.loglevel = INFO;
config.syslog = 0;
Expand Down Expand Up @@ -135,30 +138,58 @@ int config_add(char *name, char *value)
memcpy(config.requirepass, value, strlen(value));
}
} else if (strcmp(name, "node") == 0) {
cv_free(config.node.addr);
memset(&config.node, 0, sizeof(config.node));

struct address *addr = NULL;
int addr_cnt = 0;
char *p = strtok(value, ",");
while (p) {
config.node.addr = cv_realloc(config.node.addr,
sizeof(struct address) * (config.node.len + 1));

if (socket_parse_ip(p, &config.node.addr[config.node.len]) == -1) {
cv_free(config.node.addr);
return -1;
addr = cv_realloc(addr, sizeof(struct address) * (addr_cnt + 1));
if (socket_parse_ip(p, &addr[addr_cnt]) == -1) {
cv_free(addr);
return CORVUS_ERR;
}

config.node.len++;
addr_cnt++;
p = strtok(NULL, ",");
}
{
struct node_conf *newnode = cv_calloc(1, sizeof(struct node_conf));
memset(newnode, 0, sizeof(struct node_conf));
newnode->addr = addr;
newnode->len = addr_cnt;
newnode->refcount = 1;
pthread_mutex_lock(&lock_conf_node);
struct node_conf *oldnode = config.node;
config.node = newnode;
pthread_mutex_unlock(&lock_conf_node);
conf_node_dec_ref(oldnode);
}
} else if (strcmp(name, "slowlog-log-slower-than") == 0) {
config.slowlog_log_slower_than = atoi(value);
} else if (strcmp(name, "slowlog-max-len") == 0) {
config.slowlog_max_len = atoi(value);
} else if (strcmp(name, "slowlog-statsd-enabled") == 0) {
config.slowlog_statsd_enabled = atoi(value);
}
return 0;
return CORVUS_OK;
}

struct node_conf *conf_node_inc_ref()
{
pthread_mutex_lock(&lock_conf_node);
struct node_conf *node = config.node;
int refcount = ATOMIC_INC(node->refcount, 1);
assert(refcount >= 0);
pthread_mutex_unlock(&lock_conf_node);
return node;
}

void conf_node_dec_ref(struct node_conf *node)
{
int refcount = ATOMIC_DEC(node->refcount, 1);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can safely remove the lock here. Thanks to the atomic operation multiple calling of conf_node_dec_ref will end up with only one zero refcount;

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, so just remove it

assert(refcount >= 0);
if (refcount == 0) {
cv_free(node->addr);
cv_free(node);
}
}

int read_line(char **line, size_t *bytes, FILE *fp)
Expand Down Expand Up @@ -595,7 +626,7 @@ int main(int argc, const char *argv[])
usage(argv[0]);
return EXIT_FAILURE;
}
if (config.node.len <= 0) {
if (config.node->len <= 0) {
fprintf(stderr, "Error: invalid upstream list, `node` should be set to a valid nodes list.\n");
return EXIT_FAILURE;
}
Expand Down Expand Up @@ -668,7 +699,8 @@ int main(int argc, const char *argv[])
destroy_contexts();
cmd_map_destroy();
cv_free(config.requirepass);
cv_free(config.node.addr);
conf_node_dec_ref(config.node);
pthread_mutex_destroy(&lock_conf_node);
if (config.syslog) closelog();
return EXIT_SUCCESS;
}
Expand Down
6 changes: 5 additions & 1 deletion src/corvus.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ enum {
struct node_conf {
struct address *addr;
int len;
int refcount;
};

struct context {
Expand Down Expand Up @@ -95,7 +96,7 @@ struct context {
struct {
char cluster[CLUSTER_NAME_SIZE + 1];
uint16_t bind;
struct node_conf node;
struct node_conf *node;
int thread;
int loglevel;
bool syslog;
Expand All @@ -114,7 +115,10 @@ struct {
} config;

int64_t get_time();
struct node_conf *conf_node_inc_ref();
void conf_node_dec_ref(struct node_conf *node);
struct context *get_contexts();
int thread_spawn(struct context *ctx, void *(*start_routine) (void *));
int config_add(char *name, char *value);

#endif /* end of include guard: CORVUS_H */
6 changes: 4 additions & 2 deletions src/slot.c
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,15 @@ static struct {

static inline void node_list_init()
{
struct node_conf *node = conf_node_inc_ref();
pthread_rwlock_wrlock(&node_list.lock);
node_list.len = 0;
for (int i = 0; i < MIN(config.node.len, MAX_NODE_LIST); i++) {
memcpy(&node_list.nodes[i], &config.node.addr[i], sizeof(struct address));
for (int i = 0; i < MIN(node->len, MAX_NODE_LIST); i++) {
memcpy(&node_list.nodes[i], &node->addr[i], sizeof(struct address));
node_list.len++;
}
pthread_rwlock_unlock(&node_list.lock);
conf_node_dec_ref(node);
}

static inline void node_list_add(struct node_info *node)
Expand Down
5 changes: 4 additions & 1 deletion tests/corvus_test.c
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include "test.h"
#include "corvus.h"
#include "slot.h"
#include "alloc.h"

static void usage(const char *name)
{
Expand Down Expand Up @@ -83,7 +84,9 @@ int main(int argc, const char *argv[])
build_contexts();
struct context *contexts = get_contexts();

memcpy(&config.node, &conf, sizeof(config.node));
config.node = cv_malloc(sizeof(struct node_conf));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have you used valgrind to check whether there is memory leak in test program?
Please add more complete tests.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, checked for newest version and change all malloc to calloc

memset(config.node, 0, sizeof(struct node_conf));
config.node->refcount = 1;
slot_start_manager(&contexts[config.thread]);

RUN_CASE(test_slot);
Expand Down
2 changes: 0 additions & 2 deletions tests/test_config.c
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
#include "test.h"
#include "alloc.h"

extern int config_add(char *name, char *value);

TEST(test_config_bind) {
char n[] = "bind";

Expand Down