Skip to content

Commit

Permalink
Support testserver with multiple nodes.
Browse files Browse the repository at this point in the history
Right now this is only supported in insecure mode.
Start a three node cluster using the ThreeNode TestsServerOpt.
  • Loading branch information
RichardJCai committed Jun 17, 2022
1 parent 1740cc9 commit debae55
Show file tree
Hide file tree
Showing 3 changed files with 171 additions and 88 deletions.
10 changes: 6 additions & 4 deletions testserver/tenant.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,15 @@ import (
"database/sql"
"errors"
"fmt"
"github.com/cockroachdb/cockroach-go/v2/testserver/version"
"log"
"net"
"net/url"
"os/exec"
"path/filepath"
"strconv"
"strings"

"github.com/cockroachdb/cockroach-go/v2/testserver/version"
)

func (ts *testServerImpl) isTenant() bool {
Expand Down Expand Up @@ -53,7 +54,7 @@ func (ts *testServerImpl) NewTenantServer(proxy bool) (TestServer, error) {
if proxy && !ts.serverArgs.secure {
return nil, fmt.Errorf("%s: proxy cannot be used with insecure mode", tenantserverMessagePrefix)
}
cockroachBinary := ts.cmdArgs[0]
cockroachBinary := ts.serverArgs.cockroachBinary
tenantID, err := func() (int, error) {
ts.mu.Lock()
defer ts.mu.Unlock()
Expand Down Expand Up @@ -191,7 +192,7 @@ func (ts *testServerImpl) NewTenantServer(proxy bool) (TestServer, error) {
return nil, err
}

args := []string{
args := [][]string{{
cockroachBinary,
"mt",
"start-sql",
Expand All @@ -201,14 +202,15 @@ func (ts *testServerImpl) NewTenantServer(proxy bool) (TestServer, error) {
"--kv-addrs=" + pgURL.Host,
"--sql-addr=" + sqlAddr,
"--http-addr=:0",
}
}}

tenant := &testServerImpl{
serverArgs: ts.serverArgs,
version: ts.version,
state: stateNew,
baseDir: ts.baseDir,
cmdArgs: args,
cmd: make([]*exec.Cmd, ts.serverArgs.numNodes),
stdout: filepath.Join(ts.baseDir, logsDirName, fmt.Sprintf("cockroach.tenant.%d.stdout", tenantID)),
stderr: filepath.Join(ts.baseDir, logsDirName, fmt.Sprintf("cockroach.tenant.%d.stderr", tenantID)),
// TODO(asubiotto): Specify listeningURLFile once we support dynamic
Expand Down
238 changes: 154 additions & 84 deletions testserver/testserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,8 +118,10 @@ type testServerImpl struct {
// no password otherwise).
orig url.URL
}
cmd *exec.Cmd
cmdArgs []string
cmd []*exec.Cmd
cmdArgs [][]string
initCmd *exec.Cmd
initCmdArgs []string
stdout string
stderr string
stdoutBuf logWriter
Expand Down Expand Up @@ -197,6 +199,7 @@ type testServerArgs struct {
testConfig TestConfig
nonStableDB bool
cockroachBinary string // path to cockroach executable file
numNodes int
}

// CockroachBinaryPathOpt is a TestServer option that can be passed to
Expand Down Expand Up @@ -273,6 +276,12 @@ func StopDownloadInMiddleOpt() TestServerOpt {
}
}

func ThreeNode() TestServerOpt {
return func(args *testServerArgs) {
args.numNodes = 3
}
}

const (
logsDirName = "logs"
certsDirName = "certs"
Expand All @@ -287,7 +296,7 @@ var errStoppedInMiddle = errors.New("download stopped in middle")
// If the download fails, we attempt just call "cockroach", hoping it is
// found in your path.
func NewTestServer(opts ...TestServerOpt) (TestServer, error) {
serverArgs := &testServerArgs{}
serverArgs := &testServerArgs{numNodes: 1}
serverArgs.storeMemSize = defaultStoreMemSize
for _, applyOptToArgs := range opts {
applyOptToArgs(serverArgs)
Expand Down Expand Up @@ -396,8 +405,9 @@ func NewTestServer(opts ...TestServerOpt) (TestServer, error) {
if err != nil {
return nil, fmt.Errorf("%s failed to parse version: %w", testserverMessagePrefix, err)
}

startCmd := "start-single-node"
if !v.AtLeast(version.MustParse("v19.2.0-alpha")) {
if !v.AtLeast(version.MustParse("v19.2.0-alpha")) || serverArgs.numNodes > 1 {
startCmd = "start"
}

Expand All @@ -408,16 +418,40 @@ func NewTestServer(opts ...TestServerOpt) (TestServer, error) {
storeArg = fmt.Sprintf("--store=type=mem,size=%.2f", serverArgs.storeMemSize)
}

args := []string{
serverArgs.cockroachBinary,
startCmd,
"--logtostderr",
secureOpt,
"--host=localhost",
"--port=0",
"--http-port=" + strconv.Itoa(serverArgs.httpPort),
storeArg,
"--listening-url-file=" + listeningURLFile,
args := make([][]string, serverArgs.numNodes)
var initArgs []string
if serverArgs.numNodes <= 1 {
args[0] = []string{
serverArgs.cockroachBinary,
startCmd,
"--logtostderr",
secureOpt,
"--host=localhost",
"--port=0",
"--http-port=" + strconv.Itoa(serverArgs.httpPort),
storeArg,
"--listening-url-file=" + listeningURLFile,
}
} else {
for i := 0; i < serverArgs.numNodes; i++ {
args[i] = []string{
serverArgs.cockroachBinary,
startCmd,
secureOpt,
storeArg + strconv.Itoa(i),
fmt.Sprintf("--listen-addr=localhost:%d", 26257+i),
fmt.Sprintf("--http-addr=localhost:%d", 8080+i),
fmt.Sprintf("--join=localhost:26257,localhost:26258,localhost:26259"),
"--listening-url-file=" + listeningURLFile,
}

}
initArgs = []string{
serverArgs.cockroachBinary,
"init",
secureOpt,
"--host=localhost:26257",
}
}

ts := &testServerImpl{
Expand All @@ -426,6 +460,8 @@ func NewTestServer(opts ...TestServerOpt) (TestServer, error) {
state: stateNew,
baseDir: baseDir,
cmdArgs: args,
cmd: make([]*exec.Cmd, serverArgs.numNodes),
initCmdArgs: initArgs,
stdout: filepath.Join(logDir, "cockroach.stdout"),
stderr: filepath.Join(logDir, "cockroach.stderr"),
listeningURLFile: listeningURLFile,
Expand Down Expand Up @@ -565,88 +601,99 @@ func (ts *testServerImpl) Start() error {
ts.state = stateRunning
ts.mu.Unlock()

ts.cmd = exec.Command(ts.cmdArgs[0], ts.cmdArgs[1:]...)
ts.cmd.Env = []string{
"COCKROACH_MAX_OFFSET=1ns",
"COCKROACH_TRUST_CLIENT_PROVIDED_SQL_REMOTE_ADDR=true",
}

// Set the working directory of the cockroach process to our temp folder.
// This stops cockroach from polluting the project directory with _dump
// folders.
ts.cmd.Dir = ts.baseDir

if len(ts.stdout) > 0 {
wr, err := newFileLogWriter(ts.stdout)
if err != nil {
return fmt.Errorf("unable to open file %s: %w", ts.stdout, err)
}
ts.stdoutBuf = wr
}
ts.cmd.Stdout = ts.stdoutBuf
for i := 0; i < ts.serverArgs.numNodes; i++ {
ts.cmd[i] = exec.Command(ts.cmdArgs[i][0], ts.cmdArgs[i][1:]...)

if len(ts.stderr) > 0 {
wr, err := newFileLogWriter(ts.stderr)
if err != nil {
return fmt.Errorf("unable to open file %s: %w", ts.stderr, err)
currCmd := ts.cmd[i]
currCmd.Env = []string{
"COCKROACH_MAX_OFFSET=1ns",
"COCKROACH_TRUST_CLIENT_PROVIDED_SQL_REMOTE_ADDR=true",
}
ts.stderrBuf = wr
}
ts.cmd.Stderr = ts.stderrBuf

for k, v := range defaultEnv() {
ts.cmd.Env = append(ts.cmd.Env, k+"="+v)
}
// Set the working directory of the cockroach process to our temp folder.
// This stops cockroach from polluting the project directory with _dump
// folders.
currCmd.Dir = ts.baseDir

log.Printf("executing: %s", ts.cmd)
err := ts.cmd.Start()
if ts.cmd.Process != nil {
log.Printf("process %d started: %s", ts.cmd.Process.Pid, strings.Join(ts.cmdArgs, " "))
}
if err != nil {
log.Print(err.Error())
if err := ts.stdoutBuf.Close(); err != nil {
log.Printf("%s: failed to close stdout: %v", testserverMessagePrefix, err)
if len(ts.stdout) > 0 {
wr, err := newFileLogWriter(ts.stdout)
if err != nil {
return fmt.Errorf("unable to open file %s: %w", ts.stdout, err)
}
ts.stdoutBuf = wr
}
if err := ts.stderrBuf.Close(); err != nil {
log.Printf("%s: failed to close stderr: %v", testserverMessagePrefix, err)
currCmd.Stdout = ts.stdoutBuf

if len(ts.stderr) > 0 {
wr, err := newFileLogWriter(ts.stderr)
if err != nil {
return fmt.Errorf("unable to open file %s: %w", ts.stderr, err)
}
ts.stderrBuf = wr
}
currCmd.Stderr = ts.stderrBuf

ts.mu.Lock()
ts.state = stateFailed
ts.mu.Unlock()
for k, v := range defaultEnv() {
currCmd.Env = append(currCmd.Env, k+"="+v)
}

return fmt.Errorf("command %s failed: %w", ts.cmd, err)
}
log.Printf("executing: %s", currCmd)
err := currCmd.Start()
if currCmd.Process != nil {
log.Printf("process %d started: %s", currCmd.Process.Pid, strings.Join(ts.cmdArgs[i], " "))
}
if err != nil {
log.Print(err.Error())
if err := ts.stdoutBuf.Close(); err != nil {
log.Printf("%s: failed to close stdout: %v", testserverMessagePrefix, err)
}
if err := ts.stderrBuf.Close(); err != nil {
log.Printf("%s: failed to close stderr: %v", testserverMessagePrefix, err)
}

go func() {
err := ts.cmd.Wait()
ts.mu.Lock()
ts.state = stateFailed
ts.mu.Unlock()

if closeErr := ts.stdoutBuf.Close(); closeErr != nil {
log.Printf("%s: failed to close stdout: %v", testserverMessagePrefix, closeErr)
}
if closeErr := ts.stderrBuf.Close(); closeErr != nil {
log.Printf("%s: failed to close stderr: %v", testserverMessagePrefix, closeErr)
return fmt.Errorf("command %s failed: %w", currCmd, err)
}

ps := ts.cmd.ProcessState
sy := ps.Sys().(syscall.WaitStatus)
go func() {
err := currCmd.Wait()

log.Printf("%s: command %s exited with status %d: %v",
testserverMessagePrefix,
ts.cmd,
sy.ExitStatus(),
err)
log.Printf("%s process state: %s", testserverMessagePrefix, ps.String())
if closeErr := ts.stdoutBuf.Close(); closeErr != nil {
log.Printf("%s: failed to close stdout: %v", testserverMessagePrefix, closeErr)
}
if closeErr := ts.stderrBuf.Close(); closeErr != nil {
log.Printf("%s: failed to close stderr: %v", testserverMessagePrefix, closeErr)
}

ts.mu.Lock()
if sy.ExitStatus() == 0 {
ts.state = stateStopped
} else {
ts.state = stateFailed
ps := currCmd.ProcessState
sy := ps.Sys().(syscall.WaitStatus)

log.Printf("%s: command %s exited with status %d: %v",
testserverMessagePrefix,
currCmd,
sy.ExitStatus(),
err)
log.Printf("%s process state: %s", testserverMessagePrefix, ps.String())

ts.mu.Lock()
if sy.ExitStatus() == 0 {
ts.state = stateStopped
} else {
ts.state = stateFailed
}
ts.mu.Unlock()
}()
}

if ts.serverArgs.numNodes > 1 {
err := ts.CockroachInit()
if err != nil {
return err
}
ts.mu.Unlock()
}()
}

if ts.pgURL.u == nil {
go func() {
Expand Down Expand Up @@ -680,9 +727,10 @@ func (ts *testServerImpl) Stop() {
}

if ts.state != stateStopped {
// Only call kill if not running. It could have exited properly.
_ = ts.cmd.Process.Kill()

for _, cmd := range ts.cmd {
// Only call kill if not running. It could have exited properly.
_ = cmd.Process.Kill()
}
if p := ts.proxyProcess; p != nil {
_ = p.Kill()
}
Expand All @@ -692,6 +740,28 @@ func (ts *testServerImpl) Stop() {
_ = os.RemoveAll(ts.baseDir)
}

func (ts *testServerImpl) CockroachInit() error {
ts.initCmd = exec.Command(ts.initCmdArgs[0], ts.initCmdArgs[1:]...)
ts.initCmd.Env = []string{
"COCKROACH_MAX_OFFSET=1ns",
"COCKROACH_TRUST_CLIENT_PROVIDED_SQL_REMOTE_ADDR=true",
}

// Set the working directory of the cockroach process to our temp folder.
// This stops cockroach from polluting the project directory with _dump
// folders.
ts.initCmd.Dir = ts.baseDir

err := ts.initCmd.Start()
if ts.initCmd.Process != nil {
log.Printf("process %d started: %s", ts.initCmd.Process.Pid, strings.Join(ts.initCmdArgs, " "))
}
if err != nil {
return err
}
return nil
}

type logWriter interface {
Write(p []byte) (n int, err error)
String() string
Expand Down
Loading

0 comments on commit debae55

Please sign in to comment.