Skip to content

Commit

Permalink
Merge pull request #195 from jedevc/fix-disk-writer-ephemeral-err
Browse files Browse the repository at this point in the history
receive: ensure callback errors are propagated
  • Loading branch information
tonistiigi authored Apr 15, 2024
2 parents 7525a1a + 78f10b4 commit 9c5337f
Showing 1 changed file with 8 additions and 6 deletions.
14 changes: 8 additions & 6 deletions diskwriter.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ type DiskWriter struct {
ctx context.Context
cancel func()
eg *errgroup.Group
egCtx context.Context
filter FilterFunc
dirModTimes map[string]int64
}
Expand All @@ -50,13 +51,14 @@ func NewDiskWriter(ctx context.Context, dest string, opt DiskWriterOpt) (*DiskWr
}

ctx, cancel := context.WithCancel(ctx)
eg, ctx := errgroup.WithContext(ctx)
eg, egCtx := errgroup.WithContext(ctx)

return &DiskWriter{
opt: opt,
dest: dest,
eg: eg,
ctx: ctx,
egCtx: egCtx,
cancel: cancel,
filter: opt.Filter,
dirModTimes: map[string]int64{},
Expand Down Expand Up @@ -188,7 +190,7 @@ func (dw *DiskWriter) HandleChange(kind ChangeKind, p string, fi os.FileInfo, er
return errors.Wrapf(err, "failed to create %s", newPath)
}
if dw.opt.SyncDataCb != nil {
if err := dw.processChange(ChangeKindAdd, p, fi, file); err != nil {
if err := dw.processChange(dw.ctx, ChangeKindAdd, p, fi, file); err != nil {
file.Close()
return err
}
Expand Down Expand Up @@ -219,7 +221,7 @@ func (dw *DiskWriter) HandleChange(kind ChangeKind, p string, fi os.FileInfo, er
dw.requestAsyncFileData(p, destPath, fi, &statCopy)
}
} else {
return dw.processChange(kind, p, fi, nil)
return dw.processChange(dw.ctx, kind, p, fi, nil)
}

return nil
Expand All @@ -228,7 +230,7 @@ func (dw *DiskWriter) HandleChange(kind ChangeKind, p string, fi os.FileInfo, er
func (dw *DiskWriter) requestAsyncFileData(p, dest string, fi os.FileInfo, st *types.Stat) {
// todo: limit worker threads
dw.eg.Go(func() error {
if err := dw.processChange(ChangeKindAdd, p, fi, &lazyFileWriter{
if err := dw.processChange(dw.egCtx, ChangeKindAdd, p, fi, &lazyFileWriter{
dest: dest,
}); err != nil {
return err
Expand All @@ -237,7 +239,7 @@ func (dw *DiskWriter) requestAsyncFileData(p, dest string, fi os.FileInfo, st *t
})
}

func (dw *DiskWriter) processChange(kind ChangeKind, p string, fi os.FileInfo, w io.WriteCloser) error {
func (dw *DiskWriter) processChange(ctx context.Context, kind ChangeKind, p string, fi os.FileInfo, w io.WriteCloser) error {
origw := w
var hw *hashedWriter
if dw.opt.NotifyCb != nil {
Expand All @@ -252,7 +254,7 @@ func (dw *DiskWriter) processChange(kind ChangeKind, p string, fi os.FileInfo, w
if fn == nil && dw.opt.AsyncDataCb != nil {
fn = dw.opt.AsyncDataCb
}
if err := fn(dw.ctx, p, w); err != nil {
if err := fn(ctx, p, w); err != nil {
return err
}
} else {
Expand Down

0 comments on commit 9c5337f

Please sign in to comment.