diff --git a/pkg/cmd/roachtest/cluster.go b/pkg/cmd/roachtest/cluster.go index a69c1df96f81..8ecfec342ea5 100644 --- a/pkg/cmd/roachtest/cluster.go +++ b/pkg/cmd/roachtest/cluster.go @@ -2112,29 +2112,13 @@ func (c *clusterImpl) Install( return errors.Wrap(roachprod.Install(ctx, l, c.MakeNodes(nodes), software), "cluster.Install") } -var reOnlyAlphanumeric = regexp.MustCompile(`[^a-zA-Z0-9]+`) - // cmdLogFileName comes up with a log file to use for the given argument string. func cmdLogFileName(t time.Time, nodes option.NodeListOption, args ...string) string { - // Make sure we treat {"./cockroach start"} like {"./cockroach", "start"}. - args = strings.Split(strings.Join(args, " "), " ") - prefix := []string{reOnlyAlphanumeric.ReplaceAllString(args[0], "")} - for _, arg := range args[1:] { - if s := reOnlyAlphanumeric.ReplaceAllString(arg, ""); s != arg { - break - } - prefix = append(prefix, arg) - } - s := strings.Join(prefix, "_") - const maxLen = 70 - if len(s) > maxLen { - s = s[:maxLen] - } logFile := fmt.Sprintf( "run_%s_n%s_%s", t.Format(`150405.000000000`), nodes.String()[1:], - s, + install.GenFilenameFromArgs(20, args...), ) return logFile } diff --git a/pkg/cmd/roachtest/cluster_test.go b/pkg/cmd/roachtest/cluster_test.go index 3e0f0c8368ef..fb26d417e285 100644 --- a/pkg/cmd/roachtest/cluster_test.go +++ b/pkg/cmd/roachtest/cluster_test.go @@ -168,7 +168,7 @@ func TestClusterMachineType(t *testing.T) { func TestCmdLogFileName(t *testing.T) { ts := time.Date(2000, 1, 1, 15, 4, 12, 0, time.Local) - const exp = `run_150412.000000000_n1,3-4,9_cockroach_bla` + const exp = `run_150412.000000000_n1,3-4,9_cockroach-bla-foo-ba` nodes := option.NodeListOption{1, 3, 4, 9} assert.Equal(t, exp, diff --git a/pkg/cmd/roachtest/test_impl.go b/pkg/cmd/roachtest/test_impl.go index 43ac5482b1a7..a3600257fa51 100644 --- a/pkg/cmd/roachtest/test_impl.go +++ b/pkg/cmd/roachtest/test_impl.go @@ -283,52 +283,47 @@ func collectErrors(args []interface{}) []error { // ATTENTION: Since this calls panic(errTestFatal), it should only be called // from a test's closure. The test runner itself should never call this. func (t *testImpl) Fatal(args ...interface{}) { - t.addFailure("", args...) + t.addFailureAndCancel(1, "", args...) panic(errTestFatal) } // Fatalf is like Fatal, but takes a format string. func (t *testImpl) Fatalf(format string, args ...interface{}) { - t.addFailure(format, args...) + t.addFailureAndCancel(1, format, args...) panic(errTestFatal) } // FailNow implements the TestingT interface. func (t *testImpl) FailNow() { - t.addFailure("FailNow called") + t.addFailureAndCancel(1, "FailNow called") panic(errTestFatal) } // Error implements the TestingT interface func (t *testImpl) Error(args ...interface{}) { - t.addFailure("", args...) + t.addFailureAndCancel(1, "", args...) } // Errorf implements the TestingT interface. func (t *testImpl) Errorf(format string, args ...interface{}) { - t.addFailure(format, args...) + t.addFailureAndCancel(1, format, args...) } -// We take the first error from each failure which is the -// "squashed" error that contains all information of a failure -func formatFailure(b *strings.Builder, reportFailures ...failure) { - for i, failure := range reportFailures { - if i > 0 { - fmt.Fprintln(b) - } - file, line, fn, ok := errors.GetOneLineSource(failure.squashedErr) - if !ok { - file, line, fn = "", 0, "unknown" - } - fmt.Fprintf(b, "(%s:%d).%s: %v", file, line, fn, failure.squashedErr) +func (t *testImpl) addFailureAndCancel(depth int, format string, args ...interface{}) { + t.addFailure(depth+1, format, args...) + if t.mu.cancel != nil { + t.mu.cancel() } } -func (t *testImpl) addFailure(format string, args ...interface{}) { +// addFailure depth indicates how many stack frames to skip when reporting the +// site of the failure in logs. `0` will report the caller of addFailure, `1` the +// caller of the caller of addFailure, etc. +func (t *testImpl) addFailure(depth int, format string, args ...interface{}) { if format == "" { format = strings.Repeat(" %v", len(args))[1:] } - reportFailure := newFailure(errors.NewWithDepthf(1, format, args...), collectErrors(args)) + reportFailure := newFailure(errors.NewWithDepthf(depth+1, format, args...), collectErrors(args)) t.mu.Lock() defer t.mu.Unlock() @@ -339,29 +334,44 @@ func (t *testImpl) addFailure(format string, args ...interface{}) { formatFailure(&b, reportFailure) msg := b.String() - t.L().Printf("test failure #%d: %s", len(t.mu.failures), msg) + failureNum := len(t.mu.failures) + failureLog := fmt.Sprintf("failure_%d", failureNum) + t.L().Printf("test failure #%d: full stack retained in %s.log: %s", failureNum, failureLog, msg) // Also dump the verbose error (incl. all stack traces) to a log file, in case // we need it. The stacks are sometimes helpful, but we don't want them in the // main log as they are highly verbose. { - cl, err := t.L().ChildLogger( - fmt.Sprintf("failure_%d", len(t.mu.failures)), - logger.QuietStderr, logger.QuietStdout, - ) + cl, err := t.L().ChildLogger(failureLog, logger.QuietStderr, logger.QuietStdout) if err == nil { // We don't actually log through this logger since it adds an unrelated // file:line caller (namely ours). The error already has stack traces // so it's better to write only it to the file to avoid confusion. - path := cl.File.Name() + if cl.File != nil { + path := cl.File.Name() + if len(path) > 0 { + _ = os.WriteFile(path, []byte(fmt.Sprintf("%+v", reportFailure.squashedErr)), 0644) + } + } cl.Close() // we just wanted the filename - _ = os.WriteFile(path, []byte(fmt.Sprintf("%+v", reportFailure.squashedErr)), 0644) } } t.mu.output = append(t.mu.output, msg...) t.mu.output = append(t.mu.output, '\n') - if t.mu.cancel != nil { - t.mu.cancel() +} + +// We take the first error from each failure which is the +// "squashed" error that contains all information of a failure +func formatFailure(b *strings.Builder, reportFailures ...failure) { + for i, failure := range reportFailures { + if i > 0 { + fmt.Fprintln(b) + } + file, line, fn, ok := errors.GetOneLineSource(failure.squashedErr) + if !ok { + file, line, fn = "", 0, "unknown" + } + fmt.Fprintf(b, "(%s:%d).%s: %v", file, line, fn, failure.squashedErr) } } diff --git a/pkg/cmd/roachtest/test_runner.go b/pkg/cmd/roachtest/test_runner.go index 27eb3ad73fb6..dbdeefb9a81c 100644 --- a/pkg/cmd/roachtest/test_runner.go +++ b/pkg/cmd/roachtest/test_runner.go @@ -933,10 +933,9 @@ func (r *testRunner) runTest( } t.L().Printf("tearing down after %s; see teardown.log", s) case <-time.After(timeout): - // NB: we're intentionally not failing the test if it hasn't - // already. This will be done at the very end of this method, - // after we've collected artifacts. - t.L().Printf("test timed out after %s; check __stacks.log and CRDB logs for goroutine dumps", timeout) + // NB: We're adding the timeout failure intentionally without cancelling the context + // to capture as much state as possible during artifact collection. + t.addFailure(0, "test timed out (%s)", timeout) timedOut = true } @@ -1048,10 +1047,11 @@ func (r *testRunner) teardownTest( // around so someone can poke at it. _ = c.StopE(ctx, t.L(), option.DefaultStopOpts(), c.All()) - // The hung test may, against all odds, still not have reported an error. - // We delayed it to improve artifacts collection, and now we ensure the test - // is marked as failing. - t.Errorf("test timed out (%s)", t.Spec().(*registry.TestSpec).Timeout) + // We previously added a timeout failure without cancellation, so we cancel here. + if t.mu.cancel != nil { + t.mu.cancel() + } + t.L().Printf("test timed out; check __stacks.log and CRDB logs for goroutine dumps") } return nil } diff --git a/pkg/roachprod/install/cluster_synced.go b/pkg/roachprod/install/cluster_synced.go index 4d543d64cc1d..d7b3cf22b064 100644 --- a/pkg/roachprod/install/cluster_synced.go +++ b/pkg/roachprod/install/cluster_synced.go @@ -104,7 +104,7 @@ func NewSyncedCluster( var ErrAfterRetry = errors.New("error occurred after retries") // The first retry is after 5s, the second and final is after 25s -var defaultRunRetryOpt = retry.Options{ +var defaultRetryOpt = retry.Options{ InitialBackoff: 5 * time.Second, Multiplier: 5, MaxBackoff: 1 * time.Minute, @@ -112,68 +112,76 @@ var defaultRunRetryOpt = retry.Options{ MaxRetries: 2, } -// runWithMaybeRetry will run the specified function `f` at least once. -// Any returned error from `f` is passed to the `shouldRetryFn` which, +type RunRetryOpts struct { + retry.Options + shouldRetryFn func(*RunResultDetails) bool +} + +func newRunRetryOpts( + retryOpts retry.Options, shouldRetryFn func(*RunResultDetails) bool, +) *RunRetryOpts { + return &RunRetryOpts{ + Options: retryOpts, + shouldRetryFn: shouldRetryFn, + } +} + +var DefaultSSHRetryOpts = newRunRetryOpts(defaultRetryOpt, func(res *RunResultDetails) bool { return errors.Is(res.Err, rperrors.ErrSSH255) }) + +// defaultSCPRetry assumes any error is retryable +var defaultSCPRetry = newRunRetryOpts(defaultRetryOpt, func(res *RunResultDetails) bool { return true }) + +// runWithMaybeRetry will run the specified function `f` at least once, or only +// once if `runRetryOpts` is nil +// Any returned error from `f` is passed to `runRetryOpts.shouldRetryFn` which, // if it returns true, will result in `f` being retried using the `retryOpts` -// If the `shouldRetryFn` is not specified (nil), then no retries will be -// performed. +// If the `shouldRetryFn` is not specified (nil), then retries will be performed +// regardless of the previous result / error // -// We operate on a pointer to RunResultDetails as it has already have been +// We operate on a pointer to RunResultDetails as it has already been // captured in a *RunResultDetails[] in Run, but here we may enrich with attempt // number and a wrapper error. func runWithMaybeRetry( - l *logger.Logger, - retryOpts retry.Options, - shouldRetryFn func(*RunResultDetails) bool, - f func() (*RunResultDetails, error), + l *logger.Logger, retryOpts *RunRetryOpts, f func() (*RunResultDetails, error), ) (*RunResultDetails, error) { - var err error - var res *RunResultDetails + if retryOpts == nil { + res, err := f() + res.Attempt = 1 + return res, err + } + var res *RunResultDetails + var err error var cmdErr error - for r := retry.Start(retryOpts); r.Next(); { + for r := retry.Start(retryOpts.Options); r.Next(); { res, err = f() res.Attempt = r.CurrentAttempt() + 1 // nil err (denoting a roachprod error) indicates a potentially retryable res.Err if err == nil && res.Err != nil { cmdErr = errors.CombineErrors(cmdErr, res.Err) - if shouldRetryFn != nil && shouldRetryFn(res) { - l.Printf("Encountered [%v] on attempt %v of %v", res.Err, r.CurrentAttempt()+1, retryOpts.MaxRetries+1) + if retryOpts.shouldRetryFn == nil || retryOpts.shouldRetryFn(res) { + l.Printf("encountered [%v] on attempt %v of %v", res.Err, r.CurrentAttempt()+1, retryOpts.MaxRetries+1) continue } } break } - if res.Err != nil && res.Attempt > 1 { - // An error cannot be marked with more than one reference error. Since res.Err may already be marked, we create - // a new error here and mark it. - res.Err = errors.Mark(errors.Wrapf(cmdErr, "error persisted after %v attempts", res.Attempt), ErrAfterRetry) + if res.Attempt > 1 { + if res.Err != nil { + // An error cannot be marked with more than one reference error. Since res.Err may already be marked, we create + // a new error here and mark it. + res.Err = errors.Mark(errors.Wrapf(cmdErr, "error persisted after %v attempts", res.Attempt), ErrAfterRetry) + } else { + l.Printf("command successful after %v attempts", res.Attempt) + } } return res, err } -// runWithDefaultSSHRetry will retry an SSH command which returns an error with exit code 255 -func runWithDefaultSSHRetry( - l *logger.Logger, f func() (*RunResultDetails, error), -) (*RunResultDetails, error) { - return runWithMaybeRetry( - l, - defaultRunRetryOpt, - func(res *RunResultDetails) bool { return errors.Is(res.Err, rperrors.ErrSSH255) }, - f, - ) -} - -// scpWithDefaultRetry assumes that any error returned from an scp attempt is retryable -func scpWithDefaultRetry(l *logger.Logger, src, dest string) (*RunResultDetails, error) { - return runWithMaybeRetry( - l, - defaultRunRetryOpt, - func(*RunResultDetails) bool { return true }, - func() (*RunResultDetails, error) { return scp(src, dest) }, - ) +func scpWithRetry(l *logger.Logger, src, dest string) (*RunResultDetails, error) { + return runWithMaybeRetry(l, defaultSCPRetry, func() (*RunResultDetails, error) { return scp(src, dest) }) } // Host returns the public IP of a node. @@ -210,22 +218,20 @@ func (c *SyncedCluster) TargetNodes() Nodes { } // GetInternalIP returns the internal IP address of the specified node. -func (c *SyncedCluster) GetInternalIP(ctx context.Context, n Node) (string, error) { +func (c *SyncedCluster) GetInternalIP( + l *logger.Logger, ctx context.Context, n Node, +) (string, error) { if c.IsLocal() { return c.Host(n), nil } - session, err := c.newSession(n) - if err != nil { - return "", errors.Wrapf(err, "GetInternalIP: failed dial %s:%d", c.Name, n) - } - defer session.Close() + sess := c.newSession(l, n, `hostname --all-ip-addresses`, withDebugName("get-internal-ip")) + defer sess.Close() var stdout, stderr strings.Builder - session.SetStdout(&stdout) - session.SetStderr(&stderr) - cmd := `hostname --all-ip-addresses` - if err := session.Run(ctx, cmd); err != nil { + sess.SetStdout(&stdout) + sess.SetStderr(&stderr) + if err := sess.Run(ctx); err != nil { return "", errors.Wrapf(err, "GetInternalIP: failed to execute hostname on %s:%d:\n(stdout) %s\n(stderr) %s", c.Name, n, stdout.String(), stderr.String()) @@ -285,11 +291,25 @@ func (c *SyncedCluster) roachprodEnvRegex(node Node) string { return fmt.Sprintf(`ROACHPROD=%s[ \/]`, escaped) } -func (c *SyncedCluster) newSession(node Node) (session, error) { +// cmdDebugName is the suffix of the generated ssh debug file +// If it is "", a suffix will be generated from the cmd string +func (c *SyncedCluster) newSession( + l *logger.Logger, node Node, cmd string, options ...remoteSessionOption, +) session { if c.IsLocal() { - return newLocalSession(), nil + return newLocalSession(cmd) + } + command := &remoteCommand{ + node: node, + user: c.user(node), + host: c.Host(node), + cmd: cmd, } - return newRemoteSession(c.user(node), c.Host(node), c.DebugDir) + + for _, opt := range options { + opt(command) + } + return newRemoteSession(l, command) } // Stop is used to stop cockroach on all nodes in the cluster. @@ -315,11 +335,6 @@ func (c *SyncedCluster) Stop( } return c.Parallel(l, display, len(c.Nodes), 0, func(i int) (*RunResultDetails, error) { node := c.Nodes[i] - sess, err := c.newSession(node) - if err != nil { - return newRunResultDetails(node, err), err - } - defer sess.Close() var waitCmd string if wait { @@ -363,11 +378,15 @@ fi`, sig, // [3] waitCmd, // [4] ) - out, cmdErr := sess.CombinedOutput(ctx, cmd) + + sess := c.newSession(l, node, cmd, withDebugName("node-stop")) + defer sess.Close() + + out, cmdErr := sess.CombinedOutput(ctx) res := newRunResultDetails(node, cmdErr) res.CombinedOut = out return res, res.Err - }) + }, nil) // Disable SSH Retries } // Wipe TODO(peter): document @@ -378,12 +397,6 @@ func (c *SyncedCluster) Wipe(ctx context.Context, l *logger.Logger, preserveCert } return c.Parallel(l, display, len(c.Nodes), 0, func(i int) (*RunResultDetails, error) { node := c.Nodes[i] - sess, err := c.newSession(node) - if err != nil { - return newRunResultDetails(node, err), err - } - defer sess.Close() - var cmd string if c.IsLocal() { // Not all shells like brace expansion, so we'll do it here @@ -405,11 +418,14 @@ sudo rm -fr logs && cmd += "sudo rm -fr tenant-certs* ;\n" } } - out, cmdErr := sess.CombinedOutput(ctx, cmd) + sess := c.newSession(l, node, cmd, withDebugName("node-wipe")) + defer sess.Close() + + out, cmdErr := sess.CombinedOutput(ctx) res := newRunResultDetails(node, cmdErr) res.CombinedOut = out return res, res.Err - }) + }, DefaultSSHRetryOpts) } // NodeStatus contains details about the status of a node. @@ -427,11 +443,6 @@ func (c *SyncedCluster) Status(ctx context.Context, l *logger.Logger) ([]NodeSta results := make([]NodeStatus, len(c.Nodes)) if err := c.Parallel(l, display, len(c.Nodes), 0, func(i int) (*RunResultDetails, error) { node := c.Nodes[i] - sess, err := c.newSession(node) - if err != nil { - return newRunResultDetails(node, err), err - } - defer sess.Close() binary := cockroachNodeBinary(c, node) cmd := fmt.Sprintf(`out=$(ps axeww -o pid -o ucomm -o command | \ @@ -446,7 +457,10 @@ else echo ${out} fi ` - out, cmdErr := sess.CombinedOutput(ctx, cmd) + sess := c.newSession(l, node, cmd, withDebugName("node-status")) + defer sess.Close() + + out, cmdErr := sess.CombinedOutput(ctx) res := newRunResultDetails(node, cmdErr) res.CombinedOut = out @@ -463,7 +477,7 @@ fi results[i] = NodeStatus{Running: true, Version: info[0], Pid: info[1]} return res, nil - }); err != nil { + }, DefaultSSHRetryOpts); err != nil { return nil, err } for i := 0; i < len(results); i++ { @@ -505,7 +519,9 @@ type MonitorOpts struct { // If ignoreEmptyNodes is true, nodes on which no CockroachDB data is found // (in {store-dir}) will not be probed and single message, "skipped", will // be emitted for them. -func (c *SyncedCluster) Monitor(ctx context.Context, opts MonitorOpts) chan NodeMonitorInfo { +func (c *SyncedCluster) Monitor( + l *logger.Logger, ctx context.Context, opts MonitorOpts, +) chan NodeMonitorInfo { ch := make(chan NodeMonitorInfo) nodes := c.TargetNodes() var wg sync.WaitGroup @@ -514,20 +530,7 @@ func (c *SyncedCluster) Monitor(ctx context.Context, opts MonitorOpts) chan Node wg.Add(1) go func(i int) { defer wg.Done() - sess, err := c.newSession(nodes[i]) - if err != nil { - ch <- NodeMonitorInfo{Node: nodes[i], Err: err} - wg.Done() - return - } - defer sess.Close() - - p, err := sess.StdoutPipe() - if err != nil { - ch <- NodeMonitorInfo{Node: nodes[i], Err: err} - wg.Done() - return - } + node := nodes[i] // On each monitored node, we loop looking for a cockroach process. data := struct { @@ -539,8 +542,8 @@ func (c *SyncedCluster) Monitor(ctx context.Context, opts MonitorOpts) chan Node }{ OneShot: opts.OneShot, IgnoreEmpty: opts.IgnoreEmptyNodes, - Store: c.NodeDir(nodes[i], 1 /* storeIndex */), - Port: c.NodePort(nodes[i]), + Store: c.NodeDir(node, 1 /* storeIndex */), + Port: c.NodePort(node), Local: c.IsLocal(), } @@ -603,14 +606,23 @@ done t := template.Must(template.New("script").Parse(snippet)) var buf bytes.Buffer if err := t.Execute(&buf, data); err != nil { - ch <- NodeMonitorInfo{Node: nodes[i], Err: err} + ch <- NodeMonitorInfo{Node: node, Err: err} return } + sess := c.newSession(l, node, buf.String(), withDebugDisabled()) + defer sess.Close() + + p, err := sess.StdoutPipe() + if err != nil { + ch <- NodeMonitorInfo{Node: node, Err: err} + wg.Done() + return + } // Request a PTY so that the script will receive a SIGPIPE when the // session is closed. if err := sess.RequestPty(); err != nil { - ch <- NodeMonitorInfo{Node: nodes[i], Err: err} + ch <- NodeMonitorInfo{Node: node, Err: err} return } @@ -624,12 +636,12 @@ done if err == io.EOF { return } - ch <- NodeMonitorInfo{Node: nodes[i], Msg: string(line)} + ch <- NodeMonitorInfo{Node: node, Msg: string(line)} } }(p) - if err := sess.Start(buf.String()); err != nil { - ch <- NodeMonitorInfo{Node: nodes[i], Err: err} + if err := sess.Start(); err != nil { + ch <- NodeMonitorInfo{Node: node, Err: err} return } @@ -644,7 +656,7 @@ done // pipe. Otherwise it can be closed under us, causing the reader to loop // infinitely receiving a non-`io.EOF` error. if err := sess.Wait(); err != nil { - ch <- NodeMonitorInfo{Node: nodes[i], Err: err} + ch <- NodeMonitorInfo{Node: node, Err: err} return } }(i) @@ -688,12 +700,6 @@ func (c *SyncedCluster) runCmdOnSingleNode( combined bool, stdout, stderr io.Writer, ) (*RunResultDetails, error) { - sess, err := c.newSession(node) - if err != nil { - return newRunResultDetails(node, err), err - } - defer sess.Close() - // Argument template expansion is node specific (e.g. for {store-dir}). e := expander{ node: node, @@ -717,9 +723,12 @@ func (c *SyncedCluster) runCmdOnSingleNode( nodeCmd = fmt.Sprintf("cd %s; %s", c.localVMDir(node), nodeCmd) } + sess := c.newSession(l, node, nodeCmd, withDebugName(GenFilenameFromArgs(20, expandedCmd))) + defer sess.Close() + var res *RunResultDetails if combined { - out, cmdErr := sess.CombinedOutput(ctx, nodeCmd) + out, cmdErr := sess.CombinedOutput(ctx) res = newRunResultDetails(node, cmdErr) res.CombinedOut = out } else { @@ -730,7 +739,7 @@ func (c *SyncedCluster) runCmdOnSingleNode( sess.SetStdout(multStdout) sess.SetStderr(multStderr) - res = newRunResultDetails(node, sess.Run(ctx, nodeCmd)) + res = newRunResultDetails(node, sess.Run(ctx)) res.Stderr = stderrBuffer.String() res.Stdout = stdoutBuffer.String() } @@ -773,7 +782,7 @@ func (c *SyncedCluster) Run( result, err := c.runCmdOnSingleNode(ctx, l, nodes[i], cmd, !stream, stdout, stderr) results[i] = result return result, err - }); err != nil { + }, DefaultSSHRetryOpts); err != nil { return err } @@ -822,7 +831,7 @@ func (c *SyncedCluster) RunWithDetails( result, err := c.runCmdOnSingleNode(ctx, l, nodes[i], cmd, false, l.Stdout, l.Stderr) resultPtrs[i] = result return result, err - }) + }, DefaultSSHRetryOpts) // Return values to preserve API results := make([]RunResultDetails, len(nodes)) @@ -870,15 +879,12 @@ func (c *SyncedCluster) Wait(ctx context.Context, l *logger.Logger) error { if err := c.Parallel(l, display, len(c.Nodes), 0, func(i int) (*RunResultDetails, error) { node := c.Nodes[i] res := &RunResultDetails{Node: node} + cmd := "test -e /mnt/data1/.roachprod-initialized" for j := 0; j < 600; j++ { - sess, err := c.newSession(node) - if err != nil { - time.Sleep(500 * time.Millisecond) - continue - } + sess := c.newSession(l, node, cmd, withDebugDisabled()) defer sess.Close() - _, err = sess.CombinedOutput(ctx, "test -e /mnt/data1/.roachprod-initialized") + _, err := sess.CombinedOutput(ctx) if err != nil { time.Sleep(500 * time.Millisecond) continue @@ -888,7 +894,7 @@ func (c *SyncedCluster) Wait(ctx context.Context, l *logger.Logger) error { errs[i] = errors.New("timed out after 5m") res.Err = errs[i] return res, nil - }); err != nil { + }, nil); err != nil { return err } @@ -905,16 +911,6 @@ func (c *SyncedCluster) Wait(ctx context.Context, l *logger.Logger) error { return nil } -// setupSession is a helper which creates a new session and -// populates RunResultDetails with an error if one occurrs (unlikely -// given the code in `newSession`) -// RunResultDetails is used across all functions which -// create a session and holds error and stdout information -func (c *SyncedCluster) setupSession(node Node) (session, error) { - sess, err := c.newSession(node) - return sess, err -} - // SetupSSH configures the cluster for use with SSH. This is generally run after // the cloud.Cluster has been synced which resets the SSH credentials on the // machines and sets them up for the current user. This method enables the @@ -944,12 +940,6 @@ func (c *SyncedCluster) SetupSSH(ctx context.Context, l *logger.Logger) error { // cluster in order to allow inter-node ssh. var sshTar []byte if err := c.Parallel(l, "generating ssh key", 1, 0, func(i int) (*RunResultDetails, error) { - sess, err := c.setupSession(1) - if err != nil { - return newRunResultDetails(1, err), err - } - defer sess.Close() - // Create the ssh key and then tar up the public, private and // authorized_keys files and output them to stdout. We'll take this output // and pipe it back into tar on the other nodes in the cluster. @@ -960,12 +950,15 @@ test -f .ssh/id_rsa || \ tar cf - .ssh/id_rsa .ssh/id_rsa.pub .ssh/authorized_keys ` + sess := c.newSession(l, 1, cmd, withDebugName("ssh-gen-key")) + defer sess.Close() + var stdout bytes.Buffer var stderr bytes.Buffer sess.SetStdout(&stdout) sess.SetStderr(&stderr) - res := newRunResultDetails(1, sess.Run(ctx, cmd)) + res := newRunResultDetails(1, sess.Run(ctx)) res.Stdout = stdout.String() res.Stderr = stderr.String() @@ -974,7 +967,7 @@ tar cf - .ssh/id_rsa .ssh/id_rsa.pub .ssh/authorized_keys } sshTar = []byte(res.Stdout) return res, nil - }); err != nil { + }, DefaultSSHRetryOpts); err != nil { return err } @@ -982,16 +975,14 @@ tar cf - .ssh/id_rsa .ssh/id_rsa.pub .ssh/authorized_keys nodes := c.Nodes[1:] if err := c.Parallel(l, "distributing ssh key", len(nodes), 0, func(i int) (*RunResultDetails, error) { node := nodes[i] - sess, err := c.newSession(node) - if err != nil { - return newRunResultDetails(node, err), err - } + cmd := `tar xf -` + + sess := c.newSession(l, node, cmd, withDebugName("ssh-dist-key")) defer sess.Close() sess.SetStdin(bytes.NewReader(sshTar)) - cmd := `tar xf -` - out, cmdErr := sess.CombinedOutput(ctx, cmd) + out, cmdErr := sess.CombinedOutput(ctx) res := newRunResultDetails(node, cmdErr) res.CombinedOut = out @@ -999,7 +990,7 @@ tar cf - .ssh/id_rsa .ssh/id_rsa.pub .ssh/authorized_keys return res, errors.Wrapf(res.Err, "%s: output:\n%s", cmd, res.CombinedOut) } return res, nil - }); err != nil { + }, DefaultSSHRetryOpts); err != nil { return err } @@ -1013,7 +1004,7 @@ tar cf - .ssh/id_rsa .ssh/id_rsa.pub .ssh/authorized_keys res := &RunResultDetails{Node: node} for j := 0; j < 20 && ips[i] == ""; j++ { var err error - ips[i], err = c.GetInternalIP(ctx, node) + ips[i], err = c.GetInternalIP(l, ctx, node) if err != nil { res.Err = errors.Wrapf(err, "pgurls") return res, res.Err @@ -1025,7 +1016,7 @@ tar cf - .ssh/id_rsa .ssh/id_rsa.pub .ssh/authorized_keys return res, res.Err } return res, nil - }); err != nil { + }, DefaultSSHRetryOpts); err != nil { return err } @@ -1035,11 +1026,6 @@ tar cf - .ssh/id_rsa .ssh/id_rsa.pub .ssh/authorized_keys var knownHostsData []byte if err := c.Parallel(l, "scanning hosts", 1, 0, func(i int) (*RunResultDetails, error) { node := c.Nodes[i] - sess, err := c.newSession(node) - if err != nil { - return newRunResultDetails(node, err), err - } - defer sess.Close() // ssh-keyscan may return fewer than the desired number of entries if the // remote nodes are not responding yet, so we loop until we have a scan that @@ -1063,12 +1049,16 @@ for i in {1..20}; do done exit 1 ` + + sess := c.newSession(l, node, cmd, withDebugName("ssh-scan-hosts")) + defer sess.Close() + var stdout bytes.Buffer var stderr bytes.Buffer sess.SetStdout(&stdout) sess.SetStderr(&stderr) - res := newRunResultDetails(node, sess.Run(ctx, cmd)) + res := newRunResultDetails(node, sess.Run(ctx)) res.Stdout = stdout.String() res.Stderr = stderr.String() @@ -1077,19 +1067,12 @@ exit 1 } knownHostsData = stdout.Bytes() return res, nil - }); err != nil { + }, DefaultSSHRetryOpts); err != nil { return err } if err := c.Parallel(l, "distributing known_hosts", len(c.Nodes), 0, func(i int) (*RunResultDetails, error) { node := c.Nodes[i] - sess, err := c.newSession(node) - if err != nil { - return newRunResultDetails(node, err), err - } - defer sess.Close() - - sess.SetStdin(bytes.NewReader(knownHostsData)) const cmd = ` known_hosts_data="$(cat)" set -e @@ -1117,7 +1100,12 @@ if [[ "$(whoami)" != "` + config.SharedUser + `" ]]; then '"'"'{}'"'"' ~` + config.SharedUser + `/.ssh' \; fi ` - out, cmdErr := sess.CombinedOutput(ctx, cmd) + + sess := c.newSession(l, node, cmd, withDebugName("ssh-dist-known-hosts")) + defer sess.Close() + + sess.SetStdin(bytes.NewReader(knownHostsData)) + out, cmdErr := sess.CombinedOutput(ctx) res := newRunResultDetails(node, cmdErr) res.CombinedOut = out @@ -1125,7 +1113,7 @@ fi return res, errors.Wrapf(res.Err, "%s: output:\n%s", cmd, res.CombinedOut) } return res, nil - }); err != nil { + }, DefaultSSHRetryOpts); err != nil { return err } @@ -1137,13 +1125,6 @@ fi // platforms. if err := c.Parallel(l, "adding additional authorized keys", len(c.Nodes), 0, func(i int) (*RunResultDetails, error) { node := c.Nodes[i] - sess, err := c.newSession(node) - if err != nil { - return newRunResultDetails(node, err), err - } - defer sess.Close() - - sess.SetStdin(bytes.NewReader(c.AuthorizedKeys)) const cmd = ` keys_data="$(cat)" set -e @@ -1166,7 +1147,12 @@ if [[ "$(whoami)" != "` + config.SharedUser + `" ]]; then "${tmp2}" ~` + config.SharedUser + `/.ssh/authorized_keys fi ` - out, cmdErr := sess.CombinedOutput(ctx, cmd) + + sess := c.newSession(l, node, cmd, withDebugName("ssh-add-extra-keys")) + defer sess.Close() + + sess.SetStdin(bytes.NewReader(c.AuthorizedKeys)) + out, cmdErr := sess.CombinedOutput(ctx) res := newRunResultDetails(node, cmdErr) res.CombinedOut = out @@ -1174,7 +1160,7 @@ fi return res, errors.Wrapf(res.Err, "~ %s\n%s", cmd, res.CombinedOut) } return res, nil - }); err != nil { + }, DefaultSSHRetryOpts); err != nil { return err } } @@ -1203,12 +1189,6 @@ func (c *SyncedCluster) DistributeCerts(ctx context.Context, l *logger.Logger) e var msg string display := fmt.Sprintf("%s: initializing certs", c.Name) if err := c.Parallel(l, display, 1, 0, func(i int) (*RunResultDetails, error) { - sess, err := c.setupSession(1) - if err != nil { - return newRunResultDetails(1, err), err - } - defer sess.Close() - var cmd string if c.IsLocal() { cmd = fmt.Sprintf(`cd %s ; `, c.localVMDir(1)) @@ -1223,7 +1203,10 @@ mkdir -p certs tar cvf %[3]s certs `, cockroachNodeBinary(c, 1), strings.Join(nodeNames, " "), certsTarName) - out, cmdErr := sess.CombinedOutput(ctx, cmd) + sess := c.newSession(l, 1, cmd, withDebugName("init-certs")) + defer sess.Close() + + out, cmdErr := sess.CombinedOutput(ctx) res := newRunResultDetails(1, cmdErr) res.CombinedOut = out @@ -1231,7 +1214,7 @@ tar cvf %[3]s certs msg = fmt.Sprintf("%s: %v", res.CombinedOut, res.Err) } return res, nil - }); err != nil { + }, DefaultSSHRetryOpts); err != nil { return err } @@ -1293,14 +1276,9 @@ func (c *SyncedCluster) createTenantCertBundle( display := fmt.Sprintf("%s: initializing tenant certs", c.Name) return c.Parallel(l, display, 1, 0, func(i int) (*RunResultDetails, error) { node := c.Nodes[i] - sess, err := c.newSession(node) - if err != nil { - return newRunResultDetails(node, err), err - } - defer sess.Close() var tenantScopeArg string - if c.cockroachBinSupportsTenantScope(ctx, node) { + if c.cockroachBinSupportsTenantScope(l, ctx, node) { tenantScopeArg = fmt.Sprintf("--tenant-scope %d", tenantID) } @@ -1329,7 +1307,10 @@ tar cvf %[5]s $CERT_DIR bundleName, ) - out, cmdErr := sess.CombinedOutput(ctx, cmd) + sess := c.newSession(l, node, cmd, withDebugName("create-tenant-cert-bundle")) + defer sess.Close() + + out, cmdErr := sess.CombinedOutput(ctx) res := newRunResultDetails(node, cmdErr) res.CombinedOut = out @@ -1337,7 +1318,7 @@ tar cvf %[5]s $CERT_DIR return res, errors.Wrapf(res.Err, "certificate creation error: %s", res.CombinedOut) } return res, nil - }) + }, DefaultSSHRetryOpts) } // cockroachBinSupportsTenantScope is a hack to figure out if the version of @@ -1347,15 +1328,14 @@ tar cvf %[5]s $CERT_DIR // contain an integer count of commits, which does not sort correctly. Once // this feature ships in a release, it will be easier to do a version comparison // on whether this command line flag is supported. -func (c *SyncedCluster) cockroachBinSupportsTenantScope(ctx context.Context, node Node) bool { - sess, err := c.newSession(node) - if err != nil { - return false - } +func (c *SyncedCluster) cockroachBinSupportsTenantScope( + l *logger.Logger, ctx context.Context, node Node, +) bool { + cmd := fmt.Sprintf("%s cert create-client --help | grep '\\--tenant-scope'", cockroachNodeBinary(c, node)) + sess := c.newSession(l, node, cmd) defer sess.Close() - cmd := fmt.Sprintf("%s cert create-client --help | grep '\\--tenant-scope'", cockroachNodeBinary(c, node)) - return sess.Run(ctx, cmd) == nil + return sess.Run(ctx) == nil } // getFile retrieves the given file from the first node in the cluster. The @@ -1379,7 +1359,7 @@ func (c *SyncedCluster) getFileFromFirstNode( } srcFileName := fmt.Sprintf("%s@%s:%s", c.user(1), c.Host(1), name) - if res, _ := scpWithDefaultRetry(l, srcFileName, tmpfile.Name()); res.Err != nil { + if res, _ := scpWithRetry(l, srcFileName, tmpfile.Name()); res.Err != nil { cleanup() return "", nil, res.Err } @@ -1415,19 +1395,16 @@ func (c *SyncedCluster) fileExistsOnFirstNode( display := fmt.Sprintf("%s: checking %s", c.Name, path) if err := c.Parallel(l, display, 1, 0, func(i int) (*RunResultDetails, error) { node := c.Nodes[i] - sess, err := c.newSession(node) - if err != nil { - return newRunResultDetails(node, err), err - } + sess := c.newSession(l, node, `test -e `+path) defer sess.Close() - out, cmdErr := sess.CombinedOutput(ctx, `test -e `+path) + out, cmdErr := sess.CombinedOutput(ctx) res := newRunResultDetails(node, cmdErr) res.CombinedOut = out existsErr = res.Err return res, nil - }); err != nil { + }, DefaultSSHRetryOpts); err != nil { return false } return existsErr == nil @@ -1449,10 +1426,10 @@ func (c *SyncedCluster) createNodeCertArguments( node := nodes[i] res := &RunResultDetails{Node: node} - res.Stdout, res.Err = c.GetInternalIP(ctx, node) + res.Stdout, res.Err = c.GetInternalIP(l, ctx, node) ips[i] = res.Stdout return res, errors.Wrapf(res.Err, "IPs") - }); err != nil { + }, DefaultSSHRetryOpts); err != nil { return nil, err } } @@ -1494,13 +1471,6 @@ func (c *SyncedCluster) distributeLocalCertsTar( display := c.Name + ": distributing certs" return c.Parallel(l, display, len(nodes), 0, func(i int) (*RunResultDetails, error) { node := nodes[i] - sess, err := c.newSession(node) - if err != nil { - return newRunResultDetails(node, err), err - } - defer sess.Close() - - sess.SetStdin(bytes.NewReader(certsTar)) var cmd string if c.IsLocal() { cmd = fmt.Sprintf("cd %s ; ", c.localVMDir(node)) @@ -1511,7 +1481,11 @@ func (c *SyncedCluster) distributeLocalCertsTar( cmd += "tar xf -" } - out, cmdErr := sess.CombinedOutput(ctx, cmd) + sess := c.newSession(l, node, cmd, withDebugName("dist-local-certs")) + defer sess.Close() + + sess.SetStdin(bytes.NewReader(certsTar)) + out, cmdErr := sess.CombinedOutput(ctx) res := newRunResultDetails(node, cmdErr) res.CombinedOut = out @@ -1519,7 +1493,7 @@ func (c *SyncedCluster) distributeLocalCertsTar( return res, errors.Wrapf(res.Err, "~ %s\n%s", cmd, res.CombinedOut) } return res, nil - }) + }, DefaultSSHRetryOpts) } const progressDone = "=======================================>" @@ -1713,7 +1687,7 @@ func (c *SyncedCluster) Put( return } - res, _ := scpWithDefaultRetry(l, from, to) + res, _ := scpWithRetry(l, from, to) results <- result{i, res.Err} if res.Err != nil { @@ -2068,7 +2042,7 @@ func (c *SyncedCluster) Get(l *logger.Logger, nodes Nodes, src, dest string) err return } - res, _ := scpWithDefaultRetry(l, fmt.Sprintf("%s@%s:%s", c.user(nodes[0]), c.Host(nodes[i]), src), dest) + res, _ := scpWithRetry(l, fmt.Sprintf("%s@%s:%s", c.user(nodes[0]), c.Host(nodes[i]), src), dest) if res.Err == nil { // Make sure all created files and directories are world readable. // The CRDB process intentionally sets a 0007 umask (resulting in @@ -2187,10 +2161,10 @@ func (c *SyncedCluster) pghosts( if err := c.Parallel(l, "", len(nodes), 0, func(i int) (*RunResultDetails, error) { node := nodes[i] res := &RunResultDetails{Node: node} - res.Stdout, res.Err = c.GetInternalIP(ctx, node) + res.Stdout, res.Err = c.GetInternalIP(l, ctx, node) ips[i] = res.Stdout return res, errors.Wrapf(res.Err, "pghosts") - }); err != nil { + }, DefaultSSHRetryOpts); err != nil { return nil, err } @@ -2302,8 +2276,9 @@ func (c *SyncedCluster) Parallel( display string, count, concurrency int, fn func(i int) (*RunResultDetails, error), + runRetryOpts *RunRetryOpts, ) error { - failed, err := c.ParallelE(l, display, count, concurrency, fn) + failed, err := c.ParallelE(l, display, count, concurrency, fn, runRetryOpts) if err != nil { sort.Slice(failed, func(i, j int) bool { return failed[i].Index < failed[j].Index }) for _, f := range failed { @@ -2332,6 +2307,7 @@ func (c *SyncedCluster) ParallelE( display string, count, concurrency int, fn func(i int) (*RunResultDetails, error), + runRetryOpts *RunRetryOpts, ) ([]ParallelResult, error) { if concurrency == 0 || concurrency > count { concurrency = count @@ -2348,7 +2324,7 @@ func (c *SyncedCluster) ParallelE( startNext := func() { go func(i int) { defer wg.Done() - res, err := runWithDefaultSSHRetry(l, func() (*RunResultDetails, error) { return fn(i) }) + res, err := runWithMaybeRetry(l, runRetryOpts, func() (*RunResultDetails, error) { return fn(i) }) results <- ParallelResult{i, res.CombinedOut, err} }(index) index++ @@ -2439,29 +2415,50 @@ func (c *SyncedCluster) ParallelE( // Init initializes the cluster. It does it through node 1 (as per TargetNodes) // to maintain parity with auto-init behavior of `roachprod start` (when -// --skip-init) is not specified. The implementation should be kept in -// sync with Start(). +// --skip-init) is not specified. func (c *SyncedCluster) Init(ctx context.Context, l *logger.Logger) error { - // See Start(). We reserve a few special operations for the first node, so we + // We reserve a few special operations for the first node, so we // strive to maintain the same here for interoperability. - const firstNodeIdx = 0 + const firstNodeIdx = 1 - l.Printf("%s: initializing cluster\n", c.Name) - initOut, err := c.initializeCluster(ctx, firstNodeIdx) - if err != nil { + if err := c.initializeCluster(ctx, l, firstNodeIdx); err != nil { return errors.WithDetail(err, "install.Init() failed: unable to initialize cluster.") } - if initOut != "" { - l.Printf(initOut) - } - l.Printf("%s: setting cluster settings", c.Name) - clusterSettingsOut, err := c.setClusterSettings(ctx, l, firstNodeIdx) - if err != nil { + if err := c.setClusterSettings(ctx, l, firstNodeIdx); err != nil { return errors.WithDetail(err, "install.Init() failed: unable to set cluster settings.") } - if clusterSettingsOut != "" { - l.Printf(clusterSettingsOut) - } + return nil } + +// GenFilenameFromArgs given a list of cmd args, returns an alphahumeric string up to +// `maxLen` in length with hyphen delimiters, suitable for use in a filename. +// e.g. ["/bin/bash", "-c", "'sudo dmesg > dmesg.txt'"] -> binbash-c-sudo-dmesg +func GenFilenameFromArgs(maxLen int, args ...string) string { + cmd := strings.Join(args, " ") + var sb strings.Builder + lastCharSpace := true + + writeByte := func(b byte) { + if b == ' ' { + if lastCharSpace { + return + } + sb.WriteByte('-') + lastCharSpace = true + } else if ('a' <= b && b <= 'z') || ('A' <= b && b <= 'Z') || ('0' <= b && b <= '9') { + sb.WriteByte(b) + lastCharSpace = false + } + } + + for i := 0; i < len(cmd); i++ { + writeByte(cmd[i]) + if sb.Len() == maxLen { + return sb.String() + } + } + + return sb.String() +} diff --git a/pkg/roachprod/install/cluster_synced_test.go b/pkg/roachprod/install/cluster_synced_test.go index a5e58706a69b..d02ff540008e 100644 --- a/pkg/roachprod/install/cluster_synced_test.go +++ b/pkg/roachprod/install/cluster_synced_test.go @@ -98,24 +98,35 @@ func TestRunWithMaybeRetry(t *testing.T) { cases := []struct { f func() (*RunResultDetails, error) shouldRetryFn func(*RunResultDetails) bool + nilRetryOpts bool expectedAttempts int shouldError bool }{ - { // Happy path: no error, no retry required + { // 1. Happy path: no error, no retry required f: func() (*RunResultDetails, error) { return newResult(0), nil }, expectedAttempts: 1, shouldError: false, }, - { // Error, but not retry function specified + { // 2. Error, but with no retries f: func() (*RunResultDetails, error) { return newResult(1), nil }, + shouldRetryFn: func(*RunResultDetails) bool { + return false + }, expectedAttempts: 1, shouldError: true, }, - { // Error, with retries exhausted + { // 3. Error, but no retry function specified + f: func() (*RunResultDetails, error) { + return newResult(1), nil + }, + expectedAttempts: 3, + shouldError: true, + }, + { // 4. Error, with retries exhausted f: func() (*RunResultDetails, error) { return newResult(255), nil }, @@ -123,7 +134,7 @@ func TestRunWithMaybeRetry(t *testing.T) { expectedAttempts: 3, shouldError: true, }, - { // Eventual success after retries + { // 5. Eventual success after retries f: func() (*RunResultDetails, error) { attempt++ if attempt == 3 { @@ -135,12 +146,24 @@ func TestRunWithMaybeRetry(t *testing.T) { expectedAttempts: 3, shouldError: false, }, + { // 6. Error, runs once because nil retryOpts + f: func() (*RunResultDetails, error) { + return newResult(255), nil + }, + nilRetryOpts: true, + expectedAttempts: 1, + shouldError: true, + }, } for idx, tc := range cases { attempt = 0 t.Run(fmt.Sprintf("%d", idx+1), func(t *testing.T) { - res, _ := runWithMaybeRetry(l, testRetryOpts, tc.shouldRetryFn, tc.f) + var retryOpts *RunRetryOpts + if !tc.nilRetryOpts { + retryOpts = newRunRetryOpts(testRetryOpts, tc.shouldRetryFn) + } + res, _ := runWithMaybeRetry(l, retryOpts, tc.f) require.Equal(t, tc.shouldError, res.Err != nil) require.Equal(t, tc.expectedAttempts, res.Attempt) @@ -171,3 +194,10 @@ func nilLogger() *logger.Logger { } return l } + +func TestGenFilenameFromArgs(t *testing.T) { + const exp = "mkdir-p-logsredacted" + require.Equal(t, exp, GenFilenameFromArgs(20, "mkdir -p logs/redacted && ./cockroach")) + require.Equal(t, exp, GenFilenameFromArgs(20, "mkdir", "-p logs/redacted", "&& ./cockroach")) + require.Equal(t, exp, GenFilenameFromArgs(20, "mkdir -p logs/redacted && ./cockroach ")) +} diff --git a/pkg/roachprod/install/cockroach.go b/pkg/roachprod/install/cockroach.go index 48f3b2a320e8..035590b6052d 100644 --- a/pkg/roachprod/install/cockroach.go +++ b/pkg/roachprod/install/cockroach.go @@ -156,6 +156,8 @@ func (c *SyncedCluster) Start(ctx context.Context, l *logger.Logger, startOpts S } l.Printf("%s: starting nodes", c.Name) + + // SSH retries are disabled by passing nil RunRetryOpts return c.Parallel(l, "", len(nodes), parallelism, func(nodeIdx int) (*RunResultDetails, error) { node := nodes[nodeIdx] res := &RunResultDetails{Node: node} @@ -189,32 +191,18 @@ func (c *SyncedCluster) Start(ctx context.Context, l *logger.Logger, startOpts S shouldInit := !c.useStartSingleNode() if shouldInit { - l.Printf("%s: initializing cluster", c.Name) - initOut, err := c.initializeCluster(ctx, node) - if err != nil { + if err := c.initializeCluster(ctx, l, node); err != nil { res.Err = err return res, errors.Wrap(err, "failed to initialize cluster") } - - if initOut != "" { - l.Printf(initOut) - } } - // We're sure to set cluster settings after having initialized the - // cluster. - - l.Printf("%s: setting cluster settings", c.Name) - clusterSettingsOut, err := c.setClusterSettings(ctx, l, node) - if err != nil { + if err := c.setClusterSettings(ctx, l, node); err != nil { res.Err = err - return res, errors.Wrap(err, "unable to set cluster settings") - } - if clusterSettingsOut != "" { - l.Printf(clusterSettingsOut) + return res, errors.Wrap(err, "failed to set cluster settings") } return res, nil - }) + }, DefaultSSHRetryOpts) } // NodeDir returns the data directory for the given node and store. @@ -311,11 +299,6 @@ func (c *SyncedCluster) RunSQL(ctx context.Context, l *logger.Logger, args []str display := fmt.Sprintf("%s: executing sql", c.Name) if err := c.Parallel(l, display, len(c.Nodes), 0, func(nodeIdx int) (*RunResultDetails, error) { node := c.Nodes[nodeIdx] - sess, err := c.newSession(node) - if err != nil { - return newRunResultDetails(node, err), err - } - defer sess.Close() var cmd string if c.IsLocal() { @@ -325,7 +308,10 @@ func (c *SyncedCluster) RunSQL(ctx context.Context, l *logger.Logger, args []str c.NodeURL("localhost", c.NodePort(node)) + " " + ssh.Escape(args) - out, cmdErr := sess.CombinedOutput(ctx, cmd) + sess := c.newSession(l, node, cmd, withDebugName("run-sql")) + defer sess.Close() + + out, cmdErr := sess.CombinedOutput(ctx) res := newRunResultDetails(node, cmdErr) res.CombinedOut = out @@ -334,7 +320,7 @@ func (c *SyncedCluster) RunSQL(ctx context.Context, l *logger.Logger, args []str } resultChan <- result{node: node, output: string(res.CombinedOut)} return res, nil - }); err != nil { + }, DefaultSSHRetryOpts); err != nil { return err } @@ -361,19 +347,17 @@ func (c *SyncedCluster) startNode( } if err := func() error { - sess, err := c.newSession(node) - if err != nil { - return err - } - defer sess.Close() - - sess.SetStdin(strings.NewReader(startCmd)) var cmd string if c.IsLocal() { cmd = fmt.Sprintf(`cd %s ; `, c.localVMDir(node)) } cmd += `cat > cockroach.sh && chmod +x cockroach.sh` - if out, err := sess.CombinedOutput(ctx, cmd); err != nil { + + sess := c.newSession(l, node, cmd) + defer sess.Close() + + sess.SetStdin(strings.NewReader(startCmd)) + if out, err := sess.CombinedOutput(ctx); err != nil { return errors.Wrapf(err, "failed to upload start script: %s", out) } @@ -382,18 +366,16 @@ func (c *SyncedCluster) startNode( return "", err } - sess, err := c.newSession(node) - if err != nil { - return "", err - } - defer sess.Close() - var cmd string if c.IsLocal() { cmd = fmt.Sprintf(`cd %s ; `, c.localVMDir(node)) } cmd += "./cockroach.sh" - out, err := sess.CombinedOutput(ctx, cmd) + + sess := c.newSession(l, node, cmd) + defer sess.Close() + + out, err := sess.CombinedOutput(ctx) if err != nil { return "", errors.Wrapf(err, "~ %s\n%s", cmd, out) } @@ -628,38 +610,39 @@ func (c *SyncedCluster) maybeScaleMem(val int) int { return val } -func (c *SyncedCluster) initializeCluster(ctx context.Context, node Node) (string, error) { - initCmd := c.generateInitCmd(node) +func (c *SyncedCluster) initializeCluster(ctx context.Context, l *logger.Logger, node Node) error { + l.Printf("%s: initializing cluster\n", c.Name) + cmd := c.generateInitCmd(node) - sess, err := c.newSession(node) - if err != nil { - return "", err - } + sess := c.newSession(l, node, cmd, withDebugName("init-cluster")) defer sess.Close() - out, err := sess.CombinedOutput(ctx, initCmd) + out, err := sess.CombinedOutput(ctx) if err != nil { - return "", errors.Wrapf(err, "~ %s\n%s", initCmd, out) + return errors.Wrapf(err, "~ %s\n%s", cmd, out) } - return strings.TrimSpace(string(out)), nil + + if out := strings.TrimSpace(string(out)); out != "" { + l.Printf(out) + } + return nil } -func (c *SyncedCluster) setClusterSettings( - ctx context.Context, l *logger.Logger, node Node, -) (string, error) { - clusterSettingCmd := c.generateClusterSettingCmd(l, node) +func (c *SyncedCluster) setClusterSettings(ctx context.Context, l *logger.Logger, node Node) error { + l.Printf("%s: setting cluster settings", c.Name) + cmd := c.generateClusterSettingCmd(l, node) - sess, err := c.newSession(node) - if err != nil { - return "", err - } + sess := c.newSession(l, node, cmd, withDebugName("set-cluster-settings")) defer sess.Close() - out, err := sess.CombinedOutput(ctx, clusterSettingCmd) + out, err := sess.CombinedOutput(ctx) if err != nil { - return "", errors.Wrapf(err, "~ %s\n%s", clusterSettingCmd, out) + return errors.Wrapf(err, "~ %s\n%s", cmd, out) } - return strings.TrimSpace(string(out)), nil + if out := strings.TrimSpace(string(out)); out != "" { + l.Printf(out) + } + return nil } func (c *SyncedCluster) generateClusterSettingCmd(l *logger.Logger, node Node) string { diff --git a/pkg/roachprod/install/session.go b/pkg/roachprod/install/session.go index ea2fd8fe9b17..437ea5cb191e 100644 --- a/pkg/roachprod/install/session.go +++ b/pkg/roachprod/install/session.go @@ -12,6 +12,7 @@ package install import ( "context" + "fmt" "io" "os" "os/exec" @@ -20,16 +21,18 @@ import ( "github.com/cockroachdb/cockroach/pkg/roachprod/config" rperrors "github.com/cockroachdb/cockroach/pkg/roachprod/errors" + "github.com/cockroachdb/cockroach/pkg/roachprod/logger" + "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/errors" ) type session interface { - CombinedOutput(ctx context.Context, cmd string) ([]byte, error) - Run(ctx context.Context, cmd string) error + CombinedOutput(ctx context.Context) ([]byte, error) + Run(ctx context.Context) error SetStdin(r io.Reader) SetStdout(w io.Writer) SetStderr(w io.Writer) - Start(cmd string) error + Start() error StdinPipe() (io.WriteCloser, error) StdoutPipe() (io.Reader, error) StderrPipe() (io.Reader, error) @@ -44,25 +47,62 @@ type remoteSession struct { logfile string // captures ssh -vvv } -func newRemoteSession(user, host string, logdir string) (*remoteSession, error) { - // TODO(tbg): this is disabled at the time of writing. It was difficult - // to assign the logfiles to the roachtest and as a bonus our CI harness - // never actually managed to collect the files since they had wrong - // permissions; instead they clogged up the roachprod dir. - // logfile := filepath.Join( - // logdir, - // fmt.Sprintf("ssh_%s_%s", host, timeutil.Now().Format(time.RFC3339)), - // ) - const logfile = "" - args := []string{ - user + "@" + host, +type remoteCommand struct { + node Node + user string + host string + cmd string + debugDisabled bool + debugName string +} + +type remoteSessionOption = func(c *remoteCommand) + +func withDebugDisabled() remoteSessionOption { + return func(c *remoteCommand) { + c.debugDisabled = true + } +} + +func withDebugName(name string) remoteSessionOption { + return func(c *remoteCommand) { + c.debugName = name + } +} + +func newRemoteSession(l *logger.Logger, command *remoteCommand) *remoteSession { + var loggingArgs []string + + // NB: -q suppresses -E, at least on *nix. + loggingArgs = []string{"-q"} + logfile := "" + if !command.debugDisabled { + var debugName = command.debugName + if debugName == "" { + debugName = GenFilenameFromArgs(20, command.cmd) + } + + cl, err := l.ChildLogger(filepath.Join("ssh", fmt.Sprintf( + "ssh_%s_n%v_%s", + timeutil.Now().Format(`150405.000000000`), + command.node, + debugName, + ))) + + // Check the logger file since running roachprod from the cli will result in a fileless logger. + if err == nil && l.File != nil { + logfile = cl.File.Name() + loggingArgs = []string{ + "-vvv", "-E", logfile, + } + cl.Close() + } + } - // TODO(tbg): see above. - //"-vvv", "-E", logfile, - // NB: -q suppresses -E, at least on OSX. Difficult decisions will have - // to be made if omitting -q leads to annoyance on stdout/stderr. + //const logfile = "" + args := []string{ + command.user + "@" + command.host, - "-q", "-o", "UserKnownHostsFile=/dev/null", "-o", "StrictHostKeyChecking=no", // Send keep alives every minute to prevent connections without activity @@ -75,23 +115,23 @@ func newRemoteSession(user, host string, logdir string) (*remoteSession, error) // context cancellation killing hanging roachprod processes. "-o", "ConnectTimeout=5", } + args = append(args, loggingArgs...) args = append(args, sshAuthArgs()...) + args = append(args, command.cmd) ctx, cancel := context.WithCancel(context.Background()) - cmd := exec.CommandContext(ctx, "ssh", args...) - return &remoteSession{cmd, cancel, logfile}, nil + fullCmd := exec.CommandContext(ctx, "ssh", args...) + return &remoteSession{fullCmd, cancel, logfile} } func (s *remoteSession) errWithDebug(err error) error { if err != nil && s.logfile != "" { - err = errors.Wrapf(err, "ssh verbose log retained in %s", s.logfile) + err = errors.Wrapf(err, "ssh verbose log retained in %s", filepath.Base(s.logfile)) s.logfile = "" // prevent removal on close } return err } -func (s *remoteSession) CombinedOutput(ctx context.Context, cmd string) ([]byte, error) { - s.Cmd.Args = append(s.Cmd.Args, cmd) - +func (s *remoteSession) CombinedOutput(ctx context.Context) ([]byte, error) { var b []byte var err error commandFinished := make(chan struct{}) @@ -111,9 +151,7 @@ func (s *remoteSession) CombinedOutput(ctx context.Context, cmd string) ([]byte, } } -func (s *remoteSession) Run(ctx context.Context, cmd string) error { - s.Cmd.Args = append(s.Cmd.Args, cmd) - +func (s *remoteSession) Run(ctx context.Context) error { var err error commandFinished := make(chan struct{}) go func() { @@ -130,9 +168,8 @@ func (s *remoteSession) Run(ctx context.Context, cmd string) error { } } -func (s *remoteSession) Start(cmd string) error { - s.Cmd.Args = append(s.Cmd.Args, cmd) - return rperrors.ClassifyCmdError(s.Cmd.Start()) +func (s *remoteSession) Start() error { + return rperrors.ClassifyCmdError(s.errWithDebug(s.Cmd.Start())) } func (s *remoteSession) SetStdin(r io.Reader) { @@ -182,15 +219,13 @@ type localSession struct { cancel func() } -func newLocalSession() *localSession { +func newLocalSession(cmd string) *localSession { ctx, cancel := context.WithCancel(context.Background()) - cmd := exec.CommandContext(ctx, "/bin/bash", "-c") - return &localSession{cmd, cancel} + fullCmd := exec.CommandContext(ctx, "/bin/bash", "-c", cmd) + return &localSession{fullCmd, cancel} } -func (s *localSession) CombinedOutput(ctx context.Context, cmd string) ([]byte, error) { - s.Cmd.Args = append(s.Cmd.Args, cmd) - +func (s *localSession) CombinedOutput(ctx context.Context) ([]byte, error) { var b []byte var err error commandFinished := make(chan struct{}) @@ -209,9 +244,7 @@ func (s *localSession) CombinedOutput(ctx context.Context, cmd string) ([]byte, } } -func (s *localSession) Run(ctx context.Context, cmd string) error { - s.Cmd.Args = append(s.Cmd.Args, cmd) - +func (s *localSession) Run(ctx context.Context) error { var err error commandFinished := make(chan struct{}) go func() { @@ -228,8 +261,7 @@ func (s *localSession) Run(ctx context.Context, cmd string) error { } } -func (s *localSession) Start(cmd string) error { - s.Cmd.Args = append(s.Cmd.Args, cmd) +func (s *localSession) Start() error { return rperrors.ClassifyCmdError(s.Cmd.Start()) } diff --git a/pkg/roachprod/roachprod.go b/pkg/roachprod/roachprod.go index 8864e018dc9b..7a3ee4194231 100644 --- a/pkg/roachprod/roachprod.go +++ b/pkg/roachprod/roachprod.go @@ -453,10 +453,10 @@ func IP( if err := c.Parallel(l, "", len(nodes), 0, func(i int) (*install.RunResultDetails, error) { node := nodes[i] res := &install.RunResultDetails{Node: node} - res.Stdout, res.Err = c.GetInternalIP(ctx, node) + res.Stdout, res.Err = c.GetInternalIP(l, ctx, node) ips[i] = res.Stdout return res, err - }); err != nil { + }, install.DefaultSSHRetryOpts); err != nil { return nil, err } } @@ -679,7 +679,7 @@ func Monitor( if err != nil { return nil, err } - return c.Monitor(ctx, opts), nil + return c.Monitor(l, ctx, opts), nil } // StopOpts is used to pass options to Stop. @@ -870,10 +870,10 @@ func PgURL( if err := c.Parallel(l, "", len(nodes), 0, func(i int) (*install.RunResultDetails, error) { node := nodes[i] res := &install.RunResultDetails{Node: node} - res.Stdout, res.Err = c.GetInternalIP(ctx, node) + res.Stdout, res.Err = c.GetInternalIP(l, ctx, node) ips[i] = res.Stdout return res, err - }); err != nil { + }, install.DefaultSSHRetryOpts); err != nil { return nil, err } } @@ -1062,7 +1062,7 @@ func Pprof(l *logger.Logger, clusterName string, opts PprofOpts) error { outputFiles = append(outputFiles, outputFile) mu.Unlock() return res, nil - }) + }, install.DefaultSSHRetryOpts) for _, s := range outputFiles { l.Printf("Created %s", s) diff --git a/pkg/testutils/lint/passes/fmtsafe/functions.go b/pkg/testutils/lint/passes/fmtsafe/functions.go index d5a2ffa82b73..3d57459192a1 100644 --- a/pkg/testutils/lint/passes/fmtsafe/functions.go +++ b/pkg/testutils/lint/passes/fmtsafe/functions.go @@ -96,6 +96,9 @@ var requireConstFmt = map[string]bool{ "(*github.com/cockroachdb/cockroach/pkg/cmd/roachtest.testImpl).addFailure": true, "(*main.testImpl).addFailure": true, + "(*github.com/cockroachdb/cockroach/pkg/cmd/roachtest.testImpl).addFailureAndCancel": true, + "(*main.testImpl).addFailureAndCancel": true, + "(*main.testImpl).Fatalf": true, "(*github.com/cockroachdb/cockroach/pkg/cmd/roachtest.testImpl).Fatalf": true,