Skip to content

Commit

Permalink
fix: address minor data race in command output handling and enable --…
Browse files Browse the repository at this point in the history
…race in coverage
  • Loading branch information
garethgeorge committed Apr 12, 2024
1 parent 4e2bf1f commit 3223138
Show file tree
Hide file tree
Showing 5 changed files with 41 additions and 23 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/build-and-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ jobs:
args: release --snapshot --clean

- name: Test
run: PATH=$(pwd):$PATH go test ./...
run: PATH=$(pwd):$PATH go test ./... --race

build-win:
runs-on: windows-latest
Expand Down
36 changes: 31 additions & 5 deletions internal/ioutil/ioutil.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,20 @@
package ioutil

import (
"bytes"
"fmt"
"io"
"slices"
"sync"
)

type Capturer interface {
Bytes() []byte
}

// HeadWriter keeps the first 'Limit' bytes in memory.
type HeadWriter struct {
mu sync.Mutex
Buf []byte
Limit int
}
Expand All @@ -25,18 +33,23 @@ func (w *HeadWriter) Write(p []byte) (n int, err error) {
}

func (w *HeadWriter) Bytes() []byte {
return w.Buf
w.mu.Lock()
defer w.mu.Unlock()
return slices.Clone(w.Buf)
}

// tailWriter keeps the last 'Limit' bytes in memory.
type TailWriter struct {
mu sync.Mutex
Buf []byte
Limit int
}

var _ io.Writer = &TailWriter{}

