From cf6defe0252f9cd28bdf42c5e6426d2151e690f0 Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Mon, 25 Nov 2019 04:57:01 -0800 Subject: [PATCH] Fix totalSize and totalCounts calculation (#2969) --- cmd/accounting-reader.go | 2 +- cmd/mirror-main.go | 41 ++++++------ cmd/status.go | 135 ++++++++++++++++----------------------- 3 files changed, 75 insertions(+), 103 deletions(-) diff --git a/cmd/accounting-reader.go b/cmd/accounting-reader.go index 5bde1cd46a..fe4068b0e2 100644 --- a/cmd/accounting-reader.go +++ b/cmd/accounting-reader.go @@ -17,13 +17,13 @@ package cmd import ( - "encoding/json" "fmt" "sync" "sync/atomic" "time" "github.com/cheggaaa/pb" + json "github.com/minio/mc/pkg/colorjson" "github.com/minio/mc/pkg/probe" ) diff --git a/cmd/mirror-main.go b/cmd/mirror-main.go index 52e34f0a5c..396364a036 100644 --- a/cmd/mirror-main.go +++ b/cmd/mirror-main.go @@ -437,10 +437,12 @@ func (mj *mirrorJob) watchMirror(ctx context.Context, cancelMirror context.Cance shouldQueue = true } if shouldQueue || mj.isOverwrite { - mirrorURL.TotalCount = mj.TotalObjects - mirrorURL.TotalSize = mj.TotalBytes - // adjust total, because we want to show progress of the item still queued to be copied. - mj.status.SetTotal(mj.status.Total() + sourceContent.Size).Update() + // adjust total, because we want to show progress of + // the item still queued to be copied. + mj.status.SetTotal(mj.status.Get() + sourceContent.Size).Update() + mj.status.AddCounts(1) + mirrorURL.TotalSize = mj.status.Get() + mirrorURL.TotalCount = mj.status.GetCounts() mj.statusCh <- mj.doMirror(ctx, cancelMirror, mirrorURL) } continue @@ -465,10 +467,12 @@ func (mj *mirrorJob) watchMirror(ctx context.Context, cancelMirror context.Cance } if shouldQueue || mj.isOverwrite { mirrorURL.SourceContent.Size = event.Size - mirrorURL.TotalCount = mj.TotalObjects - mirrorURL.TotalSize = mj.TotalBytes - // adjust total, because we want to show progress of the itemj stiil queued to be copied. - mj.status.SetTotal(mj.status.Total() + event.Size).Update() + // adjust total, because we want to show progress + // of the itemj stiil queued to be copied. + mj.status.SetTotal(mj.status.Get() + mirrorURL.SourceContent.Size).Update() + mj.status.AddCounts(1) + mirrorURL.TotalSize = mj.status.Get() + mirrorURL.TotalCount = mj.status.GetCounts() mj.statusCh <- mj.doMirror(ctx, cancelMirror, mirrorURL) } } else if event.Type == EventRemove { @@ -479,8 +483,8 @@ func (mj *mirrorJob) watchMirror(ctx context.Context, cancelMirror context.Cance TargetContent: &clientContent{URL: *targetURL}, encKeyDB: mj.encKeyDB, } - mirrorURL.TotalCount = mj.TotalObjects - mirrorURL.TotalSize = mj.TotalBytes + mirrorURL.TotalCount = mj.status.GetCounts() + mirrorURL.TotalSize = mj.status.Get() if mirrorURL.TargetContent != nil && mj.isRemove { mj.statusCh <- mj.doRemove(mirrorURL) } @@ -506,9 +510,6 @@ func (mj *mirrorJob) watchURL(sourceClient Client) *probe.Error { // Fetch urls that need to be mirrored func (mj *mirrorJob) startMirror(ctx context.Context, cancelMirror context.CancelFunc) { - var totalBytes int64 - var totalObjects int64 - stopParallel := func() { close(mj.queueCh) mj.parallel.wait() @@ -537,19 +538,15 @@ func (mj *mirrorJob) startMirror(ctx context.Context, cancelMirror context.Cance if mj.newerThan != "" && isNewer(sURLs.SourceContent.Time, mj.newerThan) { continue } - // copy - totalBytes += sURLs.SourceContent.Size } - totalObjects++ - mj.TotalBytes = totalBytes - mj.TotalObjects = totalObjects - mj.status.SetTotal(totalBytes) + mj.status.AddCounts(1) + mj.status.Add(sURLs.SourceContent.Size) // Save total count. - sURLs.TotalCount = mj.TotalObjects + sURLs.TotalCount = mj.status.GetCounts() // Save totalSize. - sURLs.TotalSize = mj.TotalBytes + sURLs.TotalSize = mj.status.Get() if sURLs.SourceContent != nil { mj.queueCh <- func() URLs { @@ -629,7 +626,7 @@ func newMirrorJob(srcURL, dstURL string, isFake, isRemove, isOverwrite, isWatch, if globalQuiet { status = NewQuietStatus(mj.parallel) } else if globalJSON { - status = NewDummyStatus(mj.parallel) + status = NewQuietStatus(mj.parallel) } mj.status = status diff --git a/cmd/status.go b/cmd/status.go index b725b2012b..2d64f4c9f7 100644 --- a/cmd/status.go +++ b/cmd/status.go @@ -18,6 +18,7 @@ package cmd import ( "io" + "sync/atomic" "github.com/minio/mc/pkg/console" "github.com/minio/mc/pkg/probe" @@ -26,6 +27,10 @@ import ( // Status implements a interface that can be used in quit mode or with progressbar. type Status interface { Println(data ...interface{}) + AddCounts(int64) + SetCounts(int64) + GetCounts() int64 + Add(int64) Status Get() int64 Start() @@ -44,86 +49,19 @@ type Status interface { fatalIf(err *probe.Error, msg string) } -// NewDummyStatus returns a dummy status object -func NewDummyStatus(hook io.Reader) Status { - return &DummyStatus{hook} -} - -// DummyStatus will not show anything. -type DummyStatus struct { - hook io.Reader -} - -// Read implements the io.Reader interface -func (ds *DummyStatus) Read(p []byte) (n int, err error) { - ds.hook.Read(p) - return len(p), nil -} - -// Get implements Progress interface -func (ds *DummyStatus) Get() int64 { - return 0 -} - -// SetTotal sets the total of the progressbar, ignored for quietstatus -func (ds *DummyStatus) SetTotal(v int64) Status { - return ds -} - -// SetCaption sets the caption of the progressbar, ignored for quietstatus -func (ds *DummyStatus) SetCaption(s string) {} - -// Total returns the total number of bytes -func (ds *DummyStatus) Total() int64 { - return 0 -} - -// Add bytes to current number of bytes -func (ds *DummyStatus) Add(v int64) Status { - return ds -} - -// Println prints line, ignored for quietstatus -func (ds *DummyStatus) Println(data ...interface{}) {} - -// PrintMsg prints message -func (ds *DummyStatus) PrintMsg(msg message) { - if !globalJSON { - console.Println(msg.String()) - } else { - console.Println(msg.JSON()) - } -} - -// Start is ignored for quietstatus -func (ds *DummyStatus) Start() {} - -// Finish displays the accounting summary -func (ds *DummyStatus) Finish() {} - -// Update is ignored for quietstatus -func (ds *DummyStatus) Update() {} - -func (ds *DummyStatus) errorIf(err *probe.Error, msg string) { - errorIf(err, msg) -} - -func (ds *DummyStatus) fatalIf(err *probe.Error, msg string) { - fatalIf(err, msg) -} - // NewQuietStatus returns a quiet status object func NewQuietStatus(hook io.Reader) Status { return &QuietStatus{ - newAccounter(0), - hook, + accounter: newAccounter(0), + hook: hook, } } // QuietStatus will only show the progress and summary type QuietStatus struct { *accounter - hook io.Reader + hook io.Reader + counts int64 } // Read implements the io.Reader interface @@ -132,6 +70,21 @@ func (qs *QuietStatus) Read(p []byte) (n int, err error) { return qs.accounter.Read(p) } +// SetCounts sets number of files uploaded +func (qs *QuietStatus) SetCounts(v int64) { + atomic.StoreInt64(&qs.counts, v) +} + +// GetCounts returns number of files uploaded +func (qs *QuietStatus) GetCounts() int64 { + return atomic.LoadInt64(&qs.counts) +} + +// AddCounts adds 'v' number of files uploaded. +func (qs *QuietStatus) AddCounts(v int64) { + atomic.AddInt64(&qs.counts, v) +} + // SetTotal sets the total of the progressbar, ignored for quietstatus func (qs *QuietStatus) SetTotal(v int64) Status { qs.accounter.Total = v @@ -142,6 +95,11 @@ func (qs *QuietStatus) SetTotal(v int64) Status { func (qs *QuietStatus) SetCaption(s string) { } +// Get returns the current number of bytes +func (qs *QuietStatus) Get() int64 { + return qs.accounter.Get() +} + // Total returns the total number of bytes func (qs *QuietStatus) Total() int64 { return qs.accounter.Total @@ -159,11 +117,7 @@ func (qs *QuietStatus) Println(data ...interface{}) { // PrintMsg prints message func (qs *QuietStatus) PrintMsg(msg message) { - if !globalJSON { - console.Println(msg.String()) - } else { - console.Println(msg.JSON()) - } + printMsg(msg) } // Start is ignored for quietstatus @@ -172,7 +126,7 @@ func (qs *QuietStatus) Start() { // Finish displays the accounting summary func (qs *QuietStatus) Finish() { - console.Println(console.Colorize("Mirror", qs.accounter.Stat().String())) + printMsg(qs.accounter.Stat()) } // Update is ignored for quietstatus @@ -190,15 +144,16 @@ func (qs *QuietStatus) fatalIf(err *probe.Error, msg string) { // NewProgressStatus returns a progress status object func NewProgressStatus(hook io.Reader) Status { return &ProgressStatus{ - newProgressBar(0), - hook, + progressBar: newProgressBar(0), + hook: hook, } } // ProgressStatus shows a progressbar type ProgressStatus struct { *progressBar - hook io.Reader + hook io.Reader + counts int64 } // Read implements the io.Reader interface @@ -212,6 +167,26 @@ func (ps *ProgressStatus) SetCaption(s string) { ps.progressBar.SetCaption(s) } +// SetCounts sets number of files uploaded +func (ps *ProgressStatus) SetCounts(v int64) { + atomic.StoreInt64(&ps.counts, v) +} + +// GetCounts returns number of files uploaded +func (ps *ProgressStatus) GetCounts() int64 { + return atomic.LoadInt64(&ps.counts) +} + +// AddCounts adds 'v' number of files uploaded. +func (ps *ProgressStatus) AddCounts(v int64) { + atomic.AddInt64(&ps.counts, v) +} + +// Get returns the current number of bytes +func (ps *ProgressStatus) Get() int64 { + return ps.progressBar.Get() +} + // Total returns the total number of bytes func (ps *ProgressStatus) Total() int64 { return ps.progressBar.Total