diff --git a/testserver/tenant.go b/testserver/tenant.go index 247ec86..958f13d 100644 --- a/testserver/tenant.go +++ b/testserver/tenant.go @@ -58,7 +58,7 @@ func (ts *testServerImpl) NewTenantServer(proxy bool) (TestServer, error) { tenantID, err := func() (int, error) { ts.mu.Lock() defer ts.mu.Unlock() - if ts.nodeStates[0] != stateRunning { + if ts.nodes[0].state != stateRunning { return 0, errors.New("TestServer must be running before NewTenantServer may be called") } if ts.isTenant() { @@ -192,7 +192,7 @@ func (ts *testServerImpl) NewTenantServer(proxy bool) (TestServer, error) { return nil, err } - args := [][]string{{ + args := []string{ cockroachBinary, "mt", "start-sql", @@ -202,21 +202,26 @@ func (ts *testServerImpl) NewTenantServer(proxy bool) (TestServer, error) { "--kv-addrs=" + pgURL.Host, "--sql-addr=" + sqlAddr, "--http-addr=:0", - }} + } + + nodes := []nodeInfo{ + { + state: stateNew, + startCmdArgs: args, + // TODO(asubiotto): Specify listeningURLFile once we support dynamic + // ports. + listeningURLFile: "", + }, + } tenant := &testServerImpl{ - serverArgs: ts.serverArgs, - version: ts.version, - state: stateNew, - nodeStates: []int{stateNew}, - baseDir: ts.baseDir, - cmdArgs: args, - cmd: make([]*exec.Cmd, ts.serverArgs.numNodes), - stdout: filepath.Join(ts.baseDir, logsDirName, fmt.Sprintf("cockroach.tenant.%d.stdout", tenantID)), - stderr: filepath.Join(ts.baseDir, logsDirName, fmt.Sprintf("cockroach.tenant.%d.stderr", tenantID)), - // TODO(asubiotto): Specify listeningURLFile once we support dynamic - // ports. - listeningURLFile: []string{""}, + serverArgs: ts.serverArgs, + version: ts.version, + serverState: stateNew, + baseDir: ts.baseDir, + stdout: filepath.Join(ts.baseDir, logsDirName, fmt.Sprintf("cockroach.tenant.%d.stdout", tenantID)), + stderr: filepath.Join(ts.baseDir, logsDirName, fmt.Sprintf("cockroach.tenant.%d.stderr", tenantID)), + nodes: nodes, } // Start the tenant. diff --git a/testserver/testserver.go b/testserver/testserver.go index bac6154..307481a 100644 --- a/testserver/testserver.go +++ b/testserver/testserver.go @@ -117,24 +117,29 @@ type pgURLChan struct { orig url.URL } +// nodeInfo contains the info to start a node and the state of the node. +type nodeInfo struct { + startCmd *exec.Cmd + startCmdArgs []string + listeningURLFile string + state int +} + // testServerImpl is a TestServer implementation. type testServerImpl struct { - mu sync.RWMutex - version *version.Version - serverArgs testServerArgs - state int - nodeStates []int - baseDir string - pgURL []pgURLChan - cmd []*exec.Cmd - cmdArgs [][]string - initCmd *exec.Cmd - initCmdArgs []string - stdout string - stderr string - stdoutBuf logWriter - stderrBuf logWriter - listeningURLFile []string + mu sync.RWMutex + version *version.Version + serverArgs testServerArgs + serverState int + baseDir string + pgURL []pgURLChan + initCmd *exec.Cmd + initCmdArgs []string + stdout string + stderr string + stdoutBuf logWriter + stderrBuf logWriter + nodes []nodeInfo // curTenantID is used to allocate tenant IDs. Refer to NewTenantServer for // more information. @@ -376,11 +381,6 @@ func NewTestServer(opts ...TestServerOpt) (TestServer, error) { return nil, fmt.Errorf("%s: %w", testserverMessagePrefix, err) } - listeningURLFile := make([]string, serverArgs.numNodes) - for i := 0; i < serverArgs.numNodes; i++ { - listeningURLFile[i] = filepath.Join(baseDir, fmt.Sprintf("listen-url%d", i)) - } - secureOpt := "--insecure" if serverArgs.secure { // Create certificates. @@ -436,40 +436,44 @@ func NewTestServer(opts ...TestServerOpt) (TestServer, error) { storeArg = fmt.Sprintf("--store=type=mem,size=%.2f", serverArgs.storeMemSize) } - args := make([][]string, serverArgs.numNodes) + nodes := make([]nodeInfo, serverArgs.numNodes) var initArgs []string - if serverArgs.numNodes <= 1 { - args[0] = []string{ - serverArgs.cockroachBinary, - startCmd, - "--logtostderr", - secureOpt, - "--host=localhost", - "--port=0", - "--http-port=" + strconv.Itoa(serverArgs.httpPort), - storeArg, - "--listening-url-file=" + listeningURLFile[0], - } - } else { - for i := 0; i < serverArgs.numNodes; i++ { - args[i] = []string{ + for i := 0; i < serverArgs.numNodes; i++ { + nodes[i].state = stateNew + nodes[i].listeningURLFile = filepath.Join(baseDir, fmt.Sprintf("listen-url%d", i)) + if serverArgs.numNodes > 1 { + nodes[i].startCmdArgs = []string{ serverArgs.cockroachBinary, startCmd, secureOpt, storeArg + strconv.Itoa(i), fmt.Sprintf("--listen-addr=localhost:%d", 26257+i), fmt.Sprintf("--http-addr=localhost:%d", 8080+i), - "--listening-url-file=" + listeningURLFile[i], + "--listening-url-file=" + nodes[i].listeningURLFile, fmt.Sprintf("--join=localhost:%d,localhost:%d,localhost:%d", 26257, 26258, 26259), } + } else { + nodes[0].startCmdArgs = []string{ + serverArgs.cockroachBinary, + startCmd, + "--logtostderr", + secureOpt, + "--host=localhost", + "--port=0", + "--http-port=" + strconv.Itoa(serverArgs.httpPort), + storeArg, + "--listening-url-file=" + nodes[i].listeningURLFile, + } } + } - initArgs = []string{ - serverArgs.cockroachBinary, - "init", - secureOpt, - "--host=localhost:26259", - } + // We only need initArgs if we're creating a testserver + // with multiple nodes. + initArgs = []string{ + serverArgs.cockroachBinary, + "init", + secureOpt, + "--host=localhost:26259", } states := make([]int, serverArgs.numNodes) @@ -478,18 +482,15 @@ func NewTestServer(opts ...TestServerOpt) (TestServer, error) { } ts := &testServerImpl{ - serverArgs: *serverArgs, - version: v, - state: stateNew, - nodeStates: states, - baseDir: baseDir, - cmdArgs: args, - cmd: make([]*exec.Cmd, serverArgs.numNodes), - initCmdArgs: initArgs, - stdout: filepath.Join(logDir, "cockroach.stdout"), - stderr: filepath.Join(logDir, "cockroach.stderr"), - listeningURLFile: listeningURLFile, - curTenantID: firstTenantID, + serverArgs: *serverArgs, + version: v, + serverState: stateNew, + baseDir: baseDir, + initCmdArgs: initArgs, + stdout: filepath.Join(logDir, "cockroach.stdout"), + stderr: filepath.Join(logDir, "cockroach.stderr"), + curTenantID: firstTenantID, + nodes: nodes, } ts.pgURL = make([]pgURLChan, serverArgs.numNodes) @@ -546,7 +547,7 @@ func (ts *testServerImpl) setPGURLForNode(nodeNum int, u *url.URL) { close(ts.pgURL[nodeNum].set) } -func (ts *testServerImpl) WaitForNode(nodeNum int) error { +func (ts *testServerImpl) WaitForInitFinishForNode(nodeNum int) error { db, err := sql.Open("postgres", ts.PGURLForNode(nodeNum).String()) defer func() { _ = db.Close() @@ -558,7 +559,7 @@ func (ts *testServerImpl) WaitForNode(nodeNum int) error { if _, err = db.Query("SHOW DATABASES"); err == nil { return err } - log.Printf("%s: WaitForNode %d: Trying again after error: %v", testserverMessagePrefix, nodeNum, err) + log.Printf("%s: WaitForInitFinishForNode %d: Trying again after error: %v", testserverMessagePrefix, nodeNum, err) time.Sleep(time.Millisecond * 100) } return nil @@ -573,14 +574,14 @@ func (ts *testServerImpl) pollListeningURLFile(nodeNum int) error { var data []byte for { ts.mu.RLock() - state := ts.nodeStates[nodeNum] + state := ts.nodes[nodeNum].state ts.mu.RUnlock() if state != stateRunning { return fmt.Errorf("server stopped or crashed before listening URL file was available") } var err error - data, err = ioutil.ReadFile(ts.listeningURLFile[nodeNum]) + data, err = ioutil.ReadFile(ts.nodes[nodeNum].listeningURLFile) if err == nil { break } else if !os.IsNotExist(err) { @@ -624,9 +625,9 @@ func (ts *testServerImpl) pollListeningURLFile(nodeNum int) error { // to restart a testserver, but use NewTestServer(). func (ts *testServerImpl) Start() error { ts.mu.Lock() - if ts.state != stateNew { + if ts.serverState != stateNew { ts.mu.Unlock() - switch ts.state { + switch ts.serverState { case stateRunning: return nil // No-op if server is already running. case stateStopped, stateFailed: @@ -636,7 +637,7 @@ func (ts *testServerImpl) Start() error { "Please use NewTestServer()") } } - ts.state = stateRunning + ts.serverState = stateRunning ts.mu.Unlock() for i := 0; i < ts.serverArgs.numNodes; i++ { @@ -662,10 +663,10 @@ func (ts *testServerImpl) Stop() { ts.mu.RLock() defer ts.mu.RUnlock() - if ts.state == stateNew { + if ts.serverState == stateNew { log.Fatalf("%s: Stop() called, but Start() was never called", testserverMessagePrefix) } - if ts.state == stateFailed { + if ts.serverState == stateFailed { log.Fatalf("%s: Stop() called, but process exited unexpectedly. Stdout:\n%s\nStderr:\n%s\n", testserverMessagePrefix, ts.Stdout(), @@ -673,7 +674,7 @@ func (ts *testServerImpl) Stop() { return } - if ts.state != stateStopped { + if ts.serverState != stateStopped { if p := ts.proxyProcess; p != nil { _ = p.Kill() } @@ -686,16 +687,15 @@ func (ts *testServerImpl) Stop() { log.Printf("%s: failed to close stderr: %v", testserverMessagePrefix, closeErr) } - for _, cmd := range ts.cmd { + ts.serverState = stateStopped + for _, node := range ts.nodes { + cmd := node.startCmd if cmd.Process != nil { _ = cmd.Process.Kill() } - } - ts.state = stateStopped - for _, nodeState := range ts.nodeStates { - if nodeState != stateStopped { - ts.state = stateFailed + if node.state != stateStopped { + ts.serverState = stateFailed } } diff --git a/testserver/testservernode.go b/testserver/testservernode.go index 492d525..e2e4370 100644 --- a/testserver/testservernode.go +++ b/testserver/testservernode.go @@ -23,10 +23,10 @@ import ( func (ts *testServerImpl) StopNode(nodeNum int) error { ts.mu.Lock() - ts.nodeStates[nodeNum] = stateStopped + ts.nodes[nodeNum].state = stateStopped ts.mu.Unlock() ts.pgURL[nodeNum].u = nil - cmd := ts.cmd[nodeNum] + cmd := ts.nodes[nodeNum].startCmd // Kill the process. if cmd.Process != nil { @@ -38,13 +38,13 @@ func (ts *testServerImpl) StopNode(nodeNum int) error { func (ts *testServerImpl) StartNode(i int) error { ts.mu.RLock() - if ts.nodeStates[i] == stateRunning { + if ts.nodes[i].state == stateRunning { return fmt.Errorf("node %d already running", i) } ts.mu.RUnlock() - ts.cmd[i] = exec.Command(ts.cmdArgs[i][0], ts.cmdArgs[i][1:]...) + ts.nodes[i].startCmd = exec.Command(ts.nodes[i].startCmdArgs[0], ts.nodes[i].startCmdArgs[1:]...) - currCmd := ts.cmd[i] + currCmd := ts.nodes[i].startCmd currCmd.Env = []string{ "COCKROACH_MAX_OFFSET=1ns", "COCKROACH_TRUST_CLIENT_PROVIDED_SQL_REMOTE_ADDR=true", @@ -80,19 +80,19 @@ func (ts *testServerImpl) StartNode(i int) error { log.Printf("executing: %s", currCmd) err := currCmd.Start() if currCmd.Process != nil { - log.Printf("process %d started: %s", currCmd.Process.Pid, strings.Join(ts.cmdArgs[i], " ")) + log.Printf("process %d started: %s", currCmd.Process.Pid, strings.Join(ts.nodes[i].startCmdArgs, " ")) } if err != nil { log.Print(err.Error()) ts.mu.Lock() - ts.nodeStates[i] = stateFailed + ts.nodes[i].state = stateFailed ts.mu.Unlock() return fmt.Errorf("command %s failed: %w", currCmd, err) } ts.mu.Lock() - ts.nodeStates[i] = stateRunning + ts.nodes[i].state = stateRunning ts.mu.Unlock() capturedI := i @@ -116,6 +116,6 @@ func (ts *testServerImpl) UpgradeNode(nodeNum int) error { if err != nil { return err } - ts.cmdArgs[nodeNum][0] = ts.serverArgs.upgradeCockroachBinary + ts.nodes[nodeNum].startCmdArgs[0] = ts.serverArgs.upgradeCockroachBinary return ts.StartNode(nodeNum) }