Skip to content

Commit

Permalink
Create nodeInfo struct for separating startCmds per node and state pe…
Browse files Browse the repository at this point in the history
…r node
  • Loading branch information
RichardJCai committed Jun 21, 2022
1 parent 97a8658 commit bc0e582
Show file tree
Hide file tree
Showing 4 changed files with 124 additions and 112 deletions.
35 changes: 20 additions & 15 deletions testserver/tenant.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func (ts *testServerImpl) NewTenantServer(proxy bool) (TestServer, error) {
tenantID, err := func() (int, error) {
ts.mu.Lock()
defer ts.mu.Unlock()
if ts.nodeStates[0] != stateRunning {
if ts.nodes[0].state != stateRunning {
return 0, errors.New("TestServer must be running before NewTenantServer may be called")
}
if ts.isTenant() {
Expand Down Expand Up @@ -192,7 +192,7 @@ func (ts *testServerImpl) NewTenantServer(proxy bool) (TestServer, error) {
return nil, err
}

args := [][]string{{
args := []string{
cockroachBinary,
"mt",
"start-sql",
Expand All @@ -202,21 +202,26 @@ func (ts *testServerImpl) NewTenantServer(proxy bool) (TestServer, error) {
"--kv-addrs=" + pgURL.Host,
"--sql-addr=" + sqlAddr,
"--http-addr=:0",
}}
}

nodes := []nodeInfo{
{
state: stateNew,
startCmdArgs: args,
// TODO(asubiotto): Specify listeningURLFile once we support dynamic
// ports.
listeningURLFile: "",
},
}

tenant := &testServerImpl{
serverArgs: ts.serverArgs,
version: ts.version,
state: stateNew,
nodeStates: []int{stateNew},
baseDir: ts.baseDir,
cmdArgs: args,
cmd: make([]*exec.Cmd, ts.serverArgs.numNodes),
stdout: filepath.Join(ts.baseDir, logsDirName, fmt.Sprintf("cockroach.tenant.%d.stdout", tenantID)),
stderr: filepath.Join(ts.baseDir, logsDirName, fmt.Sprintf("cockroach.tenant.%d.stderr", tenantID)),
// TODO(asubiotto): Specify listeningURLFile once we support dynamic
// ports.
listeningURLFile: []string{""},
serverArgs: ts.serverArgs,
version: ts.version,
serverState: stateNew,
baseDir: ts.baseDir,
stdout: filepath.Join(ts.baseDir, logsDirName, fmt.Sprintf("cockroach.tenant.%d.stdout", tenantID)),
stderr: filepath.Join(ts.baseDir, logsDirName, fmt.Sprintf("cockroach.tenant.%d.stderr", tenantID)),
nodes: nodes,
}

// Start the tenant.
Expand Down
159 changes: 83 additions & 76 deletions testserver/testserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,10 +101,17 @@ type TestServer interface {
// BaseDir returns directory StoreOnDiskOpt writes to if used.
BaseDir() string

WaitForNode(numNode int) error
// WaitForInitFinishForNode waits until a node has completed
// initialization and is available to connect to and query on.
WaitForInitFinishForNode(numNode int) error
// StartNode runs the "cockroach start" command for the node.
StartNode(i int) error
// StopNode kills the node's process.
StopNode(i int) error
// UpgradeNode stops the node, then starts the node on the with the
// binary specified at "upgradeBinaryPath".
UpgradeNode(i int) error
// PGURLForNode returns the PGUrl for the node.
PGURLForNode(nodeNum int) *url.URL
}

Expand All @@ -117,24 +124,29 @@ type pgURLChan struct {
orig url.URL
}

// nodeInfo contains the info to start a node and the state of the node.
type nodeInfo struct {
startCmd *exec.Cmd
startCmdArgs []string
listeningURLFile string
state int
}

// testServerImpl is a TestServer implementation.
type testServerImpl struct {
mu sync.RWMutex
version *version.Version
serverArgs testServerArgs
state int
nodeStates []int
baseDir string
pgURL []pgURLChan
cmd []*exec.Cmd
cmdArgs [][]string
initCmd *exec.Cmd
initCmdArgs []string
stdout string
stderr string
stdoutBuf logWriter
stderrBuf logWriter
listeningURLFile []string
mu sync.RWMutex
version *version.Version
serverArgs testServerArgs
serverState int
baseDir string
pgURL []pgURLChan
initCmd *exec.Cmd
initCmdArgs []string
stdout string
stderr string
stdoutBuf logWriter
stderrBuf logWriter
nodes []nodeInfo

// curTenantID is used to allocate tenant IDs. Refer to NewTenantServer for
// more information.
Expand All @@ -158,7 +170,7 @@ func NewDBForTest(t *testing.T, opts ...TestServerOpt) (*sql.DB, func()) {
// it. Returns a sql *DB instance a shutdown function. The caller is
// responsible for executing the returned shutdown function on exit.
func NewDBForTestWithDatabase(
t *testing.T, database string, opts ...TestServerOpt,
t *testing.T, database string, opts ...TestServerOpt,
) (*sql.DB, func()) {
t.Helper()
ts, err := NewTestServer(opts...)
Expand Down Expand Up @@ -376,11 +388,6 @@ func NewTestServer(opts ...TestServerOpt) (TestServer, error) {
return nil, fmt.Errorf("%s: %w", testserverMessagePrefix, err)
}

listeningURLFile := make([]string, serverArgs.numNodes)
for i := 0; i < serverArgs.numNodes; i++ {
listeningURLFile[i] = filepath.Join(baseDir, fmt.Sprintf("listen-url%d", i))
}

secureOpt := "--insecure"
if serverArgs.secure {
// Create certificates.
Expand Down Expand Up @@ -436,40 +443,44 @@ func NewTestServer(opts ...TestServerOpt) (TestServer, error) {
storeArg = fmt.Sprintf("--store=type=mem,size=%.2f", serverArgs.storeMemSize)
}

args := make([][]string, serverArgs.numNodes)
nodes := make([]nodeInfo, serverArgs.numNodes)
var initArgs []string
if serverArgs.numNodes <= 1 {
args[0] = []string{
serverArgs.cockroachBinary,
startCmd,
"--logtostderr",
secureOpt,
"--host=localhost",
"--port=0",
"--http-port=" + strconv.Itoa(serverArgs.httpPort),
storeArg,
"--listening-url-file=" + listeningURLFile[0],
}
} else {
for i := 0; i < serverArgs.numNodes; i++ {
args[i] = []string{
for i := 0; i < serverArgs.numNodes; i++ {
nodes[i].state = stateNew
nodes[i].listeningURLFile = filepath.Join(baseDir, fmt.Sprintf("listen-url%d", i))
if serverArgs.numNodes > 1 {
nodes[i].startCmdArgs = []string{
serverArgs.cockroachBinary,
startCmd,
secureOpt,
storeArg + strconv.Itoa(i),
fmt.Sprintf("--listen-addr=localhost:%d", 26257+i),
fmt.Sprintf("--http-addr=localhost:%d", 8080+i),
"--listening-url-file=" + listeningURLFile[i],
"--listening-url-file=" + nodes[i].listeningURLFile,
fmt.Sprintf("--join=localhost:%d,localhost:%d,localhost:%d", 26257, 26258, 26259),
}
} else {
nodes[0].startCmdArgs = []string{
serverArgs.cockroachBinary,
startCmd,
"--logtostderr",
secureOpt,
"--host=localhost",
"--port=0",
"--http-port=" + strconv.Itoa(serverArgs.httpPort),
storeArg,
"--listening-url-file=" + nodes[i].listeningURLFile,
}
}
}

initArgs = []string{
serverArgs.cockroachBinary,
"init",
secureOpt,
"--host=localhost:26259",
}
// We only need initArgs if we're creating a testserver
// with multiple nodes.
initArgs = []string{
serverArgs.cockroachBinary,
"init",
secureOpt,
"--host=localhost:26259",
}

states := make([]int, serverArgs.numNodes)
Expand All @@ -478,18 +489,15 @@ func NewTestServer(opts ...TestServerOpt) (TestServer, error) {
}

ts := &testServerImpl{
serverArgs: *serverArgs,
version: v,
state: stateNew,
nodeStates: states,
baseDir: baseDir,
cmdArgs: args,
cmd: make([]*exec.Cmd, serverArgs.numNodes),
initCmdArgs: initArgs,
stdout: filepath.Join(logDir, "cockroach.stdout"),
stderr: filepath.Join(logDir, "cockroach.stderr"),
listeningURLFile: listeningURLFile,
curTenantID: firstTenantID,
serverArgs: *serverArgs,
version: v,
serverState: stateNew,
baseDir: baseDir,
initCmdArgs: initArgs,
stdout: filepath.Join(logDir, "cockroach.stdout"),
stderr: filepath.Join(logDir, "cockroach.stderr"),
curTenantID: firstTenantID,
nodes: nodes,
}
ts.pgURL = make([]pgURLChan, serverArgs.numNodes)

Expand Down Expand Up @@ -546,7 +554,7 @@ func (ts *testServerImpl) setPGURLForNode(nodeNum int, u *url.URL) {
close(ts.pgURL[nodeNum].set)
}

func (ts *testServerImpl) WaitForNode(nodeNum int) error {
func (ts *testServerImpl) WaitForInitFinishForNode(nodeNum int) error {
db, err := sql.Open("postgres", ts.PGURLForNode(nodeNum).String())
defer func() {
_ = db.Close()
Expand All @@ -558,29 +566,29 @@ func (ts *testServerImpl) WaitForNode(nodeNum int) error {
if _, err = db.Query("SHOW DATABASES"); err == nil {
return err
}
log.Printf("%s: WaitForNode %d: Trying again after error: %v", testserverMessagePrefix, nodeNum, err)
log.Printf("%s: WaitForInitFinishForNode %d: Trying again after error: %v", testserverMessagePrefix, nodeNum, err)
time.Sleep(time.Millisecond * 100)
}
return nil
}

// WaitForInit retries until a connection is successfully established.
func (ts *testServerImpl) WaitForInit() error {
return ts.WaitForNode(0)
return ts.WaitForInitFinishForNode(0)
}

func (ts *testServerImpl) pollListeningURLFile(nodeNum int) error {
var data []byte
for {
ts.mu.RLock()
state := ts.nodeStates[nodeNum]
state := ts.nodes[nodeNum].state
ts.mu.RUnlock()
if state != stateRunning {
return fmt.Errorf("server stopped or crashed before listening URL file was available")
}

var err error
data, err = ioutil.ReadFile(ts.listeningURLFile[nodeNum])
data, err = ioutil.ReadFile(ts.nodes[nodeNum].listeningURLFile)
if err == nil {
break
} else if !os.IsNotExist(err) {
Expand Down Expand Up @@ -624,19 +632,19 @@ func (ts *testServerImpl) pollListeningURLFile(nodeNum int) error {
// to restart a testserver, but use NewTestServer().
func (ts *testServerImpl) Start() error {
ts.mu.Lock()
if ts.state != stateNew {
if ts.serverState != stateNew {
ts.mu.Unlock()
switch ts.state {
switch ts.serverState {
case stateRunning:
return nil // No-op if server is already running.
case stateStopped, stateFailed:
// Start() can only be called once.
return errors.New(
"`Start()` cannot be used to restart a stopped or failed server. " +
"Please use NewTestServer()")
"Please use NewTestServer()")
}
}
ts.state = stateRunning
ts.serverState = stateRunning
ts.mu.Unlock()

for i := 0; i < ts.serverArgs.numNodes; i++ {
Expand All @@ -662,18 +670,18 @@ func (ts *testServerImpl) Stop() {
ts.mu.RLock()
defer ts.mu.RUnlock()

if ts.state == stateNew {
if ts.serverState == stateNew {
log.Fatalf("%s: Stop() called, but Start() was never called", testserverMessagePrefix)
}
if ts.state == stateFailed {
if ts.serverState == stateFailed {
log.Fatalf("%s: Stop() called, but process exited unexpectedly. Stdout:\n%s\nStderr:\n%s\n",
testserverMessagePrefix,
ts.Stdout(),
ts.Stderr())
return
}

if ts.state != stateStopped {
if ts.serverState != stateStopped {
if p := ts.proxyProcess; p != nil {
_ = p.Kill()
}
Expand All @@ -686,16 +694,15 @@ func (ts *testServerImpl) Stop() {
log.Printf("%s: failed to close stderr: %v", testserverMessagePrefix, closeErr)
}

for _, cmd := range ts.cmd {
ts.serverState = stateStopped
for _, node := range ts.nodes {
cmd := node.startCmd
if cmd.Process != nil {
_ = cmd.Process.Kill()
}
}

ts.state = stateStopped
for _, nodeState := range ts.nodeStates {
if nodeState != stateStopped {
ts.state = stateFailed
if node.state != stateStopped {
ts.serverState = stateFailed
}
}

Expand Down
Loading

0 comments on commit bc0e582

Please sign in to comment.