diff --git a/tests/e2e/ctl_v3_snapshot_test.go b/tests/e2e/ctl_v3_snapshot_test.go index 258bc5213c2..df0c2c33419 100644 --- a/tests/e2e/ctl_v3_snapshot_test.go +++ b/tests/e2e/ctl_v3_snapshot_test.go @@ -213,7 +213,7 @@ func testIssue6361(t *testing.T) { newDataDir := filepath.Join(t.TempDir(), "test.data") t.Log("etcdctl restoring the snapshot...") - err = e2e.SpawnWithExpect([]string{e2e.BinPath.Etcdutl, "snapshot", "restore", fpath, "--name", epc.Procs[0].Config().Name, "--initial-cluster", epc.Procs[0].Config().InitialCluster, "--initial-cluster-token", epc.Procs[0].Config().InitialToken, "--initial-advertise-peer-urls", epc.Procs[0].Config().Purl.String(), "--data-dir", newDataDir}, "added member") + err = e2e.SpawnWithExpect([]string{e2e.BinPath.Etcdutl, "snapshot", "restore", fpath, "--name", epc.Procs[0].Config().Name, "--initial-cluster", epc.Procs[0].Config().InitialCluster, "--initial-cluster-token", epc.Procs[0].Config().InitialToken, "--initial-advertise-peer-urls", epc.Procs[0].Config().PeerURL.String(), "--data-dir", newDataDir}, "added member") if err != nil { t.Fatal(err) } diff --git a/tests/e2e/etcd_grpcproxy_test.go b/tests/e2e/etcd_grpcproxy_test.go index 08134449d5c..db9ad7b4016 100644 --- a/tests/e2e/etcd_grpcproxy_test.go +++ b/tests/e2e/etcd_grpcproxy_test.go @@ -43,7 +43,7 @@ func TestGrpcProxyAutoSync(t *testing.T) { }() var ( - node1ClientURL = epc.Procs[0].Config().Acurl + node1ClientURL = epc.Procs[0].Config().ClientURL proxyClientURL = "127.0.0.1:32379" ) @@ -68,11 +68,11 @@ func TestGrpcProxyAutoSync(t *testing.T) { require.NoError(t, err) // Wait for auto sync of endpoints - err = waitForEndpointInLog(ctx, proxyProc, epc.Procs[1].Config().Acurl) + err = waitForEndpointInLog(ctx, proxyProc, epc.Procs[1].Config().ClientURL) require.NoError(t, err) err = epc.CloseProc(ctx, func(proc e2e.EtcdProcess) bool { - return proc.Config().Acurl == node1ClientURL + return proc.Config().ClientURL == node1ClientURL }) require.NoError(t, err) diff --git a/tests/framework/e2e/cluster.go b/tests/framework/e2e/cluster.go index 870dc86def9..cf28e8b56d9 100644 --- a/tests/framework/e2e/cluster.go +++ b/tests/framework/e2e/cluster.go @@ -30,6 +30,7 @@ import ( "go.uber.org/zap/zaptest" "go.etcd.io/etcd/api/v3/etcdserverpb" + "go.etcd.io/etcd/pkg/v3/proxy" "go.etcd.io/etcd/server/v3/etcdserver" "go.etcd.io/etcd/tests/v3/framework/config" ) @@ -183,6 +184,7 @@ type EtcdProcessClusterConfig struct { WarningUnaryRequestDuration time.Duration ExperimentalWarningUnaryRequestDuration time.Duration + PeerProxy bool } func DefaultConfig() *EtcdProcessClusterConfig { @@ -334,6 +336,10 @@ func WithCompactionBatchLimit(limit int) EPClusterOption { return func(c *EtcdProcessClusterConfig) { c.CompactionBatchLimit = limit } } +func WithPeerProxy(enabled bool) EPClusterOption { + return func(c *EtcdProcessClusterConfig) { c.PeerProxy = enabled } +} + // NewEtcdProcessCluster launches a new cluster from etcd processes, returning // a new EtcdProcessCluster once all nodes are ready to accept client requests. func NewEtcdProcessCluster(ctx context.Context, t testing.TB, opts ...EPClusterOption) (*EtcdProcessCluster, error) { @@ -421,7 +427,7 @@ func (cfg *EtcdProcessClusterConfig) EtcdAllServerProcessConfigs(tb testing.TB) for i := 0; i < cfg.ClusterSize; i++ { etcdCfgs[i] = cfg.EtcdServerProcessConfig(tb, i) - initialCluster[i] = fmt.Sprintf("%s=%s", etcdCfgs[i].Name, etcdCfgs[i].Purl.String()) + initialCluster[i] = fmt.Sprintf("%s=%s", etcdCfgs[i].Name, etcdCfgs[i].PeerURL.String()) } for i := range etcdCfgs { @@ -448,8 +454,12 @@ func (cfg *EtcdProcessClusterConfig) EtcdServerProcessConfig(tb testing.TB, i in var curls []string var curl, curltls string port := cfg.BasePort + 5*i - curlHost := fmt.Sprintf("localhost:%d", port) + clientPort := port + peerPort := port + 1 + metricsPort := port + 2 + peer2Port := port + 3 + curlHost := fmt.Sprintf("localhost:%d", clientPort) switch cfg.Client.ConnectionType { case ClientNonTLS, ClientTLS: curl = (&url.URL{Scheme: cfg.ClientScheme(), Host: curlHost}).String() @@ -460,7 +470,17 @@ func (cfg *EtcdProcessClusterConfig) EtcdServerProcessConfig(tb testing.TB, i in curls = []string{curl, curltls} } - purl := url.URL{Scheme: cfg.PeerScheme(), Host: fmt.Sprintf("localhost:%d", port+1)} + peerListenUrl := url.URL{Scheme: cfg.PeerScheme(), Host: fmt.Sprintf("localhost:%d", peerPort)} + peerAdvertiseUrl := url.URL{Scheme: cfg.PeerScheme(), Host: fmt.Sprintf("localhost:%d", peerPort)} + var proxyCfg *proxy.ServerConfig + if cfg.PeerProxy { + peerAdvertiseUrl.Host = fmt.Sprintf("localhost:%d", peer2Port) + proxyCfg = &proxy.ServerConfig{ + Logger: zap.NewNop(), + To: peerListenUrl, + From: peerAdvertiseUrl, + } + } name := fmt.Sprintf("%s-test-%d", testNameCleanRegex.ReplaceAllString(tb.Name(), ""), i) @@ -480,8 +500,8 @@ func (cfg *EtcdProcessClusterConfig) EtcdServerProcessConfig(tb testing.TB, i in "--name", name, "--listen-client-urls", strings.Join(curls, ","), "--advertise-client-urls", strings.Join(curls, ","), - "--listen-peer-urls", purl.String(), - "--initial-advertise-peer-urls", purl.String(), + "--listen-peer-urls", peerListenUrl.String(), + "--initial-advertise-peer-urls", peerAdvertiseUrl.String(), "--initial-cluster-token", cfg.InitialToken, "--data-dir", dataDirPath, "--snapshot-count", fmt.Sprintf("%d", cfg.SnapshotCount), @@ -508,7 +528,7 @@ func (cfg *EtcdProcessClusterConfig) EtcdServerProcessConfig(tb testing.TB, i in if cfg.MetricsURLScheme != "" { murl = (&url.URL{ Scheme: cfg.MetricsURLScheme, - Host: fmt.Sprintf("localhost:%d", port+2), + Host: fmt.Sprintf("localhost:%d", metricsPort), }).String() args = append(args, "--listen-metrics-urls", murl) } @@ -598,11 +618,12 @@ func (cfg *EtcdProcessClusterConfig) EtcdServerProcessConfig(tb testing.TB, i in DataDirPath: dataDirPath, KeepDataDir: cfg.KeepDataDir, Name: name, - Purl: purl, - Acurl: curl, - Murl: murl, + PeerURL: peerAdvertiseUrl, + ClientURL: curl, + MetricsURL: murl, InitialToken: cfg.InitialToken, GoFailPort: gofailPort, + Proxy: proxyCfg, } } @@ -695,7 +716,7 @@ func (epc *EtcdProcessCluster) CloseProc(ctx context.Context, finder func(EtcdPr return fmt.Errorf("failed to get member list: %w", err) } - memberID, err := findMemberIDByEndpoint(memberList.Members, proc.Config().Acurl) + memberID, err := findMemberIDByEndpoint(memberList.Members, proc.Config().ClientURL) if err != nil { return fmt.Errorf("failed to find member ID: %w", err) } @@ -715,7 +736,7 @@ func (epc *EtcdProcessCluster) CloseProc(ctx context.Context, finder func(EtcdPr return errors.New("failed to remove member after 10 tries") } - epc.lg.Info("successfully removed member", zap.String("acurl", proc.Config().Acurl)) + epc.lg.Info("successfully removed member", zap.String("acurl", proc.Config().ClientURL)) // Then stop process return proc.Close() @@ -732,17 +753,17 @@ func (epc *EtcdProcessCluster) StartNewProc(ctx context.Context, cfg *EtcdProces epc.nextSeq++ initialCluster := []string{ - fmt.Sprintf("%s=%s", serverCfg.Name, serverCfg.Purl.String()), + fmt.Sprintf("%s=%s", serverCfg.Name, serverCfg.PeerURL.String()), } for _, p := range epc.Procs { - initialCluster = append(initialCluster, fmt.Sprintf("%s=%s", p.Config().Name, p.Config().Purl.String())) + initialCluster = append(initialCluster, fmt.Sprintf("%s=%s", p.Config().Name, p.Config().PeerURL.String())) } epc.Cfg.SetInitialOrDiscovery(serverCfg, initialCluster, "existing") // First add new member to cluster memberCtl := epc.Client(opts...) - _, err := memberCtl.MemberAdd(ctx, serverCfg.Name, []string{serverCfg.Purl.String()}) + _, err := memberCtl.MemberAdd(ctx, serverCfg.Name, []string{serverCfg.PeerURL.String()}) if err != nil { return fmt.Errorf("failed to add new member: %w", err) } diff --git a/tests/framework/e2e/cluster_proxy.go b/tests/framework/e2e/cluster_proxy.go index 53b710c5192..9adb4f91cf9 100644 --- a/tests/framework/e2e/cluster_proxy.go +++ b/tests/framework/e2e/cluster_proxy.go @@ -28,6 +28,7 @@ import ( "go.uber.org/zap" "go.etcd.io/etcd/pkg/v3/expect" + "go.etcd.io/etcd/pkg/v3/proxy" "go.etcd.io/etcd/tests/v3/framework/config" ) @@ -123,6 +124,10 @@ func (p *proxyEtcdProcess) Wait(ctx context.Context) error { return p.etcdProc.Wait(ctx) } +func (p *proxyEtcdProcess) PeerProxy() proxy.Server { + return nil +} + type proxyProc struct { lg *zap.Logger name string @@ -187,7 +192,7 @@ type proxyV2Proc struct { } func proxyListenURL(cfg *EtcdServerProcessConfig, portOffset int) string { - u, err := url.Parse(cfg.Acurl) + u, err := url.Parse(cfg.ClientURL) if err != nil { panic(err) } @@ -205,7 +210,7 @@ func newProxyV2Proc(cfg *EtcdServerProcessConfig) *proxyV2Proc { "--name", name, "--proxy", "on", "--listen-client-urls", listenAddr, - "--initial-cluster", cfg.Name + "=" + cfg.Purl.String(), + "--initial-cluster", cfg.Name + "=" + cfg.PeerURL.String(), "--data-dir", dataDir, } return &proxyV2Proc{ @@ -231,13 +236,13 @@ func newProxyV3Proc(cfg *EtcdServerProcessConfig) *proxyV3Proc { "grpc-proxy", "start", "--listen-addr", strings.Split(listenAddr, "/")[2], - "--endpoints", cfg.Acurl, + "--endpoints", cfg.ClientURL, // pass-through member RPCs "--advertise-client-url", "", "--data-dir", cfg.DataDirPath, } murl := "" - if cfg.Murl != "" { + if cfg.MetricsURL != "" { murl = proxyListenURL(cfg, 4) args = append(args, "--metrics-addr", murl) } diff --git a/tests/framework/e2e/curl.go b/tests/framework/e2e/curl.go index 9b12a54abf3..20bf111eebf 100644 --- a/tests/framework/e2e/curl.go +++ b/tests/framework/e2e/curl.go @@ -54,7 +54,7 @@ func (r CURLReq) timeoutDuration() time.Duration { func CURLPrefixArgs(cfg *EtcdProcessClusterConfig, member EtcdProcess, method string, req CURLReq) []string { var ( cmdArgs = []string{"curl"} - acurl = member.Config().Acurl + acurl = member.Config().ClientURL ) if req.MetricsURLScheme != "https" { if req.IsTLS { @@ -62,7 +62,7 @@ func CURLPrefixArgs(cfg *EtcdProcessClusterConfig, member EtcdProcess, method st panic("should not use cURLPrefixArgsUseTLS when serving only TLS or non-TLS") } cmdArgs = append(cmdArgs, "--cacert", CaPath, "--cert", CertPath, "--key", PrivateKeyPath) - acurl = ToTLS(member.Config().Acurl) + acurl = ToTLS(member.Config().ClientURL) } else if cfg.Client.ConnectionType == ClientTLS { if cfg.CN { cmdArgs = append(cmdArgs, "--cacert", CaPath, "--cert", CertPath, "--key", PrivateKeyPath) diff --git a/tests/framework/e2e/etcd_process.go b/tests/framework/e2e/etcd_process.go index d5238b5a26c..5a7e062c3d1 100644 --- a/tests/framework/e2e/etcd_process.go +++ b/tests/framework/e2e/etcd_process.go @@ -28,6 +28,7 @@ import ( "go.etcd.io/etcd/client/pkg/v3/fileutil" "go.etcd.io/etcd/pkg/v3/expect" + "go.etcd.io/etcd/pkg/v3/proxy" "go.etcd.io/etcd/tests/v3/framework/config" ) @@ -49,6 +50,7 @@ type EtcdProcess interface { Stop() error Close() error Config() *EtcdServerProcessConfig + PeerProxy() proxy.Server Logs() LogsExpect Kill() error } @@ -62,6 +64,7 @@ type LogsExpect interface { type EtcdServerProcess struct { cfg *EtcdServerProcessConfig proc *expect.ExpectProcess + proxy proxy.Server donec chan struct{} // closed when Interact() terminates } @@ -78,14 +81,15 @@ type EtcdServerProcessConfig struct { Name string - Purl url.URL - - Acurl string - Murl string + PeerURL url.URL + ClientURL string + MetricsURL string InitialToken string InitialCluster string GoFailPort int + + Proxy *proxy.ServerConfig } func NewEtcdServerProcess(cfg *EtcdServerProcessConfig) (*EtcdServerProcess, error) { @@ -100,9 +104,9 @@ func NewEtcdServerProcess(cfg *EtcdServerProcessConfig) (*EtcdServerProcess, err return &EtcdServerProcess{cfg: cfg, donec: make(chan struct{})}, nil } -func (ep *EtcdServerProcess) EndpointsV2() []string { return []string{ep.cfg.Acurl} } +func (ep *EtcdServerProcess) EndpointsV2() []string { return []string{ep.cfg.ClientURL} } func (ep *EtcdServerProcess) EndpointsV3() []string { return ep.EndpointsV2() } -func (ep *EtcdServerProcess) EndpointsMetrics() []string { return []string{ep.cfg.Murl} } +func (ep *EtcdServerProcess) EndpointsMetrics() []string { return []string{ep.cfg.MetricsURL} } func (epc *EtcdServerProcess) Client(opts ...config.ClientOption) *EtcdctlV3 { etcdctl, err := NewEtcdctl(epc.Config().Client, epc.EndpointsV3(), opts...) @@ -117,6 +121,15 @@ func (ep *EtcdServerProcess) Start(ctx context.Context) error { if ep.proc != nil { panic("already started") } + if ep.cfg.Proxy != nil && ep.proxy == nil { + ep.cfg.lg.Info("starting proxy...", zap.String("name", ep.cfg.Name), zap.String("from", ep.cfg.Proxy.From.String()), zap.String("to", ep.cfg.Proxy.To.String())) + ep.proxy = proxy.NewServer(*ep.cfg.Proxy) + select { + case <-ep.proxy.Ready(): + case err := <-ep.proxy.Error(): + return err + } + } ep.cfg.lg.Info("starting server...", zap.String("name", ep.cfg.Name)) proc, err := SpawnCmdWithLogger(ep.cfg.lg, append([]string{ep.cfg.ExecPath}, ep.cfg.Args...), ep.cfg.EnvVars, ep.cfg.Name) if err != nil { @@ -161,8 +174,8 @@ func (ep *EtcdServerProcess) Stop() (err error) { } <-ep.donec ep.donec = make(chan struct{}) - if ep.cfg.Purl.Scheme == "unix" || ep.cfg.Purl.Scheme == "unixs" { - err = os.Remove(ep.cfg.Purl.Host + ep.cfg.Purl.Path) + if ep.cfg.PeerURL.Scheme == "unix" || ep.cfg.PeerURL.Scheme == "unixs" { + err = os.Remove(ep.cfg.PeerURL.Host + ep.cfg.PeerURL.Path) if err != nil && !os.IsNotExist(err) { return err } @@ -176,6 +189,15 @@ func (ep *EtcdServerProcess) Close() error { if err := ep.Stop(); err != nil { return err } + if ep.proxy != nil { + ep.cfg.lg.Info("closing proxy...", zap.String("name", ep.cfg.Name)) + err := ep.proxy.Close() + if err != nil { + return err + } + ep.proxy = nil + } + if !ep.cfg.KeepDataDir { ep.cfg.lg.Info("removing directory", zap.String("data-dir", ep.cfg.DataDirPath)) return os.RemoveAll(ep.cfg.DataDirPath) @@ -243,3 +265,7 @@ func AssertProcessLogs(t *testing.T, ep EtcdProcess, expectLog string) { t.Fatal(err) } } + +func (ep *EtcdServerProcess) PeerProxy() proxy.Server { + return ep.proxy +} diff --git a/tests/linearizability/failpoints.go b/tests/linearizability/failpoints.go index e40e3df606e..ff30906704f 100644 --- a/tests/linearizability/failpoints.go +++ b/tests/linearizability/failpoints.go @@ -56,6 +56,8 @@ var ( CompactBeforeCommitBatchPanic Failpoint = goPanicFailpoint{"compactBeforeCommitBatch", triggerCompact, AnyMember} CompactAfterCommitBatchPanic Failpoint = goPanicFailpoint{"compactAfterCommitBatch", triggerCompact, AnyMember} RaftBeforeLeaderSendPanic Failpoint = goPanicFailpoint{"raftBeforeLeaderSend", nil, Leader} + BlackholePeerNetwork Failpoint = blackholePeerNetworkFailpoint{duration: time.Second} + DelayPeerNetwork Failpoint = delayPeerNetworkFailpoint{duration: time.Second, baseLatency: 75 * time.Millisecond, randomizedLatency: 50 * time.Millisecond} RandomFailpoint Failpoint = randomFailpoint{[]Failpoint{ KillFailpoint, BeforeCommitPanic, AfterCommitPanic, RaftBeforeSavePanic, RaftAfterSavePanic, DefragBeforeCopyPanic, DefragBeforeRenamePanic, @@ -66,6 +68,8 @@ var ( CompactBeforeSetFinishedCompactPanic, CompactAfterSetFinishedCompactPanic, CompactBeforeCommitBatchPanic, CompactAfterCommitBatchPanic, RaftBeforeLeaderSendPanic, + BlackholePeerNetwork, + DelayPeerNetwork, }} // TODO: Figure out how to reliably trigger below failpoints and add them to RandomFailpoint raftBeforeApplySnapPanic Failpoint = goPanicFailpoint{"raftBeforeApplySnap", nil, AnyMember} @@ -247,3 +251,49 @@ func (f randomFailpoint) Trigger(t *testing.T, ctx context.Context, clus *e2e.Et func (f randomFailpoint) Name() string { return "Random" } + +type blackholePeerNetworkFailpoint struct { + duration time.Duration +} + +func (f blackholePeerNetworkFailpoint) Trigger(t *testing.T, ctx context.Context, clus *e2e.EtcdProcessCluster) error { + member := clus.Procs[rand.Int()%len(clus.Procs)] + proxy := member.PeerProxy() + + proxy.BlackholeTx() + proxy.BlackholeRx() + t.Logf("Blackholing traffic from and to %s", member.Config().Name) + time.Sleep(f.duration) + t.Logf("Traffic restored for %s", member.Config().Name) + proxy.UnblackholeTx() + proxy.UnblackholeRx() + return nil +} + +func (f blackholePeerNetworkFailpoint) Name() string { + return "blackhole" +} + +type delayPeerNetworkFailpoint struct { + duration time.Duration + baseLatency time.Duration + randomizedLatency time.Duration +} + +func (f delayPeerNetworkFailpoint) Trigger(t *testing.T, ctx context.Context, clus *e2e.EtcdProcessCluster) error { + member := clus.Procs[rand.Int()%len(clus.Procs)] + proxy := member.PeerProxy() + + proxy.DelayRx(f.baseLatency, f.randomizedLatency) + proxy.DelayTx(f.baseLatency, f.randomizedLatency) + t.Logf("Delaying traffic from and to %s by %v +/- %v", member.Config().Name, f.baseLatency, f.randomizedLatency) + time.Sleep(f.duration) + t.Logf("Traffic delay removed for %s", member.Config().Name) + proxy.UndelayRx() + proxy.UndelayTx() + return nil +} + +func (f delayPeerNetworkFailpoint) Name() string { + return "delay" +} diff --git a/tests/linearizability/linearizability_test.go b/tests/linearizability/linearizability_test.go index 3a5e1a813d1..405ae21a1ad 100644 --- a/tests/linearizability/linearizability_test.go +++ b/tests/linearizability/linearizability_test.go @@ -51,6 +51,7 @@ func TestLinearizability(t *testing.T) { failpoint: RandomFailpoint, config: *e2e.NewConfig( e2e.WithClusterSize(1), + e2e.WithPeerProxy(true), e2e.WithGoFailEnabled(true), e2e.WithCompactionBatchLimit(100), // required for compactBeforeCommitBatch and compactAfterCommitBatch failpoints ), @@ -59,6 +60,7 @@ func TestLinearizability(t *testing.T) { name: "ClusterOfSize3", failpoint: RandomFailpoint, config: *e2e.NewConfig( + e2e.WithPeerProxy(true), e2e.WithGoFailEnabled(true), e2e.WithCompactionBatchLimit(100), // required for compactBeforeCommitBatch and compactAfterCommitBatch failpoints ),