diff --git a/src/command.c b/src/command.c index ba5fa2f..2892054 100644 --- a/src/command.c +++ b/src/command.c @@ -7,6 +7,7 @@ #include #include #include +#include #include "corvus.h" #include "socket.h" #include "logging.h" @@ -54,6 +55,8 @@ static const char *rep_auth_not_set = "-ERR Client sent AUTH, but no password is struct cmd_item cmds[] = {CMD_DO(CMD_BUILD_MAP)}; const size_t CMD_NUM = sizeof(cmds) / sizeof(struct cmd_item); static struct dict command_map; +//need not destroy in linux +static pthread_mutex_t lock_setremotes = PTHREAD_MUTEX_INITIALIZER; const char *cmd_extract_prefix(const char *prefix) @@ -453,6 +456,60 @@ int cmd_proxy(struct command *cmd, struct redis_data *data) if (strcasecmp(type, "INFO") == 0) { return cmd_proxy_info(cmd); + } else if (strcasecmp(type, "SETREMOTES") == 0) { + //lock global + pthread_mutex_lock(&lock_setremotes); + if (data->elements % 2 != 0 || data->elements < 4) { + cmd_mark_fail(cmd, rep_addr_err); + } else { + int i = 0; + char ip[45], port_s[10]; + struct address *addr = 0; + int addr_cnt = 0; + int addr_len = 0; + int port_len = 0; + for (i = 2; i < data->elements; i += 2) { + long port = 0; + addr_len = data->element[i].pos.str_len; + port_len = data->element[i + 1].pos.str_len; + if (addr_len > sizeof(ip) - 1 || port_len > sizeof(port) - 1) { + continue; + } + if (pos_to_str(&data->element[i].pos, ip) == CORVUS_ERR) { + continue; + } + if (pos_to_str(&data->element[i + 1].pos, port_s) == CORVUS_ERR) { + continue; + } + port = atoi(port_s); + if (port < 1 || port > 65535) { + continue; + } + //add to list + addr = cv_realloc(addr, + sizeof(struct address) * (addr_cnt + 1)); + memcpy(addr[addr_cnt].ip, ip, addr_len); + addr[addr_cnt].ip[addr_len] = '\0'; + addr[addr_cnt].port = port; + addr_cnt ++; + } + { + struct node_conf *newnode = cv_malloc(sizeof(struct node_conf)); + memset(newnode, 0, sizeof(struct node_conf)); + newnode->addr = addr; + newnode->len = addr_cnt; + conf_node_inc_ref(newnode); + struct node_conf *oldnode = config.node; + config.node = newnode; + conf_node_dec_ref(oldnode); + } + slot_create_job(SLOT_UPDATE); + conn_add_data(cmd->client, (uint8_t*) rep_ok, strlen(rep_ok), + &cmd->rep_buf[0], &cmd->rep_buf[1]); + CMD_INCREF(cmd); + cmd_mark_done(cmd); + } + pthread_mutex_unlock(&lock_setremotes); } else if (strcasecmp(type, "UPDATESLOTMAP") == 0) { slot_create_job(SLOT_UPDATE); conn_add_data(cmd->client, (uint8_t*)rep_ok, strlen(rep_ok), diff --git a/src/connection.c b/src/connection.c index 94bcb4e..8478296 100644 --- a/src/connection.c +++ b/src/connection.c @@ -303,11 +303,14 @@ struct connection *conn_get_raw_server(struct context *ctx) int i; struct connection *server = NULL; - for (i = 0; i < config.node.len; i++) { - server = conn_get_server_from_pool(ctx, &config.node.addr[i], false); + struct node_conf *node = config.node; + conf_node_inc_ref(node); + for (i = 0; i < node->len; i++) { + server = conn_get_server_from_pool(ctx, &node->addr[i], false); if (server == NULL) continue; break; } + conf_node_dec_ref(node); if (server == NULL) { LOG(ERROR, "conn_get_raw_server: cannot connect to redis server."); return NULL; diff --git a/src/corvus.c b/src/corvus.c index 0738db8..f86cd6e 100644 --- a/src/corvus.c +++ b/src/corvus.c @@ -6,6 +6,7 @@ #include #include #include +#include #include "corvus.h" #include "mbuf.h" #include "slot.h" @@ -28,7 +29,9 @@ void config_init() strncpy(config.cluster, "default", CLUSTER_NAME_SIZE); config.bind = 12345; - memset(&config.node, 0, sizeof(struct node_conf)); + config.node = cv_malloc(sizeof(struct node_conf)); + memset(config.node, 0, sizeof(struct node_conf)); + conf_node_inc_ref(config.node); config.thread = 4; config.loglevel = INFO; config.syslog = 0; @@ -134,20 +137,21 @@ int config_add(char *name, char *value) memcpy(config.requirepass, value, strlen(value)); } } else if (strcmp(name, "node") == 0) { - cv_free(config.node.addr); - memset(&config.node, 0, sizeof(config.node)); - + //we init config before serving so it doesn't need to inc refcount + cv_free(config.node->addr); + config.node->addr = NULL; + config.node->len = 0; char *p = strtok(value, ","); while (p) { - config.node.addr = cv_realloc(config.node.addr, - sizeof(struct address) * (config.node.len + 1)); + config.node->addr = cv_realloc(config.node->addr, + sizeof(struct address) * (config.node->len + 1)); - if (socket_parse_ip(p, &config.node.addr[config.node.len]) == -1) { - cv_free(config.node.addr); + if (socket_parse_ip(p, &config.node->addr[config.node->len]) == -1) { + cv_free(config.node->addr); return -1; } - config.node.len++; + config.node->len++; p = strtok(NULL, ","); } } else if (strcmp(name, "slowlog-log-slower-than") == 0) { @@ -160,6 +164,21 @@ int config_add(char *name, char *value) return 0; } +void conf_node_inc_ref(struct node_conf *node) +{ + int refcount = ATOMIC_INC(node->refcount, 1); + assert(refcount >= 0); +} + +void conf_node_dec_ref(struct node_conf *node) +{ + int refcount = ATOMIC_DEC(node->refcount, 1); + assert(refcount >= 0); + if (refcount > 0) return; + cv_free(node->addr); + cv_free(node); +} + int read_line(char **line, size_t *bytes, FILE *fp) { size_t len, index = 0; @@ -472,7 +491,7 @@ int main(int argc, const char *argv[]) fprintf(stderr, "Error: invalid config.\n"); return EXIT_FAILURE; } - if (config.node.len <= 0) { + if (config.node->len <= 0) { fprintf(stderr, "Error: invalid config, `node` should be set.\n"); return EXIT_FAILURE; } @@ -545,7 +564,7 @@ int main(int argc, const char *argv[]) destroy_contexts(); cmd_map_destroy(); cv_free(config.requirepass); - cv_free(config.node.addr); + conf_node_dec_ref(config.node); if (config.syslog) closelog(); return EXIT_SUCCESS; } diff --git a/src/corvus.h b/src/corvus.h index 040c7cd..2745e5c 100644 --- a/src/corvus.h +++ b/src/corvus.h @@ -54,6 +54,7 @@ enum { struct node_conf { struct address *addr; int len; + int refcount; }; struct context { @@ -95,7 +96,7 @@ struct context { struct { char cluster[CLUSTER_NAME_SIZE + 1]; uint16_t bind; - struct node_conf node; + struct node_conf *node; int thread; int loglevel; bool syslog; @@ -114,6 +115,8 @@ struct { } config; int64_t get_time(); +void conf_node_inc_ref(struct node_conf *node); +void conf_node_dec_ref(struct node_conf *node); struct context *get_contexts(); int thread_spawn(struct context *ctx, void *(*start_routine) (void *)); diff --git a/src/slot.c b/src/slot.c index dff5e37..cdfe47f 100644 --- a/src/slot.c +++ b/src/slot.c @@ -37,11 +37,14 @@ static struct { static inline void node_list_init() { pthread_rwlock_wrlock(&node_list.lock); + struct node_conf *node = config.node; + conf_node_inc_ref(node); node_list.len = 0; - for (int i = 0; i < MIN(config.node.len, MAX_NODE_LIST); i++) { - memcpy(&node_list.nodes[i], &config.node.addr[i], sizeof(struct address)); + for (int i = 0; i < MIN(node->len, MAX_NODE_LIST); i++) { + memcpy(&node_list.nodes[i], &node->addr[i], sizeof(struct address)); node_list.len++; } + conf_node_dec_ref(node); pthread_rwlock_unlock(&node_list.lock); } diff --git a/tests/corvus_test.c b/tests/corvus_test.c index 38733fa..ed635f9 100644 --- a/tests/corvus_test.c +++ b/tests/corvus_test.c @@ -3,6 +3,7 @@ #include "test.h" #include "corvus.h" #include "slot.h" +#include "alloc.h" static void usage(const char *name) { @@ -83,7 +84,9 @@ int main(int argc, const char *argv[]) build_contexts(); struct context *contexts = get_contexts(); - memcpy(&config.node, &conf, sizeof(config.node)); + config.node = cv_malloc(sizeof(struct node_conf)); + memset(config.node, 0, sizeof(struct node_conf)); + conf_node_inc_ref(config.node); slot_start_manager(&contexts[config.thread]); RUN_CASE(test_slot);