Skip to content

Commit

Permalink
add supporting for config.node modification in runtime
Browse files Browse the repository at this point in the history
  • Loading branch information
jason-joo committed Oct 18, 2016
1 parent 218e8d7 commit 15393e3
Show file tree
Hide file tree
Showing 8 changed files with 117 additions and 26 deletions.
57 changes: 56 additions & 1 deletion src/command.c
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#include <sys/time.h>
#include <unistd.h>
#include <assert.h>
#include <pthread.h>
#include "corvus.h"
#include "socket.h"
#include "logging.h"
Expand Down Expand Up @@ -40,6 +41,10 @@ const char *rep_server_err = "-ERR Proxy fail to get server\r\n";
const char *rep_timeout_err = "-ERR Proxy timed out\r\n";
const char *rep_slowlog_not_enabled = "-ERR Slowlog not enabled\r\n";

const char *rep_config_err = "-ERR Config error\r\n";
const char *rep_config_parse_err = "-ERR Config fail to parse command\r\n";
const char *rep_config_addr_err = "-ERR Config fail to parse address\r\n";

const char *rep_get = "*2\r\n$3\r\nGET\r\n";
const char *rep_set = "*3\r\n$3\r\nSET\r\n";
const char *rep_del = "*2\r\n$3\r\nDEL\r\n";
Expand All @@ -54,7 +59,7 @@ static const char *rep_auth_not_set = "-ERR Client sent AUTH, but no password is
struct cmd_item cmds[] = {CMD_DO(CMD_BUILD_MAP)};
const size_t CMD_NUM = sizeof(cmds) / sizeof(struct cmd_item);
static struct dict command_map;

static pthread_mutex_t lock_config = PTHREAD_MUTEX_INITIALIZER;

const char *cmd_extract_prefix(const char *prefix)
{
Expand Down Expand Up @@ -465,6 +470,53 @@ int cmd_proxy(struct command *cmd, struct redis_data *data)
return CORVUS_OK;
}

int cmd_config(struct command *cmd, struct redis_data *data)
{
ASSERT_TYPE(data, REP_ARRAY);
ASSERT_ELEMENTS(data->elements >= 2, data);

struct redis_data *op = &data->element[1];
struct redis_data *opt = &data->element[2];
ASSERT_TYPE(op, REP_STRING);
ASSERT_TYPE(opt, REP_STRING);

char type[op->pos.str_len + 1];
char option[opt->pos.str_len + 1];
if (pos_to_str(&op->pos, type) == CORVUS_ERR
|| pos_to_str(&opt->pos, option) == CORVUS_ERR) {
LOG(ERROR, "cmd_config: parse error");
return CORVUS_ERR;
}
if (strcasecmp(type, "SET") == 0) {
// `config set` generelly need global lock
pthread_mutex_lock(&lock_config);
if (strcasecmp(option, "NODE") == 0) {
// config set node host:port,host1:port1
char value[data->element[3].pos.str_len + 1];
if (data->elements != 4) {
cmd_mark_fail(cmd, rep_config_parse_err);
} else if (data->element[3].pos.str_len
< 9|| pos_to_str(&data->element[3].pos, value) != CORVUS_OK) {
cmd_mark_fail(cmd, rep_config_parse_err);
} else if (config_add("node", value) != CORVUS_OK) {
cmd_mark_fail(cmd, rep_config_addr_err);
} else {
slot_create_job(SLOT_UPDATE);
conn_add_data(cmd->client, (uint8_t*) rep_ok, strlen(rep_ok),
&cmd->rep_buf[0], &cmd->rep_buf[1]);
CMD_INCREF(cmd);
cmd_mark_done(cmd);
}
} else {
cmd_mark_fail(cmd, rep_addr_err);
}
pthread_mutex_unlock(&lock_config);
} else {
cmd_mark_fail(cmd, rep_config_err);
}
return CORVUS_OK;
}

