From ed4110f6758039325c3824341dd69547bb62589b Mon Sep 17 00:00:00 2001 From: Lyle Franklin Date: Tue, 4 Dec 2018 08:35:44 -0800 Subject: [PATCH] Fix panic when retrying upload-product [#160181845] pivotal-cf/om #240: Intermittent `POST .../api/v0/available_products: EOF` when uploading tile --- network/fakes/progress_bar.go | 29 +++++++++++++++++++++++++++++ network/progress_client.go | 24 +++++++++++++++++++++++- network/progress_client_test.go | 30 ++++++++++++++++++------------ progress/progress_bar.go | 8 ++++++-- 4 files changed, 76 insertions(+), 15 deletions(-) diff --git a/network/fakes/progress_bar.go b/network/fakes/progress_bar.go index 49aa6431c..80c54892f 100644 --- a/network/fakes/progress_bar.go +++ b/network/fakes/progress_bar.go @@ -22,6 +22,10 @@ type ProgressBar struct { newProxyReaderReturnsOnCall map[int]struct { result1 io.ReadCloser } + ResetStub func() + resetMutex sync.RWMutex + resetArgsForCall []struct { + } SetTotal64Stub func(int64) setTotal64Mutex sync.RWMutex setTotal64ArgsForCall []struct { @@ -118,6 +122,29 @@ func (fake *ProgressBar) NewProxyReaderReturnsOnCall(i int, result1 io.ReadClose }{result1} } +func (fake *ProgressBar) Reset() { + fake.resetMutex.Lock() + fake.resetArgsForCall = append(fake.resetArgsForCall, struct { + }{}) + fake.recordInvocation("Reset", []interface{}{}) + fake.resetMutex.Unlock() + if fake.ResetStub != nil { + fake.ResetStub() + } +} + +func (fake *ProgressBar) ResetCallCount() int { + fake.resetMutex.RLock() + defer fake.resetMutex.RUnlock() + return len(fake.resetArgsForCall) +} + +func (fake *ProgressBar) ResetCalls(stub func()) { + fake.resetMutex.Lock() + defer fake.resetMutex.Unlock() + fake.ResetStub = stub +} + func (fake *ProgressBar) SetTotal64(arg1 int64) { fake.setTotal64Mutex.Lock() fake.setTotal64ArgsForCall = append(fake.setTotal64ArgsForCall, struct { @@ -179,6 +206,8 @@ func (fake *ProgressBar) Invocations() map[string][][]interface{} { defer fake.finishMutex.RUnlock() fake.newProxyReaderMutex.RLock() defer fake.newProxyReaderMutex.RUnlock() + fake.resetMutex.RLock() + defer fake.resetMutex.RUnlock() fake.setTotal64Mutex.RLock() defer fake.setTotal64Mutex.RUnlock() fake.startMutex.RLock() diff --git a/network/progress_client.go b/network/progress_client.go index 197c278e3..4dbff1aef 100644 --- a/network/progress_client.go +++ b/network/progress_client.go @@ -13,6 +13,7 @@ type progressBar interface { Start() Finish() SetTotal64(int64) + Reset() NewProxyReader(io.Reader) io.ReadCloser } @@ -44,17 +45,31 @@ func (pc ProgressClient) Do(req *http.Request) (*http.Response, error) { duration = time.Second } + // reset bar in case request is being retried + pc.progressBar.Reset() + tl := progress.NewTickingLogger(pc.liveWriter, duration) + startedTicker := make(chan bool) + switch req.Method { case "POST", "PUT": - req.Body = progress.NewReadCloser(req.Body, pc.progressBar, tl.Start) + req.Body = progress.NewReadCloser(req.Body, pc.progressBar, func() { + tl.Start() + close(startedTicker) + }) pc.progressBar.SetTotal64(req.ContentLength) case "GET": tl.Start() + close(startedTicker) } resp, err := pc.client.Do(req) + + // the req.Body is closed asynchronously, but we'll also guard against + // it never getting closed by continuing after X seconds + waitForChanWithTimeout(startedTicker, 2*time.Second) + tl.Stop() if err != nil { return nil, err @@ -67,3 +82,10 @@ func (pc ProgressClient) Do(req *http.Request) (*http.Response, error) { return resp, nil } + +func waitForChanWithTimeout(waitChan <-chan bool, timeout time.Duration) { + select { + case <-waitChan: + case <-time.After(timeout): + } +} diff --git a/network/progress_client_test.go b/network/progress_client_test.go index cafcb957a..551e87015 100644 --- a/network/progress_client_test.go +++ b/network/progress_client_test.go @@ -33,10 +33,13 @@ var _ = Describe("ProgressClient", func() { Describe("Do", func() { It("makes a request to upload the product to the Ops Manager", func() { - client.DoReturns(&http.Response{ - StatusCode: http.StatusOK, - Body: ioutil.NopCloser(strings.NewReader("{}")), - }, nil) + client.DoStub = func(req *http.Request) (*http.Response, error) { + req.Body.Close() + return &http.Response{ + StatusCode: http.StatusOK, + Body: ioutil.NopCloser(strings.NewReader("{}")), + }, nil + } progressBar.NewProxyReaderReturns(ioutil.NopCloser(strings.NewReader("some content"))) @@ -62,6 +65,8 @@ var _ = Describe("ProgressClient", func() { Expect(err).NotTo(HaveOccurred()) Expect(string(rawReqBody)).To(Equal("some content")) + Expect(progressBar.ResetCallCount()).To(Equal(1)) + Expect(progressBar.SetTotal64CallCount()).To(Equal(1)) Expect(progressBar.SetTotal64ArgsForCall(0)).To(Equal(int64(12))) @@ -204,7 +209,10 @@ var _ = Describe("ProgressClient", func() { Context("when an error occurs", func() { Context("when the client errors performing the request", func() { It("returns an error", func() { - client.DoReturns(&http.Response{}, errors.New("some client error")) + client.DoStub = func(req *http.Request) (*http.Response, error) { + req.Body.Close() + return &http.Response{}, errors.New("some client error") + } req, err := http.NewRequest("POST", "/some/endpoint", strings.NewReader("some content")) Expect(err).NotTo(HaveOccurred()) @@ -216,12 +224,10 @@ var _ = Describe("ProgressClient", func() { Context("when server responds with timeout error before upload has finished", func() { It("returns an error", func() { - client.DoStub = func(req *http.Request) (*http.Response, error) { - return &http.Response{ - StatusCode: http.StatusRequestTimeout, - Body: ioutil.NopCloser(strings.NewReader(`something from nginx probably xml`)), - }, nil - } + client.DoReturns(&http.Response{ + StatusCode: http.StatusRequestTimeout, + Body: ioutil.NopCloser(strings.NewReader(`something from nginx probably xml`)), + }, nil) var req *http.Request req, err := http.NewRequest("POST", "/some/endpoint", strings.NewReader("some content")) @@ -236,7 +242,7 @@ var _ = Describe("ProgressClient", func() { close(done) }() - Eventually(done).Should(BeClosed()) + Eventually(done, 3).Should(BeClosed()) Expect(resp.StatusCode).To(Equal(http.StatusRequestTimeout)) }) }) diff --git a/progress/progress_bar.go b/progress/progress_bar.go index ad4ba7a7e..65be67268 100644 --- a/progress/progress_bar.go +++ b/progress/progress_bar.go @@ -11,12 +11,12 @@ type Bar struct { bar *pb.ProgressBar } -func NewBar() Bar { +func NewBar() *Bar { bar := pb.New(0) bar.SetUnits(pb.U_BYTES) bar.Width = 80 bar.Output = os.Stderr - return Bar{bar} + return &Bar{bar} } func (b Bar) NewProxyReader(r io.Reader) io.ReadCloser { @@ -34,3 +34,7 @@ func (b Bar) Finish() { func (b Bar) SetTotal64(size int64) { b.bar.Total = size } + +func (b *Bar) Reset() { + b.bar = NewBar().bar +}