From 5d4804bb9477554a0316d2f3503c09e6524e8a9d Mon Sep 17 00:00:00 2001 From: garethgeorge Date: Fri, 12 Apr 2024 22:47:35 -0700 Subject: [PATCH] fix: improve restic pkg's output handling and buffering --- go.mod | 16 +++--- go.sum | 18 ++++++ pkg/restic/outputs.go | 26 ++------- pkg/restic/outputs_test.go | 4 +- pkg/restic/restic.go | 101 ++++++++++++++++------------------ pkg/restic/restic_test.go | 48 ++++++++++++++++ test/helpers/installrestic.go | 2 +- 7 files changed, 128 insertions(+), 87 deletions(-) diff --git a/go.mod b/go.mod index 0da503e1..772ffa92 100644 --- a/go.mod +++ b/go.mod @@ -6,6 +6,8 @@ require ( connectrpc.com/connect v1.16.0 github.com/alessio/shellescape v1.4.2 github.com/containrrr/shoutrrr v0.8.0 + github.com/djherbis/buffer v1.2.0 + github.com/djherbis/nio/v3 v3.0.1 github.com/gitploy-io/cronexpr v0.2.2 github.com/golang-jwt/jwt/v5 v5.2.1 github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510 @@ -15,11 +17,11 @@ require ( github.com/natefinch/atomic v1.0.1 go.etcd.io/bbolt v1.3.9 go.uber.org/zap v1.27.0 - golang.org/x/crypto v0.21.0 - golang.org/x/net v0.22.0 - golang.org/x/sync v0.6.0 - google.golang.org/genproto/googleapis/api v0.0.0-20240401170217-c3f982113cda - google.golang.org/grpc v1.62.1 + golang.org/x/crypto v0.22.0 + golang.org/x/net v0.24.0 + golang.org/x/sync v0.7.0 + google.golang.org/genproto/googleapis/api v0.0.0-20240412170617-26222e5d3d56 + google.golang.org/grpc v1.63.2 google.golang.org/protobuf v1.33.0 ) @@ -30,8 +32,8 @@ require ( github.com/mattn/go-isatty v0.0.20 // indirect github.com/stretchr/testify v1.8.4 // indirect go.uber.org/multierr v1.11.0 // indirect - golang.org/x/sys v0.18.0 // indirect + golang.org/x/sys v0.19.0 // indirect golang.org/x/text v0.14.0 // indirect golang.org/x/tools v0.19.0 // indirect - google.golang.org/genproto/googleapis/rpc v0.0.0-20240401170217-c3f982113cda // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20240412170617-26222e5d3d56 // indirect ) diff --git a/go.sum b/go.sum index 4effd7a9..0b923b17 100644 --- a/go.sum +++ b/go.sum @@ -6,6 +6,10 @@ github.com/containrrr/shoutrrr v0.8.0 h1:mfG2ATzIS7NR2Ec6XL+xyoHzN97H8WPjir8aYzJ github.com/containrrr/shoutrrr v0.8.0/go.mod h1:ioyQAyu1LJY6sILuNyKaQaw+9Ttik5QePU8atnAdO2o= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/djherbis/buffer v1.2.0 h1:PH5Dd2ss0C7CRRhQCZ2u7MssF+No9ide8Ye71nPHcrQ= +github.com/djherbis/buffer v1.2.0/go.mod h1:fjnebbZjCUpPinBRD+TDwXSOeNQ7fPQWLfGQqiAiUyE= +github.com/djherbis/nio/v3 v3.0.1 h1:6wxhnuppteMa6RHA4L81Dq7ThkZH8SwnDzXDYy95vB4= +github.com/djherbis/nio/v3 v3.0.1/go.mod h1:Ng4h80pbZFMla1yKzm61cF0tqqilXZYrogmWgZxOcmg= github.com/fatih/color v1.16.0 h1:zmkK9Ngbjj+K0yRhTVONQh1p/HknKYSlNT+vZCzyokM= github.com/fatih/color v1.16.0/go.mod h1:fL2Sau1YI5c0pdGEVCbKQbLXB6edEj1ZgiY4NijnWvE= github.com/gitploy-io/cronexpr v0.2.2 h1:Au+wK6FqmOLAF7AkW6q4gnrNXTe3rEW97XFZ4chy0xs= @@ -58,25 +62,39 @@ go.uber.org/zap v1.27.0 h1:aJMhYGrd5QSmlpLMr2MftRKl7t8J8PTZPA732ud/XR8= go.uber.org/zap v1.27.0/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E= golang.org/x/crypto v0.21.0 h1:X31++rzVUdKhX5sWmSOFZxx8UW/ldWx55cbf08iNAMA= golang.org/x/crypto v0.21.0/go.mod h1:0BP7YvVV9gBbVKyeTG0Gyn+gZm94bibOW5BjDEYAOMs= +golang.org/x/crypto v0.22.0 h1:g1v0xeRhjcugydODzvb3mEM9SQ0HGp9s/nh3COQ/C30= +golang.org/x/crypto v0.22.0/go.mod h1:vr6Su+7cTlO45qkww3VDJlzDn0ctJvRgYbC2NvXHt+M= golang.org/x/net v0.22.0 h1:9sGLhx7iRIHEiX0oAJ3MRZMUCElJgy7Br1nO+AMN3Tc= golang.org/x/net v0.22.0/go.mod h1:JKghWKKOSdJwpW2GEx0Ja7fmaKnMsbu+MWVZTokSYmg= +golang.org/x/net v0.24.0 h1:1PcaxkF854Fu3+lvBIx5SYn9wRlBzzcnHZSiaFFAb0w= +golang.org/x/net v0.24.0/go.mod h1:2Q7sJY5mzlzWjKtYUEXSlBWCdyaioyXzRB2RtU8KVE8= golang.org/x/sync v0.6.0 h1:5BMeUDZ7vkXGfEr1x9B4bRcTH4lpkTkpdh0T/J+qjbQ= golang.org/x/sync v0.6.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= +golang.org/x/sync v0.7.0 h1:YsImfSBoP9QPYL0xyKJPq0gcaJdG3rInoqxTWbfQu9M= +golang.org/x/sync v0.7.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sys v0.0.0-20190529164535-6a60838ec259/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.18.0 h1:DBdB3niSjOA/O0blCZBqDefyWNYveAYMNF1Wum0DYQ4= golang.org/x/sys v0.18.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.19.0 h1:q5f1RH2jigJ1MoAWp2KTp3gm5zAGFUTarQZ5U386+4o= +golang.org/x/sys v0.19.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ= golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= golang.org/x/tools v0.19.0 h1:tfGCXNR1OsFG+sVdLAitlpjAvD/I6dHDKnYrpEZUHkw= golang.org/x/tools v0.19.0/go.mod h1:qoJWxmGSIBmAeriMx19ogtrEPrGtDbPK634QFIcLAhc= google.golang.org/genproto/googleapis/api v0.0.0-20240401170217-c3f982113cda h1:b6F6WIV4xHHD0FA4oIyzU6mHWg2WI2X1RBehwa5QN38= google.golang.org/genproto/googleapis/api v0.0.0-20240401170217-c3f982113cda/go.mod h1:AHcE/gZH76Bk/ROZhQphlRoWo5xKDEtz3eVEO1LfA8c= +google.golang.org/genproto/googleapis/api v0.0.0-20240412170617-26222e5d3d56 h1:KuFzeG+qPmpT8KpJXcrKAyeHhn64dgEICWlccP9qp0U= +google.golang.org/genproto/googleapis/api v0.0.0-20240412170617-26222e5d3d56/go.mod h1:wTHjrkbcS8AoQbb/0v9bFIPItZQPAsyVfgG9YPUhjAM= google.golang.org/genproto/googleapis/rpc v0.0.0-20240401170217-c3f982113cda h1:LI5DOvAxUPMv/50agcLLoo+AdWc1irS9Rzz4vPuD1V4= google.golang.org/genproto/googleapis/rpc v0.0.0-20240401170217-c3f982113cda/go.mod h1:WtryC6hu0hhx87FDGxWCDptyssuo68sk10vYjF+T9fY= +google.golang.org/genproto/googleapis/rpc v0.0.0-20240412170617-26222e5d3d56 h1:zviK8GX4VlMstrK3JkexM5UHjH1VOkRebH9y3jhSBGk= +google.golang.org/genproto/googleapis/rpc v0.0.0-20240412170617-26222e5d3d56/go.mod h1:WtryC6hu0hhx87FDGxWCDptyssuo68sk10vYjF+T9fY= google.golang.org/grpc v1.62.1 h1:B4n+nfKzOICUXMgyrNd19h/I9oH0L1pizfk1d4zSgTk= google.golang.org/grpc v1.62.1/go.mod h1:IWTG0VlJLCh1SkC58F7np9ka9mx/WNkjl4PGJaiq+QE= +google.golang.org/grpc v1.63.2 h1:MUeiw1B2maTVZthpU5xvASfTh3LDbxHd6IJ6QQVU+xM= +google.golang.org/grpc v1.63.2/go.mod h1:WAX/8DgncnokcFUldAxq7GeB5DXHDbMF+lLvDomNkRA= google.golang.org/protobuf v1.33.0 h1:uNO2rsAINq/JlFpSdYEKIZ0uKD/R9cpdv0T+yoGwGmI= google.golang.org/protobuf v1.33.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= diff --git a/pkg/restic/outputs.go b/pkg/restic/outputs.go index 6d4b7175..30249929 100644 --- a/pkg/restic/outputs.go +++ b/pkg/restic/outputs.go @@ -2,13 +2,10 @@ package restic import ( "bufio" - "context" "encoding/json" "errors" "fmt" "io" - "os/exec" - "slices" "time" v1 "github.com/garethgeorge/backrest/gen/go/v1" @@ -95,7 +92,7 @@ func (b *BackupProgressEntry) Validate() error { } // readBackupProgressEntries returns the summary event or an error if the command failed. -func readBackupProgressEntries(ctx context.Context, cmd *exec.Cmd, output io.Reader, callback func(event *BackupProgressEntry)) (*BackupProgressEntry, error) { +func readBackupProgressEntries(output io.Reader, callback func(event *BackupProgressEntry)) (*BackupProgressEntry, error) { scanner := bufio.NewScanner(output) scanner.Split(bufio.ScanLines) @@ -104,14 +101,8 @@ func readBackupProgressEntries(ctx context.Context, cmd *exec.Cmd, output io.Rea // first event is handled specially to detect non-JSON output and fast-path out. if scanner.Scan() { var event BackupProgressEntry - if err := json.Unmarshal(scanner.Bytes(), &event); err != nil { - var bytes = slices.Clone(scanner.Bytes()) - for scanner.Scan() { - bytes = append(bytes, scanner.Bytes()...) - } - - return nil, newCmdError(ctx, cmd, string(bytes), fmt.Errorf("command output was not JSON: %w", err)) + return nil, fmt.Errorf("command output was not JSON: %w", err) } if err := event.Validate(); err != nil { return nil, err @@ -135,7 +126,6 @@ func readBackupProgressEntries(ctx context.Context, cmd *exec.Cmd, output io.Rea // skip it. This is a best-effort attempt to parse the output. continue } - if callback != nil { callback(&event) } @@ -143,15 +133,12 @@ func readBackupProgressEntries(ctx context.Context, cmd *exec.Cmd, output io.Rea summary = &event } } - if err := scanner.Err(); err != nil { return summary, fmt.Errorf("scanner encountered error: %w", err) } - if summary == nil { return nil, fmt.Errorf("no summary event found") } - return summary, nil } @@ -244,7 +231,7 @@ func (e *RestoreProgressEntry) Validate() error { } // readRestoreProgressEntries returns the summary event or an error if the command failed. -func readRestoreProgressEntries(ctx context.Context, cmd *exec.Cmd, output io.Reader, callback func(event *RestoreProgressEntry)) (*RestoreProgressEntry, error) { +func readRestoreProgressEntries(output io.Reader, callback func(event *RestoreProgressEntry)) (*RestoreProgressEntry, error) { scanner := bufio.NewScanner(output) scanner.Split(bufio.ScanLines) @@ -255,12 +242,7 @@ func readRestoreProgressEntries(ctx context.Context, cmd *exec.Cmd, output io.Re var event RestoreProgressEntry if err := json.Unmarshal(scanner.Bytes(), &event); err != nil { - var bytes = slices.Clone(scanner.Bytes()) - for scanner.Scan() { - bytes = append(bytes, scanner.Bytes()...) - } - - return nil, newCmdError(ctx, cmd, string(bytes), fmt.Errorf("command output was not JSON: %w", err)) + return nil, fmt.Errorf("command output was not JSON: %w", err) } if err := event.Validate(); err != nil { return nil, err diff --git a/pkg/restic/outputs_test.go b/pkg/restic/outputs_test.go index 3411d6c5..1d8bab30 100644 --- a/pkg/restic/outputs_test.go +++ b/pkg/restic/outputs_test.go @@ -2,8 +2,6 @@ package restic import ( "bytes" - "context" - "os/exec" "testing" ) @@ -14,7 +12,7 @@ func TestReadBackupProgressEntries(t *testing.T) { b := bytes.NewBuffer([]byte(testInput)) - summary, err := readBackupProgressEntries(context.Background(), &exec.Cmd{}, b, func(event *BackupProgressEntry) { + summary, err := readBackupProgressEntries(b, func(event *BackupProgressEntry) { t.Logf("event: %v", event) }) if err != nil { diff --git a/pkg/restic/restic.go b/pkg/restic/restic.go index c680e492..af89ff05 100644 --- a/pkg/restic/restic.go +++ b/pkg/restic/restic.go @@ -1,6 +1,7 @@ package restic import ( + "bufio" "bytes" "context" "encoding/json" @@ -13,6 +14,8 @@ import ( "strings" "sync" + "github.com/djherbis/buffer" + nio "github.com/djherbis/nio/v3" "github.com/garethgeorge/backrest/internal/ioutil" ) @@ -120,58 +123,51 @@ func (r *Repo) Backup(ctx context.Context, paths []string, progressCallback func args = append(args, paths...) cmd := r.commandWithContext(ctx, args, opts...) - capture := ioutil.NewOutputCapturer(outputBufferLimit) - reader, writer := io.Pipe() - r.pipeCmdOutputToWriter(cmd, writer, capture) - if err := cmd.Start(); err != nil { - return nil, newCmdError(ctx, cmd, "", err) + fullOutput := ioutil.NewOutputCapturer(outputBufferLimit) + var bufferedWriter *bufio.Writer + if logger := LoggerFromContext(ctx); logger != nil { + bufferedWriter = bufio.NewWriter(io.MultiWriter(fullOutput, logger)) + } else { + bufferedWriter = bufio.NewWriter(fullOutput) } + buf := buffer.New(32 * 1024) // 32KB IO buffer for the realtime event parsing + reader, writer := nio.Pipe(buf) + cmd.Stderr = io.MultiWriter(writer, bufferedWriter) + cmd.Stdout = io.MultiWriter(writer, bufferedWriter) - var wg sync.WaitGroup - var summary *BackupProgressEntry - var cmdErr error var readErr error - + var summary *BackupProgressEntry + var wg sync.WaitGroup wg.Add(1) go func() { defer wg.Done() var err error - summary, err = readBackupProgressEntries(ctx, cmd, reader, progressCallback) + summary, err = readBackupProgressEntries(reader, progressCallback) if err != nil { readErr = fmt.Errorf("processing command output: %w", err) + _ = cmd.Cancel() // cancel the command to prevent it from hanging now that we're not reading from it. } }() - wg.Add(1) - go func() { - defer writer.Close() - defer wg.Done() - if err := cmd.Wait(); err != nil { + cmdErr := cmd.Run() + writer.Close() + wg.Wait() + bufferedWriter.Flush() + + if cmdErr != nil || readErr != nil { + if cmdErr != nil { var exitErr *exec.ExitError - if errors.As(err, &exitErr) { + if errors.As(cmdErr, &exitErr) { if exitErr.ExitCode() == 3 { cmdErr = ErrPartialBackup } else { - cmdErr = fmt.Errorf("exit code %v: %w", exitErr.ExitCode(), ErrBackupFailed) + cmdErr = fmt.Errorf("exit code %d: %w", exitErr.ExitCode(), ErrBackupFailed) } - return } - cmdErr = err } - }() - - wg.Wait() - - if logger := LoggerFromContext(ctx); logger != nil && summary != nil { - bytes, _ := json.MarshalIndent(summary, "", " ") - logger.Write(bytes) + return summary, newCmdErrorPreformatted(ctx, cmd, string(fullOutput.Bytes()), errors.Join(cmdErr, readErr)) } - - if cmdErr != nil || readErr != nil { - return summary, newCmdErrorPreformatted(ctx, cmd, string(capture.Bytes()), errors.Join(cmdErr, readErr)) - } - return summary, nil } @@ -251,44 +247,41 @@ func (r *Repo) Prune(ctx context.Context, pruneOutput io.Writer, opts ...Generic func (r *Repo) Restore(ctx context.Context, snapshot string, callback func(*RestoreProgressEntry), opts ...GenericOption) (*RestoreProgressEntry, error) { cmd := r.commandWithContext(ctx, []string{"restore", "--json", snapshot}, opts...) - output := ioutil.NewOutputCapturer(outputBufferLimit) + capture := ioutil.NewOutputCapturer(outputBufferLimit) // for error reporting. reader, writer := io.Pipe() - r.pipeCmdOutputToWriter(cmd, output, writer) - - if err := cmd.Start(); err != nil { - return nil, newCmdError(ctx, cmd, "", err) - } + r.pipeCmdOutputToWriter(cmd, writer, capture) - var wg sync.WaitGroup - var summary *RestoreProgressEntry - var cmdErr error var readErr error - + var summary *RestoreProgressEntry + var wg sync.WaitGroup wg.Add(1) go func() { defer wg.Done() var err error - summary, err = readRestoreProgressEntries(ctx, cmd, reader, callback) + summary, err = readRestoreProgressEntries(reader, callback) if err != nil { readErr = fmt.Errorf("processing command output: %w", err) + _ = cmd.Cancel() // cancel the command to prevent it from hanging now that we're not reading from it. } }() - wg.Add(1) - go func() { - defer writer.Close() - defer wg.Done() - if err := cmd.Wait(); err != nil { - cmdErr = err - } - }() - + cmdErr := cmd.Run() + writer.Close() wg.Wait() - if cmdErr != nil || readErr != nil { - return nil, newCmdErrorPreformatted(ctx, cmd, string(output.Bytes()), errors.Join(cmdErr, readErr)) - } + if cmdErr != nil { + var exitErr *exec.ExitError + if errors.As(cmdErr, &exitErr) { + if exitErr.ExitCode() == 3 { + cmdErr = ErrPartialBackup + } else { + cmdErr = fmt.Errorf("exit code %d: %w", exitErr.ExitCode(), ErrBackupFailed) + } + } + } + return summary, newCmdErrorPreformatted(ctx, cmd, string(capture.Bytes()), errors.Join(cmdErr, readErr)) + } return summary, nil } diff --git a/pkg/restic/restic_test.go b/pkg/restic/restic_test.go index c9f32a8a..794d7600 100644 --- a/pkg/restic/restic_test.go +++ b/pkg/restic/restic_test.go @@ -5,12 +5,14 @@ import ( "context" "errors" "fmt" + "os" "path/filepath" "reflect" "runtime" "slices" "strings" "testing" + "time" "github.com/garethgeorge/backrest/test/helpers" ) @@ -482,3 +484,49 @@ func toRepoPath(path string) string { path[3:], // path )) } + +func BenchmarkBackup(t *testing.B) { + repo := t.TempDir() + r := NewRepo(helpers.ResticBinary(t), repo, WithFlags("--no-cache"), WithEnv("RESTIC_PASSWORD=test")) + if err := r.Init(context.Background()); err != nil { + t.Fatalf("failed to init repo: %v", err) + } + + workdir, err := os.Getwd() + if err != nil { + t.Fatalf("failed to get working directory: %v", err) + } + + t.ResetTimer() + + for i := 0; i < t.N; i++ { + _, err := r.Backup(context.Background(), []string{workdir}, func(e *BackupProgressEntry) {}) + if err != nil { + t.Fatalf("failed to backup: %v", err) + } + } +} + +func BenchmarkBackupWithSimulatedCallback(t *testing.B) { + repo := t.TempDir() + r := NewRepo(helpers.ResticBinary(t), repo, WithFlags("--no-cache"), WithEnv("RESTIC_PASSWORD=test")) + if err := r.Init(context.Background()); err != nil { + t.Fatalf("failed to init repo: %v", err) + } + + workdir, err := os.Getwd() + if err != nil { + t.Fatalf("failed to get working directory: %v", err) + } + + t.ResetTimer() + + for i := 0; i < t.N; i++ { + _, err := r.Backup(context.Background(), []string{workdir}, func(e *BackupProgressEntry) { + time.Sleep(50 * time.Millisecond) // simulate work being done in the callback + }) + if err != nil { + t.Fatalf("failed to backup: %v", err) + } + } +} diff --git a/test/helpers/installrestic.go b/test/helpers/installrestic.go index 55e5c808..301839d2 100644 --- a/test/helpers/installrestic.go +++ b/test/helpers/installrestic.go @@ -6,7 +6,7 @@ import ( "github.com/garethgeorge/backrest/internal/resticinstaller" ) -func ResticBinary(t *testing.T) string { +func ResticBinary(t testing.TB) string { binPath, err := resticinstaller.FindOrInstallResticBinary() if err != nil { t.Fatalf("find restic binary: %v", err)