Skip to content

Commit

Permalink
Add cluster-port support to redis-cli --cluster
Browse files Browse the repository at this point in the history
In redis#9389, we add a new cluster-port config and make cluster bus port
configurable. Now add support for this parameter in redis-cli.

Mainly for `--cluster create` and `--cluster add-node`, we can use
`ip:port@bus_port` to pass the `cluster-port` parameter. The bus_port
is optional, if not specified it is the same as before.

Also add the `bus_port` field in `--cluster backup`, we also need
to backup the `cluster-port`.

Fixes redis#10342
  • Loading branch information
enjoy-binbin committed Feb 25, 2022
1 parent 1dc89e2 commit 65bb28f
Showing 1 changed file with 71 additions and 64 deletions.
135 changes: 71 additions & 64 deletions src/redis-cli.c
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@
#define CLUSTER_MANAGER_MIGRATE_TIMEOUT 60000
#define CLUSTER_MANAGER_MIGRATE_PIPELINE 10
#define CLUSTER_MANAGER_REBALANCE_THRESHOLD 2
#define CLUSTER_PORT_INCR 10000

#define CLUSTER_MANAGER_INVALID_HOST_ARG \
"[ERR] Invalid arguments: you need to pass either a valid " \
Expand Down Expand Up @@ -2804,6 +2805,7 @@ typedef struct clusterManagerNode {
sds name;
char *ip;
int port;
int bus_port; /* cluster-port */
uint64_t current_epoch;
time_t ping_sent;
time_t ping_recv;
Expand Down Expand Up @@ -2874,7 +2876,7 @@ typedef int (*clusterManagerOnReplyError)(redisReply *reply,

/* Cluster Manager helper functions */

static clusterManagerNode *clusterManagerNewNode(char *ip, int port);
static clusterManagerNode *clusterManagerNewNode(char *ip, int port, int bus_port);
static clusterManagerNode *clusterManagerNodeByName(const char *name);
static clusterManagerNode *clusterManagerNodeByAbbreviatedName(const char *n);
static void clusterManagerNodeResetSlots(clusterManagerNode *node);
Expand Down Expand Up @@ -2932,7 +2934,7 @@ typedef struct clusterManagerCommandDef {
} clusterManagerCommandDef;

clusterManagerCommandDef clusterManagerCommands[] = {
{"create", clusterManagerCommandCreate, -2, "host1:port1 ... hostN:portN",
{"create", clusterManagerCommandCreate, -2, "host1:port1[@bus_port1] ... hostN:portN[@bus_portN]",
"replicas <arg>"},
{"check", clusterManagerCommandCheck, -1, "host:port",
"search-multiple-owners"},
Expand All @@ -2946,7 +2948,7 @@ clusterManagerCommandDef clusterManagerCommands[] = {
"weight <node1=w1...nodeN=wN>,use-empty-masters,"
"timeout <arg>,simulate,pipeline <arg>,threshold <arg>,replace"},
{"add-node", clusterManagerCommandAddNode, 2,
"new_host:new_port existing_host:existing_port", "slave,master-id <arg>"},
"new_host:new_port[@new_bus_port] existing_host:existing_port", "slave,master-id <arg>"},
{"del-node", clusterManagerCommandDeleteNode, 2, "host:port node_id",NULL},
{"call", clusterManagerCommandCall, -2,
"host:port command arg arg .. arg", "only-masters,only-replicas"},
Expand Down Expand Up @@ -3031,6 +3033,7 @@ static clusterManagerCommandProc *validateClusterManagerCommand(void) {
static int parseClusterNodeAddress(char *addr, char **ip_ptr, int *port_ptr,
int *bus_port_ptr)
{
/* ip:port[@bus_port] */
char *c = strrchr(addr, '@');
if (c != NULL) {
*c = '\0';
Expand All @@ -3053,12 +3056,13 @@ static int parseClusterNodeAddress(char *addr, char **ip_ptr, int *port_ptr,
* port into variables referenced by 'ip_ptr' and 'port_ptr' pointers,
* elsewhere it returns 0. */
static int getClusterHostFromCmdArgs(int argc, char **argv,
char **ip_ptr, int *port_ptr) {
int port = 0;
char **ip_ptr, int *port_ptr,
int *bus_port_ptr) {
int port = 0, bus_port = 0;
char *ip = NULL;
if (argc == 1) {
char *addr = argv[0];
if (!parseClusterNodeAddress(addr, &ip, &port, NULL)) return 0;
if (!parseClusterNodeAddress(addr, &ip, &port, &bus_port)) return 0;
} else {
ip = argv[0];
port = atoi(argv[1]);
Expand All @@ -3067,6 +3071,7 @@ static int getClusterHostFromCmdArgs(int argc, char **argv,
else {
*ip_ptr = ip;
*port_ptr = port;
if (bus_port_ptr) *bus_port_ptr = bus_port;
}
return 1;
}
Expand Down Expand Up @@ -3140,12 +3145,19 @@ static void freeClusterManager(void) {
dictRelease(clusterManagerUncoveredSlots);
}

static clusterManagerNode *clusterManagerNewNode(char *ip, int port) {
/* We can specify the cluster-node of the node through the bus_port parameter.
* Note the value can be 0, it means:
* 1. It is not specified in --cluster create, use port + 10000 as its value.
* 2. We don't need to know it, at this point this value may be wrong.
* If it is used, it will be corrected in clusterManagerLoadInfoFromNode.
*/
static clusterManagerNode *clusterManagerNewNode(char *ip, int port, int bus_port) {
clusterManagerNode *node = zmalloc(sizeof(*node));
node->context = NULL;
node->name = NULL;
node->ip = ip;
node->port = port;
node->bus_port = bus_port ? bus_port : port + CLUSTER_PORT_INCR;
node->current_epoch = 0;
node->ping_sent = 0;
node->ping_recv = 0;
Expand Down Expand Up @@ -3632,6 +3644,7 @@ static sds clusterManagerNodeGetJSON(clusterManagerNode *node,
" \"name\": \"%s\",\n"
" \"host\": \"%s\",\n"
" \"port\": %d,\n"
" \"bus_port\": %d,\n"
" \"replicate\": %s,\n"
" \"slots\": [%s],\n"
" \"slots_count\": %d,\n"
Expand All @@ -3640,6 +3653,7 @@ static sds clusterManagerNodeGetJSON(clusterManagerNode *node,
node->name,
node->ip,
node->port,
node->bus_port,
replicate,
slots,
node->slots_count,
Expand Down Expand Up @@ -3730,14 +3744,14 @@ static sds clusterManagerNodeInfo(clusterManagerNode *node, int indent) {
char *role = (is_master ? "M" : "S");
sds slots = NULL;
if (node->dirty && node->replicate != NULL)
info = sdscatfmt(info, "S: %S %s:%u", node->name, node->ip, node->port);
info = sdscatfmt(info, "S: %S %s:%u@%u", node->name, node->ip, node->port, node->bus_port);
else {
slots = clusterManagerNodeSlotsString(node);
sds flags = clusterManagerNodeFlagString(node);
info = sdscatfmt(info, "%s: %S %s:%u\n"
info = sdscatfmt(info, "%s: %S %s:%u@%u\n"
"%s slots:%S (%u slots) "
"%S",
role, node->name, node->ip, node->port, spaces,
role, node->name, node->ip, node->port, node->bus_port, spaces,
slots, node->slots_count, flags);
sdsfree(slots);
sdsfree(flags);
Expand Down Expand Up @@ -3800,8 +3814,8 @@ static void clusterManagerShowClusterInfo(void) {
return;
};
if (reply != NULL) freeReplyObject(reply);
printf("%s:%d (%s...) -> %d keys | %d slots | %d slaves.\n",
node->ip, node->port, name, dbsize,
printf("%s:%d@%d (%s...) -> %d keys | %d slots | %d slaves.\n",
node->ip, node->port, node->bus_port, name, dbsize,
node->slots_count, replicas);
masters++;
keys += dbsize;
Expand Down Expand Up @@ -4537,9 +4551,20 @@ static int clusterManagerNodeLoadInfo(clusterManagerNode *node, int opts,
success = 0;
goto cleanup;
}

char *ip = NULL;
int port = 0, bus_port = 0;
if (addr == NULL || !parseClusterNodeAddress(addr, &ip, &port, &bus_port)) {
fprintf(stderr, "Error: invalid CLUSTER NODES reply\n");
success = 0;
goto cleanup;
}

int myself = (strstr(flags, "myself") != NULL);
clusterManagerNode *currentNode = NULL;
if (myself) {
/* bus-port could be wrong, correct it here, see clusterManagerNewNode. */
node->bus_port = bus_port;
node->flags |= CLUSTER_MANAGER_FLAG_MYSELF;
currentNode = node;
clusterManagerNodeResetSlots(node);
Expand Down Expand Up @@ -4607,22 +4632,7 @@ static int clusterManagerNodeLoadInfo(clusterManagerNode *node, int opts,
if (!(node->flags & CLUSTER_MANAGER_FLAG_MYSELF)) continue;
else break;
} else {
if (addr == NULL) {
fprintf(stderr, "Error: invalid CLUSTER NODES reply\n");
success = 0;
goto cleanup;
}
char *c = strrchr(addr, '@');
if (c != NULL) *c = '\0';
c = strrchr(addr, ':');
if (c == NULL) {
fprintf(stderr, "Error: invalid CLUSTER NODES reply\n");
success = 0;
goto cleanup;
}
*c = '\0';
int port = atoi(++c);
currentNode = clusterManagerNewNode(sdsnew(addr), port);
currentNode = clusterManagerNewNode(sdsnew(ip), port, bus_port);
currentNode->flags |= CLUSTER_MANAGER_FLAG_FRIEND;
if (node->friends == NULL) node->friends = listCreate();
listAddNodeTail(node->friends, currentNode);
Expand Down Expand Up @@ -6036,17 +6046,14 @@ static int clusterManagerCommandCreate(int argc, char **argv) {
cluster_manager.nodes = listCreate();
for (i = 0; i < argc; i++) {
char *addr = argv[i];
char *c = strrchr(addr, '@');
if (c != NULL) *c = '\0';
c = strrchr(addr, ':');
if (c == NULL) {
char *ip = NULL;
int port = 0, bus_port = 0;
if (!parseClusterNodeAddress(addr, &ip, &port, &bus_port)) {
fprintf(stderr, "Invalid address format: %s\n", addr);
return 0;
}
*c = '\0';
char *ip = addr;
int port = atoi(++c);
clusterManagerNode *node = clusterManagerNewNode(ip, port);

clusterManagerNode *node = clusterManagerNewNode(ip, port, bus_port);
if (!clusterManagerNodeConnect(node)) {
freeClusterManagerNode(node);
return 0;
Expand Down Expand Up @@ -6243,8 +6250,8 @@ static int clusterManagerCommandCreate(int argc, char **argv) {
continue;
}
redisReply *reply = NULL;
reply = CLUSTER_MANAGER_COMMAND(node, "cluster meet %s %d",
first->ip, first->port);
reply = CLUSTER_MANAGER_COMMAND(node, "cluster meet %s %d %d",
first->ip, first->port, first->bus_port);
int is_err = 0;
if (reply != NULL) {
if ((is_err = reply->type == REDIS_REPLY_ERROR))
Expand Down Expand Up @@ -6313,15 +6320,15 @@ static int clusterManagerCommandAddNode(int argc, char **argv) {
redisReply *function_restore_reply = NULL;
redisReply *function_list_reply = NULL;
char *ref_ip = NULL, *ip = NULL;
int ref_port = 0, port = 0;
if (!getClusterHostFromCmdArgs(argc - 1, argv + 1, &ref_ip, &ref_port))
int ref_port = 0, ref_bus_port = 0, port = 0;
if (!getClusterHostFromCmdArgs(argc - 1, argv + 1, &ref_ip, &ref_port, &ref_bus_port))
goto invalid_args;
if (!getClusterHostFromCmdArgs(1, argv, &ip, &port))
if (!getClusterHostFromCmdArgs(1, argv, &ip, &port, NULL))
goto invalid_args;
clusterManagerLogInfo(">>> Adding node %s:%d to cluster %s:%d\n", ip, port,
ref_ip, ref_port);
// Check the existing cluster
clusterManagerNode *refnode = clusterManagerNewNode(ref_ip, ref_port);
clusterManagerNode *refnode = clusterManagerNewNode(ref_ip, ref_port, 0);
if (!clusterManagerLoadInfoFromNode(refnode)) return 0;
if (!clusterManagerCheckCluster(0)) return 0;

Expand All @@ -6345,7 +6352,7 @@ static int clusterManagerCommandAddNode(int argc, char **argv) {
}

// Add the new node
clusterManagerNode *new_node = clusterManagerNewNode(ip, port);
clusterManagerNode *new_node = clusterManagerNewNode(ip, port, ref_bus_port);
int added = 0;
if (!clusterManagerNodeConnect(new_node)) {
clusterManagerLogErr("[ERR] Sorry, can't connect to node %s:%d\n",
Expand Down Expand Up @@ -6416,8 +6423,8 @@ static int clusterManagerCommandAddNode(int argc, char **argv) {
// Send CLUSTER MEET command to the new node
clusterManagerLogInfo(">>> Send CLUSTER MEET to node %s:%d to make it "
"join the cluster.\n", ip, port);
reply = CLUSTER_MANAGER_COMMAND(new_node, "CLUSTER MEET %s %d",
first->ip, first->port);
reply = CLUSTER_MANAGER_COMMAND(new_node, "CLUSTER MEET %s %d %d",
first->ip, first->port, first->bus_port);
if (!(success = clusterManagerCheckRedisReply(new_node, reply, NULL)))
goto cleanup;

Expand Down Expand Up @@ -6450,11 +6457,11 @@ static int clusterManagerCommandDeleteNode(int argc, char **argv) {
int success = 1;
int port = 0;
char *ip = NULL;
if (!getClusterHostFromCmdArgs(1, argv, &ip, &port)) goto invalid_args;
if (!getClusterHostFromCmdArgs(1, argv, &ip, &port, NULL)) goto invalid_args;
char *node_id = argv[1];
clusterManagerLogInfo(">>> Removing node %s from cluster %s:%d\n",
node_id, ip, port);
clusterManagerNode *ref_node = clusterManagerNewNode(ip, port);
clusterManagerNode *ref_node = clusterManagerNewNode(ip, port, 0);
clusterManagerNode *node = NULL;

// Load cluster information
Expand Down Expand Up @@ -6515,8 +6522,8 @@ static int clusterManagerCommandDeleteNode(int argc, char **argv) {
static int clusterManagerCommandInfo(int argc, char **argv) {
int port = 0;
char *ip = NULL;
if (!getClusterHostFromCmdArgs(argc, argv, &ip, &port)) goto invalid_args;
clusterManagerNode *node = clusterManagerNewNode(ip, port);
if (!getClusterHostFromCmdArgs(argc, argv, &ip, &port, NULL)) goto invalid_args;
clusterManagerNode *node = clusterManagerNewNode(ip, port, 0);
if (!clusterManagerLoadInfoFromNode(node)) return 0;
clusterManagerShowClusterInfo();
return 1;
Expand All @@ -6528,8 +6535,8 @@ static int clusterManagerCommandInfo(int argc, char **argv) {
static int clusterManagerCommandCheck(int argc, char **argv) {
int port = 0;
char *ip = NULL;
if (!getClusterHostFromCmdArgs(argc, argv, &ip, &port)) goto invalid_args;
clusterManagerNode *node = clusterManagerNewNode(ip, port);
if (!getClusterHostFromCmdArgs(argc, argv, &ip, &port, NULL)) goto invalid_args;
clusterManagerNode *node = clusterManagerNewNode(ip, port, 0);
if (!clusterManagerLoadInfoFromNode(node)) return 0;
clusterManagerShowClusterInfo();
return clusterManagerCheckCluster(0);
Expand All @@ -6546,8 +6553,8 @@ static int clusterManagerCommandFix(int argc, char **argv) {
static int clusterManagerCommandReshard(int argc, char **argv) {
int port = 0;
char *ip = NULL;
if (!getClusterHostFromCmdArgs(argc, argv, &ip, &port)) goto invalid_args;
clusterManagerNode *node = clusterManagerNewNode(ip, port);
if (!getClusterHostFromCmdArgs(argc, argv, &ip, &port, NULL)) goto invalid_args;
clusterManagerNode *node = clusterManagerNewNode(ip, port, 0);
if (!clusterManagerLoadInfoFromNode(node)) return 0;
clusterManagerCheckCluster(0);
if (cluster_manager.errors && listLength(cluster_manager.errors) > 0) {
Expand Down Expand Up @@ -6735,8 +6742,8 @@ static int clusterManagerCommandRebalance(int argc, char **argv) {
char *ip = NULL;
clusterManagerNode **weightedNodes = NULL;
list *involved = NULL;
if (!getClusterHostFromCmdArgs(argc, argv, &ip, &port)) goto invalid_args;
clusterManagerNode *node = clusterManagerNewNode(ip, port);
if (!getClusterHostFromCmdArgs(argc, argv, &ip, &port, NULL)) goto invalid_args;
clusterManagerNode *node = clusterManagerNewNode(ip, port, 0);
if (!clusterManagerLoadInfoFromNode(node)) return 0;
int result = 1, i;
if (config.cluster_manager_command.weight != NULL) {
Expand Down Expand Up @@ -6929,15 +6936,15 @@ static int clusterManagerCommandSetTimeout(int argc, char **argv) {
UNUSED(argc);
int port = 0;
char *ip = NULL;
if (!getClusterHostFromCmdArgs(1, argv, &ip, &port)) goto invalid_args;
if (!getClusterHostFromCmdArgs(1, argv, &ip, &port, NULL)) goto invalid_args;
int timeout = atoi(argv[1]);
if (timeout < 100) {
fprintf(stderr, "Setting a node timeout of less than 100 "
"milliseconds is a bad idea.\n");
return 0;
}
// Load cluster information
clusterManagerNode *node = clusterManagerNewNode(ip, port);
clusterManagerNode *node = clusterManagerNewNode(ip, port, 0);
if (!clusterManagerLoadInfoFromNode(node)) return 0;
int ok_count = 0, err_count = 0;

Expand Down Expand Up @@ -6989,7 +6996,7 @@ static int clusterManagerCommandImport(int argc, char **argv) {
char *ip = NULL, *src_ip = NULL;
char *invalid_args_msg = NULL;
sds cmdfmt = NULL;
if (!getClusterHostFromCmdArgs(argc, argv, &ip, &port)) {
if (!getClusterHostFromCmdArgs(argc, argv, &ip, &port, NULL)) {
invalid_args_msg = CLUSTER_MANAGER_INVALID_HOST_ARG;
goto invalid_args;
}
Expand All @@ -6999,15 +7006,15 @@ static int clusterManagerCommandImport(int argc, char **argv) {
goto invalid_args;
}
char *src_host[] = {config.cluster_manager_command.from};
if (!getClusterHostFromCmdArgs(1, src_host, &src_ip, &src_port)) {
if (!getClusterHostFromCmdArgs(1, src_host, &src_ip, &src_port, NULL)) {
invalid_args_msg = "[ERR] Invalid --cluster-from host. You need to "
"pass a valid address (ie. 120.0.0.1:7000).\n";
goto invalid_args;
}
clusterManagerLogInfo(">>> Importing data from %s:%d to cluster %s:%d\n",
src_ip, src_port, ip, port);

clusterManagerNode *refnode = clusterManagerNewNode(ip, port);
clusterManagerNode *refnode = clusterManagerNewNode(ip, port, 0);
if (!clusterManagerLoadInfoFromNode(refnode)) return 0;
if (!clusterManagerCheckCluster(0)) return 0;
char *reply_err = NULL;
Expand Down Expand Up @@ -7141,8 +7148,8 @@ static int clusterManagerCommandImport(int argc, char **argv) {
static int clusterManagerCommandCall(int argc, char **argv) {
int port = 0, i;
char *ip = NULL;
if (!getClusterHostFromCmdArgs(1, argv, &ip, &port)) goto invalid_args;
clusterManagerNode *refnode = clusterManagerNewNode(ip, port);
if (!getClusterHostFromCmdArgs(1, argv, &ip, &port, NULL)) goto invalid_args;
clusterManagerNode *refnode = clusterManagerNewNode(ip, port, 0);
if (!clusterManagerLoadInfoFromNode(refnode)) return 0;
argc--;
argv++;
Expand Down Expand Up @@ -7186,8 +7193,8 @@ static int clusterManagerCommandBackup(int argc, char **argv) {
UNUSED(argc);
int success = 1, port = 0;
char *ip = NULL;
if (!getClusterHostFromCmdArgs(1, argv, &ip, &port)) goto invalid_args;
clusterManagerNode *refnode = clusterManagerNewNode(ip, port);
if (!getClusterHostFromCmdArgs(1, argv, &ip, &port, NULL)) goto invalid_args;
clusterManagerNode *refnode = clusterManagerNewNode(ip, port, 0);
if (!clusterManagerLoadInfoFromNode(refnode)) return 0;
int no_issues = clusterManagerCheckCluster(0);
int cluster_errors_count = (no_issues ? 0 :
Expand Down

0 comments on commit 65bb28f

Please sign in to comment.