int cmd_auth(struct command *cmd, struct redis_data *data)
{
ASSERT_TYPE(data, REP_ARRAY);
Expand Down Expand Up @@ -720,6 +772,8 @@ int cmd_extra(struct command *cmd, struct redis_data *data)
return cmd_auth(cmd, data);
case CMD_TIME:
return cmd_time(cmd);
case CMD_CONFIG:
return cmd_config(cmd, data);
case CMD_QUIT:
return cmd_quit(cmd);
case CMD_SLOWLOG:
Expand Down Expand Up @@ -973,6 +1027,7 @@ void cmd_map_init()
void cmd_map_destroy()
{
dict_free(&command_map);
pthread_mutex_destroy(&lock_config);
}

struct command *cmd_create(struct context *ctx)
Expand Down
3 changes: 2 additions & 1 deletion src/command.h
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,8 @@
HANDLER(SLOWLOG, EXTRA, UNKNOWN) \
HANDLER(QUIT, EXTRA, UNKNOWN) \
HANDLER(SELECT, UNIMPL, UNKNOWN) \
HANDLER(TIME, EXTRA, UNKNOWN)
HANDLER(TIME, EXTRA, UNKNOWN) \
HANDLER(CONFIG, EXTRA, UNKNOWN)

#define CMD_DEFINE(cmd, type, access) CMD_##cmd,

Expand Down
7 changes: 5 additions & 2 deletions src/connection.c
Original file line number Diff line number Diff line change
Expand Up @@ -303,11 +303,14 @@ struct connection *conn_get_raw_server(struct context *ctx)
int i;
struct connection *server = NULL;

for (i = 0; i < config.node.len; i++) {
server = conn_get_server_from_pool(ctx, &config.node.addr[i], false);
struct node_conf *node = config.node;
conf_node_inc_ref(node);
for (i = 0; i < node->len; i++) {
server = conn_get_server_from_pool(ctx, &node->addr[i], false);
if (server == NULL) continue;
break;
}
conf_node_dec_ref(node);
if (server == NULL) {
LOG(ERROR, "conn_get_raw_server: cannot connect to redis server.");
return NULL;
Expand Down
56 changes: 40 additions & 16 deletions src/corvus.c
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#include <pthread.h>
#include <unistd.h>
#include <execinfo.h>
#include <assert.h>
#include "corvus.h"
#include "mbuf.h"
#include "slot.h"
Expand All @@ -28,7 +29,9 @@ void config_init()
strncpy(config.cluster, "default", CLUSTER_NAME_SIZE);

config.bind = 12345;
memset(&config.node, 0, sizeof(struct node_conf));
config.node = cv_malloc(sizeof(struct node_conf));
memset(config.node, 0, sizeof(struct node_conf));
conf_node_inc_ref(config.node);
config.thread = 4;
config.loglevel = INFO;
config.syslog = 0;
Expand Down Expand Up @@ -65,7 +68,7 @@ int config_add(char *name, char *value)
{
int val;
if (strcmp(name, "cluster") == 0) {
if (strlen(value) <= 0) return 0;
if (strlen(value) <= 0) return CORVUS_OK;
strncpy(config.cluster, value, CLUSTER_NAME_SIZE);
} else if (strcmp(name, "bind") == 0) {
if (socket_parse_port(value, &config.bind) == CORVUS_ERR) {
Expand Down Expand Up @@ -134,30 +137,51 @@ int config_add(char *name, char *value)
memcpy(config.requirepass, value, strlen(value));
}
} else if (strcmp(name, "node") == 0) {
cv_free(config.node.addr);
memset(&config.node, 0, sizeof(config.node));

struct address *addr = NULL;
int addr_cnt = 0;
char *p = strtok(value, ",");
while (p) {
config.node.addr = cv_realloc(config.node.addr,
sizeof(struct address) * (config.node.len + 1));

if (socket_parse_ip(p, &config.node.addr[config.node.len]) == -1) {
cv_free(config.node.addr);
return -1;
addr = cv_realloc(addr, sizeof(struct address) * (addr_cnt + 1));
if (socket_parse_ip(p, &addr[addr_cnt]) == -1) {
cv_free(addr);
return CORVUS_ERR;
}

config.node.len++;
addr_cnt++;
p = strtok(NULL, ",");
}
{
struct node_conf *newnode = cv_malloc(sizeof(struct node_conf));
memset(newnode, 0, sizeof(struct node_conf));
newnode->addr = addr;
newnode->len = addr_cnt;
conf_node_inc_ref(newnode);
struct node_conf *oldnode = config.node;
config.node = newnode;
conf_node_dec_ref(oldnode);
}
} else if (strcmp(name, "slowlog-log-slower-than") == 0) {
config.slowlog_log_slower_than = atoi(value);
} else if (strcmp(name, "slowlog-max-len") == 0) {
config.slowlog_max_len = atoi(value);
} else if (strcmp(name, "slowlog-statsd-enabled") == 0) {
config.slowlog_statsd_enabled = atoi(value);
}
return 0;
return CORVUS_OK;
}

void conf_node_inc_ref(struct node_conf *node)
{
int refcount = ATOMIC_INC(node->refcount, 1);
assert(refcount >= 0);
}

void conf_node_dec_ref(struct node_conf *node)
{
int refcount = ATOMIC_DEC(node->refcount, 1);
assert(refcount >= 0);
if (refcount > 0) return;
cv_free(node->addr);
cv_free(node);
}

int read_line(char **line, size_t *bytes, FILE *fp)
Expand Down Expand Up @@ -472,7 +496,7 @@ int main(int argc, const char *argv[])
fprintf(stderr, "Error: invalid config.\n");
return EXIT_FAILURE;
}
if (config.node.len <= 0) {
if (config.node->len <= 0) {
fprintf(stderr, "Error: invalid config, `node` should be set.\n");
return EXIT_FAILURE;
}
Expand Down Expand Up @@ -545,7 +569,7 @@ int main(int argc, const char *argv[])
destroy_contexts();
cmd_map_destroy();
cv_free(config.requirepass);
cv_free(config.node.addr);
conf_node_dec_ref(config.node);
if (config.syslog) closelog();
return EXIT_SUCCESS;
}
Expand Down
6 changes: 5 additions & 1 deletion src/corvus.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ enum {
struct node_conf {
struct address *addr;
int len;
int refcount;
};

struct context {
Expand Down Expand Up @@ -95,7 +96,7 @@ struct context {
struct {
char cluster[CLUSTER_NAME_SIZE + 1];
uint16_t bind;
struct node_conf node;
struct node_conf *node;
int thread;
int loglevel;
bool syslog;
Expand All @@ -114,7 +115,10 @@ struct {
} config;

int64_t get_time();
void conf_node_inc_ref(struct node_conf *node);
void conf_node_dec_ref(struct node_conf *node);
struct context *get_contexts();
int thread_spawn(struct context *ctx, void *(*start_routine) (void *));
int config_add(char *name, char *value);

#endif /* end of include guard: CORVUS_H */
7 changes: 5 additions & 2 deletions src/slot.c
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,14 @@ static struct {
static inline void node_list_init()
{
pthread_rwlock_wrlock(&node_list.lock);
struct node_conf *node = config.node;
conf_node_inc_ref(node);
node_list.len = 0;
for (int i = 0; i < MIN(config.node.len, MAX_NODE_LIST); i++) {
memcpy(&node_list.nodes[i], &config.node.addr[i], sizeof(struct address));
for (int i = 0; i < MIN(node->len, MAX_NODE_LIST); i++) {
memcpy(&node_list.nodes[i], &node->addr[i], sizeof(struct address));
node_list.len++;
}
conf_node_dec_ref(node);
pthread_rwlock_unlock(&node_list.lock);
}

Expand Down
5 changes: 4 additions & 1 deletion tests/corvus_test.c
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include "test.h"
#include "corvus.h"
#include "slot.h"
#include "alloc.h"

static void usage(const char *name)
{
Expand Down Expand Up @@ -83,7 +84,9 @@ int main(int argc, const char *argv[])
build_contexts();
struct context *contexts = get_contexts();

memcpy(&config.node, &conf, sizeof(config.node));
config.node = cv_malloc(sizeof(struct node_conf));
memset(config.node, 0, sizeof(struct node_conf));
conf_node_inc_ref(config.node);
slot_start_manager(&contexts[config.thread]);

RUN_CASE(test_slot);
Expand Down
2 changes: 0 additions & 2 deletions tests/test_config.c
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
#include "test.h"
#include "alloc.h"

extern int config_add(char *name, char *value);

TEST(test_config_bind) {
char n[] = "bind";

Expand Down

0 comments on commit 15393e3

Please sign in to comment.