diff --git a/src/command.c b/src/command.c index ba5fa2f..cca3c6c 100644 --- a/src/command.c +++ b/src/command.c @@ -7,6 +7,7 @@ #include #include #include +#include #include "corvus.h" #include "socket.h" #include "logging.h" @@ -40,6 +41,10 @@ const char *rep_server_err = "-ERR Proxy fail to get server\r\n"; const char *rep_timeout_err = "-ERR Proxy timed out\r\n"; const char *rep_slowlog_not_enabled = "-ERR Slowlog not enabled\r\n"; +const char *rep_config_err = "-ERR Config error\r\n"; +const char *rep_config_parse_err = "-ERR Config fail to parse command\r\n"; +const char *rep_config_addr_err = "-ERR Config fail to parse address\r\n"; + const char *rep_get = "*2\r\n$3\r\nGET\r\n"; const char *rep_set = "*3\r\n$3\r\nSET\r\n"; const char *rep_del = "*2\r\n$3\r\nDEL\r\n"; @@ -54,7 +59,7 @@ static const char *rep_auth_not_set = "-ERR Client sent AUTH, but no password is struct cmd_item cmds[] = {CMD_DO(CMD_BUILD_MAP)}; const size_t CMD_NUM = sizeof(cmds) / sizeof(struct cmd_item); static struct dict command_map; - +static pthread_mutex_t lock_config = PTHREAD_MUTEX_INITIALIZER; const char *cmd_extract_prefix(const char *prefix) { @@ -465,6 +470,53 @@ int cmd_proxy(struct command *cmd, struct redis_data *data) return CORVUS_OK; } +int cmd_config(struct command *cmd, struct redis_data *data) +{ + ASSERT_TYPE(data, REP_ARRAY); + ASSERT_ELEMENTS(data->elements >= 2, data); + + struct redis_data *op = &data->element[1]; + struct redis_data *opt = &data->element[2]; + ASSERT_TYPE(op, REP_STRING); + ASSERT_TYPE(opt, REP_STRING); + + char type[op->pos.str_len + 1]; + char option[opt->pos.str_len + 1]; + if (pos_to_str(&op->pos, type) == CORVUS_ERR + || pos_to_str(&opt->pos, option) == CORVUS_ERR) { + LOG(ERROR, "cmd_config: parse error"); + return CORVUS_ERR; + } + if (strcasecmp(type, "SET") == 0) { + // `config set` generelly need global lock + pthread_mutex_lock(&lock_config); + if (strcasecmp(option, "NODE") == 0) { + // config set node host:port,host1:port1 + char value[data->element[3].pos.str_len + 1]; + if (data->elements != 4) { + cmd_mark_fail(cmd, rep_config_parse_err); + } else if (data->element[3].pos.str_len + < 9|| pos_to_str(&data->element[3].pos, value) != CORVUS_OK) { + cmd_mark_fail(cmd, rep_config_parse_err); + } else if (config_add("node", value) != CORVUS_OK) { + cmd_mark_fail(cmd, rep_config_addr_err); + } else { + slot_create_job(SLOT_UPDATE); + conn_add_data(cmd->client, (uint8_t*) rep_ok, strlen(rep_ok), + &cmd->rep_buf[0], &cmd->rep_buf[1]); + CMD_INCREF(cmd); + cmd_mark_done(cmd); + } + } else { + cmd_mark_fail(cmd, rep_addr_err); + } + pthread_mutex_unlock(&lock_config); + } else { + cmd_mark_fail(cmd, rep_config_err); + } + return CORVUS_OK; +} + int cmd_auth(struct command *cmd, struct redis_data *data) { ASSERT_TYPE(data, REP_ARRAY); @@ -720,6 +772,8 @@ int cmd_extra(struct command *cmd, struct redis_data *data) return cmd_auth(cmd, data); case CMD_TIME: return cmd_time(cmd); + case CMD_CONFIG: + return cmd_config(cmd, data); case CMD_QUIT: return cmd_quit(cmd); case CMD_SLOWLOG: @@ -973,6 +1027,7 @@ void cmd_map_init() void cmd_map_destroy() { dict_free(&command_map); + pthread_mutex_destroy(&lock_config); } struct command *cmd_create(struct context *ctx) diff --git a/src/command.h b/src/command.h index f30d2eb..30c680d 100644 --- a/src/command.h +++ b/src/command.h @@ -146,7 +146,8 @@ HANDLER(SLOWLOG, EXTRA, UNKNOWN) \ HANDLER(QUIT, EXTRA, UNKNOWN) \ HANDLER(SELECT, UNIMPL, UNKNOWN) \ - HANDLER(TIME, EXTRA, UNKNOWN) + HANDLER(TIME, EXTRA, UNKNOWN) \ + HANDLER(CONFIG, EXTRA, UNKNOWN) #define CMD_DEFINE(cmd, type, access) CMD_##cmd, diff --git a/src/connection.c b/src/connection.c index 94bcb4e..8478296 100644 --- a/src/connection.c +++ b/src/connection.c @@ -303,11 +303,14 @@ struct connection *conn_get_raw_server(struct context *ctx) int i; struct connection *server = NULL; - for (i = 0; i < config.node.len; i++) { - server = conn_get_server_from_pool(ctx, &config.node.addr[i], false); + struct node_conf *node = config.node; + conf_node_inc_ref(node); + for (i = 0; i < node->len; i++) { + server = conn_get_server_from_pool(ctx, &node->addr[i], false); if (server == NULL) continue; break; } + conf_node_dec_ref(node); if (server == NULL) { LOG(ERROR, "conn_get_raw_server: cannot connect to redis server."); return NULL; diff --git a/src/corvus.c b/src/corvus.c index 0738db8..b298aa1 100644 --- a/src/corvus.c +++ b/src/corvus.c @@ -6,6 +6,7 @@ #include #include #include +#include #include "corvus.h" #include "mbuf.h" #include "slot.h" @@ -28,7 +29,9 @@ void config_init() strncpy(config.cluster, "default", CLUSTER_NAME_SIZE); config.bind = 12345; - memset(&config.node, 0, sizeof(struct node_conf)); + config.node = cv_malloc(sizeof(struct node_conf)); + memset(config.node, 0, sizeof(struct node_conf)); + conf_node_inc_ref(config.node); config.thread = 4; config.loglevel = INFO; config.syslog = 0; @@ -65,7 +68,7 @@ int config_add(char *name, char *value) { int val; if (strcmp(name, "cluster") == 0) { - if (strlen(value) <= 0) return 0; + if (strlen(value) <= 0) return CORVUS_OK; strncpy(config.cluster, value, CLUSTER_NAME_SIZE); } else if (strcmp(name, "bind") == 0) { if (socket_parse_port(value, &config.bind) == CORVUS_ERR) { @@ -134,22 +137,28 @@ int config_add(char *name, char *value) memcpy(config.requirepass, value, strlen(value)); } } else if (strcmp(name, "node") == 0) { - cv_free(config.node.addr); - memset(&config.node, 0, sizeof(config.node)); - + struct address *addr = NULL; + int addr_cnt = 0; char *p = strtok(value, ","); while (p) { - config.node.addr = cv_realloc(config.node.addr, - sizeof(struct address) * (config.node.len + 1)); - - if (socket_parse_ip(p, &config.node.addr[config.node.len]) == -1) { - cv_free(config.node.addr); - return -1; + addr = cv_realloc(addr, sizeof(struct address) * (addr_cnt + 1)); + if (socket_parse_ip(p, &addr[addr_cnt]) == -1) { + cv_free(addr); + return CORVUS_ERR; } - - config.node.len++; + addr_cnt++; p = strtok(NULL, ","); } + { + struct node_conf *newnode = cv_calloc(1, sizeof(struct node_conf)); + memset(newnode, 0, sizeof(struct node_conf)); + newnode->addr = addr; + newnode->len = addr_cnt; + conf_node_inc_ref(newnode); + struct node_conf *oldnode = config.node; + config.node = newnode; + conf_node_dec_ref(oldnode); + } } else if (strcmp(name, "slowlog-log-slower-than") == 0) { config.slowlog_log_slower_than = atoi(value); } else if (strcmp(name, "slowlog-max-len") == 0) { @@ -157,7 +166,22 @@ int config_add(char *name, char *value) } else if (strcmp(name, "slowlog-statsd-enabled") == 0) { config.slowlog_statsd_enabled = atoi(value); } - return 0; + return CORVUS_OK; +} + +void conf_node_inc_ref(struct node_conf *node) +{ + int refcount = ATOMIC_INC(node->refcount, 1); + assert(refcount >= 0); +} + +void conf_node_dec_ref(struct node_conf *node) +{ + int refcount = ATOMIC_DEC(node->refcount, 1); + assert(refcount >= 0); + if (refcount > 0) return; + cv_free(node->addr); + cv_free(node); } int read_line(char **line, size_t *bytes, FILE *fp) @@ -472,7 +496,7 @@ int main(int argc, const char *argv[]) fprintf(stderr, "Error: invalid config.\n"); return EXIT_FAILURE; } - if (config.node.len <= 0) { + if (config.node->len <= 0) { fprintf(stderr, "Error: invalid config, `node` should be set.\n"); return EXIT_FAILURE; } @@ -545,7 +569,7 @@ int main(int argc, const char *argv[]) destroy_contexts(); cmd_map_destroy(); cv_free(config.requirepass); - cv_free(config.node.addr); + conf_node_dec_ref(config.node); if (config.syslog) closelog(); return EXIT_SUCCESS; } diff --git a/src/corvus.h b/src/corvus.h index 040c7cd..fc0d493 100644 --- a/src/corvus.h +++ b/src/corvus.h @@ -54,6 +54,7 @@ enum { struct node_conf { struct address *addr; int len; + int refcount; }; struct context { @@ -95,7 +96,7 @@ struct context { struct { char cluster[CLUSTER_NAME_SIZE + 1]; uint16_t bind; - struct node_conf node; + struct node_conf *node; int thread; int loglevel; bool syslog; @@ -114,7 +115,10 @@ struct { } config; int64_t get_time(); +void conf_node_inc_ref(struct node_conf *node); +void conf_node_dec_ref(struct node_conf *node); struct context *get_contexts(); int thread_spawn(struct context *ctx, void *(*start_routine) (void *)); +int config_add(char *name, char *value); #endif /* end of include guard: CORVUS_H */ diff --git a/src/slot.c b/src/slot.c index dff5e37..cdfe47f 100644 --- a/src/slot.c +++ b/src/slot.c @@ -37,11 +37,14 @@ static struct { static inline void node_list_init() { pthread_rwlock_wrlock(&node_list.lock); + struct node_conf *node = config.node; + conf_node_inc_ref(node); node_list.len = 0; - for (int i = 0; i < MIN(config.node.len, MAX_NODE_LIST); i++) { - memcpy(&node_list.nodes[i], &config.node.addr[i], sizeof(struct address)); + for (int i = 0; i < MIN(node->len, MAX_NODE_LIST); i++) { + memcpy(&node_list.nodes[i], &node->addr[i], sizeof(struct address)); node_list.len++; } + conf_node_dec_ref(node); pthread_rwlock_unlock(&node_list.lock); } diff --git a/tests/corvus_test.c b/tests/corvus_test.c index 38733fa..ed635f9 100644 --- a/tests/corvus_test.c +++ b/tests/corvus_test.c @@ -3,6 +3,7 @@ #include "test.h" #include "corvus.h" #include "slot.h" +#include "alloc.h" static void usage(const char *name) { @@ -83,7 +84,9 @@ int main(int argc, const char *argv[]) build_contexts(); struct context *contexts = get_contexts(); - memcpy(&config.node, &conf, sizeof(config.node)); + config.node = cv_malloc(sizeof(struct node_conf)); + memset(config.node, 0, sizeof(struct node_conf)); + conf_node_inc_ref(config.node); slot_start_manager(&contexts[config.thread]); RUN_CASE(test_slot); diff --git a/tests/test_config.c b/tests/test_config.c index ea2c5ba..03f2a03 100644 --- a/tests/test_config.c +++ b/tests/test_config.c @@ -1,8 +1,6 @@ #include "test.h" #include "alloc.h" -extern int config_add(char *name, char *value); - TEST(test_config_bind) { char n[] = "bind";