diff --git a/tests/robustness/failpoint/cluster.go b/tests/robustness/failpoint/cluster.go index 1cc2116e9da..63aed056f2e 100644 --- a/tests/robustness/failpoint/cluster.go +++ b/tests/robustness/failpoint/cluster.go @@ -31,6 +31,7 @@ import ( "go.etcd.io/etcd/tests/v3/robustness/client" "go.etcd.io/etcd/tests/v3/robustness/identity" "go.etcd.io/etcd/tests/v3/robustness/report" + "go.etcd.io/etcd/tests/v3/robustness/traffic" ) var ( @@ -135,7 +136,7 @@ func (f memberReplace) Name() string { return "MemberReplace" } -func (f memberReplace) Available(config e2e.EtcdProcessClusterConfig, member e2e.EtcdProcess) bool { +func (f memberReplace) Available(config e2e.EtcdProcessClusterConfig, member e2e.EtcdProcess, profile traffic.Profile) bool { // a lower etcd version may not be able to join a cluster with higher cluster version. return config.ClusterSize > 1 && (config.Version == e2e.QuorumLastVersion || member.Config().ExecPath == e2e.BinPath.Etcd) } diff --git a/tests/robustness/failpoint/failpoint.go b/tests/robustness/failpoint/failpoint.go index bfe63fcdeba..235f13debb2 100644 --- a/tests/robustness/failpoint/failpoint.go +++ b/tests/robustness/failpoint/failpoint.go @@ -28,6 +28,7 @@ import ( "go.etcd.io/etcd/tests/v3/framework/e2e" "go.etcd.io/etcd/tests/v3/robustness/identity" "go.etcd.io/etcd/tests/v3/robustness/report" + "go.etcd.io/etcd/tests/v3/robustness/traffic" ) const ( @@ -54,10 +55,10 @@ var ( } ) -func PickRandom(clus *e2e.EtcdProcessCluster) (Failpoint, error) { +func PickRandom(clus *e2e.EtcdProcessCluster, profile traffic.Profile) (Failpoint, error) { availableFailpoints := make([]Failpoint, 0, len(allFailpoints)) for _, failpoint := range allFailpoints { - err := Validate(clus, failpoint) + err := Validate(clus, failpoint, profile) if err != nil { continue } @@ -69,16 +70,16 @@ func PickRandom(clus *e2e.EtcdProcessCluster) (Failpoint, error) { return availableFailpoints[rand.Int()%len(availableFailpoints)], nil } -func Validate(clus *e2e.EtcdProcessCluster, failpoint Failpoint) error { +func Validate(clus *e2e.EtcdProcessCluster, failpoint Failpoint, profile traffic.Profile) error { for _, proc := range clus.Procs { - if !failpoint.Available(*clus.Cfg, proc) { + if !failpoint.Available(*clus.Cfg, proc, profile) { return fmt.Errorf("failpoint %q not available on %s", failpoint.Name(), proc.Config().Name) } } return nil } -func Inject(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.EtcdProcessCluster, failpoint Failpoint, baseTime time.Time, ids identity.Provider) (*FailpointReport, error) { +func Inject(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.EtcdProcessCluster, failpoint Failpoint, baseTime time.Time, ids identity.Provider) (*report.FailpointReport, error) { ctx, cancel := context.WithTimeout(ctx, triggerTimeout) defer cancel() var err error @@ -99,8 +100,8 @@ func Inject(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.EtcdPro lg.Info("Finished triggering failpoint", zap.String("failpoint", failpoint.Name())) end := time.Since(baseTime) - return &FailpointReport{ - Injection: Injection{ + return &report.FailpointReport{ + FailpointInjection: report.FailpointInjection{ Start: start, End: end, Name: failpoint.Name(), @@ -109,16 +110,6 @@ func Inject(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.EtcdPro }, nil } -type FailpointReport struct { - Injection - Client []report.ClientReport -} - -type Injection struct { - Start, End time.Duration - Name string -} - func verifyClusterHealth(ctx context.Context, _ *testing.T, clus *e2e.EtcdProcessCluster) error { for i := 0; i < len(clus.Procs); i++ { clusterClient, err := clientv3.New(clientv3.Config{ @@ -154,5 +145,5 @@ type Failpoint interface { } type AvailabilityChecker interface { - Available(e2e.EtcdProcessClusterConfig, e2e.EtcdProcess) bool + Available(e2e.EtcdProcessClusterConfig, e2e.EtcdProcess, traffic.Profile) bool } diff --git a/tests/robustness/failpoint/gofail.go b/tests/robustness/failpoint/gofail.go index be54faa01f2..b4f4d354842 100644 --- a/tests/robustness/failpoint/gofail.go +++ b/tests/robustness/failpoint/gofail.go @@ -27,6 +27,7 @@ import ( "go.etcd.io/etcd/tests/v3/framework/e2e" "go.etcd.io/etcd/tests/v3/robustness/identity" "go.etcd.io/etcd/tests/v3/robustness/report" + "go.etcd.io/etcd/tests/v3/robustness/traffic" ) var ( @@ -147,11 +148,11 @@ func (f goPanicFailpoint) pickMember(t *testing.T, clus *e2e.EtcdProcessCluster) } } -func (f goPanicFailpoint) Available(config e2e.EtcdProcessClusterConfig, member e2e.EtcdProcess) bool { +func (f goPanicFailpoint) Available(config e2e.EtcdProcessClusterConfig, member e2e.EtcdProcess, profile traffic.Profile) bool { if f.target == Follower && config.ClusterSize == 1 { return false } - if f.trigger != nil && !f.trigger.Available(config, member) { + if f.trigger != nil && !f.trigger.Available(config, member, profile) { return false } memberFailpoints := member.Failpoints() @@ -200,7 +201,7 @@ func (f killAndGofailSleep) Name() string { return fmt.Sprintf("%s=sleep(%s)", f.failpoint, f.time) } -func (f killAndGofailSleep) Available(config e2e.EtcdProcessClusterConfig, member e2e.EtcdProcess) bool { +func (f killAndGofailSleep) Available(config e2e.EtcdProcessClusterConfig, member e2e.EtcdProcess, profile traffic.Profile) bool { if config.ClusterSize == 1 { return false } @@ -238,7 +239,7 @@ func (f gofailSleepAndDeactivate) Name() string { return fmt.Sprintf("%s=sleep(%s)", f.failpoint, f.time) } -func (f gofailSleepAndDeactivate) Available(config e2e.EtcdProcessClusterConfig, member e2e.EtcdProcess) bool { +func (f gofailSleepAndDeactivate) Available(config e2e.EtcdProcessClusterConfig, member e2e.EtcdProcess, profile traffic.Profile) bool { memberFailpoints := member.Failpoints() if memberFailpoints == nil { return false diff --git a/tests/robustness/failpoint/kill.go b/tests/robustness/failpoint/kill.go index b0b5ff4fb2c..d046ae580e7 100644 --- a/tests/robustness/failpoint/kill.go +++ b/tests/robustness/failpoint/kill.go @@ -27,6 +27,7 @@ import ( "go.etcd.io/etcd/tests/v3/framework/e2e" "go.etcd.io/etcd/tests/v3/robustness/identity" "go.etcd.io/etcd/tests/v3/robustness/report" + "go.etcd.io/etcd/tests/v3/robustness/traffic" ) var ( @@ -67,6 +68,6 @@ func (f killFailpoint) Name() string { return "Kill" } -func (f killFailpoint) Available(e2e.EtcdProcessClusterConfig, e2e.EtcdProcess) bool { +func (f killFailpoint) Available(e2e.EtcdProcessClusterConfig, e2e.EtcdProcess, traffic.Profile) bool { return true } diff --git a/tests/robustness/failpoint/network.go b/tests/robustness/failpoint/network.go index b355b5182bc..27504c396b9 100644 --- a/tests/robustness/failpoint/network.go +++ b/tests/robustness/failpoint/network.go @@ -26,6 +26,7 @@ import ( "go.etcd.io/etcd/tests/v3/framework/e2e" "go.etcd.io/etcd/tests/v3/robustness/identity" "go.etcd.io/etcd/tests/v3/robustness/report" + "go.etcd.io/etcd/tests/v3/robustness/traffic" ) var ( @@ -56,7 +57,7 @@ func (tb triggerBlackhole) Trigger(ctx context.Context, t *testing.T, member e2e return nil, Blackhole(ctx, t, member, clus, tb.waitTillSnapshot) } -func (tb triggerBlackhole) Available(config e2e.EtcdProcessClusterConfig, process e2e.EtcdProcess) bool { +func (tb triggerBlackhole) Available(config e2e.EtcdProcessClusterConfig, process e2e.EtcdProcess, profile traffic.Profile) bool { // Avoid triggering failpoint if waiting for failpoint would take too long to fit into timeout. // Number of required entries for snapshot depends on etcd configuration. if tb.waitTillSnapshot && (entriesToGuaranteeSnapshot(config) > 200 || !e2e.CouldSetSnapshotCatchupEntries(process.Config().ExecPath)) { @@ -179,7 +180,7 @@ func (f delayPeerNetworkFailpoint) Name() string { return "delayPeerNetwork" } -func (f delayPeerNetworkFailpoint) Available(config e2e.EtcdProcessClusterConfig, clus e2e.EtcdProcess) bool { +func (f delayPeerNetworkFailpoint) Available(config e2e.EtcdProcessClusterConfig, clus e2e.EtcdProcess, profile traffic.Profile) bool { return config.ClusterSize > 1 && clus.PeerProxy() != nil } @@ -213,6 +214,6 @@ func (f dropPeerNetworkFailpoint) Name() string { return "dropPeerNetwork" } -func (f dropPeerNetworkFailpoint) Available(config e2e.EtcdProcessClusterConfig, clus e2e.EtcdProcess) bool { +func (f dropPeerNetworkFailpoint) Available(config e2e.EtcdProcessClusterConfig, clus e2e.EtcdProcess, profile traffic.Profile) bool { return config.ClusterSize > 1 && clus.PeerProxy() != nil } diff --git a/tests/robustness/failpoint/trigger.go b/tests/robustness/failpoint/trigger.go index 578833f9ec5..55ef0614ea6 100644 --- a/tests/robustness/failpoint/trigger.go +++ b/tests/robustness/failpoint/trigger.go @@ -25,6 +25,7 @@ import ( "go.etcd.io/etcd/tests/v3/robustness/client" "go.etcd.io/etcd/tests/v3/robustness/identity" "go.etcd.io/etcd/tests/v3/robustness/report" + "go.etcd.io/etcd/tests/v3/robustness/traffic" ) type trigger interface { @@ -47,7 +48,7 @@ func (t triggerDefrag) Trigger(ctx context.Context, _ *testing.T, member e2e.Etc return nil, nil } -func (t triggerDefrag) Available(e2e.EtcdProcessClusterConfig, e2e.EtcdProcess) bool { +func (t triggerDefrag) Available(e2e.EtcdProcessClusterConfig, e2e.EtcdProcess, traffic.Profile) bool { return true } @@ -83,7 +84,10 @@ func (t triggerCompact) Trigger(ctx context.Context, _ *testing.T, member e2e.Et return []report.ClientReport{cc.Report()}, nil } -func (t triggerCompact) Available(config e2e.EtcdProcessClusterConfig, _ e2e.EtcdProcess) bool { +func (t triggerCompact) Available(config e2e.EtcdProcessClusterConfig, _ e2e.EtcdProcess, profile traffic.Profile) bool { + if profile.ForbidCompaction { + return false + } // Since introduction of compaction into traffic, injecting compaction failpoints started interfeering with peer proxy. // TODO: Re-enable the peer proxy for compact failpoints when we confirm the root cause. if config.PeerProxy { diff --git a/tests/robustness/main_test.go b/tests/robustness/main_test.go index a4b6d763a29..4f3f7be4b51 100644 --- a/tests/robustness/main_test.go +++ b/tests/robustness/main_test.go @@ -75,7 +75,7 @@ func testRobustness(ctx context.Context, t *testing.T, lg *zap.Logger, s testSce defer forcestopCluster(r.Cluster) if s.failpoint == nil { - s.failpoint, err = failpoint.PickRandom(r.Cluster) + s.failpoint, err = failpoint.PickRandom(r.Cluster, s.profile) if err != nil { t.Fatal(err) } @@ -107,7 +107,7 @@ func (s testScenario) run(ctx context.Context, t *testing.T, lg *zap.Logger, clu defer cancel() g := errgroup.Group{} var operationReport, watchReport, failpointClientReport []report.ClientReport - failpointInjected := make(chan failpoint.Injection, 1) + failpointInjected := make(chan report.FailpointInjection, 1) // using baseTime time-measuring operation to get monotonic clock reading // see https://github.com/golang/go/blob/master/src/time/time.go#L17 @@ -125,7 +125,7 @@ func (s testScenario) run(ctx context.Context, t *testing.T, lg *zap.Logger, clu // Give some time for traffic to reach qps target after injecting failpoint. time.Sleep(time.Second) if fr != nil { - failpointInjected <- fr.Injection + failpointInjected <- fr.FailpointInjection failpointClientReport = fr.Client } return nil diff --git a/tests/robustness/report/failpoint.go b/tests/robustness/report/failpoint.go new file mode 100644 index 00000000000..17b604e3890 --- /dev/null +++ b/tests/robustness/report/failpoint.go @@ -0,0 +1,29 @@ +// Copyright 2024 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package report + +import ( + "time" +) + +type FailpointReport struct { + FailpointInjection + Client []ClientReport +} + +type FailpointInjection struct { + Start, End time.Duration + Name string +} diff --git a/tests/robustness/scenarios.go b/tests/robustness/scenarios.go index 9fcaa323b83..40b37f06332 100644 --- a/tests/robustness/scenarios.go +++ b/tests/robustness/scenarios.go @@ -29,24 +29,29 @@ import ( ) type TrafficProfile struct { + Name string Traffic traffic.Traffic Profile traffic.Profile } var trafficProfiles = []TrafficProfile{ { + Name: "EtcdHighTraffic", Traffic: traffic.EtcdPut, Profile: traffic.HighTrafficProfile, }, { + Name: "EtcdTrafficDeleteLeases", Traffic: traffic.EtcdPutDeleteLease, Profile: traffic.LowTraffic, }, { + Name: "KubernetesHighTraffic", Traffic: traffic.Kubernetes, Profile: traffic.HighTrafficProfile, }, { + Name: "KubernetesLowTraffic", Traffic: traffic.Kubernetes, Profile: traffic.LowTraffic, }, @@ -62,7 +67,6 @@ type testScenario struct { } func exploratoryScenarios(_ *testing.T) []testScenario { - enableLazyFS := e2e.BinPath.LazyFSAvailable() randomizableOptions := []e2e.EPClusterOption{ options.WithClusterOptionGroups( options.ClusterOptions{options.WithTickMs(29), options.WithElectionMs(271)}, @@ -98,23 +102,9 @@ func exploratoryScenarios(_ *testing.T) []testScenario { } scenarios := []testScenario{} for _, tp := range trafficProfiles { - name := filepath.Join(tp.Traffic.Name(), tp.Profile.Name, "ClusterOfSize1") + name := filepath.Join(tp.Name, "ClusterOfSize1") clusterOfSize1Options := baseOptions clusterOfSize1Options = append(clusterOfSize1Options, e2e.WithClusterSize(1)) - // Add LazyFS only for traffic with lower QPS as it uses a lot of CPU lowering minimal QPS. - if enableLazyFS && tp.Profile.MinimalQPS <= 100 { - // Set CompactionBatchLimit to default when LazyFS is enabled, because frequent compaction uses a lot of CPU too. - lazyFSOptions := append(clusterOfSize1Options, e2e.WithLazyFSEnabled(true), e2e.WithCompactionBatchLimit(1000)) - scenarios = append(scenarios, testScenario{ - name: filepath.Join(name, "LazyFS"), - traffic: tp.Traffic, - profile: tp.Profile, - cluster: *e2e.NewConfig(lazyFSOptions...), - }) - // Smaller CompactionBatchLimit without LazyFS to test Compact. - clusterOfSize1Options = append(clusterOfSize1Options, options.WithCompactionBatchLimit(10, 100)) - name = filepath.Join(name, "Compact") - } scenarios = append(scenarios, testScenario{ name: name, traffic: tp.Traffic, @@ -124,7 +114,7 @@ func exploratoryScenarios(_ *testing.T) []testScenario { } for _, tp := range trafficProfiles { - name := filepath.Join(tp.Traffic.Name(), tp.Profile.Name, "ClusterOfSize3") + name := filepath.Join(tp.Name, "ClusterOfSize3") clusterOfSize3Options := baseOptions clusterOfSize3Options = append(clusterOfSize3Options, e2e.WithIsPeerTLS(true)) clusterOfSize3Options = append(clusterOfSize3Options, e2e.WithPeerProxy(true)) @@ -138,6 +128,25 @@ func exploratoryScenarios(_ *testing.T) []testScenario { cluster: *e2e.NewConfig(clusterOfSize3Options...), }) } + if e2e.BinPath.LazyFSAvailable() { + newScenarios := scenarios + for _, s := range scenarios { + // LazyFS increases the load on CPU, so we run it with more lightweight case. + if s.profile.MinimalQPS <= 100 && s.cluster.ClusterSize == 1 { + lazyfsCluster := s.cluster + lazyfsCluster.LazyFSEnabled = true + newScenarios = append(newScenarios, testScenario{ + name: filepath.Join(s.name, "LazyFS"), + failpoint: s.failpoint, + cluster: lazyfsCluster, + traffic: s.traffic, + profile: s.profile.WithoutCompaction(), + watch: s.watch, + }) + } + } + scenarios = newScenarios + } return scenarios } diff --git a/tests/robustness/traffic/etcd.go b/tests/robustness/traffic/etcd.go index be942f2ca2d..d586baa1764 100644 --- a/tests/robustness/traffic/etcd.go +++ b/tests/robustness/traffic/etcd.go @@ -30,7 +30,7 @@ import ( ) var ( - EtcdPutDeleteLease = etcdTraffic{ + EtcdPutDeleteLease Traffic = etcdTraffic{ keyCount: 10, leaseTTL: DefaultLeaseTTL, largePutSize: 32769, @@ -49,7 +49,7 @@ var ( {Choice: Compact, Weight: 5}, }, } - EtcdPut = etcdTraffic{ + EtcdPut Traffic = etcdTraffic{ keyCount: 10, largePutSize: 32769, leaseTTL: DefaultLeaseTTL, @@ -73,6 +73,21 @@ type etcdTraffic struct { largePutSize int } +func (t etcdTraffic) WithoutCompact() Traffic { + requests := make([]choiceWeight[etcdRequestType], 0, len(t.requests)) + for _, request := range t.requests { + if request.choice != Compact { + requests = append(requests, request) + } + } + return etcdTraffic{ + keyCount: t.keyCount, + requests: requests, + leaseTTL: t.leaseTTL, + largePutSize: t.largePutSize, + } +} + func (t etcdTraffic) ExpectUniqueRevision() bool { return false } diff --git a/tests/robustness/traffic/kubernetes.go b/tests/robustness/traffic/kubernetes.go index 1754f03b478..6a845cf20fa 100644 --- a/tests/robustness/traffic/kubernetes.go +++ b/tests/robustness/traffic/kubernetes.go @@ -33,7 +33,7 @@ import ( ) var ( - Kubernetes = kubernetesTraffic{ + Kubernetes Traffic = kubernetesTraffic{ averageKeyCount: 10, resource: "pods", namespace: "default", @@ -53,12 +53,23 @@ type kubernetesTraffic struct { writeChoices []random.ChoiceWeight[KubernetesRequestType] } -func (t kubernetesTraffic) ExpectUniqueRevision() bool { - return true +func (t kubernetesTraffic) WithoutCompact() Traffic { + wcs := make([]choiceWeight[KubernetesRequestType], 0, len(t.writeChoices)) + for _, wc := range t.writeChoices { + if wc.choice != KubernetesCompact { + wcs = append(wcs, wc) + } + } + return kubernetesTraffic{ + averageKeyCount: t.averageKeyCount, + resource: t.resource, + namespace: t.namespace, + writeChoices: wcs, + } } -func (t kubernetesTraffic) Name() string { - return "Kubernetes" +func (t kubernetesTraffic) ExpectUniqueRevision() bool { + return true } func (t kubernetesTraffic) Run(ctx context.Context, c *client.RecordingClient, limiter *rate.Limiter, ids identity.Provider, lm identity.LeaseIDStorage, nonUniqueWriteLimiter ConcurrencyLimiter, finish <-chan struct{}) { diff --git a/tests/robustness/traffic/traffic.go b/tests/robustness/traffic/traffic.go index 31fec7fa760..92d7a5c6697 100644 --- a/tests/robustness/traffic/traffic.go +++ b/tests/robustness/traffic/traffic.go @@ -25,7 +25,6 @@ import ( "go.etcd.io/etcd/tests/v3/framework/e2e" "go.etcd.io/etcd/tests/v3/robustness/client" - "go.etcd.io/etcd/tests/v3/robustness/failpoint" "go.etcd.io/etcd/tests/v3/robustness/identity" "go.etcd.io/etcd/tests/v3/robustness/model" "go.etcd.io/etcd/tests/v3/robustness/report" @@ -38,14 +37,12 @@ var ( MultiOpTxnOpCount = 4 LowTraffic = Profile{ - Name: "LowTraffic", MinimalQPS: 100, MaximalQPS: 200, ClientCount: 8, MaxNonUniqueRequestConcurrency: 3, } HighTrafficProfile = Profile{ - Name: "HighTraffic", MinimalQPS: 200, MaximalQPS: 1000, ClientCount: 12, @@ -53,7 +50,7 @@ var ( } ) -func SimulateTraffic(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.EtcdProcessCluster, profile Profile, traffic Traffic, failpointInjected <-chan failpoint.Injection, baseTime time.Time, ids identity.Provider) []report.ClientReport { +func SimulateTraffic(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.EtcdProcessCluster, profile Profile, traffic Traffic, failpointInjected <-chan report.FailpointInjection, baseTime time.Time, ids identity.Provider) []report.ClientReport { mux := sync.Mutex{} endpoints := clus.EndpointsGRPC() @@ -61,6 +58,10 @@ func SimulateTraffic(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2 reports := []report.ClientReport{} limiter := rate.NewLimiter(rate.Limit(profile.MaximalQPS), 200) + if profile.ForbidCompaction { + traffic = traffic.WithoutCompact() + } + cc, err := client.NewRecordingClient(endpoints, ids, baseTime) if err != nil { t.Fatal(err) @@ -92,7 +93,7 @@ func SimulateTraffic(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2 mux.Unlock() }(c) } - var fr *failpoint.Injection + var fr *report.FailpointInjection select { case frp, ok := <-failpointInjected: if !ok { @@ -165,15 +166,20 @@ func (ts *trafficStats) QPS() float64 { } type Profile struct { - Name string MinimalQPS float64 MaximalQPS float64 MaxNonUniqueRequestConcurrency int ClientCount int + ForbidCompaction bool +} + +func (p Profile) WithoutCompaction() Profile { + p.ForbidCompaction = true + return p } type Traffic interface { Run(ctx context.Context, c *client.RecordingClient, qpsLimiter *rate.Limiter, ids identity.Provider, lm identity.LeaseIDStorage, nonUniqueWriteLimiter ConcurrencyLimiter, finish <-chan struct{}) ExpectUniqueRevision() bool - Name() string + WithoutCompact() Traffic }