From 3ca23a3c398df0a77d87d8f1b9026ac0acd539bb Mon Sep 17 00:00:00 2001 From: Tonis Tiigi Date: Tue, 21 Nov 2017 10:56:22 -0800 Subject: [PATCH] avoid possible receiver panic/deadlock on sender error Signed-off-by: Tonis Tiigi --- receive.go | 25 +++++++++++++++++++------ 1 file changed, 19 insertions(+), 6 deletions(-) diff --git a/receive.go b/receive.go index 8176f3e..4b66317 100644 --- a/receive.go +++ b/receive.go @@ -55,25 +55,33 @@ type receiver struct { type dynamicWalker struct { walkChan chan *currentPath - closed bool + err error + closeCh chan struct{} } func newDynamicWalker() *dynamicWalker { return &dynamicWalker{ walkChan: make(chan *currentPath, 128), + closeCh: make(chan struct{}), } } func (w *dynamicWalker) update(p *currentPath) error { - if w.closed { - return errors.New("walker is closed") + select { + case <-w.closeCh: + return errors.Wrap(w.err, "walker is closed") + default: } if p == nil { close(w.walkChan) return nil } - w.walkChan <- p - return nil + select { + case w.walkChan <- p: + return nil + case <-w.closeCh: + return errors.Wrap(w.err, "walker is closed") + } } func (w *dynamicWalker) fill(ctx context.Context, pathC chan<- *currentPath) error { @@ -85,6 +93,8 @@ func (w *dynamicWalker) fill(ctx context.Context, pathC chan<- *currentPath) err } pathC <- p case <-ctx.Done(): + w.err = ctx.Err() + close(w.closeCh) return ctx.Err() } } @@ -213,10 +223,13 @@ func (r *receiver) asyncDataFunc(ctx context.Context, p string, wc io.WriteClose return err } err := wwc.Wait(ctx) + if err != nil { + return err + } r.muPipes.Lock() delete(r.pipes, id) r.muPipes.Unlock() - return err + return nil } type wrappedWriteCloser struct {