Skip to content

Commit

Permalink
Merge pull request #17398 from ivanvc/3.4-backport-e2e-waitleader
Browse files Browse the repository at this point in the history
[3.4] tests/e2e: backport WaitLeader
  • Loading branch information
ahrtr authored Feb 9, 2024
2 parents 1a6eaca + 9c067a3 commit 380175a
Show file tree
Hide file tree
Showing 3 changed files with 91 additions and 0 deletions.
69 changes: 69 additions & 0 deletions tests/e2e/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package e2e

import (
"context"
"fmt"
"io/ioutil"
"net/url"
Expand Down Expand Up @@ -501,3 +502,71 @@ func (epc *etcdProcessCluster) WithStopSignal(sig os.Signal) (ret os.Signal) {
}
return ret
}

// WaitLeader returns index of the member in c.Members() that is leader
// or fails the test (if not established in 30s).
func (epc *etcdProcessCluster) WaitLeader(t testing.TB) int {
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
return epc.WaitMembersForLeader(ctx, t, epc.procs)
}

// WaitMembersForLeader waits until given members agree on the same leader,
// and returns its 'index' in the 'membs' list
func (epc *etcdProcessCluster) WaitMembersForLeader(ctx context.Context, t testing.TB, membs []etcdProcess) int {
cc := NewEtcdctl(epc.EndpointsV3(), epc.cfg.clientTLS, epc.cfg.isClientAutoTLS, epc.cfg.enableV2)

// ensure leader is up via linearizable get
for {
select {
case <-ctx.Done():
t.Fatal("WaitMembersForLeader timeout")
default:
}
_, err := cc.Get("0")
if err == nil || strings.Contains(err.Error(), "Key not found") {
break
}
t.Logf("WaitMembersForLeader Get err: %v", err)
}

leaders := make(map[uint64]struct{})
members := make(map[uint64]int)
for {
select {
case <-ctx.Done():
t.Fatal("WaitMembersForLeader timeout")
default:
}
for i := range membs {
resp, err := membs[i].Etcdctl(epc.cfg.clientTLS, epc.cfg.isClientAutoTLS, epc.cfg.enableV2).Status()
if err != nil {
if strings.Contains(err.Error(), "connection refused") {
// if member[i] has stopped
continue
} else {
t.Fatal(err)
}
}
members[resp[0].Header.MemberId] = i
leaders[resp[0].Leader] = struct{}{}
}
// members agree on the same leader
if len(leaders) == 1 {
break
}
leaders = make(map[uint64]struct{})
members = make(map[uint64]int)
// From main branch 10 * config.TickDuration (10 * time.Millisecond)
time.Sleep(100 * time.Millisecond)
}
for l := range leaders {
if index, ok := members[l]; ok {
t.Logf("members agree on a leader, members:%v , leader:%v", members, l)
return index
}
t.Fatalf("members agree on a leader which is not one of members, members:%v , leader:%v", members, l)
}
t.Fatal("impossible path of execution")
return -1
}
6 changes: 6 additions & 0 deletions tests/e2e/etcd_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ type etcdProcess interface {
PeerProxy() proxy.Server
Failpoints() *BinaryFailpoints
IsRunning() bool

Etcdctl(connType clientConnType, isAutoTLS bool, v2 bool) *Etcdctl
}

type logsExpect interface {
Expand Down Expand Up @@ -223,6 +225,10 @@ func (ep *etcdServerProcess) IsRunning() bool {
return false
}

func (ep *etcdServerProcess) Etcdctl(connType clientConnType, isAutoTLS, v2 bool) *Etcdctl {
return NewEtcdctl(ep.EndpointsV3(), connType, isAutoTLS, v2)
}

type BinaryFailpoints struct {
member etcdProcess
availableCache map[string]string
Expand Down
16 changes: 16 additions & 0 deletions tests/e2e/etcdctl.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,22 @@ func (ctl *Etcdctl) Compact(rev int64) (*clientv3.CompactResponse, error) {
return nil, spawnWithExpect(args, fmt.Sprintf("compacted revision %v", rev))
}

func (ctl *Etcdctl) Status() ([]*clientv3.StatusResponse, error) {
var epStatus []*struct {
Endpoint string
Status *clientv3.StatusResponse
}
err := ctl.spawnJsonCmd(&epStatus, "endpoint", "status")
if err != nil {
return nil, err
}
resp := make([]*clientv3.StatusResponse, len(epStatus))
for i, e := range epStatus {
resp[i] = e.Status
}
return resp, err
}

func (ctl *Etcdctl) spawnJsonCmd(output interface{}, expectedOutput string, args ...string) error {
args = append(args, "-w", "json")
cmd, err := spawnCmd(append(ctl.cmdArgs(), args...))
Expand Down

0 comments on commit 380175a

Please sign in to comment.