From 7fcab39a6992707aacab7167103ef0eb3cfceabe Mon Sep 17 00:00:00 2001 From: Adam Babik Date: Fri, 1 Nov 2024 20:55:05 +0100 Subject: [PATCH 1/4] Fix data races --- Makefile | 3 ++ internal/auth/auth.go | 10 +++-- internal/command/command_inline_shell_test.go | 2 - internal/command/command_terminal_test.go | 8 ++-- internal/command/command_virtual.go | 3 +- internal/runner/command.go | 9 ++-- internal/runnerv2service/execution.go | 3 +- .../runnerv2service/service_execute_test.go | 4 +- internal/sbuffer/safe_buffer.go | 43 +++++++++++++++++++ internal/session/env_store_map.go | 3 +- 10 files changed, 71 insertions(+), 17 deletions(-) create mode 100644 internal/sbuffer/safe_buffer.go diff --git a/Makefile b/Makefile index 3cb26410..9886e407 100644 --- a/Makefile +++ b/Makefile @@ -106,6 +106,8 @@ lint: ./... @staticcheck ./... @gosec -quiet -exclude=G110,G204,G304,G404 -exclude-generated ./... + @go vet -stdmethods=false ./... + @go vet -vettool=$(shell go env GOPATH)/bin/checklocks ./... .PHONY: pre-commit pre-commit: build wasm test lint @@ -120,6 +122,7 @@ install/dev: go install github.com/icholy/gomajor@v0.13.1 go install github.com/stateful/go-proto-gql/protoc-gen-gql@latest go install github.com/atombender/go-jsonschema@v0.16.0 + go install gvisor.dev/gvisor/tools/checklocks/cmd/checklocks@go .PHONY: install/goreleaser install/goreleaser: diff --git a/internal/auth/auth.go b/internal/auth/auth.go index 01ee28d7..1164446b 100644 --- a/internal/auth/auth.go +++ b/internal/auth/auth.go @@ -63,6 +63,7 @@ type Auth struct { env Env // loginInProgress is a mutex to prevent concurrent `Login` calls. + // +checkatomic loginInProgress uint32 // loginSession contains details about the current login session. @@ -149,7 +150,6 @@ func (a *Auth) Login(ctx context.Context) error { if !atomic.CompareAndSwapUint32(&a.loginInProgress, 0, 1) { return errors.New("login session already in progress") } - defer atomic.StoreUint32(&a.loginInProgress, 0) a.log.Debug("start logging in") @@ -159,10 +159,14 @@ func (a *Auth) Login(ctx context.Context) error { a.env = &desktopEnv{Session: a.loginSession} } + var err error if a.env.IsAutonomous() { - return a.loginAuto(ctx) + err = a.loginAuto(ctx) + } else { + err = a.loginManual(ctx) } - return a.loginManual(ctx) + atomic.StoreUint32(&a.loginInProgress, 0) + return err } func (a *Auth) loginAuto(ctx context.Context) error { diff --git a/internal/command/command_inline_shell_test.go b/internal/command/command_inline_shell_test.go index 0998aa5c..499fee99 100644 --- a/internal/command/command_inline_shell_test.go +++ b/internal/command/command_inline_shell_test.go @@ -24,8 +24,6 @@ import ( ) func TestInlineShellCommand_CollectEnv(t *testing.T) { - t.Parallel() - t.Run("Fifo", func(t *testing.T) { envCollectorUseFifo = true testInlineShellCommandCollectEnv(t) diff --git a/internal/command/command_terminal_test.go b/internal/command/command_terminal_test.go index 178f4ccb..8f16d8fd 100644 --- a/internal/command/command_terminal_test.go +++ b/internal/command/command_terminal_test.go @@ -4,7 +4,6 @@ package command import ( "bufio" - "bytes" "context" "io" "os" @@ -16,6 +15,7 @@ import ( "github.com/stretchr/testify/require" "go.uber.org/zap/zaptest" + "github.com/stateful/runme/v3/internal/sbuffer" "github.com/stateful/runme/v3/internal/session" runnerv2 "github.com/stateful/runme/v3/pkg/api/gen/proto/go/runme/runner/v2" ) @@ -26,7 +26,7 @@ func TestTerminalCommand_EnvPropagation(t *testing.T) { session, err := session.New() require.NoError(t, err) stdinR, stdinW := io.Pipe() - stdout := bytes.NewBuffer(nil) + stdout := sbuffer.New(nil) factory := NewFactory(WithLogger(zaptest.NewLogger(t))) @@ -66,13 +66,11 @@ func TestTerminalCommand_EnvPropagation(t *testing.T) { } func TestTerminalCommand_Intro(t *testing.T) { - t.Parallel() - session, err := session.New(session.WithSeedEnv(os.Environ())) require.NoError(t, err) stdinR, stdinW := io.Pipe() - stdout := bytes.NewBuffer(nil) + stdout := sbuffer.New(nil) factory := NewFactory(WithLogger(zaptest.NewLogger(t))) diff --git a/internal/command/command_virtual.go b/internal/command/command_virtual.go index bc62333f..ff35c2ac 100644 --- a/internal/command/command_virtual.go +++ b/internal/command/command_virtual.go @@ -30,7 +30,8 @@ type virtualCommand struct { wg sync.WaitGroup // watch goroutines copying I/O - mu sync.Mutex // protect err + mu sync.Mutex // protect err + // +checklocks:mu err error } diff --git a/internal/runner/command.go b/internal/runner/command.go index c0ca18d3..c049e34d 100644 --- a/internal/runner/command.go +++ b/internal/runner/command.go @@ -39,7 +39,8 @@ type command struct { Stdout io.Writer Stderr io.Writer - cmd *exec.Cmd + cmd *exec.Cmd + // +checkatomic cmdDone uint32 // pty and tty as pseud-terminal primary and secondary. @@ -53,8 +54,10 @@ type command struct { context context.Context wg sync.WaitGroup - mu sync.Mutex - err error + + mu sync.Mutex + // +checklocks:mu + err error logger *zap.Logger } diff --git a/internal/runnerv2service/execution.go b/internal/runnerv2service/execution.go index 7328ce02..51ac86cb 100644 --- a/internal/runnerv2service/execution.go +++ b/internal/runnerv2service/execution.go @@ -35,7 +35,8 @@ const ( var opininatedEnvVarNamingRegexp = regexp.MustCompile(`^[A-Z_][A-Z0-9_]{1}[A-Z0-9_]*[A-Z][A-Z0-9_]*$`) type buffer struct { - mu *sync.Mutex + mu *sync.Mutex + // +checklocks:mu b *bytes.Buffer closed *atomic.Bool close chan struct{} diff --git a/internal/runnerv2service/service_execute_test.go b/internal/runnerv2service/service_execute_test.go index 3957b1f3..50850e84 100644 --- a/internal/runnerv2service/service_execute_test.go +++ b/internal/runnerv2service/service_execute_test.go @@ -16,6 +16,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "go.uber.org/zap" "go.uber.org/zap/zaptest" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" @@ -1094,7 +1095,8 @@ func testStartRunnerServiceServer(t *testing.T) ( server := grpc.NewServer() - runnerService, err := NewRunnerService(factory, logger) + // Using nop logger to avoid data race. + runnerService, err := NewRunnerService(factory, zap.NewNop()) require.NoError(t, err) runnerv2.RegisterRunnerServiceServer(server, runnerService) diff --git a/internal/sbuffer/safe_buffer.go b/internal/sbuffer/safe_buffer.go new file mode 100644 index 00000000..cc6d5e6f --- /dev/null +++ b/internal/sbuffer/safe_buffer.go @@ -0,0 +1,43 @@ +package sbuffer + +import ( + "bytes" + "sync" +) + +type Buffer struct { + // +checklocks:mu + b *bytes.Buffer + mu *sync.RWMutex +} + +func New(buf []byte) *Buffer { + return &Buffer{ + b: bytes.NewBuffer(buf), + mu: &sync.RWMutex{}, + } +} + +func (b *Buffer) Bytes() []byte { + b.mu.RLock() + defer b.mu.RUnlock() + return b.b.Bytes() +} + +func (b *Buffer) Write(p []byte) (int, error) { + b.mu.Lock() + defer b.mu.Unlock() + return b.b.Write(p) +} + +func (b *Buffer) WriteString(s string) (int, error) { + b.mu.Lock() + defer b.mu.Unlock() + return b.b.WriteString(s) +} + +func (b *Buffer) Read(p []byte) (int, error) { + b.mu.Lock() + defer b.mu.Unlock() + return b.b.Read(p) +} diff --git a/internal/session/env_store_map.go b/internal/session/env_store_map.go index 855174d3..563f9bf4 100644 --- a/internal/session/env_store_map.go +++ b/internal/session/env_store_map.go @@ -7,7 +7,8 @@ import ( ) type EnvStoreMap struct { - mu sync.RWMutex + mu sync.RWMutex + // +checklocks:mu items map[string]string } From 768503a724f9a4b8265cffbfe1afbba365c66905 Mon Sep 17 00:00:00 2001 From: Adam Babik Date: Fri, 1 Nov 2024 21:10:38 +0100 Subject: [PATCH 2/4] Fix TestCommand_SetWinsize/Terminal --- internal/command/command_unix_test.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/internal/command/command_unix_test.go b/internal/command/command_unix_test.go index 4767ff8b..5e18556b 100644 --- a/internal/command/command_unix_test.go +++ b/internal/command/command_unix_test.go @@ -383,7 +383,7 @@ func TestCommand_SetWinsize(t *testing.T) { // TODO(adamb): on macOS is is not necessary, but on Linux // we need to wait for the shell to start before we start sending commands. - time.Sleep(time.Second) + time.Sleep(time.Second * 3) err = SetWinsize(cmd, &Winsize{Rows: 45, Cols: 56, X: 0, Y: 0}) require.NoError(t, err) @@ -393,7 +393,8 @@ func TestCommand_SetWinsize(t *testing.T) { require.NoError(t, err) err = cmd.Wait(context.Background()) require.NoError(t, err, "command failed due to: %s", stdout.String()) - require.Contains(t, stdout.String(), "56\r\n45\r\n") + require.Contains(t, stdout.String(), "56\r\n") + require.Contains(t, stdout.String(), "45\r\n") }) } From 74c003d326ffec4efc1ce2f122f97f51beef2487 Mon Sep 17 00:00:00 2001 From: Sebastian Tiedtke Date: Mon, 4 Nov 2024 10:46:24 -0800 Subject: [PATCH 3/4] Fix bad merge --- internal/command/command_terminal_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/command/command_terminal_test.go b/internal/command/command_terminal_test.go index 6d66a86e..d335722b 100644 --- a/internal/command/command_terminal_test.go +++ b/internal/command/command_terminal_test.go @@ -104,7 +104,7 @@ func expectContainLines(ctx context.Context, t *testing.T, r io.Reader, expected output := new(strings.Builder) for { - buf := new(bytes.Buffer) + buf := new(sbuffer.Buffer) r := io.TeeReader(r, buf) scanner := bufio.NewScanner(r) From 6f5c694e90b87a6c1bbefffef922b791b52da3ca Mon Sep 17 00:00:00 2001 From: Sebastian Tiedtke Date: Mon, 4 Nov 2024 11:04:52 -0800 Subject: [PATCH 4/4] Reapply due to bad merge --- internal/command/command_terminal_test.go | 20 ++++++-------------- 1 file changed, 6 insertions(+), 14 deletions(-) diff --git a/internal/command/command_terminal_test.go b/internal/command/command_terminal_test.go index d335722b..8f16d8fd 100644 --- a/internal/command/command_terminal_test.go +++ b/internal/command/command_terminal_test.go @@ -52,12 +52,12 @@ func TestTerminalCommand_EnvPropagation(t *testing.T) { // Terminal command sets up a trap on EXIT. // Wait for it before starting to send commands. - expectContainLines(ctx, t, stdout, []string{"trap -- \"__cleanup\" EXIT"}) + expectContainLines(t, stdout, []string{"trap -- \"__cleanup\" EXIT"}) _, err = stdinW.Write([]byte("export TEST_ENV=1\n")) require.NoError(t, err) // Wait for the prompt before sending the next command. - expectContainLines(ctx, t, stdout, []string{"$"}) + expectContainLines(t, stdout, []string{"$"}) _, err = stdinW.Write([]byte("exit\n")) require.NoError(t, err) @@ -94,19 +94,15 @@ func TestTerminalCommand_Intro(t *testing.T) { require.NoError(t, cmd.Start(ctx)) - expectContainLines(ctx, t, stdout, []string{envSourceCmd, introSecondLine}) + expectContainLines(t, stdout, []string{envSourceCmd, introSecondLine}) } -func expectContainLines(ctx context.Context, t *testing.T, r io.Reader, expected []string) { +func expectContainLines(t *testing.T, r io.Reader, expected []string) { t.Helper() - + var output strings.Builder hits := make(map[string]bool, len(expected)) - output := new(strings.Builder) for { - buf := new(sbuffer.Buffer) - r := io.TeeReader(r, buf) - scanner := bufio.NewScanner(r) for scanner.Scan() { _, _ = output.WriteString(scanner.Text()) @@ -123,11 +119,7 @@ func expectContainLines(ctx context.Context, t *testing.T, r io.Reader, expected return } - select { - case <-time.After(100 * time.Millisecond): - case <-ctx.Done(): - t.Fatalf("error waiting for line %q, instead read %q: %s", expected, buf.Bytes(), ctx.Err()) - } + time.Sleep(time.Millisecond * 400) } }