From bc0e5822d77d5b0f6b31309472aaca0947ef1048 Mon Sep 17 00:00:00 2001 From: richardjcai Date: Tue, 21 Jun 2022 15:57:20 -0400 Subject: [PATCH] Create nodeInfo struct for separating startCmds per node and state per node --- testserver/tenant.go | 35 ++++---- testserver/testserver.go | 159 ++++++++++++++++++---------------- testserver/testserver_test.go | 24 ++--- testserver/testservernode.go | 18 ++-- 4 files changed, 124 insertions(+), 112 deletions(-) diff --git a/testserver/tenant.go b/testserver/tenant.go index 247ec86..958f13d 100644 --- a/testserver/tenant.go +++ b/testserver/tenant.go @@ -58,7 +58,7 @@ func (ts *testServerImpl) NewTenantServer(proxy bool) (TestServer, error) { tenantID, err := func() (int, error) { ts.mu.Lock() defer ts.mu.Unlock() - if ts.nodeStates[0] != stateRunning { + if ts.nodes[0].state != stateRunning { return 0, errors.New("TestServer must be running before NewTenantServer may be called") } if ts.isTenant() { @@ -192,7 +192,7 @@ func (ts *testServerImpl) NewTenantServer(proxy bool) (TestServer, error) { return nil, err } - args := [][]string{{ + args := []string{ cockroachBinary, "mt", "start-sql", @@ -202,21 +202,26 @@ func (ts *testServerImpl) NewTenantServer(proxy bool) (TestServer, error) { "--kv-addrs=" + pgURL.Host, "--sql-addr=" + sqlAddr, "--http-addr=:0", - }} + } + + nodes := []nodeInfo{ + { + state: stateNew, + startCmdArgs: args, + // TODO(asubiotto): Specify listeningURLFile once we support dynamic + // ports. + listeningURLFile: "", + }, + } tenant := &testServerImpl{ - serverArgs: ts.serverArgs, - version: ts.version, - state: stateNew, - nodeStates: []int{stateNew}, - baseDir: ts.baseDir, - cmdArgs: args, - cmd: make([]*exec.Cmd, ts.serverArgs.numNodes), - stdout: filepath.Join(ts.baseDir, logsDirName, fmt.Sprintf("cockroach.tenant.%d.stdout", tenantID)), - stderr: filepath.Join(ts.baseDir, logsDirName, fmt.Sprintf("cockroach.tenant.%d.stderr", tenantID)), - // TODO(asubiotto): Specify listeningURLFile once we support dynamic - // ports. - listeningURLFile: []string{""}, + serverArgs: ts.serverArgs, + version: ts.version, + serverState: stateNew, + baseDir: ts.baseDir, + stdout: filepath.Join(ts.baseDir, logsDirName, fmt.Sprintf("cockroach.tenant.%d.stdout", tenantID)), + stderr: filepath.Join(ts.baseDir, logsDirName, fmt.Sprintf("cockroach.tenant.%d.stderr", tenantID)), + nodes: nodes, } // Start the tenant. diff --git a/testserver/testserver.go b/testserver/testserver.go index bac6154..d44d0bb 100644 --- a/testserver/testserver.go +++ b/testserver/testserver.go @@ -101,10 +101,17 @@ type TestServer interface { // BaseDir returns directory StoreOnDiskOpt writes to if used. BaseDir() string - WaitForNode(numNode int) error + // WaitForInitFinishForNode waits until a node has completed + // initialization and is available to connect to and query on. + WaitForInitFinishForNode(numNode int) error + // StartNode runs the "cockroach start" command for the node. StartNode(i int) error + // StopNode kills the node's process. StopNode(i int) error + // UpgradeNode stops the node, then starts the node on the with the + // binary specified at "upgradeBinaryPath". UpgradeNode(i int) error + // PGURLForNode returns the PGUrl for the node. PGURLForNode(nodeNum int) *url.URL } @@ -117,24 +124,29 @@ type pgURLChan struct { orig url.URL } +// nodeInfo contains the info to start a node and the state of the node. +type nodeInfo struct { + startCmd *exec.Cmd + startCmdArgs []string + listeningURLFile string + state int +} + // testServerImpl is a TestServer implementation. type testServerImpl struct { - mu sync.RWMutex - version *version.Version - serverArgs testServerArgs - state int - nodeStates []int - baseDir string - pgURL []pgURLChan - cmd []*exec.Cmd - cmdArgs [][]string - initCmd *exec.Cmd - initCmdArgs []string - stdout string - stderr string - stdoutBuf logWriter - stderrBuf logWriter - listeningURLFile []string + mu sync.RWMutex + version *version.Version + serverArgs testServerArgs + serverState int + baseDir string + pgURL []pgURLChan + initCmd *exec.Cmd + initCmdArgs []string + stdout string + stderr string + stdoutBuf logWriter + stderrBuf logWriter + nodes []nodeInfo // curTenantID is used to allocate tenant IDs. Refer to NewTenantServer for // more information. @@ -158,7 +170,7 @@ func NewDBForTest(t *testing.T, opts ...TestServerOpt) (*sql.DB, func()) { // it. Returns a sql *DB instance a shutdown function. The caller is // responsible for executing the returned shutdown function on exit. func NewDBForTestWithDatabase( - t *testing.T, database string, opts ...TestServerOpt, +t *testing.T, database string, opts ...TestServerOpt, ) (*sql.DB, func()) { t.Helper() ts, err := NewTestServer(opts...) @@ -376,11 +388,6 @@ func NewTestServer(opts ...TestServerOpt) (TestServer, error) { return nil, fmt.Errorf("%s: %w", testserverMessagePrefix, err) } - listeningURLFile := make([]string, serverArgs.numNodes) - for i := 0; i < serverArgs.numNodes; i++ { - listeningURLFile[i] = filepath.Join(baseDir, fmt.Sprintf("listen-url%d", i)) - } - secureOpt := "--insecure" if serverArgs.secure { // Create certificates. @@ -436,40 +443,44 @@ func NewTestServer(opts ...TestServerOpt) (TestServer, error) { storeArg = fmt.Sprintf("--store=type=mem,size=%.2f", serverArgs.storeMemSize) } - args := make([][]string, serverArgs.numNodes) + nodes := make([]nodeInfo, serverArgs.numNodes) var initArgs []string - if serverArgs.numNodes <= 1 { - args[0] = []string{ - serverArgs.cockroachBinary, - startCmd, - "--logtostderr", - secureOpt, - "--host=localhost", - "--port=0", - "--http-port=" + strconv.Itoa(serverArgs.httpPort), - storeArg, - "--listening-url-file=" + listeningURLFile[0], - } - } else { - for i := 0; i < serverArgs.numNodes; i++ { - args[i] = []string{ + for i := 0; i < serverArgs.numNodes; i++ { + nodes[i].state = stateNew + nodes[i].listeningURLFile = filepath.Join(baseDir, fmt.Sprintf("listen-url%d", i)) + if serverArgs.numNodes > 1 { + nodes[i].startCmdArgs = []string{ serverArgs.cockroachBinary, startCmd, secureOpt, storeArg + strconv.Itoa(i), fmt.Sprintf("--listen-addr=localhost:%d", 26257+i), fmt.Sprintf("--http-addr=localhost:%d", 8080+i), - "--listening-url-file=" + listeningURLFile[i], + "--listening-url-file=" + nodes[i].listeningURLFile, fmt.Sprintf("--join=localhost:%d,localhost:%d,localhost:%d", 26257, 26258, 26259), } + } else { + nodes[0].startCmdArgs = []string{ + serverArgs.cockroachBinary, + startCmd, + "--logtostderr", + secureOpt, + "--host=localhost", + "--port=0", + "--http-port=" + strconv.Itoa(serverArgs.httpPort), + storeArg, + "--listening-url-file=" + nodes[i].listeningURLFile, + } } + } - initArgs = []string{ - serverArgs.cockroachBinary, - "init", - secureOpt, - "--host=localhost:26259", - } + // We only need initArgs if we're creating a testserver + // with multiple nodes. + initArgs = []string{ + serverArgs.cockroachBinary, + "init", + secureOpt, + "--host=localhost:26259", } states := make([]int, serverArgs.numNodes) @@ -478,18 +489,15 @@ func NewTestServer(opts ...TestServerOpt) (TestServer, error) { } ts := &testServerImpl{ - serverArgs: *serverArgs, - version: v, - state: stateNew, - nodeStates: states, - baseDir: baseDir, - cmdArgs: args, - cmd: make([]*exec.Cmd, serverArgs.numNodes), - initCmdArgs: initArgs, - stdout: filepath.Join(logDir, "cockroach.stdout"), - stderr: filepath.Join(logDir, "cockroach.stderr"), - listeningURLFile: listeningURLFile, - curTenantID: firstTenantID, + serverArgs: *serverArgs, + version: v, + serverState: stateNew, + baseDir: baseDir, + initCmdArgs: initArgs, + stdout: filepath.Join(logDir, "cockroach.stdout"), + stderr: filepath.Join(logDir, "cockroach.stderr"), + curTenantID: firstTenantID, + nodes: nodes, } ts.pgURL = make([]pgURLChan, serverArgs.numNodes) @@ -546,7 +554,7 @@ func (ts *testServerImpl) setPGURLForNode(nodeNum int, u *url.URL) { close(ts.pgURL[nodeNum].set) } -func (ts *testServerImpl) WaitForNode(nodeNum int) error { +func (ts *testServerImpl) WaitForInitFinishForNode(nodeNum int) error { db, err := sql.Open("postgres", ts.PGURLForNode(nodeNum).String()) defer func() { _ = db.Close() @@ -558,7 +566,7 @@ func (ts *testServerImpl) WaitForNode(nodeNum int) error { if _, err = db.Query("SHOW DATABASES"); err == nil { return err } - log.Printf("%s: WaitForNode %d: Trying again after error: %v", testserverMessagePrefix, nodeNum, err) + log.Printf("%s: WaitForInitFinishForNode %d: Trying again after error: %v", testserverMessagePrefix, nodeNum, err) time.Sleep(time.Millisecond * 100) } return nil @@ -566,21 +574,21 @@ func (ts *testServerImpl) WaitForNode(nodeNum int) error { // WaitForInit retries until a connection is successfully established. func (ts *testServerImpl) WaitForInit() error { - return ts.WaitForNode(0) + return ts.WaitForInitFinishForNode(0) } func (ts *testServerImpl) pollListeningURLFile(nodeNum int) error { var data []byte for { ts.mu.RLock() - state := ts.nodeStates[nodeNum] + state := ts.nodes[nodeNum].state ts.mu.RUnlock() if state != stateRunning { return fmt.Errorf("server stopped or crashed before listening URL file was available") } var err error - data, err = ioutil.ReadFile(ts.listeningURLFile[nodeNum]) + data, err = ioutil.ReadFile(ts.nodes[nodeNum].listeningURLFile) if err == nil { break } else if !os.IsNotExist(err) { @@ -624,19 +632,19 @@ func (ts *testServerImpl) pollListeningURLFile(nodeNum int) error { // to restart a testserver, but use NewTestServer(). func (ts *testServerImpl) Start() error { ts.mu.Lock() - if ts.state != stateNew { + if ts.serverState != stateNew { ts.mu.Unlock() - switch ts.state { + switch ts.serverState { case stateRunning: return nil // No-op if server is already running. case stateStopped, stateFailed: // Start() can only be called once. return errors.New( "`Start()` cannot be used to restart a stopped or failed server. " + - "Please use NewTestServer()") + "Please use NewTestServer()") } } - ts.state = stateRunning + ts.serverState = stateRunning ts.mu.Unlock() for i := 0; i < ts.serverArgs.numNodes; i++ { @@ -662,10 +670,10 @@ func (ts *testServerImpl) Stop() { ts.mu.RLock() defer ts.mu.RUnlock() - if ts.state == stateNew { + if ts.serverState == stateNew { log.Fatalf("%s: Stop() called, but Start() was never called", testserverMessagePrefix) } - if ts.state == stateFailed { + if ts.serverState == stateFailed { log.Fatalf("%s: Stop() called, but process exited unexpectedly. Stdout:\n%s\nStderr:\n%s\n", testserverMessagePrefix, ts.Stdout(), @@ -673,7 +681,7 @@ func (ts *testServerImpl) Stop() { return } - if ts.state != stateStopped { + if ts.serverState != stateStopped { if p := ts.proxyProcess; p != nil { _ = p.Kill() } @@ -686,16 +694,15 @@ func (ts *testServerImpl) Stop() { log.Printf("%s: failed to close stderr: %v", testserverMessagePrefix, closeErr) } - for _, cmd := range ts.cmd { + ts.serverState = stateStopped + for _, node := range ts.nodes { + cmd := node.startCmd if cmd.Process != nil { _ = cmd.Process.Kill() } - } - ts.state = stateStopped - for _, nodeState := range ts.nodeStates { - if nodeState != stateStopped { - ts.state = stateFailed + if node.state != stateStopped { + ts.serverState = stateFailed } } diff --git a/testserver/testserver_test.go b/testserver/testserver_test.go index 7ea26ad..8ace955 100644 --- a/testserver/testserver_test.go +++ b/testserver/testserver_test.go @@ -247,13 +247,13 @@ type tenantInterface interface { // process and a SQL tenant process pointed at this TestServer. A sql connection // to the tenant and a cleanup function are returned. func newTenantDBForTest( - t *testing.T, - secure bool, - proxy bool, - pw string, - diskStore bool, - storeMemSize float64, - nonStableDB bool, +t *testing.T, +secure bool, +proxy bool, +pw string, +diskStore bool, +storeMemSize float64, +nonStableDB bool, ) (*sql.DB, func()) { t.Helper() var opts []testserver.TestServerOpt @@ -351,7 +351,7 @@ func TestRestartNode(t *testing.T) { require.NoError(t, err) defer ts.Stop() for i := 0; i < 3; i++ { - require.NoError(t, ts.WaitForNode(i)) + require.NoError(t, ts.WaitForInitFinishForNode(i)) } log.Printf("Stopping Node 2") @@ -369,7 +369,7 @@ func TestRestartNode(t *testing.T) { } require.NoError(t, ts.StartNode(2)) - require.NoError(t, ts.WaitForNode(2)) + require.NoError(t, ts.WaitForInitFinishForNode(2)) for i := 0; i < 3; i++ { url := ts.PGURLForNode(i) @@ -438,7 +438,7 @@ func TestUpgradeNode(t *testing.T) { defer ts.Stop() for i := 0; i < 3; i++ { - require.NoError(t, ts.WaitForNode(i)) + require.NoError(t, ts.WaitForInitFinishForNode(i)) } url := ts.PGURL() @@ -456,7 +456,7 @@ func TestUpgradeNode(t *testing.T) { for i := 0; i < 3; i++ { require.NoError(t, ts.UpgradeNode(i)) - require.NoError(t, ts.WaitForNode(i)) + require.NoError(t, ts.WaitForInitFinishForNode(i)) } for i := 0; i < 3; i++ { @@ -503,7 +503,7 @@ var wg = sync.WaitGroup{} // two goroutines, the second goroutine waits for the first goroutine to // finish downloading the CRDB binary into a local file. func testFlockWithDownloadPassing( - t *testing.T, opts ...testserver.TestServerOpt, +t *testing.T, opts ...testserver.TestServerOpt, ) (*sql.DB, func()) { localFile, err := getLocalFile(false) diff --git a/testserver/testservernode.go b/testserver/testservernode.go index 492d525..e2e4370 100644 --- a/testserver/testservernode.go +++ b/testserver/testservernode.go @@ -23,10 +23,10 @@ import ( func (ts *testServerImpl) StopNode(nodeNum int) error { ts.mu.Lock() - ts.nodeStates[nodeNum] = stateStopped + ts.nodes[nodeNum].state = stateStopped ts.mu.Unlock() ts.pgURL[nodeNum].u = nil - cmd := ts.cmd[nodeNum] + cmd := ts.nodes[nodeNum].startCmd // Kill the process. if cmd.Process != nil { @@ -38,13 +38,13 @@ func (ts *testServerImpl) StopNode(nodeNum int) error { func (ts *testServerImpl) StartNode(i int) error { ts.mu.RLock() - if ts.nodeStates[i] == stateRunning { + if ts.nodes[i].state == stateRunning { return fmt.Errorf("node %d already running", i) } ts.mu.RUnlock() - ts.cmd[i] = exec.Command(ts.cmdArgs[i][0], ts.cmdArgs[i][1:]...) + ts.nodes[i].startCmd = exec.Command(ts.nodes[i].startCmdArgs[0], ts.nodes[i].startCmdArgs[1:]...) - currCmd := ts.cmd[i] + currCmd := ts.nodes[i].startCmd currCmd.Env = []string{ "COCKROACH_MAX_OFFSET=1ns", "COCKROACH_TRUST_CLIENT_PROVIDED_SQL_REMOTE_ADDR=true", @@ -80,19 +80,19 @@ func (ts *testServerImpl) StartNode(i int) error { log.Printf("executing: %s", currCmd) err := currCmd.Start() if currCmd.Process != nil { - log.Printf("process %d started: %s", currCmd.Process.Pid, strings.Join(ts.cmdArgs[i], " ")) + log.Printf("process %d started: %s", currCmd.Process.Pid, strings.Join(ts.nodes[i].startCmdArgs, " ")) } if err != nil { log.Print(err.Error()) ts.mu.Lock() - ts.nodeStates[i] = stateFailed + ts.nodes[i].state = stateFailed ts.mu.Unlock() return fmt.Errorf("command %s failed: %w", currCmd, err) } ts.mu.Lock() - ts.nodeStates[i] = stateRunning + ts.nodes[i].state = stateRunning ts.mu.Unlock() capturedI := i @@ -116,6 +116,6 @@ func (ts *testServerImpl) UpgradeNode(nodeNum int) error { if err != nil { return err } - ts.cmdArgs[nodeNum][0] = ts.serverArgs.upgradeCockroachBinary + ts.nodes[nodeNum].startCmdArgs[0] = ts.serverArgs.upgradeCockroachBinary return ts.StartNode(nodeNum) }