Skip to content

Commit

Permalink
*: refine dumpling log hint system (pingcap#228)
Browse files Browse the repository at this point in the history
* refine log message, check unparsed arguments and add log status printer
  • Loading branch information
lichunzhu authored Dec 30, 2020
1 parent 5877fe5 commit 2931258
Show file tree
Hide file tree
Showing 9 changed files with 120 additions and 22 deletions.
4 changes: 4 additions & 0 deletions dumpling/cmd/dumpling/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,10 @@ func main() {
fmt.Printf("\nparse arguments failed: %+v\n", err)
os.Exit(1)
}
if pflag.NArg() > 0 {
fmt.Printf("\nmeet some unparsed arguments, please check again: %+v\n", pflag.Args())
os.Exit(1)
}

registry := prometheus.NewRegistry()
registry.MustRegister(prometheus.NewProcessCollector(prometheus.ProcessCollectorOpts{}))
Expand Down
1 change: 1 addition & 0 deletions dumpling/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ require (
github.com/pingcap/log v0.0.0-20200828042413-fce0951f1463
github.com/pingcap/tidb-tools v4.0.8-0.20200927084250-e47e0e12c7f3+incompatible
github.com/prometheus/client_golang v1.5.1
github.com/prometheus/client_model v0.2.0
github.com/soheilhy/cmux v0.1.4
github.com/spf13/pflag v1.0.5
github.com/tikv/pd v1.1.0-beta.0.20200910042021-254d1345be09
Expand Down
2 changes: 1 addition & 1 deletion dumpling/v4/export/block_allow_list.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func filterTables(conf *Config) {
}

if len(ignoredDBTable) > 0 {
log.Debug("ignore table", zap.String("", ignoredDBTable.Literal()))
log.Debug("ignore table", zap.String("tables", ignoredDBTable.Literal()))
}

conf.Tables = dbTables
Expand Down
2 changes: 1 addition & 1 deletion dumpling/v4/export/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ func DefaultConfig() *Config {
func (conf *Config) String() string {
cfg, err := json.Marshal(conf)
if err != nil {
log.Error("marshal config to json", zap.Error(err))
log.Error("fail to marshal config to json", zap.Error(err))
}
return string(cfg)
}
Expand Down
26 changes: 18 additions & 8 deletions dumpling/v4/export/dump.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,10 @@ func (d *Dumper) Dump() (dumpErr error) {
summary.SetUnit(summary.BackupUnit)
defer summary.Summary(summary.BackupUnit)

logProgressCtx, logProgressCancel := context.WithCancel(ctx)
go d.runLogProgress(logProgressCtx)
defer logProgressCancel()

tableDataStartTime := time.Now()
if conf.SQL == "" {
if err = d.dumpDatabases(metaConn, taskChan); err != nil {
Expand Down Expand Up @@ -224,6 +228,7 @@ func (d *Dumper) startWriters(ctx context.Context, wg *errgroup.Group, taskChan
writer.rebuildConnFn = rebuildConnFn
writer.setFinishTableCallBack(func(task Task) {
if td, ok := task.(*TaskTableData); ok {
finishedTablesCounter.With(conf.Labels).Inc()
log.Debug("finished dumping table data",
zap.String("database", td.Meta.DatabaseName()),
zap.String("table", td.Meta.TableName()))
Expand Down Expand Up @@ -257,7 +262,7 @@ func (d *Dumper) dumpDatabases(metaConn *sql.Conn, taskChan chan<- Task) error {
d.sendTaskToChan(task, taskChan)

for _, table := range tables {
log.Debug("start dumping table...",
log.Debug("start dumping table...", zap.String("database", dbName),
zap.String("table", table.Name))
meta, err := dumpTableMeta(conf, metaConn, dbName, table)
if err != nil {
Expand Down Expand Up @@ -321,7 +326,8 @@ func (d *Dumper) concurrentDumpTable(conn *sql.Conn, meta TableMeta, taskChan ch
}
if field == "" {
// skip split chunk logic if not found proper field
log.Debug("fallback to sequential dump due to no proper field", zap.String("field", field))
log.Warn("fallback to sequential dump due to no proper field",
zap.String("database", db), zap.String("table", tbl))
return d.sequentialDumpTable(conn, meta, taskChan)
}

Expand All @@ -334,13 +340,17 @@ func (d *Dumper) concurrentDumpTable(conn *sql.Conn, meta TableMeta, taskChan ch
zap.String("upper", max.String()))

count := estimateCount(db, tbl, conn, field, conf)
log.Info("get estimated rows count", zap.Uint64("estimateCount", count))
log.Info("get estimated rows count",
zap.String("database", db),
zap.String("table", tbl),
zap.Uint64("estimateCount", count))
if count < conf.Rows {
// skip chunk logic if estimates are low
log.Debug("skip concurrent dump due to estimate count < rows",
log.Warn("skip concurrent dump due to estimate count < rows",
zap.Uint64("estimate count", count),
zap.Uint64("conf.rows", conf.Rows),
)
zap.String("database", db),
zap.String("table", tbl))
return d.sequentialDumpTable(conn, meta, taskChan)
}

Expand Down Expand Up @@ -416,7 +426,7 @@ func (d *Dumper) selectMinAndMaxIntValue(conn *sql.Conn, db, tbl, field string)
}
if !smax.Valid || !smin.Valid {
// found no data
log.Warn("no data to dump", zap.String("schema", db), zap.String("table", tbl))
log.Warn("no data to dump", zap.String("database", db), zap.String("table", tbl))
return zero, zero, nil
}

Expand Down Expand Up @@ -634,7 +644,7 @@ func startHTTPService(d *Dumper) error {
go func() {
err := startDumplingService(conf.StatusAddr)
if err != nil {
log.Error("dumpling stops to serving service", zap.Error(err))
log.Warn("meet error when stopping dumpling http service", zap.Error(err))
}
}()
}
Expand Down Expand Up @@ -696,7 +706,7 @@ func tidbSetPDClientForGC(d *Dumper) error {
if len(pdAddrs) > 0 {
doPdGC, err := checkSameCluster(ctx, pool, pdAddrs)
if err != nil {
log.Warn("meet error while check whether fetched pd addr and TiDB belongs to one cluster", zap.Error(err), zap.Strings("pdAddrs", pdAddrs))
log.Warn("meet error while check whether fetched pd addr and TiDB belong to one cluster", zap.Error(err), zap.Strings("pdAddrs", pdAddrs))
} else if doPdGC {
pdClient, err := pd.NewClientWithContext(ctx, pdAddrs, pd.SecurityOption{})
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion dumpling/v4/export/http_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func startHTTPServer(lis net.Listener) {
err := httpServer.Serve(lis)
err = errors.Cause(err)
if err != nil && !isErrNetClosing(err) && err != http.ErrServerClosed {
log.Error("http server return with error", zap.Error(err))
log.Warn("http server return with error", zap.Error(err))
}
}

Expand Down
21 changes: 21 additions & 0 deletions dumpling/v4/export/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,10 @@
package export

import (
"math"

"github.com/prometheus/client_golang/prometheus"
dto "github.com/prometheus/client_model/go"
)

var (
Expand All @@ -21,6 +24,13 @@ var (
Name: "finished_rows",
Help: "counter for dumpling finished rows",
}, []string{})
finishedTablesCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: "dumpling",
Subsystem: "dump",
Name: "finished_tables",
Help: "counter for dumpling finished tables",
}, []string{})
writeTimeHistogram = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "dumpling",
Expand Down Expand Up @@ -57,6 +67,7 @@ var (
func RegisterMetrics(registry *prometheus.Registry) {
registry.MustRegister(finishedSizeCounter)
registry.MustRegister(finishedRowsCounter)
registry.MustRegister(finishedTablesCounter)
registry.MustRegister(writeTimeHistogram)
registry.MustRegister(receiveWriteChunkTimeHistogram)
registry.MustRegister(errorCount)
Expand All @@ -67,8 +78,18 @@ func RegisterMetrics(registry *prometheus.Registry) {
func RemoveLabelValuesWithTaskInMetrics(labels prometheus.Labels) {
finishedSizeCounter.Delete(labels)
finishedRowsCounter.Delete(labels)
finishedTablesCounter.Delete(labels)
writeTimeHistogram.Delete(labels)
receiveWriteChunkTimeHistogram.Delete(labels)
errorCount.Delete(labels)
taskChannelCapacity.Delete(labels)
}

// ReadCounter reports the current value of the counter.
func ReadCounter(counter prometheus.Counter) float64 {
var metric dto.Metric
if err := counter.Write(&metric); err != nil {
return math.NaN()
}
return metric.Counter.GetValue()
}
60 changes: 60 additions & 0 deletions dumpling/v4/export/status.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
// Copyright 2020 PingCAP, Inc. Licensed under Apache-2.0.

package export

import (
"context"
"fmt"
"time"

"github.com/docker/go-units"
"go.uber.org/zap"

"github.com/pingcap/dumpling/v4/log"
)

const logProgressTick = 2 * time.Minute

func (d *Dumper) runLogProgress(ctx context.Context) {
conf := d.conf
totalTables := float64(calculateTableCount(conf.Tables))
logProgressTicker := time.NewTicker(logProgressTick)
lastCheckpoint := time.Now()
lastBytes := float64(0)
defer logProgressTicker.Stop()
for {
select {
case <-ctx.Done():
log.Debug("stopping log progress")
return
case <-logProgressTicker.C:
nanoseconds := float64(time.Since(lastCheckpoint).Nanoseconds())

completedTables := ReadCounter(finishedTablesCounter.With(conf.Labels))
finishedBytes := ReadCounter(finishedSizeCounter.With(conf.Labels))
finishedRows := ReadCounter(finishedRowsCounter.With(conf.Labels))

log.Info("progress",
zap.String("tables", fmt.Sprintf("%.0f/%.0f (%.1f%%)", completedTables, totalTables, completedTables/totalTables*100)),
zap.String("finished rows", fmt.Sprintf("%.0f", finishedRows)),
zap.String("finished size", units.HumanSize(finishedBytes)),
zap.Float64("average speed(MiB/s)", (finishedBytes-lastBytes)/(1048576e-9*nanoseconds)),
)

lastCheckpoint = time.Now()
lastBytes = finishedBytes
}
}
}

func calculateTableCount(m DatabaseTables) int {
cnt := 0
for _, tables := range m {
for _, table := range tables {
if table.Type == TableTypeBase {
cnt++
}
}
}
return cnt
}
24 changes: 13 additions & 11 deletions dumpling/v4/export/writer_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ func WriteInsert(pCtx context.Context, cfg *Config, meta TableMeta, tblIR TableD
row = MakeRowReceiver(meta.ColumnTypes())
counter uint64
lastCounter uint64
escapeBackSlash = cfg.EscapeBackslash
escapeBackslash = cfg.EscapeBackslash
err error
)

Expand All @@ -199,10 +199,10 @@ func WriteInsert(pCtx context.Context, cfg *Config, meta TableMeta, tblIR TableD
lastBfSize := bf.Len()
if selectedField != "" {
if err = fileRowIter.Decode(row); err != nil {
log.Error("scanning from sql.Row failed", zap.Error(err))
log.Error("fail to scan from sql.Row", zap.Error(err))
return errors.Trace(err)
}
row.WriteToBuffer(bf, escapeBackSlash)
row.WriteToBuffer(bf, escapeBackslash)
} else {
bf.WriteString("()")
}
Expand Down Expand Up @@ -243,9 +243,10 @@ func WriteInsert(pCtx context.Context, cfg *Config, meta TableMeta, tblIR TableD
break
}
}
log.Debug("dumping table",
log.Debug("finish dumping table(chunk)",
zap.String("database", meta.DatabaseName()),
zap.String("table", meta.TableName()),
zap.Uint64("record counts", counter))
zap.Uint64("total rows", counter))
if bf.Len() > 0 {
wp.input <- bf
}
Expand Down Expand Up @@ -295,15 +296,15 @@ func WriteInsertInCsv(pCtx context.Context, cfg *Config, meta TableMeta, tblIR T
row = MakeRowReceiver(meta.ColumnTypes())
counter uint64
lastCounter uint64
escapeBackSlash = cfg.EscapeBackslash
escapeBackslash = cfg.EscapeBackslash
selectedFields = meta.SelectedField()
err error
)

if !cfg.NoHeader && len(meta.ColumnNames()) != 0 && selectedFields != "" {
for i, col := range meta.ColumnNames() {
bf.Write(opt.delimiter)
escapeCSV([]byte(col), bf, escapeBackSlash, opt)
escapeCSV([]byte(col), bf, escapeBackslash, opt)
bf.Write(opt.delimiter)
if i != len(meta.ColumnTypes())-1 {
bf.Write(opt.separator)
Expand All @@ -317,10 +318,10 @@ func WriteInsertInCsv(pCtx context.Context, cfg *Config, meta TableMeta, tblIR T
lastBfSize := bf.Len()
if selectedFields != "" {
if err = fileRowIter.Decode(row); err != nil {
log.Error("scanning from sql.Row failed", zap.Error(err))
log.Error("fail to scan from sql.Row", zap.Error(err))
return errors.Trace(err)
}
row.WriteToBufferInCsv(bf, escapeBackSlash, opt)
row.WriteToBufferInCsv(bf, escapeBackslash, opt)
}
counter++
wp.currentFileSize += uint64(bf.Len()-lastBfSize) + 1 // 1 is for "\n"
Expand Down Expand Up @@ -348,9 +349,10 @@ func WriteInsertInCsv(pCtx context.Context, cfg *Config, meta TableMeta, tblIR T
}
}

log.Debug("dumping table",
log.Debug("finish dumping table(chunk)",
zap.String("database", meta.DatabaseName()),
zap.String("table", meta.TableName()),
zap.Uint64("record counts", counter))
zap.Uint64("total rows", counter))
if bf.Len() > 0 {
wp.input <- bf
}
Expand Down

0 comments on commit 2931258

Please sign in to comment.