diff --git a/src/command.c b/src/command.c index 0f25305..405393b 100644 --- a/src/command.c +++ b/src/command.c @@ -7,6 +7,7 @@ #include #include #include +#include #include "corvus.h" #include "socket.h" #include "logging.h" @@ -40,6 +41,11 @@ const char *rep_server_err = "-ERR Proxy fail to get server\r\n"; const char *rep_timeout_err = "-ERR Proxy timed out\r\n"; const char *rep_slowlog_not_enabled = "-ERR Slowlog not enabled\r\n"; +const char *rep_config_err = "-ERR Config error\r\n"; +const char *rep_config_unsupported_err = "-ERR Config cmd not supported\r\n"; +const char *rep_config_parse_err = "-ERR Config fail to parse command\r\n"; +const char *rep_config_addr_err = "-ERR Config fail to parse address\r\n"; + const char *rep_get = "*2\r\n$3\r\nGET\r\n"; const char *rep_set = "*3\r\n$3\r\nSET\r\n"; const char *rep_del = "*2\r\n$3\r\nDEL\r\n"; @@ -55,7 +61,6 @@ struct cmd_item cmds[] = {CMD_DO(CMD_BUILD_MAP)}; const size_t CMD_NUM = sizeof(cmds) / sizeof(struct cmd_item); static struct dict command_map; - const char *cmd_extract_prefix(const char *prefix) { const char *get = "$3\r\nGET\r\n"; @@ -465,6 +470,82 @@ int cmd_proxy(struct command *cmd, struct redis_data *data) return CORVUS_OK; } +int cmd_config(struct command *cmd, struct redis_data *data) +{ + ASSERT_TYPE(data, REP_ARRAY); + ASSERT_ELEMENTS(data->elements >= 2, data); + + struct redis_data *op = &data->element[1]; + struct redis_data *opt = &data->element[2]; + ASSERT_TYPE(op, REP_STRING); + ASSERT_TYPE(opt, REP_STRING); + + char type[op->pos.str_len + 1]; + char option[opt->pos.str_len + 1]; + if (pos_to_str(&op->pos, type) == CORVUS_ERR + || pos_to_str(&opt->pos, option) == CORVUS_ERR) { + LOG(ERROR, "cmd_config: parse error"); + return CORVUS_ERR; + } + if (strcasecmp(type, "SET") == 0) { + //config set + ASSERT_ELEMENTS(data->elements >= 4, data); + if (strcasecmp(option, "NODE") == 0) { + // config set node host:port,host1:port1 + if (data->elements != 4) { + cmd_mark_fail(cmd, rep_config_parse_err); + } else { + char value[data->element[3].pos.str_len + 1]; + // the host-port pair is in form "1.1.1.1:1"(9 bytes) at least + if (data->element[3].pos.str_len < 9 + || pos_to_str(&data->element[3].pos, value) != CORVUS_OK) { + cmd_mark_fail(cmd, rep_config_parse_err); + } else if (config_add("node", value) != CORVUS_OK) { + cmd_mark_fail(cmd, rep_config_addr_err); + } else { + slot_create_job(SLOT_UPDATE); + conn_add_data(cmd->client, (uint8_t*) rep_ok, strlen(rep_ok), + &cmd->rep_buf[0], &cmd->rep_buf[1]); + CMD_INCREF(cmd); + cmd_mark_done(cmd); + } + } + } else { + cmd_mark_fail(cmd, rep_config_unsupported_err); + } + } else if (strcasecmp(type, "GET") == 0) { + //config get + ASSERT_ELEMENTS(data->elements >= 3, data); + if (strcasecmp(option, "NODE") == 0) { + struct node_conf *node = conf_node_inc_ref(); + int n = 1024, pos = 0; + char content[n + ADDRESS_LEN]; + char data[n + ADDRESS_LEN + 100]; //100 bytes for control data + for (int i = 0; i < node->len; i++) { + if (i > 0) { + content[pos++] = ','; + } + pos += snprintf(content + pos, ADDRESS_LEN, "%s:%d", + node->addr[i].ip, node->addr[i].port); + if (pos >= n) { + break; + } + } + conf_node_dec_ref(node); + int data_len = snprintf(data, sizeof(data), "$%d\r\n%s\r\n", pos, content); + conn_add_data(cmd->client, (uint8_t*) data, data_len, + &cmd->rep_buf[0], &cmd->rep_buf[1]); + CMD_INCREF(cmd); + cmd_mark_done(cmd); + } else { + cmd_mark_fail(cmd, rep_config_parse_err); + } + } else { + cmd_mark_fail(cmd, rep_config_err); + } + return CORVUS_OK; +} + int cmd_auth(struct command *cmd, struct redis_data *data) { ASSERT_TYPE(data, REP_ARRAY); @@ -720,6 +801,8 @@ int cmd_extra(struct command *cmd, struct redis_data *data) return cmd_auth(cmd, data); case CMD_TIME: return cmd_time(cmd); + case CMD_CONFIG: + return cmd_config(cmd, data); case CMD_QUIT: return cmd_quit(cmd); case CMD_SLOWLOG: diff --git a/src/command.h b/src/command.h index f30d2eb..30c680d 100644 --- a/src/command.h +++ b/src/command.h @@ -146,7 +146,8 @@ HANDLER(SLOWLOG, EXTRA, UNKNOWN) \ HANDLER(QUIT, EXTRA, UNKNOWN) \ HANDLER(SELECT, UNIMPL, UNKNOWN) \ - HANDLER(TIME, EXTRA, UNKNOWN) + HANDLER(TIME, EXTRA, UNKNOWN) \ + HANDLER(CONFIG, EXTRA, UNKNOWN) #define CMD_DEFINE(cmd, type, access) CMD_##cmd, diff --git a/src/connection.c b/src/connection.c index 94bcb4e..3e22efc 100644 --- a/src/connection.c +++ b/src/connection.c @@ -303,11 +303,13 @@ struct connection *conn_get_raw_server(struct context *ctx) int i; struct connection *server = NULL; - for (i = 0; i < config.node.len; i++) { - server = conn_get_server_from_pool(ctx, &config.node.addr[i], false); + struct node_conf *node = conf_node_inc_ref(); + for (i = 0; i < node->len; i++) { + server = conn_get_server_from_pool(ctx, &node->addr[i], false); if (server == NULL) continue; break; } + conf_node_dec_ref(node); if (server == NULL) { LOG(ERROR, "conn_get_raw_server: cannot connect to redis server."); return NULL; diff --git a/src/corvus.c b/src/corvus.c index 94ab607..8507fbc 100644 --- a/src/corvus.c +++ b/src/corvus.c @@ -6,6 +6,7 @@ #include #include #include +#include #include #include "corvus.h" #include "mbuf.h" @@ -22,6 +23,7 @@ #define MIN_BUFSIZE 64 static struct context *contexts; +static pthread_mutex_t lock_conf_node = PTHREAD_MUTEX_INITIALIZER; void config_init() { @@ -29,7 +31,8 @@ void config_init() strncpy(config.cluster, "default", CLUSTER_NAME_SIZE); config.bind = 12345; - memset(&config.node, 0, sizeof(struct node_conf)); + config.node = cv_calloc(1, sizeof(struct node_conf)); + config.node->refcount = 1; config.thread = 4; config.loglevel = INFO; config.syslog = 0; @@ -135,22 +138,30 @@ int config_add(char *name, char *value) memcpy(config.requirepass, value, strlen(value)); } } else if (strcmp(name, "node") == 0) { - cv_free(config.node.addr); - memset(&config.node, 0, sizeof(config.node)); - + struct address *addr = NULL; + int addr_cnt = 0; char *p = strtok(value, ","); while (p) { - config.node.addr = cv_realloc(config.node.addr, - sizeof(struct address) * (config.node.len + 1)); - - if (socket_parse_ip(p, &config.node.addr[config.node.len]) == -1) { - cv_free(config.node.addr); - return -1; + addr = cv_realloc(addr, sizeof(struct address) * (addr_cnt + 1)); + if (socket_parse_ip(p, &addr[addr_cnt]) == -1) { + cv_free(addr); + return CORVUS_ERR; } - - config.node.len++; + addr_cnt++; p = strtok(NULL, ","); } + { + struct node_conf *newnode = cv_calloc(1, sizeof(struct node_conf)); + memset(newnode, 0, sizeof(struct node_conf)); + newnode->addr = addr; + newnode->len = addr_cnt; + newnode->refcount = 1; + pthread_mutex_lock(&lock_conf_node); + struct node_conf *oldnode = config.node; + config.node = newnode; + pthread_mutex_unlock(&lock_conf_node); + conf_node_dec_ref(oldnode); + } } else if (strcmp(name, "slowlog-log-slower-than") == 0) { config.slowlog_log_slower_than = atoi(value); } else if (strcmp(name, "slowlog-max-len") == 0) { @@ -158,7 +169,27 @@ int config_add(char *name, char *value) } else if (strcmp(name, "slowlog-statsd-enabled") == 0) { config.slowlog_statsd_enabled = atoi(value); } - return 0; + return CORVUS_OK; +} + +struct node_conf *conf_node_inc_ref() +{ + pthread_mutex_lock(&lock_conf_node); + struct node_conf *node = config.node; + int refcount = ATOMIC_INC(node->refcount, 1); + assert(refcount >= 0); + pthread_mutex_unlock(&lock_conf_node); + return node; +} + +void conf_node_dec_ref(struct node_conf *node) +{ + int refcount = ATOMIC_DEC(node->refcount, 1); + assert(refcount >= 0); + if (refcount == 0) { + cv_free(node->addr); + cv_free(node); + } } int read_line(char **line, size_t *bytes, FILE *fp) @@ -595,7 +626,7 @@ int main(int argc, const char *argv[]) usage(argv[0]); return EXIT_FAILURE; } - if (config.node.len <= 0) { + if (config.node->len <= 0) { fprintf(stderr, "Error: invalid upstream list, `node` should be set to a valid nodes list.\n"); return EXIT_FAILURE; } @@ -668,7 +699,8 @@ int main(int argc, const char *argv[]) destroy_contexts(); cmd_map_destroy(); cv_free(config.requirepass); - cv_free(config.node.addr); + conf_node_dec_ref(config.node); + pthread_mutex_destroy(&lock_conf_node); if (config.syslog) closelog(); return EXIT_SUCCESS; } diff --git a/src/corvus.h b/src/corvus.h index 0e68a48..520ac96 100644 --- a/src/corvus.h +++ b/src/corvus.h @@ -54,6 +54,7 @@ enum { struct node_conf { struct address *addr; int len; + int refcount; }; struct context { @@ -95,7 +96,7 @@ struct context { struct { char cluster[CLUSTER_NAME_SIZE + 1]; uint16_t bind; - struct node_conf node; + struct node_conf *node; int thread; int loglevel; bool syslog; @@ -114,7 +115,10 @@ struct { } config; int64_t get_time(); +struct node_conf *conf_node_inc_ref(); +void conf_node_dec_ref(struct node_conf *node); struct context *get_contexts(); int thread_spawn(struct context *ctx, void *(*start_routine) (void *)); +int config_add(char *name, char *value); #endif /* end of include guard: CORVUS_H */ diff --git a/src/slot.c b/src/slot.c index dff5e37..9a89500 100644 --- a/src/slot.c +++ b/src/slot.c @@ -36,13 +36,15 @@ static struct { static inline void node_list_init() { + struct node_conf *node = conf_node_inc_ref(); pthread_rwlock_wrlock(&node_list.lock); node_list.len = 0; - for (int i = 0; i < MIN(config.node.len, MAX_NODE_LIST); i++) { - memcpy(&node_list.nodes[i], &config.node.addr[i], sizeof(struct address)); + for (int i = 0; i < MIN(node->len, MAX_NODE_LIST); i++) { + memcpy(&node_list.nodes[i], &node->addr[i], sizeof(struct address)); node_list.len++; } pthread_rwlock_unlock(&node_list.lock); + conf_node_dec_ref(node); } static inline void node_list_add(struct node_info *node) diff --git a/tests/corvus_test.c b/tests/corvus_test.c index 38733fa..9edc40c 100644 --- a/tests/corvus_test.c +++ b/tests/corvus_test.c @@ -3,6 +3,7 @@ #include "test.h" #include "corvus.h" #include "slot.h" +#include "alloc.h" static void usage(const char *name) { @@ -83,7 +84,9 @@ int main(int argc, const char *argv[]) build_contexts(); struct context *contexts = get_contexts(); - memcpy(&config.node, &conf, sizeof(config.node)); + config.node = cv_malloc(sizeof(struct node_conf)); + memset(config.node, 0, sizeof(struct node_conf)); + config.node->refcount = 1; slot_start_manager(&contexts[config.thread]); RUN_CASE(test_slot); diff --git a/tests/test_config.c b/tests/test_config.c index ea2c5ba..03f2a03 100644 --- a/tests/test_config.c +++ b/tests/test_config.c @@ -1,8 +1,6 @@ #include "test.h" #include "alloc.h" -extern int config_add(char *name, char *value); - TEST(test_config_bind) { char n[] = "bind";