Skip to content

Commit

Permalink
Merge pull request #15048 from serathius/linearizability-network
Browse files Browse the repository at this point in the history
tests: Implement network delay and blackholing in linearizability tests
  • Loading branch information
serathius authored Jan 9, 2023
2 parents 108cd9a + 064fad5 commit acf3782
Show file tree
Hide file tree
Showing 8 changed files with 136 additions and 32 deletions.
2 changes: 1 addition & 1 deletion tests/e2e/ctl_v3_snapshot_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ func testIssue6361(t *testing.T) {

newDataDir := filepath.Join(t.TempDir(), "test.data")
t.Log("etcdctl restoring the snapshot...")
err = e2e.SpawnWithExpect([]string{e2e.BinPath.Etcdutl, "snapshot", "restore", fpath, "--name", epc.Procs[0].Config().Name, "--initial-cluster", epc.Procs[0].Config().InitialCluster, "--initial-cluster-token", epc.Procs[0].Config().InitialToken, "--initial-advertise-peer-urls", epc.Procs[0].Config().Purl.String(), "--data-dir", newDataDir}, "added member")
err = e2e.SpawnWithExpect([]string{e2e.BinPath.Etcdutl, "snapshot", "restore", fpath, "--name", epc.Procs[0].Config().Name, "--initial-cluster", epc.Procs[0].Config().InitialCluster, "--initial-cluster-token", epc.Procs[0].Config().InitialToken, "--initial-advertise-peer-urls", epc.Procs[0].Config().PeerURL.String(), "--data-dir", newDataDir}, "added member")
if err != nil {
t.Fatal(err)
}
Expand Down
6 changes: 3 additions & 3 deletions tests/e2e/etcd_grpcproxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func TestGrpcProxyAutoSync(t *testing.T) {
}()

var (
node1ClientURL = epc.Procs[0].Config().Acurl
node1ClientURL = epc.Procs[0].Config().ClientURL
proxyClientURL = "127.0.0.1:32379"
)

Expand All @@ -68,11 +68,11 @@ func TestGrpcProxyAutoSync(t *testing.T) {
require.NoError(t, err)

// Wait for auto sync of endpoints
err = waitForEndpointInLog(ctx, proxyProc, epc.Procs[1].Config().Acurl)
err = waitForEndpointInLog(ctx, proxyProc, epc.Procs[1].Config().ClientURL)
require.NoError(t, err)

err = epc.CloseProc(ctx, func(proc e2e.EtcdProcess) bool {
return proc.Config().Acurl == node1ClientURL
return proc.Config().ClientURL == node1ClientURL
})
require.NoError(t, err)

Expand Down
49 changes: 35 additions & 14 deletions tests/framework/e2e/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"go.uber.org/zap/zaptest"

"go.etcd.io/etcd/api/v3/etcdserverpb"
"go.etcd.io/etcd/pkg/v3/proxy"
"go.etcd.io/etcd/server/v3/etcdserver"
"go.etcd.io/etcd/tests/v3/framework/config"
)
Expand Down Expand Up @@ -183,6 +184,7 @@ type EtcdProcessClusterConfig struct {

WarningUnaryRequestDuration time.Duration
ExperimentalWarningUnaryRequestDuration time.Duration
PeerProxy bool
}

func DefaultConfig() *EtcdProcessClusterConfig {
Expand Down Expand Up @@ -334,6 +336,10 @@ func WithCompactionBatchLimit(limit int) EPClusterOption {
return func(c *EtcdProcessClusterConfig) { c.CompactionBatchLimit = limit }
}

func WithPeerProxy(enabled bool) EPClusterOption {
return func(c *EtcdProcessClusterConfig) { c.PeerProxy = enabled }
}

// NewEtcdProcessCluster launches a new cluster from etcd processes, returning
// a new EtcdProcessCluster once all nodes are ready to accept client requests.
func NewEtcdProcessCluster(ctx context.Context, t testing.TB, opts ...EPClusterOption) (*EtcdProcessCluster, error) {
Expand Down Expand Up @@ -421,7 +427,7 @@ func (cfg *EtcdProcessClusterConfig) EtcdAllServerProcessConfigs(tb testing.TB)

for i := 0; i < cfg.ClusterSize; i++ {
etcdCfgs[i] = cfg.EtcdServerProcessConfig(tb, i)
initialCluster[i] = fmt.Sprintf("%s=%s", etcdCfgs[i].Name, etcdCfgs[i].Purl.String())
initialCluster[i] = fmt.Sprintf("%s=%s", etcdCfgs[i].Name, etcdCfgs[i].PeerURL.String())
}

for i := range etcdCfgs {
Expand All @@ -448,8 +454,12 @@ func (cfg *EtcdProcessClusterConfig) EtcdServerProcessConfig(tb testing.TB, i in
var curls []string
var curl, curltls string
port := cfg.BasePort + 5*i
curlHost := fmt.Sprintf("localhost:%d", port)
clientPort := port
peerPort := port + 1
metricsPort := port + 2
peer2Port := port + 3

curlHost := fmt.Sprintf("localhost:%d", clientPort)
switch cfg.Client.ConnectionType {
case ClientNonTLS, ClientTLS:
curl = (&url.URL{Scheme: cfg.ClientScheme(), Host: curlHost}).String()
Expand All @@ -460,7 +470,17 @@ func (cfg *EtcdProcessClusterConfig) EtcdServerProcessConfig(tb testing.TB, i in
curls = []string{curl, curltls}
}

purl := url.URL{Scheme: cfg.PeerScheme(), Host: fmt.Sprintf("localhost:%d", port+1)}
peerListenUrl := url.URL{Scheme: cfg.PeerScheme(), Host: fmt.Sprintf("localhost:%d", peerPort)}
peerAdvertiseUrl := url.URL{Scheme: cfg.PeerScheme(), Host: fmt.Sprintf("localhost:%d", peerPort)}
var proxyCfg *proxy.ServerConfig
if cfg.PeerProxy {
peerAdvertiseUrl.Host = fmt.Sprintf("localhost:%d", peer2Port)
proxyCfg = &proxy.ServerConfig{
Logger: zap.NewNop(),
To: peerListenUrl,
From: peerAdvertiseUrl,
}
}

name := fmt.Sprintf("%s-test-%d", testNameCleanRegex.ReplaceAllString(tb.Name(), ""), i)

Expand All @@ -480,8 +500,8 @@ func (cfg *EtcdProcessClusterConfig) EtcdServerProcessConfig(tb testing.TB, i in
"--name", name,
"--listen-client-urls", strings.Join(curls, ","),
"--advertise-client-urls", strings.Join(curls, ","),
"--listen-peer-urls", purl.String(),
"--initial-advertise-peer-urls", purl.String(),
"--listen-peer-urls", peerListenUrl.String(),
"--initial-advertise-peer-urls", peerAdvertiseUrl.String(),
"--initial-cluster-token", cfg.InitialToken,
"--data-dir", dataDirPath,
"--snapshot-count", fmt.Sprintf("%d", cfg.SnapshotCount),
Expand All @@ -508,7 +528,7 @@ func (cfg *EtcdProcessClusterConfig) EtcdServerProcessConfig(tb testing.TB, i in
if cfg.MetricsURLScheme != "" {
murl = (&url.URL{
Scheme: cfg.MetricsURLScheme,
Host: fmt.Sprintf("localhost:%d", port+2),
Host: fmt.Sprintf("localhost:%d", metricsPort),
}).String()
args = append(args, "--listen-metrics-urls", murl)
}
Expand Down Expand Up @@ -598,11 +618,12 @@ func (cfg *EtcdProcessClusterConfig) EtcdServerProcessConfig(tb testing.TB, i in
DataDirPath: dataDirPath,
KeepDataDir: cfg.KeepDataDir,
Name: name,
Purl: purl,
Acurl: curl,
Murl: murl,
PeerURL: peerAdvertiseUrl,
ClientURL: curl,
MetricsURL: murl,
InitialToken: cfg.InitialToken,
GoFailPort: gofailPort,
Proxy: proxyCfg,
}
}

Expand Down Expand Up @@ -695,7 +716,7 @@ func (epc *EtcdProcessCluster) CloseProc(ctx context.Context, finder func(EtcdPr
return fmt.Errorf("failed to get member list: %w", err)
}

memberID, err := findMemberIDByEndpoint(memberList.Members, proc.Config().Acurl)
memberID, err := findMemberIDByEndpoint(memberList.Members, proc.Config().ClientURL)
if err != nil {
return fmt.Errorf("failed to find member ID: %w", err)
}
Expand All @@ -715,7 +736,7 @@ func (epc *EtcdProcessCluster) CloseProc(ctx context.Context, finder func(EtcdPr
return errors.New("failed to remove member after 10 tries")
}

epc.lg.Info("successfully removed member", zap.String("acurl", proc.Config().Acurl))
epc.lg.Info("successfully removed member", zap.String("acurl", proc.Config().ClientURL))

// Then stop process
return proc.Close()
Expand All @@ -732,17 +753,17 @@ func (epc *EtcdProcessCluster) StartNewProc(ctx context.Context, cfg *EtcdProces
epc.nextSeq++

initialCluster := []string{
fmt.Sprintf("%s=%s", serverCfg.Name, serverCfg.Purl.String()),
fmt.Sprintf("%s=%s", serverCfg.Name, serverCfg.PeerURL.String()),
}
for _, p := range epc.Procs {
initialCluster = append(initialCluster, fmt.Sprintf("%s=%s", p.Config().Name, p.Config().Purl.String()))
initialCluster = append(initialCluster, fmt.Sprintf("%s=%s", p.Config().Name, p.Config().PeerURL.String()))
}

epc.Cfg.SetInitialOrDiscovery(serverCfg, initialCluster, "existing")

// First add new member to cluster
memberCtl := epc.Client(opts...)
_, err := memberCtl.MemberAdd(ctx, serverCfg.Name, []string{serverCfg.Purl.String()})
_, err := memberCtl.MemberAdd(ctx, serverCfg.Name, []string{serverCfg.PeerURL.String()})
if err != nil {
return fmt.Errorf("failed to add new member: %w", err)
}
Expand Down
13 changes: 9 additions & 4 deletions tests/framework/e2e/cluster_proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"go.uber.org/zap"

"go.etcd.io/etcd/pkg/v3/expect"
"go.etcd.io/etcd/pkg/v3/proxy"
"go.etcd.io/etcd/tests/v3/framework/config"
)

Expand Down Expand Up @@ -123,6 +124,10 @@ func (p *proxyEtcdProcess) Wait(ctx context.Context) error {
return p.etcdProc.Wait(ctx)
}

func (p *proxyEtcdProcess) PeerProxy() proxy.Server {
return nil
}

type proxyProc struct {
lg *zap.Logger
name string
Expand Down Expand Up @@ -187,7 +192,7 @@ type proxyV2Proc struct {
}

func proxyListenURL(cfg *EtcdServerProcessConfig, portOffset int) string {
u, err := url.Parse(cfg.Acurl)
u, err := url.Parse(cfg.ClientURL)
if err != nil {
panic(err)
}
Expand All @@ -205,7 +210,7 @@ func newProxyV2Proc(cfg *EtcdServerProcessConfig) *proxyV2Proc {
"--name", name,
"--proxy", "on",
"--listen-client-urls", listenAddr,
"--initial-cluster", cfg.Name + "=" + cfg.Purl.String(),
"--initial-cluster", cfg.Name + "=" + cfg.PeerURL.String(),
"--data-dir", dataDir,
}
return &proxyV2Proc{
Expand All @@ -231,13 +236,13 @@ func newProxyV3Proc(cfg *EtcdServerProcessConfig) *proxyV3Proc {
"grpc-proxy",
"start",
"--listen-addr", strings.Split(listenAddr, "/")[2],
"--endpoints", cfg.Acurl,
"--endpoints", cfg.ClientURL,
// pass-through member RPCs
"--advertise-client-url", "",
"--data-dir", cfg.DataDirPath,
}
murl := ""
if cfg.Murl != "" {
if cfg.MetricsURL != "" {
murl = proxyListenURL(cfg, 4)
args = append(args, "--metrics-addr", murl)
}
Expand Down
4 changes: 2 additions & 2 deletions tests/framework/e2e/curl.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,15 +54,15 @@ func (r CURLReq) timeoutDuration() time.Duration {
func CURLPrefixArgs(cfg *EtcdProcessClusterConfig, member EtcdProcess, method string, req CURLReq) []string {
var (
cmdArgs = []string{"curl"}
acurl = member.Config().Acurl
acurl = member.Config().ClientURL
)
if req.MetricsURLScheme != "https" {
if req.IsTLS {
if cfg.Client.ConnectionType != ClientTLSAndNonTLS {
panic("should not use cURLPrefixArgsUseTLS when serving only TLS or non-TLS")
}
cmdArgs = append(cmdArgs, "--cacert", CaPath, "--cert", CertPath, "--key", PrivateKeyPath)
acurl = ToTLS(member.Config().Acurl)
acurl = ToTLS(member.Config().ClientURL)
} else if cfg.Client.ConnectionType == ClientTLS {
if cfg.CN {
cmdArgs = append(cmdArgs, "--cacert", CaPath, "--cert", CertPath, "--key", PrivateKeyPath)
Expand Down
42 changes: 34 additions & 8 deletions tests/framework/e2e/etcd_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (

"go.etcd.io/etcd/client/pkg/v3/fileutil"
"go.etcd.io/etcd/pkg/v3/expect"
"go.etcd.io/etcd/pkg/v3/proxy"
"go.etcd.io/etcd/tests/v3/framework/config"
)

Expand All @@ -49,6 +50,7 @@ type EtcdProcess interface {
Stop() error
Close() error
Config() *EtcdServerProcessConfig
PeerProxy() proxy.Server
Logs() LogsExpect
Kill() error
}
Expand All @@ -62,6 +64,7 @@ type LogsExpect interface {
type EtcdServerProcess struct {
cfg *EtcdServerProcessConfig
proc *expect.ExpectProcess
proxy proxy.Server
donec chan struct{} // closed when Interact() terminates
}

Expand All @@ -78,14 +81,15 @@ type EtcdServerProcessConfig struct {

Name string

Purl url.URL

Acurl string
Murl string
PeerURL url.URL
ClientURL string
MetricsURL string

InitialToken string
InitialCluster string
GoFailPort int

Proxy *proxy.ServerConfig
}

func NewEtcdServerProcess(cfg *EtcdServerProcessConfig) (*EtcdServerProcess, error) {
Expand All @@ -100,9 +104,9 @@ func NewEtcdServerProcess(cfg *EtcdServerProcessConfig) (*EtcdServerProcess, err
return &EtcdServerProcess{cfg: cfg, donec: make(chan struct{})}, nil
}

func (ep *EtcdServerProcess) EndpointsV2() []string { return []string{ep.cfg.Acurl} }
func (ep *EtcdServerProcess) EndpointsV2() []string { return []string{ep.cfg.ClientURL} }
func (ep *EtcdServerProcess) EndpointsV3() []string { return ep.EndpointsV2() }
func (ep *EtcdServerProcess) EndpointsMetrics() []string { return []string{ep.cfg.Murl} }
func (ep *EtcdServerProcess) EndpointsMetrics() []string { return []string{ep.cfg.MetricsURL} }

func (epc *EtcdServerProcess) Client(opts ...config.ClientOption) *EtcdctlV3 {
etcdctl, err := NewEtcdctl(epc.Config().Client, epc.EndpointsV3(), opts...)
Expand All @@ -117,6 +121,15 @@ func (ep *EtcdServerProcess) Start(ctx context.Context) error {
if ep.proc != nil {
panic("already started")
}
if ep.cfg.Proxy != nil && ep.proxy == nil {
ep.cfg.lg.Info("starting proxy...", zap.String("name", ep.cfg.Name), zap.String("from", ep.cfg.Proxy.From.String()), zap.String("to", ep.cfg.Proxy.To.String()))
ep.proxy = proxy.NewServer(*ep.cfg.Proxy)
select {
case <-ep.proxy.Ready():
case err := <-ep.proxy.Error():
return err
}
}
ep.cfg.lg.Info("starting server...", zap.String("name", ep.cfg.Name))
proc, err := SpawnCmdWithLogger(ep.cfg.lg, append([]string{ep.cfg.ExecPath}, ep.cfg.Args...), ep.cfg.EnvVars, ep.cfg.Name)
if err != nil {
Expand Down Expand Up @@ -161,8 +174,8 @@ func (ep *EtcdServerProcess) Stop() (err error) {
}
<-ep.donec
ep.donec = make(chan struct{})
if ep.cfg.Purl.Scheme == "unix" || ep.cfg.Purl.Scheme == "unixs" {
err = os.Remove(ep.cfg.Purl.Host + ep.cfg.Purl.Path)
if ep.cfg.PeerURL.Scheme == "unix" || ep.cfg.PeerURL.Scheme == "unixs" {
err = os.Remove(ep.cfg.PeerURL.Host + ep.cfg.PeerURL.Path)
if err != nil && !os.IsNotExist(err) {
return err
}
Expand All @@ -176,6 +189,15 @@ func (ep *EtcdServerProcess) Close() error {
if err := ep.Stop(); err != nil {
return err
}
if ep.proxy != nil {
ep.cfg.lg.Info("closing proxy...", zap.String("name", ep.cfg.Name))
err := ep.proxy.Close()
if err != nil {
return err
}
ep.proxy = nil
}

if !ep.cfg.KeepDataDir {
ep.cfg.lg.Info("removing directory", zap.String("data-dir", ep.cfg.DataDirPath))
return os.RemoveAll(ep.cfg.DataDirPath)
Expand Down Expand Up @@ -243,3 +265,7 @@ func AssertProcessLogs(t *testing.T, ep EtcdProcess, expectLog string) {
t.Fatal(err)
}
}

func (ep *EtcdServerProcess) PeerProxy() proxy.Server {
return ep.proxy
}
Loading

0 comments on commit acf3782

Please sign in to comment.