diff --git a/pkg/minikube/bootstrapper/ssh_runner.go b/pkg/minikube/bootstrapper/ssh_runner.go index 95bf59b91ec1..f10cb4dab5c0 100644 --- a/pkg/minikube/bootstrapper/ssh_runner.go +++ b/pkg/minikube/bootstrapper/ssh_runner.go @@ -17,6 +17,7 @@ limitations under the License. package bootstrapper import ( + "bytes" "fmt" "io" "path" @@ -26,6 +27,7 @@ import ( "github.com/pkg/errors" "golang.org/x/crypto/ssh" "k8s.io/minikube/pkg/minikube/assets" + "k8s.io/minikube/pkg/util" ) // SSHRunner runs commands through SSH. @@ -52,25 +54,80 @@ func (s *SSHRunner) Remove(f assets.CopyableFile) error { return sess.Run(cmd) } +type singleWriter struct { + b bytes.Buffer + mu sync.Mutex +} + +func (w *singleWriter) Write(p []byte) (int, error) { + w.mu.Lock() + defer w.mu.Unlock() + return w.b.Write(p) +} + +// teeSSH runs an SSH command, streaming stdout, stderr to logs +func teeSSH(s *ssh.Session, cmd string, outB io.Writer, errB io.Writer) error { + outPipe, err := s.StdoutPipe() + if err != nil { + return errors.Wrap(err, "stdout") + } + + errPipe, err := s.StderrPipe() + if err != nil { + return errors.Wrap(err, "stderr") + } + var wg sync.WaitGroup + wg.Add(2) + + go func() { + if err := util.TeePrefix(util.ErrPrefix, errPipe, errB, glog.Infof); err != nil { + glog.Errorf("tee stderr: %v", err) + } + wg.Done() + }() + go func() { + if err := util.TeePrefix(util.OutPrefix, outPipe, outB, glog.Infof); err != nil { + glog.Errorf("tee stdout: %v", err) + } + wg.Done() + }() + err = s.Run(cmd) + wg.Wait() + return err +} + // Run starts a command on the remote and waits for it to return. func (s *SSHRunner) Run(cmd string) error { - glog.Infoln("Run:", cmd) + glog.Infof("SSH: %s", cmd) sess, err := s.c.NewSession() if err != nil { - return errors.Wrap(err, "getting ssh session") + return errors.Wrap(err, "NewSession") } - defer sess.Close() - return sess.Run(cmd) + + defer func() { + if err := sess.Close(); err != nil { + if err != io.EOF { + glog.Errorf("session close: %v", err) + } + } + }() + var outB bytes.Buffer + var errB bytes.Buffer + err = teeSSH(sess, cmd, &outB, &errB) + if err != nil { + return errors.Wrapf(err, "command failed: %s\nstdout: %s\nstderr: %s", cmd, outB.String(), errB.String()) + } + return nil } // CombinedOutputTo runs the command and stores both command // output and error to out. -func (s *SSHRunner) CombinedOutputTo(cmd string, out io.Writer) error { - b, err := s.CombinedOutput(cmd) +func (s *SSHRunner) CombinedOutputTo(cmd string, w io.Writer) error { + out, err := s.CombinedOutput(cmd) if err != nil { - return errors.Wrapf(err, "running command: %s\n.", cmd) + return err } - _, err = out.Write([]byte(b)) + _, err = w.Write([]byte(out)) return err } @@ -80,15 +137,17 @@ func (s *SSHRunner) CombinedOutput(cmd string) (string, error) { glog.Infoln("Run with output:", cmd) sess, err := s.c.NewSession() if err != nil { - return "", errors.Wrap(err, "getting ssh session") + return "", errors.Wrap(err, "NewSession") } defer sess.Close() - b, err := sess.CombinedOutput(cmd) + var combined singleWriter + err = teeSSH(sess, cmd, &combined, &combined) + out := combined.b.String() if err != nil { - return "", errors.Wrapf(err, "running command: %s\n, output: %s", cmd, string(b)) + return "", err } - return string(b), nil + return out, nil } // Copy copies a file to the remote over SSH. @@ -97,18 +156,18 @@ func (s *SSHRunner) Copy(f assets.CopyableFile) error { mkdirCmd := fmt.Sprintf("sudo mkdir -p %s", f.GetTargetDir()) for _, cmd := range []string{deleteCmd, mkdirCmd} { if err := s.Run(cmd); err != nil { - return errors.Wrapf(err, "Error running command: %s", cmd) + return errors.Wrapf(err, "pre-copy") } } sess, err := s.c.NewSession() if err != nil { - return errors.Wrap(err, "Error creating new session via ssh client") + return errors.Wrap(err, "NewSession") } w, err := sess.StdinPipe() if err != nil { - return errors.Wrap(err, "Error accessing StdinPipe via ssh session") + return errors.Wrap(err, "StdinPipe") } // The scpcmd below *should not* return until all data is copied and the // StdinPipe is closed. But let's use a WaitGroup to make it expicit. @@ -123,12 +182,10 @@ func (s *SSHRunner) Copy(f assets.CopyableFile) error { fmt.Fprint(w, "\x00") }() - scpcmd := fmt.Sprintf("sudo scp -t %s", f.GetTargetDir()) - out, err := sess.CombinedOutput(scpcmd) + _, err = sess.CombinedOutput(fmt.Sprintf("sudo scp -t %s", f.GetTargetDir())) if err != nil { - return errors.Wrapf(err, "Error running scp command: %s output: %s", scpcmd, out) + return err } wg.Wait() - return nil } diff --git a/pkg/util/utils.go b/pkg/util/utils.go index 88b86368d101..5ebe4cd2a49c 100644 --- a/pkg/util/utils.go +++ b/pkg/util/utils.go @@ -17,6 +17,8 @@ limitations under the License. package util import ( + "bufio" + "bytes" "fmt" "io" "io/ioutil" @@ -33,6 +35,9 @@ import ( "github.com/pkg/errors" ) +const ErrPrefix = "! " +const OutPrefix = "> " + const ( downloadURL = "https://storage.googleapis.com/minikube/releases/%s/minikube-%s-amd64%s" ) @@ -199,3 +204,31 @@ func MaybeChownDirRecursiveToMinikubeUser(dir string) error { } return nil } + +// TeePrefix copies bytes from a reader to writer, logging each new line. +func TeePrefix(prefix string, r io.Reader, w io.Writer, logger func(format string, args ...interface{})) error { + scanner := bufio.NewScanner(r) + scanner.Split(bufio.ScanBytes) + var line bytes.Buffer + + for scanner.Scan() { + b := scanner.Bytes() + if _, err := w.Write(b); err != nil { + return err + } + + if bytes.IndexAny(b, "\r\n") == 0 { + if line.Len() > 0 { + logger("%s%s", prefix, line.String()) + line.Reset() + } + continue + } + line.Write(b) + } + // Catch trailing output in case stream does not end with a newline + if line.Len() > 0 { + logger("%s%s", prefix, line.String()) + } + return nil +} diff --git a/pkg/util/utils_test.go b/pkg/util/utils_test.go index eeea378564d6..f1027a7e4c96 100644 --- a/pkg/util/utils_test.go +++ b/pkg/util/utils_test.go @@ -17,9 +17,13 @@ limitations under the License. package util import ( + "bytes" + "fmt" "io" "net/http" "net/http/httptest" + "strings" + "sync" "testing" "github.com/pkg/errors" @@ -158,3 +162,39 @@ func TestGetBinaryDownloadURL(t *testing.T) { } } + +func TestTeePrefix(t *testing.T) { + var in bytes.Buffer + var out bytes.Buffer + var logged strings.Builder + + logSink := func(format string, args ...interface{}) { + logged.WriteString("(" + fmt.Sprintf(format, args...) + ")") + } + + // Simulate the primary use case: tee in the background. This also helps avoid I/O races. + var wg sync.WaitGroup + wg.Add(1) + go func() { + TeePrefix(":", &in, &out, logSink) + wg.Done() + }() + + in.Write([]byte("goo")) + in.Write([]byte("\n")) + in.Write([]byte("g\r\n\r\n")) + in.Write([]byte("le")) + wg.Wait() + + gotBytes := out.Bytes() + wantBytes := []byte("goo\ng\r\n\r\nle") + if !bytes.Equal(gotBytes, wantBytes) { + t.Errorf("output=%q, want: %q", gotBytes, wantBytes) + } + + gotLog := logged.String() + wantLog := "(:goo)(:g)(:le)" + if gotLog != wantLog { + t.Errorf("log=%q, want: %q", gotLog, wantLog) + } +} diff --git a/test/integration/docker_test.go b/test/integration/docker_test.go index 68428fe7d6c5..ede3f15ae220 100644 --- a/test/integration/docker_test.go +++ b/test/integration/docker_test.go @@ -40,32 +40,32 @@ func TestDocker(t *testing.T) { mk.RunWithContext(ctx, "delete") startCmd := fmt.Sprintf("start %s %s %s", mk.StartArgs, mk.Args, - "--docker-env=FOO=BAR --docker-env=BAZ=BAT --docker-opt=debug --docker-opt=icc=true") - out, err := mk.RunWithContext(ctx, startCmd) + "--docker-env=FOO=BAR --docker-env=BAZ=BAT --docker-opt=debug --docker-opt=icc=true --alsologtostderr --v=5") + stdout, stderr, err := mk.RunWithContext(ctx, startCmd) if err != nil { - t.Fatalf("start: %v\nstart out: %s", err, out) + t.Fatalf("start: %v\nstdout: %s\nstderr: %s", err, stdout, stderr) } mk.EnsureRunning() - out, err = mk.RunWithContext(ctx, "ssh -- systemctl show docker --property=Environment --no-pager") + stdout, stderr, err = mk.RunWithContext(ctx, "ssh -- systemctl show docker --property=Environment --no-pager") if err != nil { - t.Errorf("docker env: %v\ndocker env out: %s", err, out) + t.Errorf("docker env: %v\nstderr: %s", err, stderr) } for _, envVar := range []string{"FOO=BAR", "BAZ=BAT"} { - if !strings.Contains(string(out), envVar) { - t.Errorf("Env var %s missing: %s.", envVar, out) + if !strings.Contains(stdout, envVar) { + t.Errorf("Env var %s missing: %s.", envVar, stdout) } } - out, err = mk.RunWithContext(ctx, "ssh -- systemctl show docker --property=ExecStart --no-pager") + stdout, stderr, err = mk.RunWithContext(ctx, "ssh -- systemctl show docker --property=ExecStart --no-pager") if err != nil { - t.Errorf("ssh show docker: %v\nshow docker out: %s", err, out) + t.Errorf("ssh show docker: %v\nstderr: %s", err, stderr) } for _, opt := range []string{"--debug", "--icc=true"} { - if !strings.Contains(string(out), opt) { - t.Fatalf("Option %s missing from ExecStart: %s.", opt, out) + if !strings.Contains(stdout, opt) { + t.Fatalf("Option %s missing from ExecStart: %s.", opt, stdout) } } } diff --git a/test/integration/util/util.go b/test/integration/util/util.go index a4b4a8d5b19b..7a4c11d77072 100644 --- a/test/integration/util/util.go +++ b/test/integration/util/util.go @@ -27,6 +27,7 @@ import ( "path/filepath" "regexp" "strings" + "sync" "testing" "time" @@ -47,6 +48,15 @@ type MinikubeRunner struct { Runtime string } +// Logf writes logs to stdout if -v is set. +func Logf(str string, args ...interface{}) { + if !testing.Verbose() { + return + } + fmt.Printf(" %s | ", time.Now().Format("15:04:05")) + fmt.Println(fmt.Sprintf(str, args...)) +} + func (m *MinikubeRunner) Run(cmd string) error { _, err := m.SSH(cmd) return err @@ -55,6 +65,7 @@ func (m *MinikubeRunner) Run(cmd string) error { func (m *MinikubeRunner) Copy(f assets.CopyableFile) error { path, _ := filepath.Abs(m.BinaryPath) cmd := exec.Command("/bin/bash", "-c", path, "ssh", "--", fmt.Sprintf("cat >> %s", filepath.Join(f.GetTargetDir(), f.GetTargetName()))) + Logf("Running: %s", cmd) return cmd.Run() } @@ -67,27 +78,62 @@ func (m *MinikubeRunner) Remove(f assets.CopyableFile) error { return err } +// teeRun runs a command, streaming stdout, stderr to console +func (m *MinikubeRunner) teeRun(cmd *exec.Cmd) (string, string, error) { + errPipe, err := cmd.StderrPipe() + if err != nil { + return "", "", err + } + outPipe, err := cmd.StdoutPipe() + if err != nil { + return "", "", err + } + + cmd.Start() + var outB bytes.Buffer + var errB bytes.Buffer + var wg sync.WaitGroup + wg.Add(2) + go func() { + if err := commonutil.TeePrefix(commonutil.ErrPrefix, errPipe, &errB, Logf); err != nil { + m.T.Logf("tee: %v", err) + } + wg.Done() + }() + go func() { + if err := commonutil.TeePrefix(commonutil.OutPrefix, outPipe, &outB, Logf); err != nil { + m.T.Logf("tee: %v", err) + } + wg.Done() + }() + err = cmd.Wait() + wg.Wait() + return outB.String(), errB.String(), err +} + func (m *MinikubeRunner) RunCommand(command string, checkError bool) string { commandArr := strings.Split(command, " ") path, _ := filepath.Abs(m.BinaryPath) cmd := exec.Command(path, commandArr...) - stdout, err := cmd.Output() - + Logf("Run: %s", cmd.Args) + stdout, stderr, err := m.teeRun(cmd) if checkError && err != nil { if exitError, ok := err.(*exec.ExitError); ok { m.T.Fatalf("Error running command: %s %s. Output: %s", command, exitError.Stderr, stdout) } else { - m.T.Fatalf("Error running command: %s %v. Output: %s", command, err, stdout) + m.T.Fatalf("Error running command: %s %v. Output: %s", command, err, stderr) } } - return string(stdout) + return stdout } // RunWithContext calls the minikube command with a context, useful for timeouts. -func (m *MinikubeRunner) RunWithContext(ctx context.Context, command string) ([]byte, error) { +func (m *MinikubeRunner) RunWithContext(ctx context.Context, command string) (string, string, error) { commandArr := strings.Split(command, " ") path, _ := filepath.Abs(m.BinaryPath) - return exec.CommandContext(ctx, path, commandArr...).CombinedOutput() + cmd := exec.CommandContext(ctx, path, commandArr...) + Logf("Run: %s", cmd.Args) + return m.teeRun(cmd) } func (m *MinikubeRunner) RunDaemon(command string) (*exec.Cmd, *bufio.Reader) { @@ -115,11 +161,13 @@ func (m *MinikubeRunner) SetRuntime(runtime string) { func (m *MinikubeRunner) SSH(command string) (string, error) { path, _ := filepath.Abs(m.BinaryPath) cmd := exec.Command(path, "ssh", command) + Logf("SSH: %s", command) + stdout, err := cmd.CombinedOutput() + Logf("Output: %s", stdout) if err, ok := err.(*exec.ExitError); ok { return string(stdout), err } - return string(stdout), nil } @@ -127,9 +175,9 @@ func (m *MinikubeRunner) Start() { switch r := m.Runtime; r { case constants.ContainerdRuntime: containerdFlags := "--container-runtime=containerd --network-plugin=cni --docker-opt containerd=/var/run/containerd/containerd.sock" - m.RunCommand(fmt.Sprintf("start %s %s %s", m.StartArgs, m.Args, containerdFlags), true) + m.RunCommand(fmt.Sprintf("start %s %s %s --alsologtostderr --v=5", m.StartArgs, m.Args, containerdFlags), true) default: - m.RunCommand(fmt.Sprintf("start %s %s", m.StartArgs, m.Args), true) + m.RunCommand(fmt.Sprintf("start %s %s --alsologtostderr --v=5", m.StartArgs, m.Args), true) } } @@ -167,7 +215,7 @@ func (m *MinikubeRunner) CheckStatus(desired string) { func (m *MinikubeRunner) CheckStatusNoFail(desired string) error { s := m.GetStatus() if s != desired { - return fmt.Errorf("Machine is in the wrong state: %s, expected %s", s, desired) + return fmt.Errorf("got state: %q, expected %q", s, desired) } return nil }