Skip to content
This repository has been archived by the owner on Jul 24, 2024. It is now read-only.

Commit

Permalink
progress: removed the progress struct (#561) (#564)
Browse files Browse the repository at this point in the history
* progress: removed the progress struct

* progress: added a guard for cancel

* progress: remove atomic adder

Co-authored-by: hillium <yujuncen@pingcap.com>
  • Loading branch information
ti-srebot and YuJuncen authored Oct 23, 2020
1 parent 0cf4798 commit cdac9b1
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 59 deletions.
31 changes: 1 addition & 30 deletions pkg/gluetikv/glue.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,11 @@ package gluetikv

import (
"context"
"sync/atomic"

"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/store/tikv"
"github.com/prometheus/common/log"
pd "github.com/tikv/pd/client"

"github.com/pingcap/br/pkg/glue"
Expand Down Expand Up @@ -50,37 +48,10 @@ func (Glue) OwnsStorage() bool {

// StartProgress implements glue.Glue.
func (Glue) StartProgress(ctx context.Context, cmdName string, total int64, redirectLog bool) glue.Progress {
return progress{ch: utils.StartProgress(ctx, cmdName, total, redirectLog), closed: 0}
return utils.StartProgress(ctx, cmdName, total, redirectLog)
}

// Record implements glue.Glue.
func (Glue) Record(name string, val uint64) {
summary.CollectUint(name, val)
}

type progress struct {
ch chan<- struct{}
closed int32
}

// Inc implements glue.Progress.
func (p progress) Inc() {
if atomic.LoadInt32(&p.closed) != 0 {
log.Warn("proposing a closed progress")
return
}
// there might be buggy if the thread is yielded here.
// however, there should not be gosched, at most time.
// so send here probably is safe, even not totally safe.
// since adding an extra lock should be costly, we just be optimistic.
// (Maybe a spin lock here would be better?)
p.ch <- struct{}{}
}

// Close implements glue.Progress.
func (p progress) Close() {
// set closed to true firstly,
// so we won't see a state that the channel is closed and the p.closed is false.
atomic.StoreInt32(&p.closed, 1)
close(p.ch)
}
46 changes: 29 additions & 17 deletions pkg/utils/progress.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"context"
"encoding/json"
"io"
"sync/atomic"
"time"

"github.com/cheggaaa/pb/v3"
Expand All @@ -18,8 +19,9 @@ type ProgressPrinter struct {
name string
total int64
redirectLog bool
progress int64

updateCh chan struct{}
cancel context.CancelFunc
}

// NewProgressPrinter returns a new progress printer.
Expand All @@ -32,20 +34,29 @@ func NewProgressPrinter(
name: name,
total: total,
redirectLog: redirectLog,
updateCh: make(chan struct{}, total/2),
cancel: func() {
log.Warn("canceling non-started progress printer")
},
}
}

// UpdateCh returns an update channel.
func (pp *ProgressPrinter) UpdateCh() chan<- struct{} {
return pp.updateCh
// Inc increases the current progress bar.
func (pp *ProgressPrinter) Inc() {
atomic.AddInt64(&pp.progress, 1)
}

// Close closes the current progress bar.
func (pp *ProgressPrinter) Close() {
pp.cancel()
}

// goPrintProgress starts a gorouinte and prints progress.
func (pp *ProgressPrinter) goPrintProgress(
ctx context.Context,
testWriter io.Writer, // Only for tests
) {
cctx, cancel := context.WithCancel(ctx)
pp.cancel = cancel
bar := pb.New64(pp.total)
if pp.redirectLog || testWriter != nil {
tmpl := `{"P":"{{percent .}}","C":"{{counters . }}","E":"{{etime .}}","R":"{{rtime .}}","S":"{{speed .}}"}`
Expand All @@ -64,7 +75,7 @@ func (pp *ProgressPrinter) goPrintProgress(
}
if testWriter != nil {
bar.SetWriter(testWriter)
bar.SetRefreshRate(10 * time.Millisecond)
bar.SetRefreshRate(2 * time.Second)
}
bar.Start()

Expand All @@ -73,22 +84,23 @@ func (pp *ProgressPrinter) goPrintProgress(
defer t.Stop()
defer bar.Finish()

var counter int64
for {
select {
case <-ctx.Done():
return
case _, ok := <-pp.updateCh:
if !ok {
bar.SetCurrent(pp.total)
case <-cctx.Done():
// a hacky way to adapt the old behavior:
// when canceled by the outer context, leave the progress unchanged.
// when canceled by Close method (the 'internal' way), push the progress to 100%.
if ctx.Err() != nil {
return
}
counter++
bar.SetCurrent(pp.total)
return
case <-t.C:
}

if counter <= pp.total {
bar.SetCurrent(counter)
currentProgress := atomic.LoadInt64(&pp.progress)
if currentProgress <= pp.total {
bar.SetCurrent(currentProgress)
} else {
bar.SetCurrent(pp.total)
}
Expand Down Expand Up @@ -127,8 +139,8 @@ func StartProgress(
name string,
total int64,
redirectLog bool,
) chan<- struct{} {
) *ProgressPrinter {
progress := NewProgressPrinter(name, total, redirectLog)
progress.goPrintProgress(ctx, nil)
return progress.UpdateCh()
return progress
}
27 changes: 15 additions & 12 deletions pkg/utils/progress_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ package utils

import (
"context"
"time"

. "github.com/pingcap/check"
)
Expand All @@ -30,14 +31,16 @@ func (r *testProgressSuite) TestProgress(c *C) {
progress2.goPrintProgress(ctx, &testWriter{
fn: func(p string) { pCh2 <- p },
})
updateCh2 := progress2.UpdateCh()
updateCh2 <- struct{}{}
progress2.Inc()
time.Sleep(2 * time.Second)
p = <-pCh2
c.Assert(p, Matches, `.*"P":"50\.00%".*`)
updateCh2 <- struct{}{}
progress2.Inc()
time.Sleep(2 * time.Second)
p = <-pCh2
c.Assert(p, Matches, `.*"P":"100\.00%".*`)
updateCh2 <- struct{}{}
progress2.Inc()
time.Sleep(2 * time.Second)
p = <-pCh2
c.Assert(p, Matches, `.*"P":"100\.00%".*`)

Expand All @@ -46,12 +49,13 @@ func (r *testProgressSuite) TestProgress(c *C) {
progress4.goPrintProgress(ctx, &testWriter{
fn: func(p string) { pCh4 <- p },
})
updateCh4 := progress4.UpdateCh()
updateCh4 <- struct{}{}
progress4.Inc()
time.Sleep(2 * time.Second)
p = <-pCh4
c.Assert(p, Matches, `.*"P":"25\.00%".*`)
updateCh4 <- struct{}{}
close(updateCh4)
progress4.Inc()
progress4.Close()
time.Sleep(2 * time.Second)
p = <-pCh4
c.Assert(p, Matches, `.*"P":"100\.00%".*`)

Expand All @@ -60,10 +64,9 @@ func (r *testProgressSuite) TestProgress(c *C) {
progress8.goPrintProgress(ctx, &testWriter{
fn: func(p string) { pCh8 <- p },
})
updateCh8 := progress8.UpdateCh()
updateCh8 <- struct{}{}
updateCh8 <- struct{}{}
<-pCh8
progress8.Inc()
progress8.Inc()
time.Sleep(2 * time.Second)
p = <-pCh8
c.Assert(p, Matches, `.*"P":"25\.00%".*`)

Expand Down

0 comments on commit cdac9b1

Please sign in to comment.