func (w *TailWriter) Write(p []byte) (n int, err error) {
w.mu.Lock()
defer w.mu.Unlock()
w.Buf = append(w.Buf, p...)
if len(w.Buf) > w.Limit {
w.Buf = w.Buf[len(w.Buf)-w.Limit:]
Expand All @@ -45,10 +58,13 @@ func (w *TailWriter) Write(p []byte) (n int, err error) {
}

func (w *TailWriter) Bytes() []byte {
return w.Buf
w.mu.Lock()
defer w.mu.Unlock()
return slices.Clone(w.Buf)
}

type OutputCapturer struct {
mu sync.Mutex
HeadWriter
TailWriter
Limit int
Expand All @@ -66,21 +82,31 @@ func NewOutputCapturer(limit int) *OutputCapturer {
}

func (w *OutputCapturer) Write(p []byte) (n int, err error) {
w.mu.Lock()
defer w.mu.Unlock()
w.HeadWriter.Write(p)
w.TailWriter.Write(p)
w.totalBytes += len(p)
return len(p), nil
}

func (w *OutputCapturer) String() string {
func (w *OutputCapturer) Bytes() []byte {
w.mu.Lock()
defer w.mu.Unlock()
head := w.HeadWriter.Bytes()
tail := w.TailWriter.Bytes()
if w.totalBytes <= w.Limit {
return string(head)
return head
}

head = head[:w.Limit/2]
tail = tail[len(tail)-w.Limit/2:]

return fmt.Sprintf("%s...[%v bytes dropped]...%s", string(head), w.totalBytes-len(head)-len(tail), string(tail))
buf := bytes.NewBuffer(make([]byte, 0, len(head)+len(tail)+100))

buf.Write(head)
buf.WriteString(fmt.Sprintf("...[%v bytes dropped]...", w.totalBytes-len(head)-len(tail)))
buf.Write(tail)

return buf.Bytes()
}
4 changes: 2 additions & 2 deletions internal/orchestrator/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,8 @@ func (t *TaskWithOperation) runWithOpAndContext(ctx context.Context, do func(ctx

err := do(ctx, t.op)

if str := capture.String(); len(str) > 0 {
ref, e := t.orch.logStore.Write([]byte(str))
if bytes := capture.Bytes(); len(bytes) > 0 {
ref, e := t.orch.logStore.Write(bytes)
if e != nil {
errors.Join(err, fmt.Errorf("failed to write log to logstore: %w", e))
}
Expand Down
4 changes: 4 additions & 0 deletions pkg/restic/logging.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,12 @@ func addLoggingToCommand(ctx context.Context, cmd *exec.Cmd) {
}
if cmd.Stdout != nil {
cmd.Stdout = io.MultiWriter(cmd.Stdout, logger)
} else {
cmd.Stdout = logger
}
if cmd.Stderr != nil {
cmd.Stderr = io.MultiWriter(cmd.Stderr, logger)
} else {
cmd.Stderr = logger
}
}
18 changes: 3 additions & 15 deletions pkg/restic/restic.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,12 +81,6 @@ func (r *Repo) pipeCmdOutputToWriter(cmd *exec.Cmd, handlers ...io.Writer) {
cmd.Stderr = io.MultiWriter(handlers...)
}

func (r *Repo) pipeCmdOutputToLogger(ctx context.Context, cmd *exec.Cmd) {
if logger := LoggerFromContext(ctx); logger != nil {
r.pipeCmdOutputToWriter(cmd, logger)
}
}

// init initializes the repo, the command will be cancelled with the context.
func (r *Repo) init(ctx context.Context, opts ...GenericOption) error {
if r.initialized {
Expand Down Expand Up @@ -175,7 +169,7 @@ func (r *Repo) Backup(ctx context.Context, paths []string, progressCallback func
}

if cmdErr != nil || readErr != nil {
return summary, newCmdErrorPreformatted(ctx, cmd, capture.String(), errors.Join(cmdErr, readErr))
return summary, newCmdErrorPreformatted(ctx, cmd, string(capture.Bytes()), errors.Join(cmdErr, readErr))
}

return summary, nil
Expand All @@ -185,7 +179,6 @@ func (r *Repo) Snapshots(ctx context.Context, opts ...GenericOption) ([]*Snapsho
cmd := r.commandWithContext(ctx, []string{"snapshots", "--json"}, opts...)
output := bytes.NewBuffer(nil)
r.pipeCmdOutputToWriter(cmd, output)
r.pipeCmdOutputToLogger(ctx, cmd)

if err := cmd.Run(); err != nil {
return nil, newCmdError(ctx, cmd, output.String(), err)
Expand Down Expand Up @@ -247,12 +240,11 @@ func (r *Repo) Prune(ctx context.Context, pruneOutput io.Writer, opts ...Generic
cmd := r.commandWithContext(ctx, args, opts...)
output := bytes.NewBuffer(nil)
r.pipeCmdOutputToWriter(cmd, output)
r.pipeCmdOutputToLogger(ctx, cmd)
if pruneOutput != nil {
r.pipeCmdOutputToWriter(cmd, pruneOutput)
}
if err := cmd.Run(); err != nil {
return newCmdErrorPreformatted(ctx, cmd, output.String(), err)
return newCmdError(ctx, cmd, output.String(), err)
}
return nil
}
Expand All @@ -262,7 +254,6 @@ func (r *Repo) Restore(ctx context.Context, snapshot string, callback func(*Rest
output := ioutil.NewOutputCapturer(outputBufferLimit)
reader, writer := io.Pipe()
r.pipeCmdOutputToWriter(cmd, output, writer)
r.pipeCmdOutputToLogger(ctx, cmd)

if err := cmd.Start(); err != nil {
return nil, newCmdError(ctx, cmd, "", err)
Expand Down Expand Up @@ -295,7 +286,7 @@ func (r *Repo) Restore(ctx context.Context, snapshot string, callback func(*Rest
wg.Wait()

if cmdErr != nil || readErr != nil {
return nil, newCmdErrorPreformatted(ctx, cmd, output.String(), errors.Join(cmdErr, readErr))
return nil, newCmdErrorPreformatted(ctx, cmd, string(output.Bytes()), errors.Join(cmdErr, readErr))
}

return summary, nil
Expand All @@ -310,7 +301,6 @@ func (r *Repo) ListDirectory(ctx context.Context, snapshot string, path string,
cmd := r.commandWithContext(ctx, []string{"ls", "--json", snapshot, path}, opts...)
output := bytes.NewBuffer(nil)
r.pipeCmdOutputToWriter(cmd, output)
r.pipeCmdOutputToLogger(ctx, cmd)

if err := cmd.Run(); err != nil {
return nil, nil, newCmdError(ctx, cmd, output.String(), err)
Expand All @@ -328,7 +318,6 @@ func (r *Repo) Unlock(ctx context.Context, opts ...GenericOption) error {
cmd := r.commandWithContext(ctx, []string{"unlock"}, opts...)
output := bytes.NewBuffer(nil)
r.pipeCmdOutputToWriter(cmd, output)
r.pipeCmdOutputToLogger(ctx, cmd)
if err := cmd.Run(); err != nil {
return newCmdError(ctx, cmd, output.String(), err)
}
Expand All @@ -339,7 +328,6 @@ func (r *Repo) Stats(ctx context.Context, opts ...GenericOption) (*RepoStats, er
cmd := r.commandWithContext(ctx, []string{"stats", "--json", "--mode=raw-data"}, opts...)
output := bytes.NewBuffer(nil)
r.pipeCmdOutputToWriter(cmd, output)
r.pipeCmdOutputToLogger(ctx, cmd)

if err := cmd.Run(); err != nil {
return nil, newCmdError(ctx, cmd, output.String(), err)
Expand Down

0 comments on commit 3223138

Please sign in to comment.