Skip to content

Commit

Permalink
Create nodeInfo struct for separating startCmds per node and state pe…
Browse files Browse the repository at this point in the history
…r node
  • Loading branch information
RichardJCai committed Jun 21, 2022
1 parent fa1f424 commit 885cac3
Show file tree
Hide file tree
Showing 3 changed files with 101 additions and 96 deletions.
35 changes: 20 additions & 15 deletions testserver/tenant.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func (ts *testServerImpl) NewTenantServer(proxy bool) (TestServer, error) {
tenantID, err := func() (int, error) {
ts.mu.Lock()
defer ts.mu.Unlock()
if ts.nodeStates[0] != stateRunning {
if ts.nodes[0].state != stateRunning {
return 0, errors.New("TestServer must be running before NewTenantServer may be called")
}
if ts.isTenant() {
Expand Down Expand Up @@ -192,7 +192,7 @@ func (ts *testServerImpl) NewTenantServer(proxy bool) (TestServer, error) {
return nil, err
}

args := [][]string{{
args := []string{
cockroachBinary,
"mt",
"start-sql",
Expand All @@ -202,21 +202,26 @@ func (ts *testServerImpl) NewTenantServer(proxy bool) (TestServer, error) {
"--kv-addrs=" + pgURL.Host,
"--sql-addr=" + sqlAddr,
"--http-addr=:0",
}}
}

nodes := []nodeInfo{
{
state: stateNew,
startCmdArgs: args,
// TODO(asubiotto): Specify listeningURLFile once we support dynamic
// ports.
listeningURLFile: "",
},
}

tenant := &testServerImpl{
serverArgs: ts.serverArgs,
version: ts.version,
state: stateNew,
nodeStates: []int{stateNew},
baseDir: ts.baseDir,
cmdArgs: args,
cmd: make([]*exec.Cmd, ts.serverArgs.numNodes),
stdout: filepath.Join(ts.baseDir, logsDirName, fmt.Sprintf("cockroach.tenant.%d.stdout", tenantID)),
stderr: filepath.Join(ts.baseDir, logsDirName, fmt.Sprintf("cockroach.tenant.%d.stderr", tenantID)),
// TODO(asubiotto): Specify listeningURLFile once we support dynamic
// ports.
listeningURLFile: []string{""},
serverArgs: ts.serverArgs,
version: ts.version,
serverState: stateNew,
baseDir: ts.baseDir,
stdout: filepath.Join(ts.baseDir, logsDirName, fmt.Sprintf("cockroach.tenant.%d.stdout", tenantID)),
stderr: filepath.Join(ts.baseDir, logsDirName, fmt.Sprintf("cockroach.tenant.%d.stderr", tenantID)),
nodes: nodes,
}

// Start the tenant.
Expand Down
144 changes: 72 additions & 72 deletions testserver/testserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,24 +117,29 @@ type pgURLChan struct {
orig url.URL
}

// nodeInfo contains the info to start a node and the state of the node.
type nodeInfo struct {
startCmd *exec.Cmd
startCmdArgs []string
listeningURLFile string
state int
}

// testServerImpl is a TestServer implementation.
type testServerImpl struct {
mu sync.RWMutex
version *version.Version
serverArgs testServerArgs
state int
nodeStates []int
baseDir string
pgURL []pgURLChan
cmd []*exec.Cmd
cmdArgs [][]string
initCmd *exec.Cmd
initCmdArgs []string
stdout string
stderr string
stdoutBuf logWriter
stderrBuf logWriter
listeningURLFile []string
mu sync.RWMutex
version *version.Version
serverArgs testServerArgs
serverState int
baseDir string
pgURL []pgURLChan
initCmd *exec.Cmd
initCmdArgs []string
stdout string
stderr string
stdoutBuf logWriter
stderrBuf logWriter
nodes []nodeInfo

// curTenantID is used to allocate tenant IDs. Refer to NewTenantServer for
// more information.
Expand Down Expand Up @@ -376,11 +381,6 @@ func NewTestServer(opts ...TestServerOpt) (TestServer, error) {
return nil, fmt.Errorf("%s: %w", testserverMessagePrefix, err)
}

listeningURLFile := make([]string, serverArgs.numNodes)
for i := 0; i < serverArgs.numNodes; i++ {
listeningURLFile[i] = filepath.Join(baseDir, fmt.Sprintf("listen-url%d", i))
}

secureOpt := "--insecure"
if serverArgs.secure {
// Create certificates.
Expand Down Expand Up @@ -436,40 +436,44 @@ func NewTestServer(opts ...TestServerOpt) (TestServer, error) {
storeArg = fmt.Sprintf("--store=type=mem,size=%.2f", serverArgs.storeMemSize)
}

args := make([][]string, serverArgs.numNodes)
nodes := make([]nodeInfo, serverArgs.numNodes)
var initArgs []string
if serverArgs.numNodes <= 1 {
args[0] = []string{
serverArgs.cockroachBinary,
startCmd,
"--logtostderr",
secureOpt,
"--host=localhost",
"--port=0",
"--http-port=" + strconv.Itoa(serverArgs.httpPort),
storeArg,
"--listening-url-file=" + listeningURLFile[0],
}
} else {
for i := 0; i < serverArgs.numNodes; i++ {
args[i] = []string{
for i := 0; i < serverArgs.numNodes; i++ {
nodes[i].state = stateNew
nodes[i].listeningURLFile = filepath.Join(baseDir, fmt.Sprintf("listen-url%d", i))
if serverArgs.numNodes > 1 {
nodes[i].startCmdArgs = []string{
serverArgs.cockroachBinary,
startCmd,
secureOpt,
storeArg + strconv.Itoa(i),
fmt.Sprintf("--listen-addr=localhost:%d", 26257+i),
fmt.Sprintf("--http-addr=localhost:%d", 8080+i),
"--listening-url-file=" + listeningURLFile[i],
"--listening-url-file=" + nodes[i].listeningURLFile,
fmt.Sprintf("--join=localhost:%d,localhost:%d,localhost:%d", 26257, 26258, 26259),
}
} else {
nodes[0].startCmdArgs = []string{
serverArgs.cockroachBinary,
startCmd,
"--logtostderr",
secureOpt,
"--host=localhost",
"--port=0",
"--http-port=" + strconv.Itoa(serverArgs.httpPort),
storeArg,
"--listening-url-file=" + nodes[i].listeningURLFile,
}
}
}

initArgs = []string{
serverArgs.cockroachBinary,
"init",
secureOpt,
"--host=localhost:26259",
}
// We only need initArgs if we're creating a testserver
// with multiple nodes.
initArgs = []string{
serverArgs.cockroachBinary,
"init",
secureOpt,
"--host=localhost:26259",
}

states := make([]int, serverArgs.numNodes)
Expand All @@ -478,18 +482,15 @@ func NewTestServer(opts ...TestServerOpt) (TestServer, error) {
}

ts := &testServerImpl{
serverArgs: *serverArgs,
version: v,
state: stateNew,
nodeStates: states,
baseDir: baseDir,
cmdArgs: args,
cmd: make([]*exec.Cmd, serverArgs.numNodes),
initCmdArgs: initArgs,
stdout: filepath.Join(logDir, "cockroach.stdout"),
stderr: filepath.Join(logDir, "cockroach.stderr"),
listeningURLFile: listeningURLFile,
curTenantID: firstTenantID,
serverArgs: *serverArgs,
version: v,
serverState: stateNew,
baseDir: baseDir,
initCmdArgs: initArgs,
stdout: filepath.Join(logDir, "cockroach.stdout"),
stderr: filepath.Join(logDir, "cockroach.stderr"),
curTenantID: firstTenantID,
nodes: nodes,
}
ts.pgURL = make([]pgURLChan, serverArgs.numNodes)

Expand Down Expand Up @@ -546,7 +547,7 @@ func (ts *testServerImpl) setPGURLForNode(nodeNum int, u *url.URL) {
close(ts.pgURL[nodeNum].set)
}

func (ts *testServerImpl) WaitForNode(nodeNum int) error {
func (ts *testServerImpl) WaitForInitFinishForNode(nodeNum int) error {
db, err := sql.Open("postgres", ts.PGURLForNode(nodeNum).String())
defer func() {
_ = db.Close()
Expand All @@ -558,7 +559,7 @@ func (ts *testServerImpl) WaitForNode(nodeNum int) error {
if _, err = db.Query("SHOW DATABASES"); err == nil {
return err
}
log.Printf("%s: WaitForNode %d: Trying again after error: %v", testserverMessagePrefix, nodeNum, err)
log.Printf("%s: WaitForInitFinishForNode %d: Trying again after error: %v", testserverMessagePrefix, nodeNum, err)
time.Sleep(time.Millisecond * 100)
}
return nil
Expand All @@ -573,14 +574,14 @@ func (ts *testServerImpl) pollListeningURLFile(nodeNum int) error {
var data []byte
for {
ts.mu.RLock()
state := ts.nodeStates[nodeNum]
state := ts.nodes[nodeNum].state
ts.mu.RUnlock()
if state != stateRunning {
return fmt.Errorf("server stopped or crashed before listening URL file was available")
}

var err error
data, err = ioutil.ReadFile(ts.listeningURLFile[nodeNum])
data, err = ioutil.ReadFile(ts.nodes[nodeNum].listeningURLFile)
if err == nil {
break
} else if !os.IsNotExist(err) {
Expand Down Expand Up @@ -624,9 +625,9 @@ func (ts *testServerImpl) pollListeningURLFile(nodeNum int) error {
// to restart a testserver, but use NewTestServer().
func (ts *testServerImpl) Start() error {
ts.mu.Lock()
if ts.state != stateNew {
if ts.serverState != stateNew {
ts.mu.Unlock()
switch ts.state {
switch ts.serverState {
case stateRunning:
return nil // No-op if server is already running.
case stateStopped, stateFailed:
Expand All @@ -636,7 +637,7 @@ func (ts *testServerImpl) Start() error {
"Please use NewTestServer()")
}
}
ts.state = stateRunning
ts.serverState = stateRunning
ts.mu.Unlock()

for i := 0; i < ts.serverArgs.numNodes; i++ {
Expand All @@ -662,18 +663,18 @@ func (ts *testServerImpl) Stop() {
ts.mu.RLock()
defer ts.mu.RUnlock()

if ts.state == stateNew {
if ts.serverState == stateNew {
log.Fatalf("%s: Stop() called, but Start() was never called", testserverMessagePrefix)
}
if ts.state == stateFailed {
if ts.serverState == stateFailed {
log.Fatalf("%s: Stop() called, but process exited unexpectedly. Stdout:\n%s\nStderr:\n%s\n",
testserverMessagePrefix,
ts.Stdout(),
ts.Stderr())
return
}

if ts.state != stateStopped {
if ts.serverState != stateStopped {
if p := ts.proxyProcess; p != nil {
_ = p.Kill()
}
Expand All @@ -686,16 +687,15 @@ func (ts *testServerImpl) Stop() {
log.Printf("%s: failed to close stderr: %v", testserverMessagePrefix, closeErr)
}

for _, cmd := range ts.cmd {
ts.serverState = stateStopped
for _, node := range ts.nodes {
cmd := node.startCmd
if cmd.Process != nil {
_ = cmd.Process.Kill()
}
}

ts.state = stateStopped
for _, nodeState := range ts.nodeStates {
if nodeState != stateStopped {
ts.state = stateFailed
if node.state != stateStopped {
ts.serverState = stateFailed
}
}

Expand Down
18 changes: 9 additions & 9 deletions testserver/testservernode.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,10 @@ import (

func (ts *testServerImpl) StopNode(nodeNum int) error {
ts.mu.Lock()
ts.nodeStates[nodeNum] = stateStopped
ts.nodes[nodeNum].state = stateStopped
ts.mu.Unlock()
ts.pgURL[nodeNum].u = nil
cmd := ts.cmd[nodeNum]
cmd := ts.nodes[nodeNum].startCmd

// Kill the process.
if cmd.Process != nil {
Expand All @@ -38,13 +38,13 @@ func (ts *testServerImpl) StopNode(nodeNum int) error {

func (ts *testServerImpl) StartNode(i int) error {
ts.mu.RLock()
if ts.nodeStates[i] == stateRunning {
if ts.nodes[i].state == stateRunning {
return fmt.Errorf("node %d already running", i)
}
ts.mu.RUnlock()
ts.cmd[i] = exec.Command(ts.cmdArgs[i][0], ts.cmdArgs[i][1:]...)
ts.nodes[i].startCmd = exec.Command(ts.nodes[i].startCmdArgs[0], ts.nodes[i].startCmdArgs[1:]...)

currCmd := ts.cmd[i]
currCmd := ts.nodes[i].startCmd
currCmd.Env = []string{
"COCKROACH_MAX_OFFSET=1ns",
"COCKROACH_TRUST_CLIENT_PROVIDED_SQL_REMOTE_ADDR=true",
Expand Down Expand Up @@ -80,19 +80,19 @@ func (ts *testServerImpl) StartNode(i int) error {
log.Printf("executing: %s", currCmd)
err := currCmd.Start()
if currCmd.Process != nil {
log.Printf("process %d started: %s", currCmd.Process.Pid, strings.Join(ts.cmdArgs[i], " "))
log.Printf("process %d started: %s", currCmd.Process.Pid, strings.Join(ts.nodes[i].startCmdArgs, " "))
}
if err != nil {
log.Print(err.Error())
ts.mu.Lock()
ts.nodeStates[i] = stateFailed
ts.nodes[i].state = stateFailed
ts.mu.Unlock()

return fmt.Errorf("command %s failed: %w", currCmd, err)
}

ts.mu.Lock()
ts.nodeStates[i] = stateRunning
ts.nodes[i].state = stateRunning
ts.mu.Unlock()

capturedI := i
Expand All @@ -116,6 +116,6 @@ func (ts *testServerImpl) UpgradeNode(nodeNum int) error {
if err != nil {
return err
}
ts.cmdArgs[nodeNum][0] = ts.serverArgs.upgradeCockroachBinary
ts.nodes[nodeNum].startCmdArgs[0] = ts.serverArgs.upgradeCockroachBinary
return ts.StartNode(nodeNum)
}

0 comments on commit 885cac3

Please sign in to comment.