Skip to content

Commit

Permalink
fix: merge fix from etcd etcd-io/etcd#12288
Browse files Browse the repository at this point in the history
  • Loading branch information
absolute8511 committed Jan 18, 2021
1 parent 54866dd commit ac931ef
Show file tree
Hide file tree
Showing 2 changed files with 119 additions and 3 deletions.
7 changes: 6 additions & 1 deletion node/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -504,6 +504,7 @@ func (rc *raftNode) restartAsStandaloneNode(cfg *raft.Config, snapshot *raftpb.S
// ID-related entry:
// - ConfChangeAddNode, in which case the contained ID will be added into the set.
// - ConfChangeRemoveNode, in which case the contained ID will be removed from the set.
// - ConfChangeAddLearnerNode, in which the contained ID will be added into the set.
func getIDsAndGroups(snap *raftpb.Snapshot, ents []raftpb.Entry) ([]uint64, map[uint64]raftpb.Group) {
ids := make(map[uint64]bool)
grps := make(map[uint64]raftpb.Group)
Expand All @@ -528,12 +529,16 @@ func getIDsAndGroups(snap *raftpb.Snapshot, ents []raftpb.Entry) ([]uint64, map[
var cc raftpb.ConfChange
cc.Unmarshal(e.Data)
switch cc.Type {
case raftpb.ConfChangeAddLearnerNode:
// https://github.com/etcd-io/etcd/pull/12288
ids[cc.ReplicaID] = true
grps[cc.NodeGroup.RaftReplicaId] = cc.NodeGroup
case raftpb.ConfChangeAddNode:
ids[cc.ReplicaID] = true
grps[cc.NodeGroup.RaftReplicaId] = cc.NodeGroup
case raftpb.ConfChangeRemoveNode:
delete(ids, cc.ReplicaID)
delete(grps, cc.ReplicaID)
delete(grps, cc.NodeGroup.RaftReplicaId)
case raftpb.ConfChangeUpdateNode:
// do nothing
default:
Expand Down
115 changes: 113 additions & 2 deletions pdserver/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1814,8 +1814,119 @@ func TestClusterDecrReplicaOneByOne(t *testing.T) {
}
}

func TestRestartWithForceAlone(t *testing.T) {
// TODO: test force restart with alone
func TestRestartWithForceAloneWithLearnerAndRemovedNode(t *testing.T) {
// test force restart with alone
// test force start as alone for normal node and for learner node
// and test force restart as alone for have removed node before
node.EnableForTest()
ensureClusterReady(t, 3)

time.Sleep(time.Second)
ns := "test_force_restart_alone_after_add_learner_remove_node"
partNum := 1

pduri := "http://127.0.0.1:" + pdHttpPort
ensureDataNodesReady(t, pduri, len(gkvList))
enableAutoBalance(t, pduri, true)
ensureNamespace(t, pduri, ns, partNum, 3)
defer ensureDeleteNamespace(t, pduri, ns)

dnw, leaderNode := waitForLeader(t, ns, 0)
leader := dnw.s
assert.NotNil(t, leader)
leaderNode.Node.OptimizeDB("")

node.SetSyncerOnly(true)
defer node.SetSyncerOnly(false)
remotePD, remoteSrvs, remoteTmpDir := startRemoteSyncTestCluster(t, 1)
defer func() {
for _, kv := range remoteSrvs {
kv.s.Stop()
}
if remotePD != nil {
remotePD.Stop()
}
if strings.Contains(remoteTmpDir, "rocksdb-test") {
t.Logf("removing: %v", remoteTmpDir)
os.RemoveAll(remoteTmpDir)
}
}()
pduri = "http://127.0.0.1:" + pdRemoteHttpPort
for _, lrnSrv := range remoteSrvs {
lrnSrv.s.GetCoord().UpdateSyncerWriteOnly(true)
}
ensureDataNodesReady(t, pduri, len(remoteSrvs))
enableAutoBalance(t, pduri, true)
ensureNamespace(t, pduri, ns, partNum, 1)
defer ensureDeleteNamespace(t, pduri, ns)

learnerPD, learnerSrvs, tmpDir := startTestClusterForLearner(t, 1)
defer func() {
for _, kv := range learnerSrvs {
kv.s.Stop()
}
if learnerPD != nil {
learnerPD.Stop()
}
if strings.Contains(tmpDir, "learner-test") {
t.Logf("removing: %v", tmpDir)
os.RemoveAll(tmpDir)
}
}()
time.Sleep(time.Second * 3)

t.Logf("begin wait first before restart")
waitRemoteClusterSync(t, ns, leaderNode, learnerSrvs, remoteSrvs)
oldNs := getNsInfo(t, ns, 0)
t.Logf("new isr is: %v", oldNs)
assert.Equal(t, 1, len(oldNs.LearnerNodes))

err := gpdServer.pdCoord.ChangeNamespaceMetaParam(ns, 1, "", 0)
assert.Nil(t, err)

time.Sleep(time.Second * 10)
for {
time.Sleep(time.Second)
newNs := getNsInfo(t, ns, 0)
t.Logf("new isr is: %v", newNs)
waitForAllFullReady(t, ns, 0)
if len(newNs.GetISR()) < 2 {
break
}
}

waitEnoughReplica(t, ns, 0)
waitForAllFullReady(t, ns, 0)
waitBalancedLeader(t, ns, 0)

// restart leader as alone
leader.RestartAsStandalone(common.GetNsDesp(ns, 0))
time.Sleep(time.Second)

waitEnoughReplica(t, ns, 0)
waitForAllFullReady(t, ns, 0)

dnw, leaderNode = waitForLeader(t, ns, 0)
leader = dnw.s
assert.NotNil(t, leader)
leaderNode.Node.OptimizeDB("")
time.Sleep(time.Second * 3)

err = gpdServer.pdCoord.ChangeNamespaceMetaParam(ns, 2, "", 0)
assert.Nil(t, err)

for {
time.Sleep(time.Second)
newNs := getNsInfo(t, ns, 0)
t.Logf("new isr is: %v", newNs)
waitForAllFullReady(t, ns, 0)
if len(newNs.GetISR()) == 2 {
break
}
}
waitEnoughReplica(t, ns, 0)
waitForAllFullReady(t, ns, 0)
waitBalancedLeader(t, ns, 0)
}

func TestInstallSnapshotOnFollower(t *testing.T) {
Expand Down

0 comments on commit ac931ef

Please sign in to comment.