Skip to content

Commit

Permalink
Merge pull request #95 from jasonjoo2010/master
Browse files Browse the repository at this point in the history
Add `config set` command to modify corresponding redis nodes dynamically
  • Loading branch information
doyoubi authored Jan 10, 2017
2 parents 8e848a0 + 0f42521 commit 0318b8b
Show file tree
Hide file tree
Showing 8 changed files with 150 additions and 25 deletions.
85 changes: 84 additions & 1 deletion src/command.c
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#include <sys/time.h>
#include <unistd.h>
#include <assert.h>
#include <pthread.h>
#include "corvus.h"
#include "socket.h"
#include "logging.h"
Expand Down Expand Up @@ -40,6 +41,11 @@ const char *rep_server_err = "-ERR Proxy fail to get server\r\n";
const char *rep_timeout_err = "-ERR Proxy timed out\r\n";
const char *rep_slowlog_not_enabled = "-ERR Slowlog not enabled\r\n";

const char *rep_config_err = "-ERR Config error\r\n";
const char *rep_config_unsupported_err = "-ERR Config cmd not supported\r\n";
const char *rep_config_parse_err = "-ERR Config fail to parse command\r\n";
const char *rep_config_addr_err = "-ERR Config fail to parse address\r\n";

const char *rep_get = "*2\r\n$3\r\nGET\r\n";
const char *rep_set = "*3\r\n$3\r\nSET\r\n";
const char *rep_del = "*2\r\n$3\r\nDEL\r\n";
Expand All @@ -55,7 +61,6 @@ struct cmd_item cmds[] = {CMD_DO(CMD_BUILD_MAP)};
const size_t CMD_NUM = sizeof(cmds) / sizeof(struct cmd_item);
static struct dict command_map;


const char *cmd_extract_prefix(const char *prefix)
{
const char *get = "$3\r\nGET\r\n";
Expand Down Expand Up @@ -465,6 +470,82 @@ int cmd_proxy(struct command *cmd, struct redis_data *data)
return CORVUS_OK;
}

int cmd_config(struct command *cmd, struct redis_data *data)
{
ASSERT_TYPE(data, REP_ARRAY);
ASSERT_ELEMENTS(data->elements >= 2, data);

struct redis_data *op = &data->element[1];
struct redis_data *opt = &data->element[2];
ASSERT_TYPE(op, REP_STRING);
ASSERT_TYPE(opt, REP_STRING);

char type[op->pos.str_len + 1];
char option[opt->pos.str_len + 1];
if (pos_to_str(&op->pos, type) == CORVUS_ERR
|| pos_to_str(&opt->pos, option) == CORVUS_ERR) {
LOG(ERROR, "cmd_config: parse error");
return CORVUS_ERR;
}
if (strcasecmp(type, "SET") == 0) {
//config set <item> <val>
ASSERT_ELEMENTS(data->elements >= 4, data);
if (strcasecmp(option, "NODE") == 0) {
// config set node host:port,host1:port1
if (data->elements != 4) {
cmd_mark_fail(cmd, rep_config_parse_err);
} else {
char value[data->element[3].pos.str_len + 1];
// the host-port pair is in form "1.1.1.1:1"(9 bytes) at least
if (data->element[3].pos.str_len < 9
|| pos_to_str(&data->element[3].pos, value) != CORVUS_OK) {
cmd_mark_fail(cmd, rep_config_parse_err);
} else if (config_add("node", value) != CORVUS_OK) {
cmd_mark_fail(cmd, rep_config_addr_err);
} else {
slot_create_job(SLOT_UPDATE);
conn_add_data(cmd->client, (uint8_t*) rep_ok, strlen(rep_ok),
&cmd->rep_buf[0], &cmd->rep_buf[1]);
CMD_INCREF(cmd);
cmd_mark_done(cmd);
}
}
} else {
cmd_mark_fail(cmd, rep_config_unsupported_err);
}
} else if (strcasecmp(type, "GET") == 0) {
//config get <item>
ASSERT_ELEMENTS(data->elements >= 3, data);
if (strcasecmp(option, "NODE") == 0) {
struct node_conf *node = conf_node_inc_ref();
int n = 1024, pos = 0;
char content[n + ADDRESS_LEN];
char data[n + ADDRESS_LEN + 100]; //100 bytes for control data
for (int i = 0; i < node->len; i++) {
if (i > 0) {
content[pos++] = ',';
}
pos += snprintf(content + pos, ADDRESS_LEN, "%s:%d",
node->addr[i].ip, node->addr[i].port);
if (pos >= n) {
break;
}
}
conf_node_dec_ref(node);
int data_len = snprintf(data, sizeof(data), "$%d\r\n%s\r\n", pos, content);
conn_add_data(cmd->client, (uint8_t*) data, data_len,
&cmd->rep_buf[0], &cmd->rep_buf[1]);
CMD_INCREF(cmd);
cmd_mark_done(cmd);
} else {
cmd_mark_fail(cmd, rep_config_parse_err);
}
} else {
cmd_mark_fail(cmd, rep_config_err);
}
return CORVUS_OK;
}

