From db91fbbd6bbb75851d56f362b9210f064ddd8801 Mon Sep 17 00:00:00 2001 From: richardjcai Date: Fri, 17 Jun 2022 12:18:55 -0400 Subject: [PATCH 1/3] Support testserver with multiple nodes. Right now this is only supported in insecure mode. Start a three node cluster using the ThreeNode TestsServerOpt. --- testserver/tenant.go | 10 +- testserver/testserver.go | 238 ++++++++++++++++++++++------------ testserver/testserver_test.go | 11 ++ 3 files changed, 171 insertions(+), 88 deletions(-) diff --git a/testserver/tenant.go b/testserver/tenant.go index d0ce9f6..8bcbcc4 100644 --- a/testserver/tenant.go +++ b/testserver/tenant.go @@ -18,7 +18,6 @@ import ( "database/sql" "errors" "fmt" - "github.com/cockroachdb/cockroach-go/v2/testserver/version" "log" "net" "net/url" @@ -26,6 +25,8 @@ import ( "path/filepath" "strconv" "strings" + + "github.com/cockroachdb/cockroach-go/v2/testserver/version" ) func (ts *testServerImpl) isTenant() bool { @@ -53,7 +54,7 @@ func (ts *testServerImpl) NewTenantServer(proxy bool) (TestServer, error) { if proxy && !ts.serverArgs.secure { return nil, fmt.Errorf("%s: proxy cannot be used with insecure mode", tenantserverMessagePrefix) } - cockroachBinary := ts.cmdArgs[0] + cockroachBinary := ts.serverArgs.cockroachBinary tenantID, err := func() (int, error) { ts.mu.Lock() defer ts.mu.Unlock() @@ -191,7 +192,7 @@ func (ts *testServerImpl) NewTenantServer(proxy bool) (TestServer, error) { return nil, err } - args := []string{ + args := [][]string{{ cockroachBinary, "mt", "start-sql", @@ -201,7 +202,7 @@ func (ts *testServerImpl) NewTenantServer(proxy bool) (TestServer, error) { "--kv-addrs=" + pgURL.Host, "--sql-addr=" + sqlAddr, "--http-addr=:0", - } + }} tenant := &testServerImpl{ serverArgs: ts.serverArgs, @@ -209,6 +210,7 @@ func (ts *testServerImpl) NewTenantServer(proxy bool) (TestServer, error) { state: stateNew, baseDir: ts.baseDir, cmdArgs: args, + cmd: make([]*exec.Cmd, ts.serverArgs.numNodes), stdout: filepath.Join(ts.baseDir, logsDirName, fmt.Sprintf("cockroach.tenant.%d.stdout", tenantID)), stderr: filepath.Join(ts.baseDir, logsDirName, fmt.Sprintf("cockroach.tenant.%d.stderr", tenantID)), // TODO(asubiotto): Specify listeningURLFile once we support dynamic diff --git a/testserver/testserver.go b/testserver/testserver.go index 487c540..cf7c206 100644 --- a/testserver/testserver.go +++ b/testserver/testserver.go @@ -118,8 +118,10 @@ type testServerImpl struct { // no password otherwise). orig url.URL } - cmd *exec.Cmd - cmdArgs []string + cmd []*exec.Cmd + cmdArgs [][]string + initCmd *exec.Cmd + initCmdArgs []string stdout string stderr string stdoutBuf logWriter @@ -197,6 +199,7 @@ type testServerArgs struct { testConfig TestConfig nonStableDB bool cockroachBinary string // path to cockroach executable file + numNodes int } // CockroachBinaryPathOpt is a TestServer option that can be passed to @@ -273,6 +276,12 @@ func StopDownloadInMiddleOpt() TestServerOpt { } } +func ThreeNode() TestServerOpt { + return func(args *testServerArgs) { + args.numNodes = 3 + } +} + const ( logsDirName = "logs" certsDirName = "certs" @@ -287,7 +296,7 @@ var errStoppedInMiddle = errors.New("download stopped in middle") // If the download fails, we attempt just call "cockroach", hoping it is // found in your path. func NewTestServer(opts ...TestServerOpt) (TestServer, error) { - serverArgs := &testServerArgs{} + serverArgs := &testServerArgs{numNodes: 1} serverArgs.storeMemSize = defaultStoreMemSize for _, applyOptToArgs := range opts { applyOptToArgs(serverArgs) @@ -396,8 +405,9 @@ func NewTestServer(opts ...TestServerOpt) (TestServer, error) { if err != nil { return nil, fmt.Errorf("%s failed to parse version: %w", testserverMessagePrefix, err) } + startCmd := "start-single-node" - if !v.AtLeast(version.MustParse("v19.2.0-alpha")) { + if !v.AtLeast(version.MustParse("v19.2.0-alpha")) || serverArgs.numNodes > 1 { startCmd = "start" } @@ -408,16 +418,40 @@ func NewTestServer(opts ...TestServerOpt) (TestServer, error) { storeArg = fmt.Sprintf("--store=type=mem,size=%.2f", serverArgs.storeMemSize) } - args := []string{ - serverArgs.cockroachBinary, - startCmd, - "--logtostderr", - secureOpt, - "--host=localhost", - "--port=0", - "--http-port=" + strconv.Itoa(serverArgs.httpPort), - storeArg, - "--listening-url-file=" + listeningURLFile, + args := make([][]string, serverArgs.numNodes) + var initArgs []string + if serverArgs.numNodes <= 1 { + args[0] = []string{ + serverArgs.cockroachBinary, + startCmd, + "--logtostderr", + secureOpt, + "--host=localhost", + "--port=0", + "--http-port=" + strconv.Itoa(serverArgs.httpPort), + storeArg, + "--listening-url-file=" + listeningURLFile, + } + } else { + for i := 0; i < serverArgs.numNodes; i++ { + args[i] = []string{ + serverArgs.cockroachBinary, + startCmd, + secureOpt, + storeArg + strconv.Itoa(i), + fmt.Sprintf("--listen-addr=localhost:%d", 26257+i), + fmt.Sprintf("--http-addr=localhost:%d", 8080+i), + fmt.Sprintf("--join=localhost:26257,localhost:26258,localhost:26259"), + "--listening-url-file=" + listeningURLFile, + } + + } + initArgs = []string{ + serverArgs.cockroachBinary, + "init", + secureOpt, + "--host=localhost:26257", + } } ts := &testServerImpl{ @@ -426,6 +460,8 @@ func NewTestServer(opts ...TestServerOpt) (TestServer, error) { state: stateNew, baseDir: baseDir, cmdArgs: args, + cmd: make([]*exec.Cmd, serverArgs.numNodes), + initCmdArgs: initArgs, stdout: filepath.Join(logDir, "cockroach.stdout"), stderr: filepath.Join(logDir, "cockroach.stderr"), listeningURLFile: listeningURLFile, @@ -565,88 +601,99 @@ func (ts *testServerImpl) Start() error { ts.state = stateRunning ts.mu.Unlock() - ts.cmd = exec.Command(ts.cmdArgs[0], ts.cmdArgs[1:]...) - ts.cmd.Env = []string{ - "COCKROACH_MAX_OFFSET=1ns", - "COCKROACH_TRUST_CLIENT_PROVIDED_SQL_REMOTE_ADDR=true", - } - - // Set the working directory of the cockroach process to our temp folder. - // This stops cockroach from polluting the project directory with _dump - // folders. - ts.cmd.Dir = ts.baseDir - - if len(ts.stdout) > 0 { - wr, err := newFileLogWriter(ts.stdout) - if err != nil { - return fmt.Errorf("unable to open file %s: %w", ts.stdout, err) - } - ts.stdoutBuf = wr - } - ts.cmd.Stdout = ts.stdoutBuf + for i := 0; i < ts.serverArgs.numNodes; i++ { + ts.cmd[i] = exec.Command(ts.cmdArgs[i][0], ts.cmdArgs[i][1:]...) - if len(ts.stderr) > 0 { - wr, err := newFileLogWriter(ts.stderr) - if err != nil { - return fmt.Errorf("unable to open file %s: %w", ts.stderr, err) + currCmd := ts.cmd[i] + currCmd.Env = []string{ + "COCKROACH_MAX_OFFSET=1ns", + "COCKROACH_TRUST_CLIENT_PROVIDED_SQL_REMOTE_ADDR=true", } - ts.stderrBuf = wr - } - ts.cmd.Stderr = ts.stderrBuf - for k, v := range defaultEnv() { - ts.cmd.Env = append(ts.cmd.Env, k+"="+v) - } + // Set the working directory of the cockroach process to our temp folder. + // This stops cockroach from polluting the project directory with _dump + // folders. + currCmd.Dir = ts.baseDir - log.Printf("executing: %s", ts.cmd) - err := ts.cmd.Start() - if ts.cmd.Process != nil { - log.Printf("process %d started: %s", ts.cmd.Process.Pid, strings.Join(ts.cmdArgs, " ")) - } - if err != nil { - log.Print(err.Error()) - if err := ts.stdoutBuf.Close(); err != nil { - log.Printf("%s: failed to close stdout: %v", testserverMessagePrefix, err) + if len(ts.stdout) > 0 { + wr, err := newFileLogWriter(ts.stdout) + if err != nil { + return fmt.Errorf("unable to open file %s: %w", ts.stdout, err) + } + ts.stdoutBuf = wr } - if err := ts.stderrBuf.Close(); err != nil { - log.Printf("%s: failed to close stderr: %v", testserverMessagePrefix, err) + currCmd.Stdout = ts.stdoutBuf + + if len(ts.stderr) > 0 { + wr, err := newFileLogWriter(ts.stderr) + if err != nil { + return fmt.Errorf("unable to open file %s: %w", ts.stderr, err) + } + ts.stderrBuf = wr } + currCmd.Stderr = ts.stderrBuf - ts.mu.Lock() - ts.state = stateFailed - ts.mu.Unlock() + for k, v := range defaultEnv() { + currCmd.Env = append(currCmd.Env, k+"="+v) + } - return fmt.Errorf("command %s failed: %w", ts.cmd, err) - } + log.Printf("executing: %s", currCmd) + err := currCmd.Start() + if currCmd.Process != nil { + log.Printf("process %d started: %s", currCmd.Process.Pid, strings.Join(ts.cmdArgs[i], " ")) + } + if err != nil { + log.Print(err.Error()) + if err := ts.stdoutBuf.Close(); err != nil { + log.Printf("%s: failed to close stdout: %v", testserverMessagePrefix, err) + } + if err := ts.stderrBuf.Close(); err != nil { + log.Printf("%s: failed to close stderr: %v", testserverMessagePrefix, err) + } - go func() { - err := ts.cmd.Wait() + ts.mu.Lock() + ts.state = stateFailed + ts.mu.Unlock() - if closeErr := ts.stdoutBuf.Close(); closeErr != nil { - log.Printf("%s: failed to close stdout: %v", testserverMessagePrefix, closeErr) - } - if closeErr := ts.stderrBuf.Close(); closeErr != nil { - log.Printf("%s: failed to close stderr: %v", testserverMessagePrefix, closeErr) + return fmt.Errorf("command %s failed: %w", currCmd, err) } - ps := ts.cmd.ProcessState - sy := ps.Sys().(syscall.WaitStatus) + go func() { + err := currCmd.Wait() - log.Printf("%s: command %s exited with status %d: %v", - testserverMessagePrefix, - ts.cmd, - sy.ExitStatus(), - err) - log.Printf("%s process state: %s", testserverMessagePrefix, ps.String()) + if closeErr := ts.stdoutBuf.Close(); closeErr != nil { + log.Printf("%s: failed to close stdout: %v", testserverMessagePrefix, closeErr) + } + if closeErr := ts.stderrBuf.Close(); closeErr != nil { + log.Printf("%s: failed to close stderr: %v", testserverMessagePrefix, closeErr) + } - ts.mu.Lock() - if sy.ExitStatus() == 0 { - ts.state = stateStopped - } else { - ts.state = stateFailed + ps := currCmd.ProcessState + sy := ps.Sys().(syscall.WaitStatus) + + log.Printf("%s: command %s exited with status %d: %v", + testserverMessagePrefix, + currCmd, + sy.ExitStatus(), + err) + log.Printf("%s process state: %s", testserverMessagePrefix, ps.String()) + + ts.mu.Lock() + if sy.ExitStatus() == 0 { + ts.state = stateStopped + } else { + ts.state = stateFailed + } + ts.mu.Unlock() + }() + } + + if ts.serverArgs.numNodes > 1 { + err := ts.CockroachInit() + if err != nil { + return err } - ts.mu.Unlock() - }() + } if ts.pgURL.u == nil { go func() { @@ -680,9 +727,10 @@ func (ts *testServerImpl) Stop() { } if ts.state != stateStopped { - // Only call kill if not running. It could have exited properly. - _ = ts.cmd.Process.Kill() - + for _, cmd := range ts.cmd { + // Only call kill if not running. It could have exited properly. + _ = cmd.Process.Kill() + } if p := ts.proxyProcess; p != nil { _ = p.Kill() } @@ -692,6 +740,28 @@ func (ts *testServerImpl) Stop() { _ = os.RemoveAll(ts.baseDir) } +func (ts *testServerImpl) CockroachInit() error { + ts.initCmd = exec.Command(ts.initCmdArgs[0], ts.initCmdArgs[1:]...) + ts.initCmd.Env = []string{ + "COCKROACH_MAX_OFFSET=1ns", + "COCKROACH_TRUST_CLIENT_PROVIDED_SQL_REMOTE_ADDR=true", + } + + // Set the working directory of the cockroach process to our temp folder. + // This stops cockroach from polluting the project directory with _dump + // folders. + ts.initCmd.Dir = ts.baseDir + + err := ts.initCmd.Start() + if ts.initCmd.Process != nil { + log.Printf("process %d started: %s", ts.initCmd.Process.Pid, strings.Join(ts.initCmdArgs, " ")) + } + if err != nil { + return err + } + return nil +} + type logWriter interface { Write(p []byte) (n int, err error) String() string diff --git a/testserver/testserver_test.go b/testserver/testserver_test.go index 5f0a212..2383eb8 100644 --- a/testserver/testserver_test.go +++ b/testserver/testserver_test.go @@ -25,6 +25,7 @@ import ( "time" "github.com/cockroachdb/cockroach-go/v2/testserver" + "github.com/stretchr/testify/require" ) const noPW = "" @@ -177,10 +178,20 @@ func TestRunServer(t *testing.T) { ) }, }, + { + name: "Insecure 3 Node", + instantiation: func(t *testing.T) (*sql.DB, func()) { + return testserver.NewDBForTest(t, testserver.ThreeNode()) + }, + }, } { t.Run(tc.name, func(t *testing.T) { db, stop := tc.instantiation(t) defer stop() + var out int + row := db.QueryRow("SELECT 1") + row.Scan(&out) + require.Equal(t, out, 1) if _, err := db.Exec("SELECT 1"); err != nil { t.Fatal(err) } From 97a8658ea72fce74019ef6cd865fe084a0e1ec9d Mon Sep 17 00:00:00 2001 From: richardjcai Date: Fri, 17 Jun 2022 14:13:41 -0400 Subject: [PATCH 2/3] Support Start/Stop single node --- testserver/tenant.go | 12 +- testserver/testserver.go | 255 +++++++++++++++------------------- testserver/testserver_test.go | 168 +++++++++++++++++++++- testserver/testservernode.go | 121 ++++++++++++++++ 4 files changed, 401 insertions(+), 155 deletions(-) create mode 100644 testserver/testservernode.go diff --git a/testserver/tenant.go b/testserver/tenant.go index 8bcbcc4..247ec86 100644 --- a/testserver/tenant.go +++ b/testserver/tenant.go @@ -58,7 +58,7 @@ func (ts *testServerImpl) NewTenantServer(proxy bool) (TestServer, error) { tenantID, err := func() (int, error) { ts.mu.Lock() defer ts.mu.Unlock() - if ts.state != stateRunning { + if ts.nodeStates[0] != stateRunning { return 0, errors.New("TestServer must be running before NewTenantServer may be called") } if ts.isTenant() { @@ -148,7 +148,7 @@ func (ts *testServerImpl) NewTenantServer(proxy bool) (TestServer, error) { } proxyAddr, err := func() (string, error) { - <-ts.pgURL.set + <-ts.pgURL[0].set ts.mu.Lock() defer ts.mu.Unlock() @@ -208,6 +208,7 @@ func (ts *testServerImpl) NewTenantServer(proxy bool) (TestServer, error) { serverArgs: ts.serverArgs, version: ts.version, state: stateNew, + nodeStates: []int{stateNew}, baseDir: ts.baseDir, cmdArgs: args, cmd: make([]*exec.Cmd, ts.serverArgs.numNodes), @@ -215,15 +216,16 @@ func (ts *testServerImpl) NewTenantServer(proxy bool) (TestServer, error) { stderr: filepath.Join(ts.baseDir, logsDirName, fmt.Sprintf("cockroach.tenant.%d.stderr", tenantID)), // TODO(asubiotto): Specify listeningURLFile once we support dynamic // ports. - listeningURLFile: "", + listeningURLFile: []string{""}, } // Start the tenant. // Initialize direct connection to the tenant. We need to use `orig` instead of `pgurl` because if the test server // is using a root password, this password does not carry over to the tenant; client certs will, though. - tenantURL := ts.pgURL.orig + tenantURL := ts.pgURL[0].orig tenantURL.Host = sqlAddr - tenant.pgURL.set = make(chan struct{}) + tenant.pgURL = make([]pgURLChan, 1) + tenant.pgURL[0].set = make(chan struct{}) tenant.setPGURL(&tenantURL) if err := tenant.Start(); err != nil { diff --git a/testserver/testserver.go b/testserver/testserver.go index cf7c206..bac6154 100644 --- a/testserver/testserver.go +++ b/testserver/testserver.go @@ -53,7 +53,6 @@ import ( "strconv" "strings" "sync" - "syscall" "testing" "time" @@ -101,23 +100,32 @@ type TestServer interface { WaitForInit() error // BaseDir returns directory StoreOnDiskOpt writes to if used. BaseDir() string + + WaitForNode(numNode int) error + StartNode(i int) error + StopNode(i int) error + UpgradeNode(i int) error + PGURLForNode(nodeNum int) *url.URL +} + +type pgURLChan struct { + set chan struct{} + u *url.URL + // The original URL is preserved here if we are using a custom password. + // In that case, the one below uses client certificates, if secure (and + // no password otherwise). + orig url.URL } // testServerImpl is a TestServer implementation. type testServerImpl struct { - mu sync.RWMutex - version *version.Version - serverArgs testServerArgs - state int - baseDir string - pgURL struct { - set chan struct{} - u *url.URL - // The original URL is preserved here if we are using a custom password. - // In that case, the one below uses client certificates, if secure (and - // no password otherwise). - orig url.URL - } + mu sync.RWMutex + version *version.Version + serverArgs testServerArgs + state int + nodeStates []int + baseDir string + pgURL []pgURLChan cmd []*exec.Cmd cmdArgs [][]string initCmd *exec.Cmd @@ -126,7 +134,7 @@ type testServerImpl struct { stderr string stdoutBuf logWriter stderrBuf logWriter - listeningURLFile string + listeningURLFile []string // curTenantID is used to allocate tenant IDs. Refer to NewTenantServer for // more information. @@ -191,15 +199,16 @@ type TestConfig struct { } type testServerArgs struct { - secure bool - rootPW string // if nonempty, set as pw for root - storeOnDisk bool // to save database in disk - storeMemSize float64 // the proportion of available memory allocated to test server - httpPort int - testConfig TestConfig - nonStableDB bool - cockroachBinary string // path to cockroach executable file - numNodes int + secure bool + rootPW string // if nonempty, set as pw for root + storeOnDisk bool // to save database in disk + storeMemSize float64 // the proportion of available memory allocated to test server + httpPort int + testConfig TestConfig + nonStableDB bool + cockroachBinary string // path to cockroach executable file + upgradeCockroachBinary string // path to cockroach binary for upgrade + numNodes int } // CockroachBinaryPathOpt is a TestServer option that can be passed to @@ -212,6 +221,12 @@ func CockroachBinaryPathOpt(executablePath string) TestServerOpt { } } +func UpgradeCockroachBinaryPathOpt(executablePath string) TestServerOpt { + return func(args *testServerArgs) { + args.upgradeCockroachBinary = executablePath + } +} + // SecureOpt is a TestServer option that can be passed to NewTestServer to // enable secure mode. func SecureOpt() TestServerOpt { @@ -361,7 +376,10 @@ func NewTestServer(opts ...TestServerOpt) (TestServer, error) { return nil, fmt.Errorf("%s: %w", testserverMessagePrefix, err) } - listeningURLFile := filepath.Join(baseDir, "listen-url") + listeningURLFile := make([]string, serverArgs.numNodes) + for i := 0; i < serverArgs.numNodes; i++ { + listeningURLFile[i] = filepath.Join(baseDir, fmt.Sprintf("listen-url%d", i)) + } secureOpt := "--insecure" if serverArgs.secure { @@ -430,7 +448,7 @@ func NewTestServer(opts ...TestServerOpt) (TestServer, error) { "--port=0", "--http-port=" + strconv.Itoa(serverArgs.httpPort), storeArg, - "--listening-url-file=" + listeningURLFile, + "--listening-url-file=" + listeningURLFile[0], } } else { for i := 0; i < serverArgs.numNodes; i++ { @@ -441,23 +459,29 @@ func NewTestServer(opts ...TestServerOpt) (TestServer, error) { storeArg + strconv.Itoa(i), fmt.Sprintf("--listen-addr=localhost:%d", 26257+i), fmt.Sprintf("--http-addr=localhost:%d", 8080+i), - fmt.Sprintf("--join=localhost:26257,localhost:26258,localhost:26259"), - "--listening-url-file=" + listeningURLFile, + "--listening-url-file=" + listeningURLFile[i], + fmt.Sprintf("--join=localhost:%d,localhost:%d,localhost:%d", 26257, 26258, 26259), } - } + initArgs = []string{ serverArgs.cockroachBinary, "init", secureOpt, - "--host=localhost:26257", + "--host=localhost:26259", } } + states := make([]int, serverArgs.numNodes) + for i := 0; i < serverArgs.numNodes; i++ { + states[i] = stateNew + } + ts := &testServerImpl{ serverArgs: *serverArgs, version: v, state: stateNew, + nodeStates: states, baseDir: baseDir, cmdArgs: args, cmd: make([]*exec.Cmd, serverArgs.numNodes), @@ -467,7 +491,7 @@ func NewTestServer(opts ...TestServerOpt) (TestServer, error) { listeningURLFile: listeningURLFile, curTenantID: firstTenantID, } - ts.pgURL.set = make(chan struct{}) + ts.pgURL = make([]pgURLChan, serverArgs.numNodes) if err := ts.Start(); err != nil { return nil, fmt.Errorf("%s Start failed: %w", testserverMessagePrefix, err) @@ -505,45 +529,58 @@ func (ts *testServerImpl) BaseDir() string { // It blocks until the network URL is determined and does not timeout, // relying instead on test timeouts. func (ts *testServerImpl) PGURL() *url.URL { - <-ts.pgURL.set - return ts.pgURL.u + return ts.PGURLForNode(0) } func (ts *testServerImpl) setPGURL(u *url.URL) { - ts.pgURL.u = u - close(ts.pgURL.set) + ts.setPGURLForNode(0, u) } -// WaitForInit retries until a connection is successfully established. -func (ts *testServerImpl) WaitForInit() error { - var err error - db, err := sql.Open("postgres", ts.PGURL().String()) +func (ts *testServerImpl) PGURLForNode(nodeNum int) *url.URL { + <-ts.pgURL[nodeNum].set + return ts.pgURL[nodeNum].u +} + +func (ts *testServerImpl) setPGURLForNode(nodeNum int, u *url.URL) { + ts.pgURL[nodeNum].u = u + close(ts.pgURL[nodeNum].set) +} + +func (ts *testServerImpl) WaitForNode(nodeNum int) error { + db, err := sql.Open("postgres", ts.PGURLForNode(nodeNum).String()) + defer func() { + _ = db.Close() + }() if err != nil { return err } - defer db.Close() for i := 0; i < 50; i++ { if _, err = db.Query("SHOW DATABASES"); err == nil { return err } - log.Printf("%s: WaitForInit: Trying again after error: %v", testserverMessagePrefix, err) + log.Printf("%s: WaitForNode %d: Trying again after error: %v", testserverMessagePrefix, nodeNum, err) time.Sleep(time.Millisecond * 100) } - return err + return nil } -func (ts *testServerImpl) pollListeningURLFile() error { +// WaitForInit retries until a connection is successfully established. +func (ts *testServerImpl) WaitForInit() error { + return ts.WaitForNode(0) +} + +func (ts *testServerImpl) pollListeningURLFile(nodeNum int) error { var data []byte for { - ts.mu.Lock() - state := ts.state - ts.mu.Unlock() + ts.mu.RLock() + state := ts.nodeStates[nodeNum] + ts.mu.RUnlock() if state != stateRunning { return fmt.Errorf("server stopped or crashed before listening URL file was available") } var err error - data, err = ioutil.ReadFile(ts.listeningURLFile) + data, err = ioutil.ReadFile(ts.listeningURLFile[nodeNum]) if err == nil { break } else if !os.IsNotExist(err) { @@ -556,7 +593,7 @@ func (ts *testServerImpl) pollListeningURLFile() error { if err != nil { return fmt.Errorf("failed to parse SQL URL: %w", err) } - ts.pgURL.orig = *u + ts.pgURL[nodeNum].orig = *u if pw := ts.serverArgs.rootPW; pw != "" { db, err := sql.Open("postgres", u.String()) if err != nil { @@ -573,7 +610,8 @@ func (ts *testServerImpl) pollListeningURLFile() error { u.RawQuery = v.Encode() u.User = url.UserPassword("root", pw) } - ts.setPGURL(u) + + ts.setPGURLForNode(nodeNum, u) return nil } @@ -602,90 +640,9 @@ func (ts *testServerImpl) Start() error { ts.mu.Unlock() for i := 0; i < ts.serverArgs.numNodes; i++ { - ts.cmd[i] = exec.Command(ts.cmdArgs[i][0], ts.cmdArgs[i][1:]...) - - currCmd := ts.cmd[i] - currCmd.Env = []string{ - "COCKROACH_MAX_OFFSET=1ns", - "COCKROACH_TRUST_CLIENT_PROVIDED_SQL_REMOTE_ADDR=true", - } - - // Set the working directory of the cockroach process to our temp folder. - // This stops cockroach from polluting the project directory with _dump - // folders. - currCmd.Dir = ts.baseDir - - if len(ts.stdout) > 0 { - wr, err := newFileLogWriter(ts.stdout) - if err != nil { - return fmt.Errorf("unable to open file %s: %w", ts.stdout, err) - } - ts.stdoutBuf = wr - } - currCmd.Stdout = ts.stdoutBuf - - if len(ts.stderr) > 0 { - wr, err := newFileLogWriter(ts.stderr) - if err != nil { - return fmt.Errorf("unable to open file %s: %w", ts.stderr, err) - } - ts.stderrBuf = wr - } - currCmd.Stderr = ts.stderrBuf - - for k, v := range defaultEnv() { - currCmd.Env = append(currCmd.Env, k+"="+v) - } - - log.Printf("executing: %s", currCmd) - err := currCmd.Start() - if currCmd.Process != nil { - log.Printf("process %d started: %s", currCmd.Process.Pid, strings.Join(ts.cmdArgs[i], " ")) - } - if err != nil { - log.Print(err.Error()) - if err := ts.stdoutBuf.Close(); err != nil { - log.Printf("%s: failed to close stdout: %v", testserverMessagePrefix, err) - } - if err := ts.stderrBuf.Close(); err != nil { - log.Printf("%s: failed to close stderr: %v", testserverMessagePrefix, err) - } - - ts.mu.Lock() - ts.state = stateFailed - ts.mu.Unlock() - - return fmt.Errorf("command %s failed: %w", currCmd, err) + if err := ts.StartNode(i); err != nil { + return err } - - go func() { - err := currCmd.Wait() - - if closeErr := ts.stdoutBuf.Close(); closeErr != nil { - log.Printf("%s: failed to close stdout: %v", testserverMessagePrefix, closeErr) - } - if closeErr := ts.stderrBuf.Close(); closeErr != nil { - log.Printf("%s: failed to close stderr: %v", testserverMessagePrefix, closeErr) - } - - ps := currCmd.ProcessState - sy := ps.Sys().(syscall.WaitStatus) - - log.Printf("%s: command %s exited with status %d: %v", - testserverMessagePrefix, - currCmd, - sy.ExitStatus(), - err) - log.Printf("%s process state: %s", testserverMessagePrefix, ps.String()) - - ts.mu.Lock() - if sy.ExitStatus() == 0 { - ts.state = stateStopped - } else { - ts.state = stateFailed - } - ts.mu.Unlock() - }() } if ts.serverArgs.numNodes > 1 { @@ -695,16 +652,6 @@ func (ts *testServerImpl) Start() error { } } - if ts.pgURL.u == nil { - go func() { - if err := ts.pollListeningURLFile(); err != nil { - log.Printf("%s failed to poll listening URL file: %v", testserverMessagePrefix, err) - close(ts.pgURL.set) - ts.Stop() - } - }() - } - return nil } @@ -727,15 +674,31 @@ func (ts *testServerImpl) Stop() { } if ts.state != stateStopped { - for _, cmd := range ts.cmd { - // Only call kill if not running. It could have exited properly. - _ = cmd.Process.Kill() - } if p := ts.proxyProcess; p != nil { _ = p.Kill() } } + if closeErr := ts.stdoutBuf.Close(); closeErr != nil { + log.Printf("%s: failed to close stdout: %v", testserverMessagePrefix, closeErr) + } + if closeErr := ts.stderrBuf.Close(); closeErr != nil { + log.Printf("%s: failed to close stderr: %v", testserverMessagePrefix, closeErr) + } + + for _, cmd := range ts.cmd { + if cmd.Process != nil { + _ = cmd.Process.Kill() + } + } + + ts.state = stateStopped + for _, nodeState := range ts.nodeStates { + if nodeState != stateStopped { + ts.state = stateFailed + } + } + // Only cleanup on intentional stops. _ = os.RemoveAll(ts.baseDir) } diff --git a/testserver/testserver_test.go b/testserver/testserver_test.go index 2383eb8..7ea26ad 100644 --- a/testserver/testserver_test.go +++ b/testserver/testserver_test.go @@ -17,7 +17,11 @@ package testserver_test import ( "database/sql" "fmt" + "io" + "log" + http "net/http" "os" + "os/exec" "path/filepath" "strings" "sync" @@ -184,17 +188,22 @@ func TestRunServer(t *testing.T) { return testserver.NewDBForTest(t, testserver.ThreeNode()) }, }, + { + name: "Insecure 3 Node On Disk", + instantiation: func(t *testing.T) (*sql.DB, func()) { + return testserver.NewDBForTest(t, testserver.ThreeNode(), testserver.StoreOnDiskOpt()) + }, + }, } { t.Run(tc.name, func(t *testing.T) { db, stop := tc.instantiation(t) defer stop() var out int row := db.QueryRow("SELECT 1") - row.Scan(&out) + require.NoError(t, row.Scan(&out)) require.Equal(t, out, 1) - if _, err := db.Exec("SELECT 1"); err != nil { - t.Fatal(err) - } + _, err := db.Exec("SELECT 1") + require.NoError(t, err) }) } } @@ -337,6 +346,157 @@ func TestFlockOnDownloadedCRDB(t *testing.T) { } } +func TestRestartNode(t *testing.T) { + ts, err := testserver.NewTestServer(testserver.ThreeNode(), testserver.StoreOnDiskOpt()) + require.NoError(t, err) + defer ts.Stop() + for i := 0; i < 3; i++ { + require.NoError(t, ts.WaitForNode(i)) + } + + log.Printf("Stopping Node 2") + require.NoError(t, ts.StopNode(2)) + for i := 0; i < 2; i++ { + url := ts.PGURLForNode(i) + + db, err := sql.Open("postgres", url.String()) + require.NoError(t, err) + var out int + row := db.QueryRow("SELECT 1") + err = row.Scan(&out) + require.NoError(t, err) + require.NoError(t, db.Close()) + } + + require.NoError(t, ts.StartNode(2)) + require.NoError(t, ts.WaitForNode(2)) + + for i := 0; i < 3; i++ { + url := ts.PGURLForNode(i) + db, err := sql.Open("postgres", url.String()) + require.NoError(t, err) + + var out int + row := db.QueryRow("SELECT 1") + err = row.Scan(&out) + require.NoError(t, err) + require.NoError(t, db.Close()) + } +} + +func downloadBinaryTest(filepath string, url string) error { + // Get the data + resp, err := http.Get(url) + if err != nil { + return err + } + defer resp.Body.Close() + + // Create the file + out, err := os.Create(filepath) + if err != nil { + return err + } + defer out.Close() + + // Write the body to file + _, err = io.Copy(out, resp.Body) + return err +} + +func TestUpgradeNode(t *testing.T) { + t.Skip("doesn't work on linux") + binary21_2 := "cockroach-v21.2.12.darwin-10.9-amd64" + binary22_1 := "cockroach-v22.1.0.darwin-10.9-amd64" + getMacBinary := func(fileName string) { + require.NoError(t, exec.Command("mkdir", "./temp_binaries").Start()) + require.NoError(t, downloadBinaryTest(fmt.Sprintf("./temp_binaries/%s.tgz", fileName), fmt.Sprintf("https://binaries.cockroachdb.com/%s.tgz", fileName))) + tarCmd := exec.Command("tar", "-zxvf", fmt.Sprintf("./temp_binaries/%s.tgz", fileName), "-C", "./temp_binaries") + require.NoError(t, tarCmd.Start()) + require.NoError(t, tarCmd.Wait()) + } + + defer func() { + require.NoError(t, exec.Command("rm", "-rf", "./temp_binaries").Start()) + }() + + getMacBinary(binary21_2) + getMacBinary(binary22_1) + + absFilePath21_2, err := filepath.Abs(fmt.Sprintf("./temp_binaries/%s/cockroach", binary21_2)) + require.NoError(t, err) + absFilePath22_1, err := filepath.Abs(fmt.Sprintf("./temp_binaries/%s/cockroach", binary22_1)) + require.NoError(t, err) + + ts, err := testserver.NewTestServer( + testserver.ThreeNode(), + testserver.CockroachBinaryPathOpt(absFilePath21_2), + testserver.UpgradeCockroachBinaryPathOpt(absFilePath22_1), + testserver.StoreOnDiskOpt(), + ) + require.NoError(t, err) + defer ts.Stop() + + for i := 0; i < 3; i++ { + require.NoError(t, ts.WaitForNode(i)) + } + + url := ts.PGURL() + db, err := sql.Open("postgres", url.String()) + require.NoError(t, err) + + var version string + row := db.QueryRow("SHOW CLUSTER SETTING version") + err = row.Scan(&version) + require.NoError(t, err) + + _, err = db.Exec("SET CLUSTER SETTING cluster.preserve_downgrade_option = '21.2';") + require.NoError(t, err) + require.NoError(t, db.Close()) + + for i := 0; i < 3; i++ { + require.NoError(t, ts.UpgradeNode(i)) + require.NoError(t, ts.WaitForNode(i)) + } + + for i := 0; i < 3; i++ { + url := ts.PGURLForNode(i) + + db, err = sql.Open("postgres", url.String()) + require.NoError(t, err) + + var out int + row = db.QueryRow("SELECT 1") + err = row.Scan(&out) + require.NoError(t, err) + require.NoError(t, db.Close()) + } + + db, err = sql.Open("postgres", ts.PGURL().String()) + require.NoError(t, err) + defer db.Close() + _, err = db.Exec("RESET CLUSTER SETTING cluster.preserve_downgrade_option;") + require.NoError(t, err) + + updated := false + for start := time.Now(); time.Since(start) < 300*time.Second; { + row = db.QueryRow("SHOW CLUSTER SETTING version") + err = row.Scan(&version) + if err != nil { + t.Fatal(err) + } + if version == "22.1" { + updated = true + break + } + time.Sleep(time.Second) + } + + if !updated { + t.Fatal("update to 22.1 did not complete") + } +} + var wg = sync.WaitGroup{} // testFlockWithDownloadPassing is to test the flock over downloaded CRDB binary with diff --git a/testserver/testservernode.go b/testserver/testservernode.go new file mode 100644 index 0000000..492d525 --- /dev/null +++ b/testserver/testservernode.go @@ -0,0 +1,121 @@ +// Copyright 2022 The Cockroach Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +// implied. See the License for the specific language governing +// permissions and limitations under the License. + +package testserver + +import ( + "fmt" + "log" + "os/exec" + "strings" +) + +func (ts *testServerImpl) StopNode(nodeNum int) error { + ts.mu.Lock() + ts.nodeStates[nodeNum] = stateStopped + ts.mu.Unlock() + ts.pgURL[nodeNum].u = nil + cmd := ts.cmd[nodeNum] + + // Kill the process. + if cmd.Process != nil { + return cmd.Process.Kill() + } + + return nil +} + +func (ts *testServerImpl) StartNode(i int) error { + ts.mu.RLock() + if ts.nodeStates[i] == stateRunning { + return fmt.Errorf("node %d already running", i) + } + ts.mu.RUnlock() + ts.cmd[i] = exec.Command(ts.cmdArgs[i][0], ts.cmdArgs[i][1:]...) + + currCmd := ts.cmd[i] + currCmd.Env = []string{ + "COCKROACH_MAX_OFFSET=1ns", + "COCKROACH_TRUST_CLIENT_PROVIDED_SQL_REMOTE_ADDR=true", + } + + // Set the working directory of the cockroach process to our temp folder. + // This stops cockroach from polluting the project directory with _dump + // folders. + currCmd.Dir = ts.baseDir + + if len(ts.stdout) > 0 { + wr, err := newFileLogWriter(ts.stdout) + if err != nil { + return fmt.Errorf("unable to open file %s: %w", ts.stdout, err) + } + ts.stdoutBuf = wr + } + currCmd.Stdout = ts.stdoutBuf + + if len(ts.stderr) > 0 { + wr, err := newFileLogWriter(ts.stderr) + if err != nil { + return fmt.Errorf("unable to open file %s: %w", ts.stderr, err) + } + ts.stderrBuf = wr + } + currCmd.Stderr = ts.stderrBuf + + for k, v := range defaultEnv() { + currCmd.Env = append(currCmd.Env, k+"="+v) + } + + log.Printf("executing: %s", currCmd) + err := currCmd.Start() + if currCmd.Process != nil { + log.Printf("process %d started: %s", currCmd.Process.Pid, strings.Join(ts.cmdArgs[i], " ")) + } + if err != nil { + log.Print(err.Error()) + ts.mu.Lock() + ts.nodeStates[i] = stateFailed + ts.mu.Unlock() + + return fmt.Errorf("command %s failed: %w", currCmd, err) + } + + ts.mu.Lock() + ts.nodeStates[i] = stateRunning + ts.mu.Unlock() + + capturedI := i + + if ts.pgURL[capturedI].u == nil { + ts.pgURL[i].set = make(chan struct{}) + go func() { + if err := ts.pollListeningURLFile(capturedI); err != nil { + log.Printf("%s failed to poll listening URL file: %v", testserverMessagePrefix, err) + close(ts.pgURL[capturedI].set) + ts.Stop() + } + }() + } + + return nil +} + +func (ts *testServerImpl) UpgradeNode(nodeNum int) error { + err := ts.StopNode(nodeNum) + if err != nil { + return err + } + ts.cmdArgs[nodeNum][0] = ts.serverArgs.upgradeCockroachBinary + return ts.StartNode(nodeNum) +} From a7906e67440541ad739d9062a168de2de190f7c8 Mon Sep 17 00:00:00 2001 From: richardjcai Date: Tue, 21 Jun 2022 15:57:20 -0400 Subject: [PATCH 3/3] Create nodeInfo struct for separating startCmds per node and state per node --- testserver/tenant.go | 35 ++++---- testserver/testserver.go | 157 ++++++++++++++++++---------------- testserver/testserver_test.go | 16 ++-- testserver/testservernode.go | 18 ++-- 4 files changed, 119 insertions(+), 107 deletions(-) diff --git a/testserver/tenant.go b/testserver/tenant.go index 247ec86..958f13d 100644 --- a/testserver/tenant.go +++ b/testserver/tenant.go @@ -58,7 +58,7 @@ func (ts *testServerImpl) NewTenantServer(proxy bool) (TestServer, error) { tenantID, err := func() (int, error) { ts.mu.Lock() defer ts.mu.Unlock() - if ts.nodeStates[0] != stateRunning { + if ts.nodes[0].state != stateRunning { return 0, errors.New("TestServer must be running before NewTenantServer may be called") } if ts.isTenant() { @@ -192,7 +192,7 @@ func (ts *testServerImpl) NewTenantServer(proxy bool) (TestServer, error) { return nil, err } - args := [][]string{{ + args := []string{ cockroachBinary, "mt", "start-sql", @@ -202,21 +202,26 @@ func (ts *testServerImpl) NewTenantServer(proxy bool) (TestServer, error) { "--kv-addrs=" + pgURL.Host, "--sql-addr=" + sqlAddr, "--http-addr=:0", - }} + } + + nodes := []nodeInfo{ + { + state: stateNew, + startCmdArgs: args, + // TODO(asubiotto): Specify listeningURLFile once we support dynamic + // ports. + listeningURLFile: "", + }, + } tenant := &testServerImpl{ - serverArgs: ts.serverArgs, - version: ts.version, - state: stateNew, - nodeStates: []int{stateNew}, - baseDir: ts.baseDir, - cmdArgs: args, - cmd: make([]*exec.Cmd, ts.serverArgs.numNodes), - stdout: filepath.Join(ts.baseDir, logsDirName, fmt.Sprintf("cockroach.tenant.%d.stdout", tenantID)), - stderr: filepath.Join(ts.baseDir, logsDirName, fmt.Sprintf("cockroach.tenant.%d.stderr", tenantID)), - // TODO(asubiotto): Specify listeningURLFile once we support dynamic - // ports. - listeningURLFile: []string{""}, + serverArgs: ts.serverArgs, + version: ts.version, + serverState: stateNew, + baseDir: ts.baseDir, + stdout: filepath.Join(ts.baseDir, logsDirName, fmt.Sprintf("cockroach.tenant.%d.stdout", tenantID)), + stderr: filepath.Join(ts.baseDir, logsDirName, fmt.Sprintf("cockroach.tenant.%d.stderr", tenantID)), + nodes: nodes, } // Start the tenant. diff --git a/testserver/testserver.go b/testserver/testserver.go index bac6154..a19df12 100644 --- a/testserver/testserver.go +++ b/testserver/testserver.go @@ -101,10 +101,17 @@ type TestServer interface { // BaseDir returns directory StoreOnDiskOpt writes to if used. BaseDir() string - WaitForNode(numNode int) error + // WaitForInitFinishForNode waits until a node has completed + // initialization and is available to connect to and query on. + WaitForInitFinishForNode(numNode int) error + // StartNode runs the "cockroach start" command for the node. StartNode(i int) error + // StopNode kills the node's process. StopNode(i int) error + // UpgradeNode stops the node, then starts the node on the with the + // binary specified at "upgradeBinaryPath". UpgradeNode(i int) error + // PGURLForNode returns the PGUrl for the node. PGURLForNode(nodeNum int) *url.URL } @@ -117,24 +124,29 @@ type pgURLChan struct { orig url.URL } +// nodeInfo contains the info to start a node and the state of the node. +type nodeInfo struct { + startCmd *exec.Cmd + startCmdArgs []string + listeningURLFile string + state int +} + // testServerImpl is a TestServer implementation. type testServerImpl struct { - mu sync.RWMutex - version *version.Version - serverArgs testServerArgs - state int - nodeStates []int - baseDir string - pgURL []pgURLChan - cmd []*exec.Cmd - cmdArgs [][]string - initCmd *exec.Cmd - initCmdArgs []string - stdout string - stderr string - stdoutBuf logWriter - stderrBuf logWriter - listeningURLFile []string + mu sync.RWMutex + version *version.Version + serverArgs testServerArgs + serverState int + baseDir string + pgURL []pgURLChan + initCmd *exec.Cmd + initCmdArgs []string + stdout string + stderr string + stdoutBuf logWriter + stderrBuf logWriter + nodes []nodeInfo // curTenantID is used to allocate tenant IDs. Refer to NewTenantServer for // more information. @@ -291,7 +303,7 @@ func StopDownloadInMiddleOpt() TestServerOpt { } } -func ThreeNode() TestServerOpt { +func ThreeNodeOpt() TestServerOpt { return func(args *testServerArgs) { args.numNodes = 3 } @@ -376,11 +388,6 @@ func NewTestServer(opts ...TestServerOpt) (TestServer, error) { return nil, fmt.Errorf("%s: %w", testserverMessagePrefix, err) } - listeningURLFile := make([]string, serverArgs.numNodes) - for i := 0; i < serverArgs.numNodes; i++ { - listeningURLFile[i] = filepath.Join(baseDir, fmt.Sprintf("listen-url%d", i)) - } - secureOpt := "--insecure" if serverArgs.secure { // Create certificates. @@ -436,40 +443,44 @@ func NewTestServer(opts ...TestServerOpt) (TestServer, error) { storeArg = fmt.Sprintf("--store=type=mem,size=%.2f", serverArgs.storeMemSize) } - args := make([][]string, serverArgs.numNodes) + nodes := make([]nodeInfo, serverArgs.numNodes) var initArgs []string - if serverArgs.numNodes <= 1 { - args[0] = []string{ - serverArgs.cockroachBinary, - startCmd, - "--logtostderr", - secureOpt, - "--host=localhost", - "--port=0", - "--http-port=" + strconv.Itoa(serverArgs.httpPort), - storeArg, - "--listening-url-file=" + listeningURLFile[0], - } - } else { - for i := 0; i < serverArgs.numNodes; i++ { - args[i] = []string{ + for i := 0; i < serverArgs.numNodes; i++ { + nodes[i].state = stateNew + nodes[i].listeningURLFile = filepath.Join(baseDir, fmt.Sprintf("listen-url%d", i)) + if serverArgs.numNodes > 1 { + nodes[i].startCmdArgs = []string{ serverArgs.cockroachBinary, startCmd, secureOpt, storeArg + strconv.Itoa(i), fmt.Sprintf("--listen-addr=localhost:%d", 26257+i), fmt.Sprintf("--http-addr=localhost:%d", 8080+i), - "--listening-url-file=" + listeningURLFile[i], + "--listening-url-file=" + nodes[i].listeningURLFile, fmt.Sprintf("--join=localhost:%d,localhost:%d,localhost:%d", 26257, 26258, 26259), } + } else { + nodes[0].startCmdArgs = []string{ + serverArgs.cockroachBinary, + startCmd, + "--logtostderr", + secureOpt, + "--host=localhost", + "--port=0", + "--http-port=" + strconv.Itoa(serverArgs.httpPort), + storeArg, + "--listening-url-file=" + nodes[i].listeningURLFile, + } } + } - initArgs = []string{ - serverArgs.cockroachBinary, - "init", - secureOpt, - "--host=localhost:26259", - } + // We only need initArgs if we're creating a testserver + // with multiple nodes. + initArgs = []string{ + serverArgs.cockroachBinary, + "init", + secureOpt, + "--host=localhost:26259", } states := make([]int, serverArgs.numNodes) @@ -478,18 +489,15 @@ func NewTestServer(opts ...TestServerOpt) (TestServer, error) { } ts := &testServerImpl{ - serverArgs: *serverArgs, - version: v, - state: stateNew, - nodeStates: states, - baseDir: baseDir, - cmdArgs: args, - cmd: make([]*exec.Cmd, serverArgs.numNodes), - initCmdArgs: initArgs, - stdout: filepath.Join(logDir, "cockroach.stdout"), - stderr: filepath.Join(logDir, "cockroach.stderr"), - listeningURLFile: listeningURLFile, - curTenantID: firstTenantID, + serverArgs: *serverArgs, + version: v, + serverState: stateNew, + baseDir: baseDir, + initCmdArgs: initArgs, + stdout: filepath.Join(logDir, "cockroach.stdout"), + stderr: filepath.Join(logDir, "cockroach.stderr"), + curTenantID: firstTenantID, + nodes: nodes, } ts.pgURL = make([]pgURLChan, serverArgs.numNodes) @@ -546,7 +554,7 @@ func (ts *testServerImpl) setPGURLForNode(nodeNum int, u *url.URL) { close(ts.pgURL[nodeNum].set) } -func (ts *testServerImpl) WaitForNode(nodeNum int) error { +func (ts *testServerImpl) WaitForInitFinishForNode(nodeNum int) error { db, err := sql.Open("postgres", ts.PGURLForNode(nodeNum).String()) defer func() { _ = db.Close() @@ -558,7 +566,7 @@ func (ts *testServerImpl) WaitForNode(nodeNum int) error { if _, err = db.Query("SHOW DATABASES"); err == nil { return err } - log.Printf("%s: WaitForNode %d: Trying again after error: %v", testserverMessagePrefix, nodeNum, err) + log.Printf("%s: WaitForInitFinishForNode %d: Trying again after error: %v", testserverMessagePrefix, nodeNum, err) time.Sleep(time.Millisecond * 100) } return nil @@ -566,21 +574,21 @@ func (ts *testServerImpl) WaitForNode(nodeNum int) error { // WaitForInit retries until a connection is successfully established. func (ts *testServerImpl) WaitForInit() error { - return ts.WaitForNode(0) + return ts.WaitForInitFinishForNode(0) } func (ts *testServerImpl) pollListeningURLFile(nodeNum int) error { var data []byte for { ts.mu.RLock() - state := ts.nodeStates[nodeNum] + state := ts.nodes[nodeNum].state ts.mu.RUnlock() if state != stateRunning { return fmt.Errorf("server stopped or crashed before listening URL file was available") } var err error - data, err = ioutil.ReadFile(ts.listeningURLFile[nodeNum]) + data, err = ioutil.ReadFile(ts.nodes[nodeNum].listeningURLFile) if err == nil { break } else if !os.IsNotExist(err) { @@ -624,9 +632,9 @@ func (ts *testServerImpl) pollListeningURLFile(nodeNum int) error { // to restart a testserver, but use NewTestServer(). func (ts *testServerImpl) Start() error { ts.mu.Lock() - if ts.state != stateNew { + if ts.serverState != stateNew { ts.mu.Unlock() - switch ts.state { + switch ts.serverState { case stateRunning: return nil // No-op if server is already running. case stateStopped, stateFailed: @@ -636,7 +644,7 @@ func (ts *testServerImpl) Start() error { "Please use NewTestServer()") } } - ts.state = stateRunning + ts.serverState = stateRunning ts.mu.Unlock() for i := 0; i < ts.serverArgs.numNodes; i++ { @@ -662,10 +670,10 @@ func (ts *testServerImpl) Stop() { ts.mu.RLock() defer ts.mu.RUnlock() - if ts.state == stateNew { + if ts.serverState == stateNew { log.Fatalf("%s: Stop() called, but Start() was never called", testserverMessagePrefix) } - if ts.state == stateFailed { + if ts.serverState == stateFailed { log.Fatalf("%s: Stop() called, but process exited unexpectedly. Stdout:\n%s\nStderr:\n%s\n", testserverMessagePrefix, ts.Stdout(), @@ -673,7 +681,7 @@ func (ts *testServerImpl) Stop() { return } - if ts.state != stateStopped { + if ts.serverState != stateStopped { if p := ts.proxyProcess; p != nil { _ = p.Kill() } @@ -686,16 +694,15 @@ func (ts *testServerImpl) Stop() { log.Printf("%s: failed to close stderr: %v", testserverMessagePrefix, closeErr) } - for _, cmd := range ts.cmd { + ts.serverState = stateStopped + for _, node := range ts.nodes { + cmd := node.startCmd if cmd.Process != nil { _ = cmd.Process.Kill() } - } - ts.state = stateStopped - for _, nodeState := range ts.nodeStates { - if nodeState != stateStopped { - ts.state = stateFailed + if node.state != stateStopped { + ts.serverState = stateFailed } } diff --git a/testserver/testserver_test.go b/testserver/testserver_test.go index 7ea26ad..b48d772 100644 --- a/testserver/testserver_test.go +++ b/testserver/testserver_test.go @@ -185,13 +185,13 @@ func TestRunServer(t *testing.T) { { name: "Insecure 3 Node", instantiation: func(t *testing.T) (*sql.DB, func()) { - return testserver.NewDBForTest(t, testserver.ThreeNode()) + return testserver.NewDBForTest(t, testserver.ThreeNodeOpt()) }, }, { name: "Insecure 3 Node On Disk", instantiation: func(t *testing.T) (*sql.DB, func()) { - return testserver.NewDBForTest(t, testserver.ThreeNode(), testserver.StoreOnDiskOpt()) + return testserver.NewDBForTest(t, testserver.ThreeNodeOpt(), testserver.StoreOnDiskOpt()) }, }, } { @@ -347,11 +347,11 @@ func TestFlockOnDownloadedCRDB(t *testing.T) { } func TestRestartNode(t *testing.T) { - ts, err := testserver.NewTestServer(testserver.ThreeNode(), testserver.StoreOnDiskOpt()) + ts, err := testserver.NewTestServer(testserver.ThreeNodeOpt(), testserver.StoreOnDiskOpt()) require.NoError(t, err) defer ts.Stop() for i := 0; i < 3; i++ { - require.NoError(t, ts.WaitForNode(i)) + require.NoError(t, ts.WaitForInitFinishForNode(i)) } log.Printf("Stopping Node 2") @@ -369,7 +369,7 @@ func TestRestartNode(t *testing.T) { } require.NoError(t, ts.StartNode(2)) - require.NoError(t, ts.WaitForNode(2)) + require.NoError(t, ts.WaitForInitFinishForNode(2)) for i := 0; i < 3; i++ { url := ts.PGURLForNode(i) @@ -429,7 +429,7 @@ func TestUpgradeNode(t *testing.T) { require.NoError(t, err) ts, err := testserver.NewTestServer( - testserver.ThreeNode(), + testserver.ThreeNodeOpt(), testserver.CockroachBinaryPathOpt(absFilePath21_2), testserver.UpgradeCockroachBinaryPathOpt(absFilePath22_1), testserver.StoreOnDiskOpt(), @@ -438,7 +438,7 @@ func TestUpgradeNode(t *testing.T) { defer ts.Stop() for i := 0; i < 3; i++ { - require.NoError(t, ts.WaitForNode(i)) + require.NoError(t, ts.WaitForInitFinishForNode(i)) } url := ts.PGURL() @@ -456,7 +456,7 @@ func TestUpgradeNode(t *testing.T) { for i := 0; i < 3; i++ { require.NoError(t, ts.UpgradeNode(i)) - require.NoError(t, ts.WaitForNode(i)) + require.NoError(t, ts.WaitForInitFinishForNode(i)) } for i := 0; i < 3; i++ { diff --git a/testserver/testservernode.go b/testserver/testservernode.go index 492d525..e2e4370 100644 --- a/testserver/testservernode.go +++ b/testserver/testservernode.go @@ -23,10 +23,10 @@ import ( func (ts *testServerImpl) StopNode(nodeNum int) error { ts.mu.Lock() - ts.nodeStates[nodeNum] = stateStopped + ts.nodes[nodeNum].state = stateStopped ts.mu.Unlock() ts.pgURL[nodeNum].u = nil - cmd := ts.cmd[nodeNum] + cmd := ts.nodes[nodeNum].startCmd // Kill the process. if cmd.Process != nil { @@ -38,13 +38,13 @@ func (ts *testServerImpl) StopNode(nodeNum int) error { func (ts *testServerImpl) StartNode(i int) error { ts.mu.RLock() - if ts.nodeStates[i] == stateRunning { + if ts.nodes[i].state == stateRunning { return fmt.Errorf("node %d already running", i) } ts.mu.RUnlock() - ts.cmd[i] = exec.Command(ts.cmdArgs[i][0], ts.cmdArgs[i][1:]...) + ts.nodes[i].startCmd = exec.Command(ts.nodes[i].startCmdArgs[0], ts.nodes[i].startCmdArgs[1:]...) - currCmd := ts.cmd[i] + currCmd := ts.nodes[i].startCmd currCmd.Env = []string{ "COCKROACH_MAX_OFFSET=1ns", "COCKROACH_TRUST_CLIENT_PROVIDED_SQL_REMOTE_ADDR=true", @@ -80,19 +80,19 @@ func (ts *testServerImpl) StartNode(i int) error { log.Printf("executing: %s", currCmd) err := currCmd.Start() if currCmd.Process != nil { - log.Printf("process %d started: %s", currCmd.Process.Pid, strings.Join(ts.cmdArgs[i], " ")) + log.Printf("process %d started: %s", currCmd.Process.Pid, strings.Join(ts.nodes[i].startCmdArgs, " ")) } if err != nil { log.Print(err.Error()) ts.mu.Lock() - ts.nodeStates[i] = stateFailed + ts.nodes[i].state = stateFailed ts.mu.Unlock() return fmt.Errorf("command %s failed: %w", currCmd, err) } ts.mu.Lock() - ts.nodeStates[i] = stateRunning + ts.nodes[i].state = stateRunning ts.mu.Unlock() capturedI := i @@ -116,6 +116,6 @@ func (ts *testServerImpl) UpgradeNode(nodeNum int) error { if err != nil { return err } - ts.cmdArgs[nodeNum][0] = ts.serverArgs.upgradeCockroachBinary + ts.nodes[nodeNum].startCmdArgs[0] = ts.serverArgs.upgradeCockroachBinary return ts.StartNode(nodeNum) }