diff --git a/src/cluster_legacy.c b/src/cluster_legacy.c index 4f4e826989..b1cae7ae91 100644 --- a/src/cluster_legacy.c +++ b/src/cluster_legacy.c @@ -69,7 +69,7 @@ int clusterAddSlot(clusterNode *n, int slot); int clusterDelSlot(int slot); int clusterDelNodeSlots(clusterNode *node); int clusterNodeSetSlotBit(clusterNode *n, int slot); -void clusterSetPrimary(clusterNode *n, int closeSlots); +static void clusterSetPrimary(clusterNode *n, int closeSlots, int full_sync_required); void clusterHandleReplicaFailover(void); void clusterHandleReplicaMigration(int max_replicas); int bitmapTestBit(unsigned char *bitmap, int pos); @@ -2370,7 +2370,7 @@ int nodeUpdateAddressIfNeeded(clusterNode *node, clusterLink *link, clusterMsg * /* Check if this is our primary and we have to change the * replication target as well. */ if (nodeIsReplica(myself) && myself->replicaof == node) - replicationSetPrimary(node->ip, getNodeDefaultReplicationPort(node)); + replicationSetPrimary(node->ip, getNodeDefaultReplicationPort(node), 0); return 1; } @@ -2432,6 +2432,9 @@ void clusterUpdateSlotsConfigWith(clusterNode *sender, uint64_t senderConfigEpoc return; } + /* Sender and myself in the same shard? */ + int are_in_same_shard = areInSameShard(sender, myself); + for (j = 0; j < CLUSTER_SLOTS; j++) { if (bitmapTestBit(slots, j)) { sender_slots++; @@ -2474,7 +2477,7 @@ void clusterUpdateSlotsConfigWith(clusterNode *sender, uint64_t senderConfigEpoc * the same shard and we should retain the migrating_slots_to state * for the slot in question */ if (server.cluster->migrating_slots_to[j] != NULL) { - if (!areInSameShard(sender, myself)) { + if (!are_in_same_shard) { serverLog(LL_NOTICE, "Slot %d is no longer being migrated to node %.40s (%s) in shard %.40s.", j, server.cluster->migrating_slots_to[j]->name, server.cluster->migrating_slots_to[j]->human_nodename, @@ -2595,7 +2598,7 @@ void clusterUpdateSlotsConfigWith(clusterNode *sender, uint64_t senderConfigEpoc * the new primary if my current config epoch is lower than the * sender's. */ if (!new_primary && myself->replicaof != sender && sender_slots == 0 && myself->numslots == 0 && - nodeEpoch(myself) < senderConfigEpoch && areInSameShard(sender, myself)) { + nodeEpoch(myself) < senderConfigEpoch && are_in_same_shard) { new_primary = sender; } @@ -2619,16 +2622,18 @@ void clusterUpdateSlotsConfigWith(clusterNode *sender, uint64_t senderConfigEpoc * sender. In this case we don't reconfigure ourselves as a replica * of the sender. */ if (new_primary && cur_primary->numslots == 0) { - if (server.cluster_allow_replica_migration || areInSameShard(sender, myself)) { + if (server.cluster_allow_replica_migration || are_in_same_shard) { serverLog(LL_NOTICE, "Configuration change detected. Reconfiguring myself " "as a replica of node %.40s (%s) in shard %.40s", sender->name, sender->human_nodename, sender->shard_id); /* Don't clear the migrating/importing states if this is a replica that - * just gets promoted to the new primary in the shard. */ - clusterSetPrimary(sender, !areInSameShard(sender, myself)); + * just gets promoted to the new primary in the shard. + * + * If the sender and myself are in the same shard, try psync. */ + clusterSetPrimary(sender, !are_in_same_shard, !are_in_same_shard); clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG | CLUSTER_TODO_UPDATE_STATE | CLUSTER_TODO_FSYNC_CONFIG); - } else if ((sender_slots >= migrated_our_slots) && !areInSameShard(sender, myself)) { + } else if ((sender_slots >= migrated_our_slots) && !are_in_same_shard) { /* When all our slots are lost to the sender and the sender belongs to * a different shard, this is likely due to a client triggered slot * migration. Don't reconfigure this node to migrate to the new shard @@ -3383,12 +3388,19 @@ int clusterProcessPacket(clusterLink *link) { /* Explicitly check for a replication loop before attempting the replication * chain folding logic. */ if (myself->replicaof && myself->replicaof->replicaof && myself->replicaof->replicaof != myself) { - /* Safeguard against sub-replicas. A replica's primary can turn itself - * into a replica if its last slot is removed. If no other node takes - * over the slot, there is nothing else to trigger replica migration. */ + /* Safeguard against sub-replicas. + * + * A replica's primary can turn itself into a replica if its last slot + * is removed. If no other node takes over the slot, there is nothing + * else to trigger replica migration. In this case, they are not in the + * same shard, so a full sync is required. + * + * Or a replica's primary can turn itself into a replica of its other + * replica during a failover. In this case, they are in the same shard, + * so we can try a psync. */ serverLog(LL_NOTICE, "I'm a sub-replica! Reconfiguring myself as a replica of %.40s from %.40s", myself->replicaof->replicaof->name, myself->replicaof->name); - clusterSetPrimary(myself->replicaof->replicaof, 1); + clusterSetPrimary(myself->replicaof->replicaof, 1, !areInSameShard(myself->replicaof->replicaof, myself)); clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG | CLUSTER_TODO_UPDATE_STATE | CLUSTER_TODO_FSYNC_CONFIG); } @@ -4692,7 +4704,9 @@ void clusterHandleReplicaMigration(int max_replicas) { !(server.cluster_module_flags & CLUSTER_MODULE_FLAG_NO_FAILOVER)) { serverLog(LL_NOTICE, "Migrating to orphaned primary %.40s (%s) in shard %.40s", target->name, target->human_nodename, target->shard_id); - clusterSetPrimary(target, 1); + /* We are migrating to a different shard that has a completely different + * replication history, so a full sync is required. */ + clusterSetPrimary(target, 1, 1); } } @@ -5005,7 +5019,7 @@ void clusterCron(void) { * enable it if we know the address of our primary and it appears to * be up. */ if (nodeIsReplica(myself) && server.primary_host == NULL && myself->replicaof && nodeHasAddr(myself->replicaof)) { - replicationSetPrimary(myself->replicaof->ip, getNodeDefaultReplicationPort(myself->replicaof)); + replicationSetPrimary(myself->replicaof->ip, getNodeDefaultReplicationPort(myself->replicaof), 0); } /* Abort a manual failover if the timeout is reached. */ @@ -5398,7 +5412,7 @@ static inline void removeAllNotOwnedShardChannelSubscriptions(void) { /* Set the specified node 'n' as primary for this node. * If this node is currently a primary, it is turned into a replica. */ -void clusterSetPrimary(clusterNode *n, int closeSlots) { +static void clusterSetPrimary(clusterNode *n, int closeSlots, int full_sync_required) { serverAssert(n != myself); serverAssert(myself->numslots == 0); @@ -5412,7 +5426,7 @@ void clusterSetPrimary(clusterNode *n, int closeSlots) { myself->replicaof = n; updateShardId(myself, n->shard_id); clusterNodeAddReplica(n, myself); - replicationSetPrimary(n->ip, getNodeDefaultReplicationPort(n)); + replicationSetPrimary(n->ip, getNodeDefaultReplicationPort(n), full_sync_required); removeAllNotOwnedShardChannelSubscriptions(); resetManualFailover(); @@ -6343,7 +6357,9 @@ void clusterCommandSetSlot(client *c) { "Lost my last slot during slot migration. Reconfiguring myself " "as a replica of %.40s (%s) in shard %.40s", n->name, n->human_nodename, n->shard_id); - clusterSetPrimary(n, 1); + /* We are migrating to a different shard that has a completely different + * replication history, so a full sync is required. */ + clusterSetPrimary(n, 1, 1); clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG | CLUSTER_TODO_UPDATE_STATE | CLUSTER_TODO_FSYNC_CONFIG); } @@ -6548,8 +6564,11 @@ int clusterCommandSpecial(client *c) { return 1; } - /* Set the primary. */ - clusterSetPrimary(n, 1); + /* Set the primary. + * If the instance is a primary, it is an empty primary. + * If the instance is a replica, it had a totally different replication history. + * In these both cases, myself as a replica has to do a full sync. */ + clusterSetPrimary(n, 1, 1); clusterBroadcastPong(CLUSTER_BROADCAST_ALL); clusterDoBeforeSleep(CLUSTER_TODO_UPDATE_STATE | CLUSTER_TODO_SAVE_CONFIG); addReply(c, shared.ok); diff --git a/src/replication.c b/src/replication.c index 174a4774dd..560153b8d2 100644 --- a/src/replication.c +++ b/src/replication.c @@ -3726,7 +3726,7 @@ int cancelReplicationHandshake(int reconnect) { } /* Set replication to the specified primary address and port. */ -void replicationSetPrimary(char *ip, int port) { +void replicationSetPrimary(char *ip, int port, int full_sync_required) { int was_primary = server.primary_host == NULL; sdsfree(server.primary_host); @@ -3752,13 +3752,22 @@ void replicationSetPrimary(char *ip, int port) { * sync with new primary. */ cancelReplicationHandshake(0); + /* Before destroying our primary state, create a cached primary using * our own parameters, to later PSYNC with the new primary. */ - if (was_primary) { + if (was_primary && !full_sync_required) { replicationDiscardCachedPrimary(); replicationCachePrimaryUsingMyself(); } + /* If full sync is required, drop the cached primary. Doing so increases + * this replica node's election rank (delay) and reduces its chance of + * winning the election. If a replica requiring a full sync wins the + * election, it will flush valid data in the shard, causing data loss. */ + if (full_sync_required) { + replicationDiscardCachedPrimary(); + } + /* Fire the role change modules event. */ moduleFireServerEvent(VALKEYMODULE_EVENT_REPLICATION_ROLE_CHANGED, VALKEYMODULE_EVENT_REPLROLECHANGED_NOW_REPLICA, NULL); @@ -3896,7 +3905,7 @@ void replicaofCommand(client *c) { } /* There was no previous primary or the user specified a different one, * we can continue. */ - replicationSetPrimary(c->argv[1]->ptr, port); + replicationSetPrimary(c->argv[1]->ptr, port, 0); sds client = catClientInfoString(sdsempty(), c); serverLog(LL_NOTICE, "REPLICAOF %s:%d enabled (user request from '%s')", server.primary_host, server.primary_port, client); @@ -4907,7 +4916,7 @@ void updateFailoverStatus(void) { server.target_replica_port); server.failover_state = FAILOVER_IN_PROGRESS; /* If timeout has expired force a failover if requested. */ - replicationSetPrimary(server.target_replica_host, server.target_replica_port); + replicationSetPrimary(server.target_replica_host, server.target_replica_port, 0); return; } else { /* Force was not requested, so timeout. */ @@ -4950,6 +4959,6 @@ void updateFailoverStatus(void) { serverLog(LL_NOTICE, "Failover target %s:%d is synced, failing over.", server.target_replica_host, server.target_replica_port); /* Designated replica is caught up, failover to it. */ - replicationSetPrimary(server.target_replica_host, server.target_replica_port); + replicationSetPrimary(server.target_replica_host, server.target_replica_port, 0); } } diff --git a/src/server.h b/src/server.h index 7fb7b2a4e5..c4c6e1fc41 100644 --- a/src/server.h +++ b/src/server.h @@ -3029,7 +3029,7 @@ void replicationStartPendingFork(void); void replicationHandlePrimaryDisconnection(void); void replicationCachePrimary(client *c); void resizeReplicationBacklog(void); -void replicationSetPrimary(char *ip, int port); +void replicationSetPrimary(char *ip, int port, int full_sync_required); void replicationUnsetPrimary(void); void refreshGoodReplicasCount(void); int checkGoodReplicasStatus(void); diff --git a/tests/unit/cluster/replica_migration.tcl b/tests/unit/cluster/replica_migration.tcl new file mode 100644 index 0000000000..4d7efa0b70 --- /dev/null +++ b/tests/unit/cluster/replica_migration.tcl @@ -0,0 +1,159 @@ +# Allocate slot 0 to the last primary and evenly distribute the remaining +# slots to the remaining primaries. +proc my_slot_allocation {masters replicas} { + set avg [expr double(16384) / [expr $masters-1]] + set slot_start 1 + for {set j 0} {$j < $masters-1} {incr j} { + set slot_end [expr int(ceil(($j + 1) * $avg) - 1)] + R $j cluster addslotsrange $slot_start $slot_end + set slot_start [expr $slot_end + 1] + } + R [expr $masters-1] cluster addslots 0 +} + +start_cluster 4 4 {tags {external:skip cluster} overrides {cluster-node-timeout 1000 cluster-migration-barrier 999}} { + test "Migrated replica reports zero repl offset and rank, and fails to win election" { + # Write some data to primary 0, slot 1, make a small repl_offset. + for {set i 0} {$i < 1024} {incr i} { + R 0 incr key_991803 + } + assert_equal {1024} [R 0 get key_991803] + + # Write some data to primary 3, slot 0, make a big repl_offset. + for {set i 0} {$i < 10240} {incr i} { + R 3 incr key_977613 + } + assert_equal {10240} [R 3 get key_977613] + + # 10s, make sure primary 0 will hang in the save. + R 0 config set rdb-key-save-delay 100000000 + + # Move the slot 0 from primary 3 to primary 0 + set addr "[srv 0 host]:[srv 0 port]" + set myid [R 3 CLUSTER MYID] + set code [catch { + exec src/valkey-cli {*}[valkeycli_tls_config "./tests"] --cluster rebalance $addr --cluster-weight $myid=0 + } result] + if {$code != 0} { + fail "valkey-cli --cluster rebalance returns non-zero exit code, output below:\n$result" + } + + # Validate that shard 3's primary and replica can convert to replicas after + # they lose the last slot. + R 3 config set cluster-replica-validity-factor 0 + R 7 config set cluster-replica-validity-factor 0 + R 3 config set cluster-allow-replica-migration yes + R 7 config set cluster-allow-replica-migration yes + + # Shutdown primary 0. + catch {R 0 shutdown nosave} + + # Wait for the replica to become a primary, and make sure + # the other primary become a replica. + wait_for_condition 1000 50 { + [s -4 role] eq {master} && + [s -3 role] eq {slave} && + [s -7 role] eq {slave} + } else { + puts "s -4 role: [s -4 role]" + puts "s -3 role: [s -3 role]" + puts "s -7 role: [s -7 role]" + fail "Failover does not happened" + } + + # Make sure the offset of server 3 / 7 is 0. + verify_log_message -3 "*Start of election*offset 0*" 0 + verify_log_message -7 "*Start of election*offset 0*" 0 + + # Make sure the right replica gets the higher rank. + verify_log_message -4 "*Start of election*rank #0*" 0 + + # Wait for the cluster to be ok. + wait_for_condition 1000 50 { + [CI 3 cluster_state] eq "ok" && + [CI 4 cluster_state] eq "ok" && + [CI 7 cluster_state] eq "ok" + } else { + puts "R 3: [R 3 cluster info]" + puts "R 4: [R 4 cluster info]" + puts "R 7: [R 7 cluster info]" + fail "Cluster is down" + } + + # Make sure the key exists and is consistent. + R 3 readonly + R 7 readonly + wait_for_condition 1000 50 { + [R 3 get key_991803] == 1024 && [R 3 get key_977613] == 10240 && + [R 4 get key_991803] == 1024 && [R 4 get key_977613] == 10240 && + [R 7 get key_991803] == 1024 && [R 7 get key_977613] == 10240 + } else { + puts "R 3: [R 3 keys *]" + puts "R 4: [R 4 keys *]" + puts "R 7: [R 7 keys *]" + fail "Key not consistent" + } + } +} my_slot_allocation cluster_allocate_replicas ;# start_cluster + +start_cluster 4 4 {tags {external:skip cluster} overrides {cluster-node-timeout 1000 cluster-migration-barrier 999}} { + test "New non-empty replica reports zero repl offset and rank, and fails to win election" { + # Write some data to primary 0, slot 1, make a small repl_offset. + for {set i 0} {$i < 1024} {incr i} { + R 0 incr key_991803 + } + assert_equal {1024} [R 0 get key_991803] + + # Write some data to primary 3, slot 0, make a big repl_offset. + for {set i 0} {$i < 10240} {incr i} { + R 3 incr key_977613 + } + assert_equal {10240} [R 3 get key_977613] + + # 10s, make sure primary 0 will hang in the save. + R 0 config set rdb-key-save-delay 100000000 + + # Make server 7 a replica of server 0. + R 7 config set cluster-replica-validity-factor 0 + R 7 config set cluster-allow-replica-migration yes + R 7 cluster replicate [R 0 cluster myid] + + # Shutdown primary 0. + catch {R 0 shutdown nosave} + + # Wait for the replica to become a primary. + wait_for_condition 1000 50 { + [s -4 role] eq {master} && + [s -7 role] eq {slave} + } else { + puts "s -4 role: [s -4 role]" + puts "s -7 role: [s -7 role]" + fail "Failover does not happened" + } + + # Make sure server 7 gets the lower rank and it's offset is 0. + verify_log_message -4 "*Start of election*rank #0*" 0 + verify_log_message -7 "*Start of election*offset 0*" 0 + + # Wait for the cluster to be ok. + wait_for_condition 1000 50 { + [CI 4 cluster_state] eq "ok" && + [CI 7 cluster_state] eq "ok" + } else { + puts "R 4: [R 4 cluster info]" + puts "R 7: [R 7 cluster info]" + fail "Cluster is down" + } + + # Make sure the key exists and is consistent. + R 7 readonly + wait_for_condition 1000 50 { + [R 4 get key_991803] == 1024 && + [R 7 get key_991803] == 1024 + } else { + puts "R 4: [R 4 get key_991803]" + puts "R 7: [R 7 get key_991803]" + fail "Key not consistent" + } + } +} my_slot_allocation cluster_allocate_replicas ;# start_cluster