Skip to content

Commit

Permalink
tmp save
Browse files Browse the repository at this point in the history
fix for segfault

update when setremotes

config.node non-blocking modification
  • Loading branch information
jason-joo committed Oct 17, 2016
1 parent 218e8d7 commit 5f5fc17
Show file tree
Hide file tree
Showing 6 changed files with 105 additions and 17 deletions.
57 changes: 57 additions & 0 deletions src/command.c
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#include <sys/time.h>
#include <unistd.h>
#include <assert.h>
#include <pthread.h>
#include "corvus.h"
#include "socket.h"
#include "logging.h"
Expand Down Expand Up @@ -54,6 +55,8 @@ static const char *rep_auth_not_set = "-ERR Client sent AUTH, but no password is
struct cmd_item cmds[] = {CMD_DO(CMD_BUILD_MAP)};
const size_t CMD_NUM = sizeof(cmds) / sizeof(struct cmd_item);
static struct dict command_map;
//need not destroy in linux
static pthread_mutex_t lock_setremotes = PTHREAD_MUTEX_INITIALIZER;


const char *cmd_extract_prefix(const char *prefix)
Expand Down Expand Up @@ -453,6 +456,60 @@ int cmd_proxy(struct command *cmd, struct redis_data *data)

if (strcasecmp(type, "INFO") == 0) {
return cmd_proxy_info(cmd);
} else if (strcasecmp(type, "SETREMOTES") == 0) {
//lock global
pthread_mutex_lock(&lock_setremotes);
if (data->elements % 2 != 0 || data->elements < 4) {
cmd_mark_fail(cmd, rep_addr_err);
} else {
int i = 0;
char ip[45], port_s[10];
struct address *addr = 0;
int addr_cnt = 0;
int addr_len = 0;
int port_len = 0;
for (i = 2; i < data->elements; i += 2) {
long port = 0;
addr_len = data->element[i].pos.str_len;
port_len = data->element[i + 1].pos.str_len;
if (addr_len > sizeof(ip) - 1 || port_len > sizeof(port) - 1) {
continue;
}
if (pos_to_str(&data->element[i].pos, ip) == CORVUS_ERR) {
continue;
}
if (pos_to_str(&data->element[i + 1].pos, port_s) == CORVUS_ERR) {
continue;
}
port = atoi(port_s);
if (port < 1 || port > 65535) {
continue;
}
//add to list
addr = cv_realloc(addr,
sizeof(struct address) * (addr_cnt + 1));
memcpy(addr[addr_cnt].ip, ip, addr_len);
addr[addr_cnt].ip[addr_len] = '\0';
addr[addr_cnt].port = port;
addr_cnt ++;
}
{
struct node_conf *newnode = cv_malloc(sizeof(struct node_conf));
memset(newnode, 0, sizeof(struct node_conf));
newnode->addr = addr;
newnode->len = addr_cnt;
conf_node_inc_ref(newnode);
struct node_conf *oldnode = config.node;
config.node = newnode;
conf_node_dec_ref(oldnode);
}
slot_create_job(SLOT_UPDATE);
conn_add_data(cmd->client, (uint8_t*) rep_ok, strlen(rep_ok),
&cmd->rep_buf[0], &cmd->rep_buf[1]);
CMD_INCREF(cmd);
cmd_mark_done(cmd);
}
pthread_mutex_unlock(&lock_setremotes);
} else if (strcasecmp(type, "UPDATESLOTMAP") == 0) {
slot_create_job(SLOT_UPDATE);
conn_add_data(cmd->client, (uint8_t*)rep_ok, strlen(rep_ok),
Expand Down
7 changes: 5 additions & 2 deletions src/connection.c
Original file line number Diff line number Diff line change
Expand Up @@ -303,11 +303,14 @@ struct connection *conn_get_raw_server(struct context *ctx)
int i;
struct connection *server = NULL;

for (i = 0; i < config.node.len; i++) {
server = conn_get_server_from_pool(ctx, &config.node.addr[i], false);
struct node_conf *node = config.node;
conf_node_inc_ref(node);
for (i = 0; i < node->len; i++) {
server = conn_get_server_from_pool(ctx, &node->addr[i], false);
if (server == NULL) continue;
break;
}
conf_node_dec_ref(node);
if (server == NULL) {
LOG(ERROR, "conn_get_raw_server: cannot connect to redis server.");
return NULL;
Expand Down
41 changes: 30 additions & 11 deletions src/corvus.c
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#include <pthread.h>
#include <unistd.h>
#include <execinfo.h>
#include <assert.h>
#include "corvus.h"
#include "mbuf.h"
#include "slot.h"
Expand All @@ -28,7 +29,9 @@ void config_init()
strncpy(config.cluster, "default", CLUSTER_NAME_SIZE);

config.bind = 12345;
memset(&config.node, 0, sizeof(struct node_conf));
config.node = cv_malloc(sizeof(struct node_conf));
memset(config.node, 0, sizeof(struct node_conf));
conf_node_inc_ref(config.node);
config.thread = 4;
config.loglevel = INFO;
config.syslog = 0;
Expand Down Expand Up @@ -134,20 +137,21 @@ int config_add(char *name, char *value)
memcpy(config.requirepass, value, strlen(value));
}
} else if (strcmp(name, "node") == 0) {
cv_free(config.node.addr);
memset(&config.node, 0, sizeof(config.node));

//we init config before serving so it doesn't need to inc refcount
cv_free(config.node->addr);
config.node->addr = NULL;
config.node->len = 0;
char *p = strtok(value, ",");
while (p) {
config.node.addr = cv_realloc(config.node.addr,
sizeof(struct address) * (config.node.len + 1));
config.node->addr = cv_realloc(config.node->addr,
sizeof(struct address) * (config.node->len + 1));

if (socket_parse_ip(p, &config.node.addr[config.node.len]) == -1) {
cv_free(config.node.addr);
if (socket_parse_ip(p, &config.node->addr[config.node->len]) == -1) {
cv_free(config.node->addr);
return -1;
}

config.node.len++;
config.node->len++;
p = strtok(NULL, ",");
}
} else if (strcmp(name, "slowlog-log-slower-than") == 0) {
Expand All @@ -160,6 +164,21 @@ int config_add(char *name, char *value)
return 0;
}

void conf_node_inc_ref(struct node_conf *node)
{
int refcount = ATOMIC_INC(node->refcount, 1);
assert(refcount >= 0);
}

void conf_node_dec_ref(struct node_conf *node)
{
int refcount = ATOMIC_DEC(node->refcount, 1);
assert(refcount >= 0);
if (refcount > 0) return;
cv_free(node->addr);
cv_free(node);
}

int read_line(char **line, size_t *bytes, FILE *fp)
{
size_t len, index = 0;
Expand Down Expand Up @@ -472,7 +491,7 @@ int main(int argc, const char *argv[])
fprintf(stderr, "Error: invalid config.\n");
return EXIT_FAILURE;
}
if (config.node.len <= 0) {
if (config.node->len <= 0) {
fprintf(stderr, "Error: invalid config, `node` should be set.\n");
return EXIT_FAILURE;
}
Expand Down Expand Up @@ -545,7 +564,7 @@ int main(int argc, const char *argv[])
destroy_contexts();
cmd_map_destroy();
cv_free(config.requirepass);
cv_free(config.node.addr);
conf_node_dec_ref(config.node);
if (config.syslog) closelog();
return EXIT_SUCCESS;
}
Expand Down
5 changes: 4 additions & 1 deletion src/corvus.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ enum {
struct node_conf {
struct address *addr;
int len;
int refcount;
};

struct context {
Expand Down Expand Up @@ -95,7 +96,7 @@ struct context {
struct {
char cluster[CLUSTER_NAME_SIZE + 1];
uint16_t bind;
struct node_conf node;
struct node_conf *node;
int thread;
int loglevel;
bool syslog;
Expand All @@ -114,6 +115,8 @@ struct {
} config;

int64_t get_time();
void conf_node_inc_ref(struct node_conf *node);
void conf_node_dec_ref(struct node_conf *node);
struct context *get_contexts();
int thread_spawn(struct context *ctx, void *(*start_routine) (void *));

Expand Down
7 changes: 5 additions & 2 deletions src/slot.c
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,14 @@ static struct {
static inline void node_list_init()
{
pthread_rwlock_wrlock(&node_list.lock);
struct node_conf *node = config.node;
conf_node_inc_ref(node);
node_list.len = 0;
for (int i = 0; i < MIN(config.node.len, MAX_NODE_LIST); i++) {
memcpy(&node_list.nodes[i], &config.node.addr[i], sizeof(struct address));
for (int i = 0; i < MIN(node->len, MAX_NODE_LIST); i++) {
memcpy(&node_list.nodes[i], &node->addr[i], sizeof(struct address));
node_list.len++;
}
conf_node_dec_ref(node);
pthread_rwlock_unlock(&node_list.lock);
}

Expand Down
5 changes: 4 additions & 1 deletion tests/corvus_test.c
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include "test.h"
#include "corvus.h"
#include "slot.h"
#include "alloc.h"

static void usage(const char *name)
{
Expand Down Expand Up @@ -83,7 +84,9 @@ int main(int argc, const char *argv[])
build_contexts();
struct context *contexts = get_contexts();

memcpy(&config.node, &conf, sizeof(config.node));
config.node = cv_malloc(sizeof(struct node_conf));
memset(config.node, 0, sizeof(struct node_conf));
conf_node_inc_ref(config.node);
slot_start_manager(&contexts[config.thread]);

RUN_CASE(test_slot);
Expand Down

0 comments on commit 5f5fc17

Please sign in to comment.