int cmd_auth(struct command *cmd, struct redis_data *data)
{
ASSERT_TYPE(data, REP_ARRAY);
Expand Down Expand Up @@ -720,6 +801,8 @@ int cmd_extra(struct command *cmd, struct redis_data *data)
return cmd_auth(cmd, data);
case CMD_TIME:
return cmd_time(cmd);
case CMD_CONFIG:
return cmd_config(cmd, data);
case CMD_QUIT:
return cmd_quit(cmd);
case CMD_SLOWLOG:
Expand Down
3 changes: 2 additions & 1 deletion src/command.h
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,8 @@
HANDLER(SLOWLOG, EXTRA, UNKNOWN) \
HANDLER(QUIT, EXTRA, UNKNOWN) \
HANDLER(SELECT, UNIMPL, UNKNOWN) \
HANDLER(TIME, EXTRA, UNKNOWN)
HANDLER(TIME, EXTRA, UNKNOWN) \
HANDLER(CONFIG, EXTRA, UNKNOWN)

#define CMD_DEFINE(cmd, type, access) CMD_##cmd,

Expand Down
6 changes: 4 additions & 2 deletions src/connection.c
Original file line number Diff line number Diff line change
Expand Up @@ -303,11 +303,13 @@ struct connection *conn_get_raw_server(struct context *ctx)
int i;
struct connection *server = NULL;

for (i = 0; i < config.node.len; i++) {
server = conn_get_server_from_pool(ctx, &config.node.addr[i], false);
struct node_conf *node = conf_node_inc_ref();
for (i = 0; i < node->len; i++) {
server = conn_get_server_from_pool(ctx, &node->addr[i], false);
if (server == NULL) continue;
break;
}
conf_node_dec_ref(node);
if (server == NULL) {
LOG(ERROR, "conn_get_raw_server: cannot connect to redis server.");
return NULL;
Expand Down
62 changes: 47 additions & 15 deletions src/corvus.c
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#include <pthread.h>
#include <unistd.h>
#include <execinfo.h>
#include <assert.h>
#include <getopt.h>
#include "corvus.h"
#include "mbuf.h"
Expand All @@ -22,14 +23,16 @@
#define MIN_BUFSIZE 64

static struct context *contexts;
static pthread_mutex_t lock_conf_node = PTHREAD_MUTEX_INITIALIZER;

void config_init()
{
memset(config.cluster, 0, CLUSTER_NAME_SIZE + 1);
strncpy(config.cluster, "default", CLUSTER_NAME_SIZE);

config.bind = 12345;
memset(&config.node, 0, sizeof(struct node_conf));
config.node = cv_calloc(1, sizeof(struct node_conf));
config.node->refcount = 1;
config.thread = 4;
config.loglevel = INFO;
config.syslog = 0;
Expand Down Expand Up @@ -135,30 +138,58 @@ int config_add(char *name, char *value)
memcpy(config.requirepass, value, strlen(value));
}
} else if (strcmp(name, "node") == 0) {
cv_free(config.node.addr);
memset(&config.node, 0, sizeof(config.node));

struct address *addr = NULL;
int addr_cnt = 0;
char *p = strtok(value, ",");
while (p) {
config.node.addr = cv_realloc(config.node.addr,
sizeof(struct address) * (config.node.len + 1));

if (socket_parse_ip(p, &config.node.addr[config.node.len]) == -1) {
cv_free(config.node.addr);
return -1;
addr = cv_realloc(addr, sizeof(struct address) * (addr_cnt + 1));
if (socket_parse_ip(p, &addr[addr_cnt]) == -1) {
cv_free(addr);
return CORVUS_ERR;
}

config.node.len++;
addr_cnt++;
p = strtok(NULL, ",");
}
{
struct node_conf *newnode = cv_calloc(1, sizeof(struct node_conf));
memset(newnode, 0, sizeof(struct node_conf));
newnode->addr = addr;
newnode->len = addr_cnt;
newnode->refcount = 1;
pthread_mutex_lock(&lock_conf_node);
struct node_conf *oldnode = config.node;
config.node = newnode;
pthread_mutex_unlock(&lock_conf_node);
conf_node_dec_ref(oldnode);
}
} else if (strcmp(name, "slowlog-log-slower-than") == 0) {
config.slowlog_log_slower_than = atoi(value);
} else if (strcmp(name, "slowlog-max-len") == 0) {
config.slowlog_max_len = atoi(value);
} else if (strcmp(name, "slowlog-statsd-enabled") == 0) {
config.slowlog_statsd_enabled = atoi(value);
}
return 0;
return CORVUS_OK;
}

struct node_conf *conf_node_inc_ref()
{
pthread_mutex_lock(&lock_conf_node);
struct node_conf *node = config.node;
int refcount = ATOMIC_INC(node->refcount, 1);
assert(refcount >= 0);
pthread_mutex_unlock(&lock_conf_node);
return node;
}

void conf_node_dec_ref(struct node_conf *node)
{
int refcount = ATOMIC_DEC(node->refcount, 1);
assert(refcount >= 0);
if (refcount == 0) {
cv_free(node->addr);
cv_free(node);
}
}

int read_line(char **line, size_t *bytes, FILE *fp)
Expand Down Expand Up @@ -595,7 +626,7 @@ int main(int argc, const char *argv[])
usage(argv[0]);
return EXIT_FAILURE;
}
if (config.node.len <= 0) {
if (config.node->len <= 0) {
fprintf(stderr, "Error: invalid upstream list, `node` should be set to a valid nodes list.\n");
return EXIT_FAILURE;
}
Expand Down Expand Up @@ -668,7 +699,8 @@ int main(int argc, const char *argv[])
destroy_contexts();
cmd_map_destroy();
cv_free(config.requirepass);
cv_free(config.node.addr);
conf_node_dec_ref(config.node);
pthread_mutex_destroy(&lock_conf_node);
if (config.syslog) closelog();
return EXIT_SUCCESS;
}
Expand Down
6 changes: 5 additions & 1 deletion src/corvus.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ enum {
struct node_conf {
struct address *addr;
int len;
int refcount;
};

struct context {
Expand Down Expand Up @@ -95,7 +96,7 @@ struct context {
struct {
char cluster[CLUSTER_NAME_SIZE + 1];
uint16_t bind;
struct node_conf node;
struct node_conf *node;
int thread;
int loglevel;
bool syslog;
Expand All @@ -114,7 +115,10 @@ struct {
} config;

int64_t get_time();
struct node_conf *conf_node_inc_ref();
void conf_node_dec_ref(struct node_conf *node);
struct context *get_contexts();
int thread_spawn(struct context *ctx, void *(*start_routine) (void *));
int config_add(char *name, char *value);

#endif /* end of include guard: CORVUS_H */
6 changes: 4 additions & 2 deletions src/slot.c
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,15 @@ static struct {

static inline void node_list_init()
{
struct node_conf *node = conf_node_inc_ref();
pthread_rwlock_wrlock(&node_list.lock);
node_list.len = 0;
for (int i = 0; i < MIN(config.node.len, MAX_NODE_LIST); i++) {
memcpy(&node_list.nodes[i], &config.node.addr[i], sizeof(struct address));
for (int i = 0; i < MIN(node->len, MAX_NODE_LIST); i++) {
memcpy(&node_list.nodes[i], &node->addr[i], sizeof(struct address));
node_list.len++;
}
pthread_rwlock_unlock(&node_list.lock);
conf_node_dec_ref(node);
}

static inline void node_list_add(struct node_info *node)
Expand Down
5 changes: 4 additions & 1 deletion tests/corvus_test.c
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include "test.h"
#include "corvus.h"
#include "slot.h"
#include "alloc.h"

static void usage(const char *name)
{
Expand Down Expand Up @@ -83,7 +84,9 @@ int main(int argc, const char *argv[])
build_contexts();
struct context *contexts = get_contexts();

memcpy(&config.node, &conf, sizeof(config.node));
config.node = cv_malloc(sizeof(struct node_conf));
memset(config.node, 0, sizeof(struct node_conf));
config.node->refcount = 1;
slot_start_manager(&contexts[config.thread]);

RUN_CASE(test_slot);
Expand Down
2 changes: 0 additions & 2 deletions tests/test_config.c
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
#include "test.h"
#include "alloc.h"

extern int config_add(char *name, char *value);

TEST(test_config_bind) {
char n[] = "bind";

Expand Down

0 comments on commit 0318b8b

Please sign in to comment.