Skip to content

Commit

Permalink
tests: Trigger raftBeforeLeaderSend
Browse files Browse the repository at this point in the history
Signed-off-by: Marek Siarkowicz <siarkowicz@google.com>
  • Loading branch information
serathius committed Nov 17, 2022
1 parent bf3eea8 commit f002c49
Show file tree
Hide file tree
Showing 4 changed files with 117 additions and 102 deletions.
66 changes: 66 additions & 0 deletions tests/framework/e2e/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -822,3 +822,69 @@ func findMemberIDByEndpoint(members []*etcdserverpb.Member, endpoint string) (ui

return 0, fmt.Errorf("member not found")
}

// WaitLeader returns index of the member in c.Members() that is leader
// or fails the test (if not established in 30s).
func (epc *EtcdProcessCluster) WaitLeader(t testing.TB) int {
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
return epc.WaitMembersForLeader(ctx, t, epc.Procs)
}

// WaitMembersForLeader waits until given members agree on the same leader,
// and returns its 'index' in the 'membs' list
func (epc *EtcdProcessCluster) WaitMembersForLeader(ctx context.Context, t testing.TB, membs []EtcdProcess) int {
cc := epc.Client()

// ensure leader is up via linearizable get
for {
select {
case <-ctx.Done():
t.Fatal("WaitMembersForLeader timeout")
default:
}
_, err := cc.Get(ctx, "0", config.GetOptions{Timeout: 10*config.TickDuration + time.Second})
if err == nil || strings.Contains(err.Error(), "Key not found") {
break
}
}

leaders := make(map[uint64]struct{})
members := make(map[uint64]int)
for {
select {
case <-ctx.Done():
t.Fatal("WaitMembersForLeader timeout")
default:
}
for i := range membs {
resp, err := membs[i].Client().Status(ctx)
if err != nil {
if strings.Contains(err.Error(), "connection refused") {
// if member[i] has stopped
continue
} else {
t.Fatal(err)
}
}
members[resp[0].Header.MemberId] = i
leaders[resp[0].Leader] = struct{}{}
}
// members agree on the same leader
if len(leaders) == 1 {
break
}
leaders = make(map[uint64]struct{})
members = make(map[uint64]int)
time.Sleep(10 * config.TickDuration)
}
for l := range leaders {
if index, ok := members[l]; ok {
t.Logf("members agree on a leader, members:%v , leader:%v", members, l)
return index
}
t.Fatalf("members agree on a leader which is not one of members, members:%v , leader:%v", members, l)
}
t.Fatal("impossible path of execution")
return -1
}
69 changes: 0 additions & 69 deletions tests/framework/e2e/e2e.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,11 @@ package e2e
import (
"context"
"os"
"strings"
"testing"
"time"

"go.etcd.io/etcd/client/pkg/v3/testutil"
"go.etcd.io/etcd/tests/v3/framework/config"
intf "go.etcd.io/etcd/tests/v3/framework/interfaces"
"go.etcd.io/etcd/tests/v3/framework/testutils"
)

type e2eRunner struct{}
Expand Down Expand Up @@ -114,72 +111,6 @@ func (c *e2eCluster) Members() (ms []intf.Member) {
return ms
}

// WaitLeader returns index of the member in c.Members() that is leader
// or fails the test (if not established in 30s).
func (c *e2eCluster) WaitLeader(t testing.TB) int {
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
return c.WaitMembersForLeader(ctx, t, c.Members())
}

