Skip to content

Commit

Permalink
Merge pull request #18181 from serathius/robustness-compact-lazyfs
Browse files Browse the repository at this point in the history
Avoid sending Compact request when LazyFS is enabled
  • Loading branch information
serathius authored Jun 18, 2024
2 parents 2deefb0 + 2e04ee7 commit c70e0e4
Show file tree
Hide file tree
Showing 12 changed files with 132 additions and 63 deletions.
3 changes: 2 additions & 1 deletion tests/robustness/failpoint/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"go.etcd.io/etcd/tests/v3/robustness/client"
"go.etcd.io/etcd/tests/v3/robustness/identity"
"go.etcd.io/etcd/tests/v3/robustness/report"
"go.etcd.io/etcd/tests/v3/robustness/traffic"
)

var (
Expand Down Expand Up @@ -135,7 +136,7 @@ func (f memberReplace) Name() string {
return "MemberReplace"
}

func (f memberReplace) Available(config e2e.EtcdProcessClusterConfig, member e2e.EtcdProcess) bool {
func (f memberReplace) Available(config e2e.EtcdProcessClusterConfig, member e2e.EtcdProcess, profile traffic.Profile) bool {
// a lower etcd version may not be able to join a cluster with higher cluster version.
return config.ClusterSize > 1 && (config.Version == e2e.QuorumLastVersion || member.Config().ExecPath == e2e.BinPath.Etcd)
}
Expand Down
27 changes: 9 additions & 18 deletions tests/robustness/failpoint/failpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"go.etcd.io/etcd/tests/v3/framework/e2e"
"go.etcd.io/etcd/tests/v3/robustness/identity"
"go.etcd.io/etcd/tests/v3/robustness/report"
"go.etcd.io/etcd/tests/v3/robustness/traffic"
)

const (
Expand All @@ -54,10 +55,10 @@ var (
}
)

func PickRandom(clus *e2e.EtcdProcessCluster) (Failpoint, error) {
func PickRandom(clus *e2e.EtcdProcessCluster, profile traffic.Profile) (Failpoint, error) {
availableFailpoints := make([]Failpoint, 0, len(allFailpoints))
for _, failpoint := range allFailpoints {
err := Validate(clus, failpoint)
err := Validate(clus, failpoint, profile)
if err != nil {
continue
}
Expand All @@ -69,16 +70,16 @@ func PickRandom(clus *e2e.EtcdProcessCluster) (Failpoint, error) {
return availableFailpoints[rand.Int()%len(availableFailpoints)], nil
}

func Validate(clus *e2e.EtcdProcessCluster, failpoint Failpoint) error {
func Validate(clus *e2e.EtcdProcessCluster, failpoint Failpoint, profile traffic.Profile) error {
for _, proc := range clus.Procs {
if !failpoint.Available(*clus.Cfg, proc) {
if !failpoint.Available(*clus.Cfg, proc, profile) {
return fmt.Errorf("failpoint %q not available on %s", failpoint.Name(), proc.Config().Name)
}
}
return nil
}

func Inject(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.EtcdProcessCluster, failpoint Failpoint, baseTime time.Time, ids identity.Provider) (*FailpointReport, error) {
func Inject(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.EtcdProcessCluster, failpoint Failpoint, baseTime time.Time, ids identity.Provider) (*report.FailpointReport, error) {
ctx, cancel := context.WithTimeout(ctx, triggerTimeout)
defer cancel()
var err error
Expand All @@ -99,8 +100,8 @@ func Inject(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.EtcdPro
lg.Info("Finished triggering failpoint", zap.String("failpoint", failpoint.Name()))
end := time.Since(baseTime)

return &FailpointReport{
Injection: Injection{
return &report.FailpointReport{
FailpointInjection: report.FailpointInjection{
Start: start,
End: end,
Name: failpoint.Name(),
Expand All @@ -109,16 +110,6 @@ func Inject(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.EtcdPro
}, nil
}

type FailpointReport struct {
Injection
Client []report.ClientReport
}

type Injection struct {
Start, End time.Duration
Name string
}

func verifyClusterHealth(ctx context.Context, _ *testing.T, clus *e2e.EtcdProcessCluster) error {
for i := 0; i < len(clus.Procs); i++ {
clusterClient, err := clientv3.New(clientv3.Config{
Expand Down Expand Up @@ -154,5 +145,5 @@ type Failpoint interface {
}

type AvailabilityChecker interface {
Available(e2e.EtcdProcessClusterConfig, e2e.EtcdProcess) bool
Available(e2e.EtcdProcessClusterConfig, e2e.EtcdProcess, traffic.Profile) bool
}
9 changes: 5 additions & 4 deletions tests/robustness/failpoint/gofail.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"go.etcd.io/etcd/tests/v3/framework/e2e"
"go.etcd.io/etcd/tests/v3/robustness/identity"
"go.etcd.io/etcd/tests/v3/robustness/report"
"go.etcd.io/etcd/tests/v3/robustness/traffic"
)

var (
Expand Down Expand Up @@ -147,11 +148,11 @@ func (f goPanicFailpoint) pickMember(t *testing.T, clus *e2e.EtcdProcessCluster)
}
}

func (f goPanicFailpoint) Available(config e2e.EtcdProcessClusterConfig, member e2e.EtcdProcess) bool {
func (f goPanicFailpoint) Available(config e2e.EtcdProcessClusterConfig, member e2e.EtcdProcess, profile traffic.Profile) bool {
if f.target == Follower && config.ClusterSize == 1 {
return false
}
if f.trigger != nil && !f.trigger.Available(config, member) {
if f.trigger != nil && !f.trigger.Available(config, member, profile) {
return false
}
memberFailpoints := member.Failpoints()
Expand Down Expand Up @@ -200,7 +201,7 @@ func (f killAndGofailSleep) Name() string {
return fmt.Sprintf("%s=sleep(%s)", f.failpoint, f.time)
}

func (f killAndGofailSleep) Available(config e2e.EtcdProcessClusterConfig, member e2e.EtcdProcess) bool {
func (f killAndGofailSleep) Available(config e2e.EtcdProcessClusterConfig, member e2e.EtcdProcess, profile traffic.Profile) bool {
if config.ClusterSize == 1 {
return false
}
Expand Down Expand Up @@ -238,7 +239,7 @@ func (f gofailSleepAndDeactivate) Name() string {
return fmt.Sprintf("%s=sleep(%s)", f.failpoint, f.time)
}

func (f gofailSleepAndDeactivate) Available(config e2e.EtcdProcessClusterConfig, member e2e.EtcdProcess) bool {
func (f gofailSleepAndDeactivate) Available(config e2e.EtcdProcessClusterConfig, member e2e.EtcdProcess, profile traffic.Profile) bool {
memberFailpoints := member.Failpoints()
if memberFailpoints == nil {
return false
Expand Down
3 changes: 2 additions & 1 deletion tests/robustness/failpoint/kill.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"go.etcd.io/etcd/tests/v3/framework/e2e"
"go.etcd.io/etcd/tests/v3/robustness/identity"
"go.etcd.io/etcd/tests/v3/robustness/report"
"go.etcd.io/etcd/tests/v3/robustness/traffic"
)

var (
Expand Down Expand Up @@ -67,6 +68,6 @@ func (f killFailpoint) Name() string {
return "Kill"
}

func (f killFailpoint) Available(e2e.EtcdProcessClusterConfig, e2e.EtcdProcess) bool {
func (f killFailpoint) Available(e2e.EtcdProcessClusterConfig, e2e.EtcdProcess, traffic.Profile) bool {
return true
}
7 changes: 4 additions & 3 deletions tests/robustness/failpoint/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"go.etcd.io/etcd/tests/v3/framework/e2e"
"go.etcd.io/etcd/tests/v3/robustness/identity"
"go.etcd.io/etcd/tests/v3/robustness/report"
"go.etcd.io/etcd/tests/v3/robustness/traffic"
)

var (
Expand Down Expand Up @@ -56,7 +57,7 @@ func (tb triggerBlackhole) Trigger(ctx context.Context, t *testing.T, member e2e
return nil, Blackhole(ctx, t, member, clus, tb.waitTillSnapshot)
}

func (tb triggerBlackhole) Available(config e2e.EtcdProcessClusterConfig, process e2e.EtcdProcess) bool {
func (tb triggerBlackhole) Available(config e2e.EtcdProcessClusterConfig, process e2e.EtcdProcess, profile traffic.Profile) bool {
// Avoid triggering failpoint if waiting for failpoint would take too long to fit into timeout.
// Number of required entries for snapshot depends on etcd configuration.
if tb.waitTillSnapshot && (entriesToGuaranteeSnapshot(config) > 200 || !e2e.CouldSetSnapshotCatchupEntries(process.Config().ExecPath)) {
Expand Down Expand Up @@ -179,7 +180,7 @@ func (f delayPeerNetworkFailpoint) Name() string {
return "delayPeerNetwork"
}

func (f delayPeerNetworkFailpoint) Available(config e2e.EtcdProcessClusterConfig, clus e2e.EtcdProcess) bool {
func (f delayPeerNetworkFailpoint) Available(config e2e.EtcdProcessClusterConfig, clus e2e.EtcdProcess, profile traffic.Profile) bool {
return config.ClusterSize > 1 && clus.PeerProxy() != nil
}

Expand Down Expand Up @@ -213,6 +214,6 @@ func (f dropPeerNetworkFailpoint) Name() string {
return "dropPeerNetwork"
}

func (f dropPeerNetworkFailpoint) Available(config e2e.EtcdProcessClusterConfig, clus e2e.EtcdProcess) bool {
func (f dropPeerNetworkFailpoint) Available(config e2e.EtcdProcessClusterConfig, clus e2e.EtcdProcess, profile traffic.Profile) bool {
return config.ClusterSize > 1 && clus.PeerProxy() != nil
}
8 changes: 6 additions & 2 deletions tests/robustness/failpoint/trigger.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"go.etcd.io/etcd/tests/v3/robustness/client"
"go.etcd.io/etcd/tests/v3/robustness/identity"
"go.etcd.io/etcd/tests/v3/robustness/report"
"go.etcd.io/etcd/tests/v3/robustness/traffic"
)

type trigger interface {
Expand All @@ -47,7 +48,7 @@ func (t triggerDefrag) Trigger(ctx context.Context, _ *testing.T, member e2e.Etc
return nil, nil
}

func (t triggerDefrag) Available(e2e.EtcdProcessClusterConfig, e2e.EtcdProcess) bool {
func (t triggerDefrag) Available(e2e.EtcdProcessClusterConfig, e2e.EtcdProcess, traffic.Profile) bool {
return true
}

Expand Down Expand Up @@ -83,7 +84,10 @@ func (t triggerCompact) Trigger(ctx context.Context, _ *testing.T, member e2e.Et
return []report.ClientReport{cc.Report()}, nil
}

func (t triggerCompact) Available(config e2e.EtcdProcessClusterConfig, _ e2e.EtcdProcess) bool {
func (t triggerCompact) Available(config e2e.EtcdProcessClusterConfig, _ e2e.EtcdProcess, profile traffic.Profile) bool {
if profile.ForbidCompaction {
return false
}
// Since introduction of compaction into traffic, injecting compaction failpoints started interfeering with peer proxy.
// TODO: Re-enable the peer proxy for compact failpoints when we confirm the root cause.
if config.PeerProxy {
Expand Down
6 changes: 3 additions & 3 deletions tests/robustness/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func testRobustness(ctx context.Context, t *testing.T, lg *zap.Logger, s testSce
defer forcestopCluster(r.Cluster)

if s.failpoint == nil {
s.failpoint, err = failpoint.PickRandom(r.Cluster)
s.failpoint, err = failpoint.PickRandom(r.Cluster, s.profile)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -107,7 +107,7 @@ func (s testScenario) run(ctx context.Context, t *testing.T, lg *zap.Logger, clu
defer cancel()
g := errgroup.Group{}
var operationReport, watchReport, failpointClientReport []report.ClientReport
failpointInjected := make(chan failpoint.Injection, 1)
failpointInjected := make(chan report.FailpointInjection, 1)

// using baseTime time-measuring operation to get monotonic clock reading
// see https://github.com/golang/go/blob/master/src/time/time.go#L17
Expand All @@ -125,7 +125,7 @@ func (s testScenario) run(ctx context.Context, t *testing.T, lg *zap.Logger, clu
// Give some time for traffic to reach qps target after injecting failpoint.
time.Sleep(time.Second)
if fr != nil {
failpointInjected <- fr.Injection
failpointInjected <- fr.FailpointInjection
failpointClientReport = fr.Client
}
return nil
Expand Down
29 changes: 29 additions & 0 deletions tests/robustness/report/failpoint.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
// Copyright 2024 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package report

import (
"time"
)

type FailpointReport struct {
FailpointInjection
Client []ClientReport
}

type FailpointInjection struct {
Start, End time.Duration
Name string
}
43 changes: 26 additions & 17 deletions tests/robustness/scenarios.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,24 +29,29 @@ import (
)

type TrafficProfile struct {
Name string
Traffic traffic.Traffic
Profile traffic.Profile
}

var trafficProfiles = []TrafficProfile{
{
Name: "EtcdHighTraffic",
Traffic: traffic.EtcdPut,
Profile: traffic.HighTrafficProfile,
},
{
Name: "EtcdTrafficDeleteLeases",
Traffic: traffic.EtcdPutDeleteLease,
Profile: traffic.LowTraffic,
},
{
Name: "KubernetesHighTraffic",
Traffic: traffic.Kubernetes,
Profile: traffic.HighTrafficProfile,
},
{
Name: "KubernetesLowTraffic",
Traffic: traffic.Kubernetes,
Profile: traffic.LowTraffic,
},
Expand All @@ -62,7 +67,6 @@ type testScenario struct {
}

func exploratoryScenarios(_ *testing.T) []testScenario {
enableLazyFS := e2e.BinPath.LazyFSAvailable()
randomizableOptions := []e2e.EPClusterOption{
options.WithClusterOptionGroups(
options.ClusterOptions{options.WithTickMs(29), options.WithElectionMs(271)},
Expand Down Expand Up @@ -98,23 +102,9 @@ func exploratoryScenarios(_ *testing.T) []testScenario {
}
scenarios := []testScenario{}
for _, tp := range trafficProfiles {
name := filepath.Join(tp.Traffic.Name(), tp.Profile.Name, "ClusterOfSize1")
name := filepath.Join(tp.Name, "ClusterOfSize1")
clusterOfSize1Options := baseOptions
clusterOfSize1Options = append(clusterOfSize1Options, e2e.WithClusterSize(1))
// Add LazyFS only for traffic with lower QPS as it uses a lot of CPU lowering minimal QPS.
if enableLazyFS && tp.Profile.MinimalQPS <= 100 {
// Set CompactionBatchLimit to default when LazyFS is enabled, because frequent compaction uses a lot of CPU too.
lazyFSOptions := append(clusterOfSize1Options, e2e.WithLazyFSEnabled(true), e2e.WithCompactionBatchLimit(1000))
scenarios = append(scenarios, testScenario{
name: filepath.Join(name, "LazyFS"),
traffic: tp.Traffic,
profile: tp.Profile,
cluster: *e2e.NewConfig(lazyFSOptions...),
})
// Smaller CompactionBatchLimit without LazyFS to test Compact.
clusterOfSize1Options = append(clusterOfSize1Options, options.WithCompactionBatchLimit(10, 100))
name = filepath.Join(name, "Compact")
}
scenarios = append(scenarios, testScenario{
name: name,
traffic: tp.Traffic,
Expand All @@ -124,7 +114,7 @@ func exploratoryScenarios(_ *testing.T) []testScenario {
}

for _, tp := range trafficProfiles {
name := filepath.Join(tp.Traffic.Name(), tp.Profile.Name, "ClusterOfSize3")
name := filepath.Join(tp.Name, "ClusterOfSize3")
clusterOfSize3Options := baseOptions
clusterOfSize3Options = append(clusterOfSize3Options, e2e.WithIsPeerTLS(true))
clusterOfSize3Options = append(clusterOfSize3Options, e2e.WithPeerProxy(true))
Expand All @@ -138,6 +128,25 @@ func exploratoryScenarios(_ *testing.T) []testScenario {
cluster: *e2e.NewConfig(clusterOfSize3Options...),
})
}
if e2e.BinPath.LazyFSAvailable() {
newScenarios := scenarios
for _, s := range scenarios {
// LazyFS increases the load on CPU, so we run it with more lightweight case.
if s.profile.MinimalQPS <= 100 && s.cluster.ClusterSize == 1 {
lazyfsCluster := s.cluster
lazyfsCluster.LazyFSEnabled = true
newScenarios = append(newScenarios, testScenario{
name: filepath.Join(s.name, "LazyFS"),
failpoint: s.failpoint,
cluster: lazyfsCluster,
traffic: s.traffic,
profile: s.profile.WithoutCompaction(),
watch: s.watch,
})
}
}
scenarios = newScenarios
}
return scenarios
}

Expand Down
Loading

0 comments on commit c70e0e4

Please sign in to comment.