From cdac9b164671875c8f044ff8f7e7afb928bc0ed0 Mon Sep 17 00:00:00 2001 From: ti-srebot <66930949+ti-srebot@users.noreply.github.com> Date: Fri, 23 Oct 2020 17:13:14 +0800 Subject: [PATCH] progress: removed the progress struct (#561) (#564) * progress: removed the progress struct * progress: added a guard for cancel * progress: remove atomic adder Co-authored-by: hillium --- pkg/gluetikv/glue.go | 31 +------------------------ pkg/utils/progress.go | 46 ++++++++++++++++++++++++-------------- pkg/utils/progress_test.go | 27 ++++++++++++---------- 3 files changed, 45 insertions(+), 59 deletions(-) diff --git a/pkg/gluetikv/glue.go b/pkg/gluetikv/glue.go index 79f2e6393..cde4b406e 100644 --- a/pkg/gluetikv/glue.go +++ b/pkg/gluetikv/glue.go @@ -4,13 +4,11 @@ package gluetikv import ( "context" - "sync/atomic" "github.com/pingcap/tidb/config" "github.com/pingcap/tidb/domain" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/store/tikv" - "github.com/prometheus/common/log" pd "github.com/tikv/pd/client" "github.com/pingcap/br/pkg/glue" @@ -50,37 +48,10 @@ func (Glue) OwnsStorage() bool { // StartProgress implements glue.Glue. func (Glue) StartProgress(ctx context.Context, cmdName string, total int64, redirectLog bool) glue.Progress { - return progress{ch: utils.StartProgress(ctx, cmdName, total, redirectLog), closed: 0} + return utils.StartProgress(ctx, cmdName, total, redirectLog) } // Record implements glue.Glue. func (Glue) Record(name string, val uint64) { summary.CollectUint(name, val) } - -type progress struct { - ch chan<- struct{} - closed int32 -} - -// Inc implements glue.Progress. -func (p progress) Inc() { - if atomic.LoadInt32(&p.closed) != 0 { - log.Warn("proposing a closed progress") - return - } - // there might be buggy if the thread is yielded here. - // however, there should not be gosched, at most time. - // so send here probably is safe, even not totally safe. - // since adding an extra lock should be costly, we just be optimistic. - // (Maybe a spin lock here would be better?) - p.ch <- struct{}{} -} - -// Close implements glue.Progress. -func (p progress) Close() { - // set closed to true firstly, - // so we won't see a state that the channel is closed and the p.closed is false. - atomic.StoreInt32(&p.closed, 1) - close(p.ch) -} diff --git a/pkg/utils/progress.go b/pkg/utils/progress.go index 75121bbc4..d3903040e 100644 --- a/pkg/utils/progress.go +++ b/pkg/utils/progress.go @@ -6,6 +6,7 @@ import ( "context" "encoding/json" "io" + "sync/atomic" "time" "github.com/cheggaaa/pb/v3" @@ -18,8 +19,9 @@ type ProgressPrinter struct { name string total int64 redirectLog bool + progress int64 - updateCh chan struct{} + cancel context.CancelFunc } // NewProgressPrinter returns a new progress printer. @@ -32,13 +34,20 @@ func NewProgressPrinter( name: name, total: total, redirectLog: redirectLog, - updateCh: make(chan struct{}, total/2), + cancel: func() { + log.Warn("canceling non-started progress printer") + }, } } -// UpdateCh returns an update channel. -func (pp *ProgressPrinter) UpdateCh() chan<- struct{} { - return pp.updateCh +// Inc increases the current progress bar. +func (pp *ProgressPrinter) Inc() { + atomic.AddInt64(&pp.progress, 1) +} + +// Close closes the current progress bar. +func (pp *ProgressPrinter) Close() { + pp.cancel() } // goPrintProgress starts a gorouinte and prints progress. @@ -46,6 +55,8 @@ func (pp *ProgressPrinter) goPrintProgress( ctx context.Context, testWriter io.Writer, // Only for tests ) { + cctx, cancel := context.WithCancel(ctx) + pp.cancel = cancel bar := pb.New64(pp.total) if pp.redirectLog || testWriter != nil { tmpl := `{"P":"{{percent .}}","C":"{{counters . }}","E":"{{etime .}}","R":"{{rtime .}}","S":"{{speed .}}"}` @@ -64,7 +75,7 @@ func (pp *ProgressPrinter) goPrintProgress( } if testWriter != nil { bar.SetWriter(testWriter) - bar.SetRefreshRate(10 * time.Millisecond) + bar.SetRefreshRate(2 * time.Second) } bar.Start() @@ -73,22 +84,23 @@ func (pp *ProgressPrinter) goPrintProgress( defer t.Stop() defer bar.Finish() - var counter int64 for { select { - case <-ctx.Done(): - return - case _, ok := <-pp.updateCh: - if !ok { - bar.SetCurrent(pp.total) + case <-cctx.Done(): + // a hacky way to adapt the old behavior: + // when canceled by the outer context, leave the progress unchanged. + // when canceled by Close method (the 'internal' way), push the progress to 100%. + if ctx.Err() != nil { return } - counter++ + bar.SetCurrent(pp.total) + return case <-t.C: } - if counter <= pp.total { - bar.SetCurrent(counter) + currentProgress := atomic.LoadInt64(&pp.progress) + if currentProgress <= pp.total { + bar.SetCurrent(currentProgress) } else { bar.SetCurrent(pp.total) } @@ -127,8 +139,8 @@ func StartProgress( name string, total int64, redirectLog bool, -) chan<- struct{} { +) *ProgressPrinter { progress := NewProgressPrinter(name, total, redirectLog) progress.goPrintProgress(ctx, nil) - return progress.UpdateCh() + return progress } diff --git a/pkg/utils/progress_test.go b/pkg/utils/progress_test.go index 1662ee0b9..9de874cbf 100644 --- a/pkg/utils/progress_test.go +++ b/pkg/utils/progress_test.go @@ -4,6 +4,7 @@ package utils import ( "context" + "time" . "github.com/pingcap/check" ) @@ -30,14 +31,16 @@ func (r *testProgressSuite) TestProgress(c *C) { progress2.goPrintProgress(ctx, &testWriter{ fn: func(p string) { pCh2 <- p }, }) - updateCh2 := progress2.UpdateCh() - updateCh2 <- struct{}{} + progress2.Inc() + time.Sleep(2 * time.Second) p = <-pCh2 c.Assert(p, Matches, `.*"P":"50\.00%".*`) - updateCh2 <- struct{}{} + progress2.Inc() + time.Sleep(2 * time.Second) p = <-pCh2 c.Assert(p, Matches, `.*"P":"100\.00%".*`) - updateCh2 <- struct{}{} + progress2.Inc() + time.Sleep(2 * time.Second) p = <-pCh2 c.Assert(p, Matches, `.*"P":"100\.00%".*`) @@ -46,12 +49,13 @@ func (r *testProgressSuite) TestProgress(c *C) { progress4.goPrintProgress(ctx, &testWriter{ fn: func(p string) { pCh4 <- p }, }) - updateCh4 := progress4.UpdateCh() - updateCh4 <- struct{}{} + progress4.Inc() + time.Sleep(2 * time.Second) p = <-pCh4 c.Assert(p, Matches, `.*"P":"25\.00%".*`) - updateCh4 <- struct{}{} - close(updateCh4) + progress4.Inc() + progress4.Close() + time.Sleep(2 * time.Second) p = <-pCh4 c.Assert(p, Matches, `.*"P":"100\.00%".*`) @@ -60,10 +64,9 @@ func (r *testProgressSuite) TestProgress(c *C) { progress8.goPrintProgress(ctx, &testWriter{ fn: func(p string) { pCh8 <- p }, }) - updateCh8 := progress8.UpdateCh() - updateCh8 <- struct{}{} - updateCh8 <- struct{}{} - <-pCh8 + progress8.Inc() + progress8.Inc() + time.Sleep(2 * time.Second) p = <-pCh8 c.Assert(p, Matches, `.*"P":"25\.00%".*`)