// WaitMembersForLeader waits until given members agree on the same leader,
// and returns its 'index' in the 'membs' list
func (c *e2eCluster) WaitMembersForLeader(ctx context.Context, t testing.TB, membs []intf.Member) int {
cc := testutils.MustClient(c.Client())

// ensure leader is up via linearizable get
for {
select {
case <-ctx.Done():
t.Fatal("WaitMembersForLeader timeout")
default:
}
_, err := cc.Get(ctx, "0", config.GetOptions{Timeout: 10*config.TickDuration + time.Second})
if err == nil || strings.Contains(err.Error(), "Key not found") {
break
}
}

leaders := make(map[uint64]struct{})
members := make(map[uint64]int)
for {
select {
case <-ctx.Done():
t.Fatal("WaitMembersForLeader timeout")
default:
}
for i := range membs {
resp, err := membs[i].Client().Status(ctx)
if err != nil {
if strings.Contains(err.Error(), "connection refused") {
// if member[i] has stopped
continue
} else {
t.Fatal(err)
}
}
members[resp[0].Header.MemberId] = i
leaders[resp[0].Leader] = struct{}{}
}
// members agree on the same leader
if len(leaders) == 1 {
break
}
leaders = make(map[uint64]struct{})
members = make(map[uint64]int)
time.Sleep(10 * config.TickDuration)
}
for l := range leaders {
if index, ok := members[l]; ok {
t.Logf("members agree on a leader, members:%v , leader:%v", members, l)
return index
}
t.Fatalf("members agree on a leader which is not one of members, members:%v , leader:%v", members, l)
}
t.Fatal("impossible path of execution")
return -1
}

type e2eClient struct {
*EtcdctlV3
}
Expand Down
82 changes: 50 additions & 32 deletions tests/linearizability/failpoints.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"net/http"
"net/url"
"strings"
"testing"
"time"

