Skip to content

Commit

Permalink
fix: improve restic pkg's output handling and buffering
Browse files Browse the repository at this point in the history
  • Loading branch information
garethgeorge committed Apr 13, 2024
1 parent 66d63c1 commit 5d4804b
Show file tree
Hide file tree
Showing 7 changed files with 128 additions and 87 deletions.
16 changes: 9 additions & 7 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ require (
connectrpc.com/connect v1.16.0
github.com/alessio/shellescape v1.4.2
github.com/containrrr/shoutrrr v0.8.0
github.com/djherbis/buffer v1.2.0
github.com/djherbis/nio/v3 v3.0.1
github.com/gitploy-io/cronexpr v0.2.2
github.com/golang-jwt/jwt/v5 v5.2.1
github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510
Expand All @@ -15,11 +17,11 @@ require (
github.com/natefinch/atomic v1.0.1
go.etcd.io/bbolt v1.3.9
go.uber.org/zap v1.27.0
golang.org/x/crypto v0.21.0
golang.org/x/net v0.22.0
golang.org/x/sync v0.6.0
google.golang.org/genproto/googleapis/api v0.0.0-20240401170217-c3f982113cda
google.golang.org/grpc v1.62.1
golang.org/x/crypto v0.22.0
golang.org/x/net v0.24.0
golang.org/x/sync v0.7.0
google.golang.org/genproto/googleapis/api v0.0.0-20240412170617-26222e5d3d56
google.golang.org/grpc v1.63.2
google.golang.org/protobuf v1.33.0
)

Expand All @@ -30,8 +32,8 @@ require (
github.com/mattn/go-isatty v0.0.20 // indirect
github.com/stretchr/testify v1.8.4 // indirect
go.uber.org/multierr v1.11.0 // indirect
golang.org/x/sys v0.18.0 // indirect
golang.org/x/sys v0.19.0 // indirect
golang.org/x/text v0.14.0 // indirect
golang.org/x/tools v0.19.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240401170217-c3f982113cda // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240412170617-26222e5d3d56 // indirect
)
18 changes: 18 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@ github.com/containrrr/shoutrrr v0.8.0 h1:mfG2ATzIS7NR2Ec6XL+xyoHzN97H8WPjir8aYzJ
github.com/containrrr/shoutrrr v0.8.0/go.mod h1:ioyQAyu1LJY6sILuNyKaQaw+9Ttik5QePU8atnAdO2o=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/djherbis/buffer v1.2.0 h1:PH5Dd2ss0C7CRRhQCZ2u7MssF+No9ide8Ye71nPHcrQ=
github.com/djherbis/buffer v1.2.0/go.mod h1:fjnebbZjCUpPinBRD+TDwXSOeNQ7fPQWLfGQqiAiUyE=
github.com/djherbis/nio/v3 v3.0.1 h1:6wxhnuppteMa6RHA4L81Dq7ThkZH8SwnDzXDYy95vB4=
github.com/djherbis/nio/v3 v3.0.1/go.mod h1:Ng4h80pbZFMla1yKzm61cF0tqqilXZYrogmWgZxOcmg=
github.com/fatih/color v1.16.0 h1:zmkK9Ngbjj+K0yRhTVONQh1p/HknKYSlNT+vZCzyokM=
github.com/fatih/color v1.16.0/go.mod h1:fL2Sau1YI5c0pdGEVCbKQbLXB6edEj1ZgiY4NijnWvE=
github.com/gitploy-io/cronexpr v0.2.2 h1:Au+wK6FqmOLAF7AkW6q4gnrNXTe3rEW97XFZ4chy0xs=
Expand Down Expand Up @@ -58,25 +62,39 @@ go.uber.org/zap v1.27.0 h1:aJMhYGrd5QSmlpLMr2MftRKl7t8J8PTZPA732ud/XR8=
go.uber.org/zap v1.27.0/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E=
golang.org/x/crypto v0.21.0 h1:X31++rzVUdKhX5sWmSOFZxx8UW/ldWx55cbf08iNAMA=
golang.org/x/crypto v0.21.0/go.mod h1:0BP7YvVV9gBbVKyeTG0Gyn+gZm94bibOW5BjDEYAOMs=
golang.org/x/crypto v0.22.0 h1:g1v0xeRhjcugydODzvb3mEM9SQ0HGp9s/nh3COQ/C30=
golang.org/x/crypto v0.22.0/go.mod h1:vr6Su+7cTlO45qkww3VDJlzDn0ctJvRgYbC2NvXHt+M=
golang.org/x/net v0.22.0 h1:9sGLhx7iRIHEiX0oAJ3MRZMUCElJgy7Br1nO+AMN3Tc=
golang.org/x/net v0.22.0/go.mod h1:JKghWKKOSdJwpW2GEx0Ja7fmaKnMsbu+MWVZTokSYmg=
golang.org/x/net v0.24.0 h1:1PcaxkF854Fu3+lvBIx5SYn9wRlBzzcnHZSiaFFAb0w=
golang.org/x/net v0.24.0/go.mod h1:2Q7sJY5mzlzWjKtYUEXSlBWCdyaioyXzRB2RtU8KVE8=
golang.org/x/sync v0.6.0 h1:5BMeUDZ7vkXGfEr1x9B4bRcTH4lpkTkpdh0T/J+qjbQ=
golang.org/x/sync v0.6.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sync v0.7.0 h1:YsImfSBoP9QPYL0xyKJPq0gcaJdG3rInoqxTWbfQu9M=
golang.org/x/sync v0.7.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sys v0.0.0-20190529164535-6a60838ec259/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.18.0 h1:DBdB3niSjOA/O0blCZBqDefyWNYveAYMNF1Wum0DYQ4=
golang.org/x/sys v0.18.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.19.0 h1:q5f1RH2jigJ1MoAWp2KTp3gm5zAGFUTarQZ5U386+4o=
golang.org/x/sys v0.19.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ=
golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU=
golang.org/x/tools v0.19.0 h1:tfGCXNR1OsFG+sVdLAitlpjAvD/I6dHDKnYrpEZUHkw=
golang.org/x/tools v0.19.0/go.mod h1:qoJWxmGSIBmAeriMx19ogtrEPrGtDbPK634QFIcLAhc=
google.golang.org/genproto/googleapis/api v0.0.0-20240401170217-c3f982113cda h1:b6F6WIV4xHHD0FA4oIyzU6mHWg2WI2X1RBehwa5QN38=
google.golang.org/genproto/googleapis/api v0.0.0-20240401170217-c3f982113cda/go.mod h1:AHcE/gZH76Bk/ROZhQphlRoWo5xKDEtz3eVEO1LfA8c=
google.golang.org/genproto/googleapis/api v0.0.0-20240412170617-26222e5d3d56 h1:KuFzeG+qPmpT8KpJXcrKAyeHhn64dgEICWlccP9qp0U=
google.golang.org/genproto/googleapis/api v0.0.0-20240412170617-26222e5d3d56/go.mod h1:wTHjrkbcS8AoQbb/0v9bFIPItZQPAsyVfgG9YPUhjAM=
google.golang.org/genproto/googleapis/rpc v0.0.0-20240401170217-c3f982113cda h1:LI5DOvAxUPMv/50agcLLoo+AdWc1irS9Rzz4vPuD1V4=
google.golang.org/genproto/googleapis/rpc v0.0.0-20240401170217-c3f982113cda/go.mod h1:WtryC6hu0hhx87FDGxWCDptyssuo68sk10vYjF+T9fY=
google.golang.org/genproto/googleapis/rpc v0.0.0-20240412170617-26222e5d3d56 h1:zviK8GX4VlMstrK3JkexM5UHjH1VOkRebH9y3jhSBGk=
google.golang.org/genproto/googleapis/rpc v0.0.0-20240412170617-26222e5d3d56/go.mod h1:WtryC6hu0hhx87FDGxWCDptyssuo68sk10vYjF+T9fY=
google.golang.org/grpc v1.62.1 h1:B4n+nfKzOICUXMgyrNd19h/I9oH0L1pizfk1d4zSgTk=
google.golang.org/grpc v1.62.1/go.mod h1:IWTG0VlJLCh1SkC58F7np9ka9mx/WNkjl4PGJaiq+QE=
google.golang.org/grpc v1.63.2 h1:MUeiw1B2maTVZthpU5xvASfTh3LDbxHd6IJ6QQVU+xM=
google.golang.org/grpc v1.63.2/go.mod h1:WAX/8DgncnokcFUldAxq7GeB5DXHDbMF+lLvDomNkRA=
google.golang.org/protobuf v1.33.0 h1:uNO2rsAINq/JlFpSdYEKIZ0uKD/R9cpdv0T+yoGwGmI=
google.golang.org/protobuf v1.33.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
Expand Down
26 changes: 4 additions & 22 deletions pkg/restic/outputs.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,10 @@ package restic

import (
"bufio"
"context"
"encoding/json"
"errors"
"fmt"
"io"
"os/exec"
"slices"
"time"

v1 "github.com/garethgeorge/backrest/gen/go/v1"
Expand Down Expand Up @@ -95,7 +92,7 @@ func (b *BackupProgressEntry) Validate() error {
}

// readBackupProgressEntries returns the summary event or an error if the command failed.
func readBackupProgressEntries(ctx context.Context, cmd *exec.Cmd, output io.Reader, callback func(event *BackupProgressEntry)) (*BackupProgressEntry, error) {
func readBackupProgressEntries(output io.Reader, callback func(event *BackupProgressEntry)) (*BackupProgressEntry, error) {
scanner := bufio.NewScanner(output)
scanner.Split(bufio.ScanLines)

Expand All @@ -104,14 +101,8 @@ func readBackupProgressEntries(ctx context.Context, cmd *exec.Cmd, output io.Rea
// first event is handled specially to detect non-JSON output and fast-path out.
if scanner.Scan() {
var event BackupProgressEntry

if err := json.Unmarshal(scanner.Bytes(), &event); err != nil {
var bytes = slices.Clone(scanner.Bytes())
for scanner.Scan() {
bytes = append(bytes, scanner.Bytes()...)
}

return nil, newCmdError(ctx, cmd, string(bytes), fmt.Errorf("command output was not JSON: %w", err))
return nil, fmt.Errorf("command output was not JSON: %w", err)
}
if err := event.Validate(); err != nil {
return nil, err
Expand All @@ -135,23 +126,19 @@ func readBackupProgressEntries(ctx context.Context, cmd *exec.Cmd, output io.Rea
// skip it. This is a best-effort attempt to parse the output.
continue
}

if callback != nil {
callback(&event)
}
if event.MessageType == "summary" {
summary = &event
}
}

if err := scanner.Err(); err != nil {
return summary, fmt.Errorf("scanner encountered error: %w", err)
}

if summary == nil {
return nil, fmt.Errorf("no summary event found")
}

return summary, nil
}

Expand Down Expand Up @@ -244,7 +231,7 @@ func (e *RestoreProgressEntry) Validate() error {
}

// readRestoreProgressEntries returns the summary event or an error if the command failed.
func readRestoreProgressEntries(ctx context.Context, cmd *exec.Cmd, output io.Reader, callback func(event *RestoreProgressEntry)) (*RestoreProgressEntry, error) {
func readRestoreProgressEntries(output io.Reader, callback func(event *RestoreProgressEntry)) (*RestoreProgressEntry, error) {
scanner := bufio.NewScanner(output)
scanner.Split(bufio.ScanLines)

Expand All @@ -255,12 +242,7 @@ func readRestoreProgressEntries(ctx context.Context, cmd *exec.Cmd, output io.Re
var event RestoreProgressEntry

if err := json.Unmarshal(scanner.Bytes(), &event); err != nil {
var bytes = slices.Clone(scanner.Bytes())
for scanner.Scan() {
bytes = append(bytes, scanner.Bytes()...)
}

return nil, newCmdError(ctx, cmd, string(bytes), fmt.Errorf("command output was not JSON: %w", err))
return nil, fmt.Errorf("command output was not JSON: %w", err)
}
if err := event.Validate(); err != nil {
return nil, err
Expand Down
4 changes: 1 addition & 3 deletions pkg/restic/outputs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@ package restic

import (
"bytes"
"context"
"os/exec"
"testing"
)

Expand All @@ -14,7 +12,7 @@ func TestReadBackupProgressEntries(t *testing.T) {

b := bytes.NewBuffer([]byte(testInput))

summary, err := readBackupProgressEntries(context.Background(), &exec.Cmd{}, b, func(event *BackupProgressEntry) {
summary, err := readBackupProgressEntries(b, func(event *BackupProgressEntry) {
t.Logf("event: %v", event)
})
if err != nil {
Expand Down
101 changes: 47 additions & 54 deletions pkg/restic/restic.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package restic

import (
"bufio"
"bytes"
"context"
"encoding/json"
Expand All @@ -13,6 +14,8 @@ import (
"strings"
"sync"

"github.com/djherbis/buffer"
nio "github.com/djherbis/nio/v3"
"github.com/garethgeorge/backrest/internal/ioutil"
)

Expand Down Expand Up @@ -120,58 +123,51 @@ func (r *Repo) Backup(ctx context.Context, paths []string, progressCallback func
args = append(args, paths...)

cmd := r.commandWithContext(ctx, args, opts...)
capture := ioutil.NewOutputCapturer(outputBufferLimit)
reader, writer := io.Pipe()
r.pipeCmdOutputToWriter(cmd, writer, capture)

if err := cmd.Start(); err != nil {
return nil, newCmdError(ctx, cmd, "", err)
fullOutput := ioutil.NewOutputCapturer(outputBufferLimit)
var bufferedWriter *bufio.Writer
if logger := LoggerFromContext(ctx); logger != nil {
bufferedWriter = bufio.NewWriter(io.MultiWriter(fullOutput, logger))
} else {
bufferedWriter = bufio.NewWriter(fullOutput)
}
buf := buffer.New(32 * 1024) // 32KB IO buffer for the realtime event parsing
reader, writer := nio.Pipe(buf)
cmd.Stderr = io.MultiWriter(writer, bufferedWriter)
cmd.Stdout = io.MultiWriter(writer, bufferedWriter)

var wg sync.WaitGroup
var summary *BackupProgressEntry
var cmdErr error
var readErr error

var summary *BackupProgressEntry
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
var err error
summary, err = readBackupProgressEntries(ctx, cmd, reader, progressCallback)
summary, err = readBackupProgressEntries(reader, progressCallback)
if err != nil {
readErr = fmt.Errorf("processing command output: %w", err)
_ = cmd.Cancel() // cancel the command to prevent it from hanging now that we're not reading from it.
}
}()

wg.Add(1)
go func() {
defer writer.Close()
defer wg.Done()
if err := cmd.Wait(); err != nil {
cmdErr := cmd.Run()
writer.Close()
wg.Wait()
bufferedWriter.Flush()

if cmdErr != nil || readErr != nil {
if cmdErr != nil {
var exitErr *exec.ExitError
if errors.As(err, &exitErr) {
if errors.As(cmdErr, &exitErr) {
if exitErr.ExitCode() == 3 {
cmdErr = ErrPartialBackup
} else {
cmdErr = fmt.Errorf("exit code %v: %w", exitErr.ExitCode(), ErrBackupFailed)
cmdErr = fmt.Errorf("exit code %d: %w", exitErr.ExitCode(), ErrBackupFailed)
}
return
}
cmdErr = err
}
}()

wg.Wait()

if logger := LoggerFromContext(ctx); logger != nil && summary != nil {
bytes, _ := json.MarshalIndent(summary, "", " ")
logger.Write(bytes)
return summary, newCmdErrorPreformatted(ctx, cmd, string(fullOutput.Bytes()), errors.Join(cmdErr, readErr))
}

if cmdErr != nil || readErr != nil {
return summary, newCmdErrorPreformatted(ctx, cmd, string(capture.Bytes()), errors.Join(cmdErr, readErr))
}

return summary, nil
}

Expand Down Expand Up @@ -251,44 +247,41 @@ func (r *Repo) Prune(ctx context.Context, pruneOutput io.Writer, opts ...Generic

func (r *Repo) Restore(ctx context.Context, snapshot string, callback func(*RestoreProgressEntry), opts ...GenericOption) (*RestoreProgressEntry, error) {
cmd := r.commandWithContext(ctx, []string{"restore", "--json", snapshot}, opts...)
output := ioutil.NewOutputCapturer(outputBufferLimit)
capture := ioutil.NewOutputCapturer(outputBufferLimit) // for error reporting.
reader, writer := io.Pipe()
r.pipeCmdOutputToWriter(cmd, output, writer)

if err := cmd.Start(); err != nil {
return nil, newCmdError(ctx, cmd, "", err)
}
r.pipeCmdOutputToWriter(cmd, writer, capture)

var wg sync.WaitGroup
var summary *RestoreProgressEntry
var cmdErr error
var readErr error

var summary *RestoreProgressEntry
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
var err error
summary, err = readRestoreProgressEntries(ctx, cmd, reader, callback)
summary, err = readRestoreProgressEntries(reader, callback)
if err != nil {
readErr = fmt.Errorf("processing command output: %w", err)
_ = cmd.Cancel() // cancel the command to prevent it from hanging now that we're not reading from it.
}
}()

wg.Add(1)
go func() {
defer writer.Close()
defer wg.Done()
if err := cmd.Wait(); err != nil {
cmdErr = err
}
}()

cmdErr := cmd.Run()
writer.Close()
wg.Wait()

if cmdErr != nil || readErr != nil {
return nil, newCmdErrorPreformatted(ctx, cmd, string(output.Bytes()), errors.Join(cmdErr, readErr))
}
if cmdErr != nil {
var exitErr *exec.ExitError
if errors.As(cmdErr, &exitErr) {
if exitErr.ExitCode() == 3 {
cmdErr = ErrPartialBackup
} else {
cmdErr = fmt.Errorf("exit code %d: %w", exitErr.ExitCode(), ErrBackupFailed)
}
}
}

return summary, newCmdErrorPreformatted(ctx, cmd, string(capture.Bytes()), errors.Join(cmdErr, readErr))
}
return summary, nil
}

Expand Down
Loading

0 comments on commit 5d4804b

Please sign in to comment.