Skip to content

Commit

Permalink
remove config lock now and add 'config get node'
Browse files Browse the repository at this point in the history
  • Loading branch information
jason-joo committed Jan 4, 2017
1 parent f083bd2 commit a1642dd
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 32 deletions.
55 changes: 39 additions & 16 deletions src/command.c
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@ static const char *rep_auth_not_set = "-ERR Client sent AUTH, but no password is
struct cmd_item cmds[] = {CMD_DO(CMD_BUILD_MAP)};
const size_t CMD_NUM = sizeof(cmds) / sizeof(struct cmd_item);
static struct dict command_map;
static pthread_mutex_t lock_config = PTHREAD_MUTEX_INITIALIZER;

const char *cmd_extract_prefix(const char *prefix)
{
Expand Down Expand Up @@ -488,29 +487,54 @@ int cmd_config(struct command *cmd, struct redis_data *data)
return CORVUS_ERR;
}
if (strcasecmp(type, "SET") == 0) {
// `config set` generelly need global lock
pthread_mutex_lock(&lock_config);
if (strcasecmp(option, "NODE") == 0) {
// config set node host:port,host1:port1
char value[data->element[3].pos.str_len + 1];
if (data->elements != 4) {
cmd_mark_fail(cmd, rep_config_parse_err);
} else if (data->element[3].pos.str_len
< 9|| pos_to_str(&data->element[3].pos, value) != CORVUS_OK) {
cmd_mark_fail(cmd, rep_config_parse_err);
} else if (config_add("node", value) != CORVUS_OK) {
cmd_mark_fail(cmd, rep_config_addr_err);
} else {
slot_create_job(SLOT_UPDATE);
conn_add_data(cmd->client, (uint8_t*) rep_ok, strlen(rep_ok),
&cmd->rep_buf[0], &cmd->rep_buf[1]);
CMD_INCREF(cmd);
cmd_mark_done(cmd);
char value[data->element[3].pos.str_len + 1];
// the host-port pair is in form "1.1.1.1:1"(9 bytes) at least
if (data->element[3].pos.str_len < 9
|| pos_to_str(&data->element[3].pos, value) != CORVUS_OK) {
cmd_mark_fail(cmd, rep_config_parse_err);
} else if (config_add("node", value) != CORVUS_OK) {
cmd_mark_fail(cmd, rep_config_addr_err);
} else {
slot_create_job(SLOT_UPDATE);
conn_add_data(cmd->client, (uint8_t*) rep_ok, strlen(rep_ok),
&cmd->rep_buf[0], &cmd->rep_buf[1]);
CMD_INCREF(cmd);
cmd_mark_done(cmd);
}
}
} else {
cmd_mark_fail(cmd, rep_addr_err);
}
pthread_mutex_unlock(&lock_config);
} else if (strcasecmp(type, "GET") == 0) {
if (strcasecmp(option, "NODE") == 0) {
struct node_conf *node = conf_node_inc_ref();
int n = 1024, pos = 0;
char content[n + ADDRESS_LEN];
char data[n + ADDRESS_LEN + 100]; //100 bytes for control data
for (int i = 0; i < node->len; i++) {
if (i > 0) {
content[pos++] = ',';
}
pos += snprintf(content + pos, ADDRESS_LEN, "%s:%d",
node->addr[i].ip, node->addr[i].port);
if (pos >= n) {
break;
}
}
conf_node_dec_ref(node);
int data_len = snprintf(data, sizeof(data), "$%d\r\n%s\r\n", pos, content);
conn_add_data(cmd->client, (uint8_t*) data, data_len,
&cmd->rep_buf[0], &cmd->rep_buf[1]);
CMD_INCREF(cmd);
cmd_mark_done(cmd);
} else {
cmd_mark_fail(cmd, rep_config_parse_err);
}
} else {
cmd_mark_fail(cmd, rep_config_err);
}
Expand Down Expand Up @@ -1027,7 +1051,6 @@ void cmd_map_init()
void cmd_map_destroy()
{
dict_free(&command_map);
pthread_mutex_destroy(&lock_config);
}

struct command *cmd_create(struct context *ctx)
Expand Down
27 changes: 13 additions & 14 deletions src/corvus.c
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ void config_init()

config.bind = 12345;
config.node = cv_calloc(1, sizeof(struct node_conf));
memset(config.node, 0, sizeof(struct node_conf));
config.node->refcount = 1;
config.thread = 4;
config.loglevel = INFO;
Expand Down Expand Up @@ -142,24 +141,26 @@ int config_add(char *name, char *value)
int addr_cnt = 0;
char *p = strtok(value, ",");
while (p) {
addr = cv_realloc(addr, sizeof(struct address) * (addr_cnt + 1));
addr = cv_realloc(addr, sizeof(struct address) * (addr_cnt + 1));
if (socket_parse_ip(p, &addr[addr_cnt]) == -1) {
cv_free(addr);
return CORVUS_ERR;
}
addr_cnt++;
p = strtok(NULL, ",");
}
{
struct node_conf *newnode = cv_calloc(1, sizeof(struct node_conf));
memset(newnode, 0, sizeof(struct node_conf));
newnode->addr = addr;
newnode->len = addr_cnt;
newnode->refcount = 1;
struct node_conf *oldnode = config.node;
config.node = newnode;
conf_node_dec_ref(oldnode);
}
{
struct node_conf *newnode = cv_calloc(1, sizeof(struct node_conf));
memset(newnode, 0, sizeof(struct node_conf));
newnode->addr = addr;
newnode->len = addr_cnt;
newnode->refcount = 1;
pthread_mutex_lock(&lock_conf_node);
struct node_conf *oldnode = config.node;
config.node = newnode;
pthread_mutex_unlock(&lock_conf_node);
conf_node_dec_ref(oldnode);
}
} else if (strcmp(name, "slowlog-log-slower-than") == 0) {
config.slowlog_log_slower_than = atoi(value);
} else if (strcmp(name, "slowlog-max-len") == 0) {
Expand All @@ -182,14 +183,12 @@ struct node_conf *conf_node_inc_ref()

void conf_node_dec_ref(struct node_conf *node)
{
pthread_mutex_lock(&lock_conf_node);
int refcount = ATOMIC_DEC(node->refcount, 1);
assert(refcount >= 0);
if (refcount == 0) {
cv_free(node->addr);
cv_free(node);
}
pthread_mutex_unlock(&lock_conf_node);
}

int read_line(char **line, size_t *bytes, FILE *fp)
Expand Down
4 changes: 2 additions & 2 deletions src/slot.c
Original file line number Diff line number Diff line change
Expand Up @@ -36,15 +36,15 @@ static struct {

static inline void node_list_init()
{
pthread_rwlock_wrlock(&node_list.lock);
struct node_conf *node = conf_node_inc_ref();
pthread_rwlock_wrlock(&node_list.lock);
node_list.len = 0;
for (int i = 0; i < MIN(node->len, MAX_NODE_LIST); i++) {
memcpy(&node_list.nodes[i], &node->addr[i], sizeof(struct address));
node_list.len++;
}
conf_node_dec_ref(node);
pthread_rwlock_unlock(&node_list.lock);
conf_node_dec_ref(node);
}

static inline void node_list_add(struct node_info *node)
Expand Down

0 comments on commit a1642dd

Please sign in to comment.