"go.uber.org/zap"
Expand All @@ -32,24 +33,25 @@ import (

var (
KillFailpoint Failpoint = killFailpoint{}
DefragBeforeCopyPanic Failpoint = goFailpoint{"backend/defragBeforeCopy", "panic", triggerDefrag}
DefragBeforeRenamePanic Failpoint = goFailpoint{"backend/defragBeforeRename", "panic", triggerDefrag}
BeforeCommitPanic Failpoint = goFailpoint{"backend/beforeCommit", "panic", nil}
AfterCommitPanic Failpoint = goFailpoint{"backend/afterCommit", "panic", nil}
RaftBeforeSavePanic Failpoint = goFailpoint{"etcdserver/raftBeforeSave", "panic", nil}
RaftAfterSavePanic Failpoint = goFailpoint{"etcdserver/raftAfterSave", "panic", nil}
BackendBeforePreCommitHookPanic Failpoint = goFailpoint{"backend/commitBeforePreCommitHook", "panic", nil}
BackendAfterPreCommitHookPanic Failpoint = goFailpoint{"backend/commitAfterPreCommitHook", "panic", nil}
BackendBeforeStartDBTxnPanic Failpoint = goFailpoint{"backend/beforeStartDBTxn", "panic", nil}
BackendAfterStartDBTxnPanic Failpoint = goFailpoint{"backend/afterStartDBTxn", "panic", nil}
BackendBeforeWritebackBufPanic Failpoint = goFailpoint{"backend/beforeWritebackBuf", "panic", nil}
BackendAfterWritebackBufPanic Failpoint = goFailpoint{"backend/afterWritebackBuf", "panic", nil}
CompactBeforeCommitScheduledCompactPanic Failpoint = goFailpoint{"mvcc/compactBeforeCommitScheduledCompact", "panic", triggerCompact}
CompactAfterCommitScheduledCompactPanic Failpoint = goFailpoint{"mvcc/compactAfterCommitScheduledCompact", "panic", triggerCompact}
CompactBeforeSetFinishedCompactPanic Failpoint = goFailpoint{"mvcc/compactBeforeSetFinishedCompact", "panic", triggerCompact}
CompactAfterSetFinishedCompactPanic Failpoint = goFailpoint{"mvcc/compactAfterSetFinishedCompact", "panic", triggerCompact}
CompactBeforeCommitBatchPanic Failpoint = goFailpoint{"mvcc/compactBeforeCommitBatch", "panic", triggerCompact}
CompactAfterCommitBatchPanic Failpoint = goFailpoint{"mvcc/compactAfterCommitBatch", "panic", triggerCompact}
DefragBeforeCopyPanic Failpoint = goFailpoint{"backend/defragBeforeCopy", "panic", triggerDefrag, AnyMember}
DefragBeforeRenamePanic Failpoint = goFailpoint{"backend/defragBeforeRename", "panic", triggerDefrag, AnyMember}
BeforeCommitPanic Failpoint = goFailpoint{"backend/beforeCommit", "panic", nil, AnyMember}
AfterCommitPanic Failpoint = goFailpoint{"backend/afterCommit", "panic", nil, AnyMember}
RaftBeforeSavePanic Failpoint = goFailpoint{"etcdserver/raftBeforeSave", "panic", nil, AnyMember}
RaftAfterSavePanic Failpoint = goFailpoint{"etcdserver/raftAfterSave", "panic", nil, AnyMember}
BackendBeforePreCommitHookPanic Failpoint = goFailpoint{"backend/commitBeforePreCommitHook", "panic", nil, AnyMember}
BackendAfterPreCommitHookPanic Failpoint = goFailpoint{"backend/commitAfterPreCommitHook", "panic", nil, AnyMember}
BackendBeforeStartDBTxnPanic Failpoint = goFailpoint{"backend/beforeStartDBTxn", "panic", nil, AnyMember}
BackendAfterStartDBTxnPanic Failpoint = goFailpoint{"backend/afterStartDBTxn", "panic", nil, AnyMember}
BackendBeforeWritebackBufPanic Failpoint = goFailpoint{"backend/beforeWritebackBuf", "panic", nil, AnyMember}
BackendAfterWritebackBufPanic Failpoint = goFailpoint{"backend/afterWritebackBuf", "panic", nil, AnyMember}
CompactBeforeCommitScheduledCompactPanic Failpoint = goFailpoint{"mvcc/compactBeforeCommitScheduledCompact", "panic", triggerCompact, AnyMember}
CompactAfterCommitScheduledCompactPanic Failpoint = goFailpoint{"mvcc/compactAfterCommitScheduledCompact", "panic", triggerCompact, AnyMember}
CompactBeforeSetFinishedCompactPanic Failpoint = goFailpoint{"mvcc/compactBeforeSetFinishedCompact", "panic", triggerCompact, AnyMember}
CompactAfterSetFinishedCompactPanic Failpoint = goFailpoint{"mvcc/compactAfterSetFinishedCompact", "panic", triggerCompact, AnyMember}
CompactBeforeCommitBatchPanic Failpoint = goFailpoint{"mvcc/compactBeforeCommitBatch", "panic", triggerCompact, AnyMember}
CompactAfterCommitBatchPanic Failpoint = goFailpoint{"mvcc/compactAfterCommitBatch", "panic", triggerCompact, AnyMember}
RaftBeforeLeaderSendPanic Failpoint = goFailpoint{"etcdserver/raftBeforeLeaderSend", "panic", nil, Leader}
RandomFailpoint Failpoint = randomFailpoint{[]Failpoint{
KillFailpoint, BeforeCommitPanic, AfterCommitPanic, RaftBeforeSavePanic,
RaftAfterSavePanic, DefragBeforeCopyPanic, DefragBeforeRenamePanic,
Expand All @@ -59,25 +61,25 @@ var (
CompactBeforeCommitScheduledCompactPanic, CompactAfterCommitScheduledCompactPanic,
CompactBeforeSetFinishedCompactPanic, CompactAfterSetFinishedCompactPanic,
CompactBeforeCommitBatchPanic, CompactAfterCommitBatchPanic,
RaftBeforeLeaderSendPanic,
}}
// TODO: Figure out how to reliably trigger below failpoints and add them to RandomFailpoint
raftBeforeLeaderSendPanic Failpoint = goFailpoint{"etcdserver/raftBeforeLeaderSend", "panic", nil}
raftBeforeApplySnapPanic Failpoint = goFailpoint{"etcdserver/raftBeforeApplySnap", "panic", nil}
raftAfterApplySnapPanic Failpoint = goFailpoint{"etcdserver/raftAfterApplySnap", "panic", nil}
raftAfterWALReleasePanic Failpoint = goFailpoint{"etcdserver/raftAfterWALRelease", "panic", nil}
raftBeforeFollowerSendPanic Failpoint = goFailpoint{"etcdserver/raftBeforeFollowerSend", "panic", nil}
raftBeforeSaveSnapPanic Failpoint = goFailpoint{"etcdserver/raftBeforeSaveSnap", "panic", nil}
raftAfterSaveSnapPanic Failpoint = goFailpoint{"etcdserver/raftAfterSaveSnap", "panic", nil}
raftBeforeApplySnapPanic Failpoint = goFailpoint{"etcdserver/raftBeforeApplySnap", "panic", nil, AnyMember}
raftAfterApplySnapPanic Failpoint = goFailpoint{"etcdserver/raftAfterApplySnap", "panic", nil, AnyMember}
raftAfterWALReleasePanic Failpoint = goFailpoint{"etcdserver/raftAfterWALRelease", "panic", nil, AnyMember}
raftBeforeFollowerSendPanic Failpoint = goFailpoint{"etcdserver/raftBeforeFollowerSend", "panic", nil, AnyMember}
raftBeforeSaveSnapPanic Failpoint = goFailpoint{"etcdserver/raftBeforeSaveSnap", "panic", nil, AnyMember}
raftAfterSaveSnapPanic Failpoint = goFailpoint{"etcdserver/raftAfterSaveSnap", "panic", nil, AnyMember}
)

type Failpoint interface {
Trigger(ctx context.Context, clus *e2e.EtcdProcessCluster) error
Trigger(t *testing.T, ctx context.Context, clus *e2e.EtcdProcessCluster) error
Name() string
}

type killFailpoint struct{}

func (f killFailpoint) Trigger(ctx context.Context, clus *e2e.EtcdProcessCluster) error {
func (f killFailpoint) Trigger(t *testing.T, ctx context.Context, clus *e2e.EtcdProcessCluster) error {
member := clus.Procs[rand.Int()%len(clus.Procs)]
err := member.Kill()
if err != nil {
Expand All @@ -102,10 +104,26 @@ type goFailpoint struct {
failpoint string
payload string
trigger func(ctx context.Context, member e2e.EtcdProcess) error
target failpointTarget
}

func (f goFailpoint) Trigger(ctx context.Context, clus *e2e.EtcdProcessCluster) error {
member := clus.Procs[rand.Int()%len(clus.Procs)]
type failpointTarget string

const (
AnyMember failpointTarget = "AnyMember"
Leader failpointTarget = "Leader"
)

func (f goFailpoint) Trigger(t *testing.T, ctx context.Context, clus *e2e.EtcdProcessCluster) error {
var member e2e.EtcdProcess
switch f.target {
case AnyMember:
member = clus.Procs[rand.Int()%len(clus.Procs)]
case Leader:
member = clus.Procs[clus.WaitLeader(t)]
default:
panic("unknown target")
}
address := fmt.Sprintf("127.0.0.1:%d", member.Config().GoFailPort)
err := setupGoFailpoint(address, f.failpoint, f.payload)
if err != nil {
Expand Down Expand Up @@ -201,10 +219,10 @@ type randomFailpoint struct {
failpoints []Failpoint
}

func (f randomFailpoint) Trigger(ctx context.Context, clus *e2e.EtcdProcessCluster) error {
func (f randomFailpoint) Trigger(t *testing.T, ctx context.Context, clus *e2e.EtcdProcessCluster) error {
failpoint := f.failpoints[rand.Int()%len(f.failpoints)]
fmt.Printf("Triggering %v failpoint\n", failpoint.Name())
return failpoint.Trigger(ctx, clus)
t.Logf("Triggering %v failpoint\n", failpoint.Name())
return failpoint.Trigger(t, ctx, clus)
}

func (f randomFailpoint) Name() string {
Expand Down
2 changes: 1 addition & 1 deletion tests/linearizability/linearizability_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ func triggerFailpoints(ctx context.Context, t *testing.T, clus *e2e.EtcdProcessC
failures := 0
time.Sleep(config.waitBetweenTriggers)
for successes < config.count && failures < config.count {
err = config.failpoint.Trigger(ctx, clus)
err = config.failpoint.Trigger(t, ctx, clus)
if err != nil {
t.Logf("Failed to trigger failpoint %q, err: %v\n", config.failpoint.Name(), err)
failures++
Expand Down

0 comments on commit f002c49

Please sign in to comment.