diff --git a/dumpling/cmd/dumpling/main.go b/dumpling/cmd/dumpling/main.go index b5dcd50b..fb045a2e 100644 --- a/dumpling/cmd/dumpling/main.go +++ b/dumpling/cmd/dumpling/main.go @@ -56,6 +56,10 @@ func main() { fmt.Printf("\nparse arguments failed: %+v\n", err) os.Exit(1) } + if pflag.NArg() > 0 { + fmt.Printf("\nmeet some unparsed arguments, please check again: %+v\n", pflag.Args()) + os.Exit(1) + } registry := prometheus.NewRegistry() registry.MustRegister(prometheus.NewProcessCollector(prometheus.ProcessCollectorOpts{})) diff --git a/dumpling/go.mod b/dumpling/go.mod index 481e33d0..50adfca1 100644 --- a/dumpling/go.mod +++ b/dumpling/go.mod @@ -16,6 +16,7 @@ require ( github.com/pingcap/log v0.0.0-20200828042413-fce0951f1463 github.com/pingcap/tidb-tools v4.0.8-0.20200927084250-e47e0e12c7f3+incompatible github.com/prometheus/client_golang v1.5.1 + github.com/prometheus/client_model v0.2.0 github.com/soheilhy/cmux v0.1.4 github.com/spf13/pflag v1.0.5 github.com/tikv/pd v1.1.0-beta.0.20200910042021-254d1345be09 diff --git a/dumpling/v4/export/block_allow_list.go b/dumpling/v4/export/block_allow_list.go index dbc33182..70b9f5cb 100644 --- a/dumpling/v4/export/block_allow_list.go +++ b/dumpling/v4/export/block_allow_list.go @@ -31,7 +31,7 @@ func filterTables(conf *Config) { } if len(ignoredDBTable) > 0 { - log.Debug("ignore table", zap.String("", ignoredDBTable.Literal())) + log.Debug("ignore table", zap.String("tables", ignoredDBTable.Literal())) } conf.Tables = dbTables diff --git a/dumpling/v4/export/config.go b/dumpling/v4/export/config.go index 4cbcab21..cb551f10 100644 --- a/dumpling/v4/export/config.go +++ b/dumpling/v4/export/config.go @@ -173,7 +173,7 @@ func DefaultConfig() *Config { func (conf *Config) String() string { cfg, err := json.Marshal(conf) if err != nil { - log.Error("marshal config to json", zap.Error(err)) + log.Error("fail to marshal config to json", zap.Error(err)) } return string(cfg) } diff --git a/dumpling/v4/export/dump.go b/dumpling/v4/export/dump.go index 832027f2..2cac5ce1 100755 --- a/dumpling/v4/export/dump.go +++ b/dumpling/v4/export/dump.go @@ -191,6 +191,10 @@ func (d *Dumper) Dump() (dumpErr error) { summary.SetUnit(summary.BackupUnit) defer summary.Summary(summary.BackupUnit) + logProgressCtx, logProgressCancel := context.WithCancel(ctx) + go d.runLogProgress(logProgressCtx) + defer logProgressCancel() + tableDataStartTime := time.Now() if conf.SQL == "" { if err = d.dumpDatabases(metaConn, taskChan); err != nil { @@ -224,6 +228,7 @@ func (d *Dumper) startWriters(ctx context.Context, wg *errgroup.Group, taskChan writer.rebuildConnFn = rebuildConnFn writer.setFinishTableCallBack(func(task Task) { if td, ok := task.(*TaskTableData); ok { + finishedTablesCounter.With(conf.Labels).Inc() log.Debug("finished dumping table data", zap.String("database", td.Meta.DatabaseName()), zap.String("table", td.Meta.TableName())) @@ -257,7 +262,7 @@ func (d *Dumper) dumpDatabases(metaConn *sql.Conn, taskChan chan<- Task) error { d.sendTaskToChan(task, taskChan) for _, table := range tables { - log.Debug("start dumping table...", + log.Debug("start dumping table...", zap.String("database", dbName), zap.String("table", table.Name)) meta, err := dumpTableMeta(conf, metaConn, dbName, table) if err != nil { @@ -321,7 +326,8 @@ func (d *Dumper) concurrentDumpTable(conn *sql.Conn, meta TableMeta, taskChan ch } if field == "" { // skip split chunk logic if not found proper field - log.Debug("fallback to sequential dump due to no proper field", zap.String("field", field)) + log.Warn("fallback to sequential dump due to no proper field", + zap.String("database", db), zap.String("table", tbl)) return d.sequentialDumpTable(conn, meta, taskChan) } @@ -334,13 +340,17 @@ func (d *Dumper) concurrentDumpTable(conn *sql.Conn, meta TableMeta, taskChan ch zap.String("upper", max.String())) count := estimateCount(db, tbl, conn, field, conf) - log.Info("get estimated rows count", zap.Uint64("estimateCount", count)) + log.Info("get estimated rows count", + zap.String("database", db), + zap.String("table", tbl), + zap.Uint64("estimateCount", count)) if count < conf.Rows { // skip chunk logic if estimates are low - log.Debug("skip concurrent dump due to estimate count < rows", + log.Warn("skip concurrent dump due to estimate count < rows", zap.Uint64("estimate count", count), zap.Uint64("conf.rows", conf.Rows), - ) + zap.String("database", db), + zap.String("table", tbl)) return d.sequentialDumpTable(conn, meta, taskChan) } @@ -416,7 +426,7 @@ func (d *Dumper) selectMinAndMaxIntValue(conn *sql.Conn, db, tbl, field string) } if !smax.Valid || !smin.Valid { // found no data - log.Warn("no data to dump", zap.String("schema", db), zap.String("table", tbl)) + log.Warn("no data to dump", zap.String("database", db), zap.String("table", tbl)) return zero, zero, nil } @@ -634,7 +644,7 @@ func startHTTPService(d *Dumper) error { go func() { err := startDumplingService(conf.StatusAddr) if err != nil { - log.Error("dumpling stops to serving service", zap.Error(err)) + log.Warn("meet error when stopping dumpling http service", zap.Error(err)) } }() } @@ -696,7 +706,7 @@ func tidbSetPDClientForGC(d *Dumper) error { if len(pdAddrs) > 0 { doPdGC, err := checkSameCluster(ctx, pool, pdAddrs) if err != nil { - log.Warn("meet error while check whether fetched pd addr and TiDB belongs to one cluster", zap.Error(err), zap.Strings("pdAddrs", pdAddrs)) + log.Warn("meet error while check whether fetched pd addr and TiDB belong to one cluster", zap.Error(err), zap.Strings("pdAddrs", pdAddrs)) } else if doPdGC { pdClient, err := pd.NewClientWithContext(ctx, pdAddrs, pd.SecurityOption{}) if err != nil { diff --git a/dumpling/v4/export/http_handler.go b/dumpling/v4/export/http_handler.go index 98f79b48..092c6243 100644 --- a/dumpling/v4/export/http_handler.go +++ b/dumpling/v4/export/http_handler.go @@ -35,7 +35,7 @@ func startHTTPServer(lis net.Listener) { err := httpServer.Serve(lis) err = errors.Cause(err) if err != nil && !isErrNetClosing(err) && err != http.ErrServerClosed { - log.Error("http server return with error", zap.Error(err)) + log.Warn("http server return with error", zap.Error(err)) } } diff --git a/dumpling/v4/export/metrics.go b/dumpling/v4/export/metrics.go index 0a97c514..1d0ed381 100644 --- a/dumpling/v4/export/metrics.go +++ b/dumpling/v4/export/metrics.go @@ -3,7 +3,10 @@ package export import ( + "math" + "github.com/prometheus/client_golang/prometheus" + dto "github.com/prometheus/client_model/go" ) var ( @@ -21,6 +24,13 @@ var ( Name: "finished_rows", Help: "counter for dumpling finished rows", }, []string{}) + finishedTablesCounter = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: "dumpling", + Subsystem: "dump", + Name: "finished_tables", + Help: "counter for dumpling finished tables", + }, []string{}) writeTimeHistogram = prometheus.NewHistogramVec( prometheus.HistogramOpts{ Namespace: "dumpling", @@ -57,6 +67,7 @@ var ( func RegisterMetrics(registry *prometheus.Registry) { registry.MustRegister(finishedSizeCounter) registry.MustRegister(finishedRowsCounter) + registry.MustRegister(finishedTablesCounter) registry.MustRegister(writeTimeHistogram) registry.MustRegister(receiveWriteChunkTimeHistogram) registry.MustRegister(errorCount) @@ -67,8 +78,18 @@ func RegisterMetrics(registry *prometheus.Registry) { func RemoveLabelValuesWithTaskInMetrics(labels prometheus.Labels) { finishedSizeCounter.Delete(labels) finishedRowsCounter.Delete(labels) + finishedTablesCounter.Delete(labels) writeTimeHistogram.Delete(labels) receiveWriteChunkTimeHistogram.Delete(labels) errorCount.Delete(labels) taskChannelCapacity.Delete(labels) } + +// ReadCounter reports the current value of the counter. +func ReadCounter(counter prometheus.Counter) float64 { + var metric dto.Metric + if err := counter.Write(&metric); err != nil { + return math.NaN() + } + return metric.Counter.GetValue() +} diff --git a/dumpling/v4/export/status.go b/dumpling/v4/export/status.go new file mode 100644 index 00000000..5c8ad544 --- /dev/null +++ b/dumpling/v4/export/status.go @@ -0,0 +1,60 @@ +// Copyright 2020 PingCAP, Inc. Licensed under Apache-2.0. + +package export + +import ( + "context" + "fmt" + "time" + + "github.com/docker/go-units" + "go.uber.org/zap" + + "github.com/pingcap/dumpling/v4/log" +) + +const logProgressTick = 2 * time.Minute + +func (d *Dumper) runLogProgress(ctx context.Context) { + conf := d.conf + totalTables := float64(calculateTableCount(conf.Tables)) + logProgressTicker := time.NewTicker(logProgressTick) + lastCheckpoint := time.Now() + lastBytes := float64(0) + defer logProgressTicker.Stop() + for { + select { + case <-ctx.Done(): + log.Debug("stopping log progress") + return + case <-logProgressTicker.C: + nanoseconds := float64(time.Since(lastCheckpoint).Nanoseconds()) + + completedTables := ReadCounter(finishedTablesCounter.With(conf.Labels)) + finishedBytes := ReadCounter(finishedSizeCounter.With(conf.Labels)) + finishedRows := ReadCounter(finishedRowsCounter.With(conf.Labels)) + + log.Info("progress", + zap.String("tables", fmt.Sprintf("%.0f/%.0f (%.1f%%)", completedTables, totalTables, completedTables/totalTables*100)), + zap.String("finished rows", fmt.Sprintf("%.0f", finishedRows)), + zap.String("finished size", units.HumanSize(finishedBytes)), + zap.Float64("average speed(MiB/s)", (finishedBytes-lastBytes)/(1048576e-9*nanoseconds)), + ) + + lastCheckpoint = time.Now() + lastBytes = finishedBytes + } + } +} + +func calculateTableCount(m DatabaseTables) int { + cnt := 0 + for _, tables := range m { + for _, table := range tables { + if table.Type == TableTypeBase { + cnt++ + } + } + } + return cnt +} diff --git a/dumpling/v4/export/writer_util.go b/dumpling/v4/export/writer_util.go index 865443fa..e0ee693a 100755 --- a/dumpling/v4/export/writer_util.go +++ b/dumpling/v4/export/writer_util.go @@ -174,7 +174,7 @@ func WriteInsert(pCtx context.Context, cfg *Config, meta TableMeta, tblIR TableD row = MakeRowReceiver(meta.ColumnTypes()) counter uint64 lastCounter uint64 - escapeBackSlash = cfg.EscapeBackslash + escapeBackslash = cfg.EscapeBackslash err error ) @@ -199,10 +199,10 @@ func WriteInsert(pCtx context.Context, cfg *Config, meta TableMeta, tblIR TableD lastBfSize := bf.Len() if selectedField != "" { if err = fileRowIter.Decode(row); err != nil { - log.Error("scanning from sql.Row failed", zap.Error(err)) + log.Error("fail to scan from sql.Row", zap.Error(err)) return errors.Trace(err) } - row.WriteToBuffer(bf, escapeBackSlash) + row.WriteToBuffer(bf, escapeBackslash) } else { bf.WriteString("()") } @@ -243,9 +243,10 @@ func WriteInsert(pCtx context.Context, cfg *Config, meta TableMeta, tblIR TableD break } } - log.Debug("dumping table", + log.Debug("finish dumping table(chunk)", + zap.String("database", meta.DatabaseName()), zap.String("table", meta.TableName()), - zap.Uint64("record counts", counter)) + zap.Uint64("total rows", counter)) if bf.Len() > 0 { wp.input <- bf } @@ -295,7 +296,7 @@ func WriteInsertInCsv(pCtx context.Context, cfg *Config, meta TableMeta, tblIR T row = MakeRowReceiver(meta.ColumnTypes()) counter uint64 lastCounter uint64 - escapeBackSlash = cfg.EscapeBackslash + escapeBackslash = cfg.EscapeBackslash selectedFields = meta.SelectedField() err error ) @@ -303,7 +304,7 @@ func WriteInsertInCsv(pCtx context.Context, cfg *Config, meta TableMeta, tblIR T if !cfg.NoHeader && len(meta.ColumnNames()) != 0 && selectedFields != "" { for i, col := range meta.ColumnNames() { bf.Write(opt.delimiter) - escapeCSV([]byte(col), bf, escapeBackSlash, opt) + escapeCSV([]byte(col), bf, escapeBackslash, opt) bf.Write(opt.delimiter) if i != len(meta.ColumnTypes())-1 { bf.Write(opt.separator) @@ -317,10 +318,10 @@ func WriteInsertInCsv(pCtx context.Context, cfg *Config, meta TableMeta, tblIR T lastBfSize := bf.Len() if selectedFields != "" { if err = fileRowIter.Decode(row); err != nil { - log.Error("scanning from sql.Row failed", zap.Error(err)) + log.Error("fail to scan from sql.Row", zap.Error(err)) return errors.Trace(err) } - row.WriteToBufferInCsv(bf, escapeBackSlash, opt) + row.WriteToBufferInCsv(bf, escapeBackslash, opt) } counter++ wp.currentFileSize += uint64(bf.Len()-lastBfSize) + 1 // 1 is for "\n" @@ -348,9 +349,10 @@ func WriteInsertInCsv(pCtx context.Context, cfg *Config, meta TableMeta, tblIR T } } - log.Debug("dumping table", + log.Debug("finish dumping table(chunk)", + zap.String("database", meta.DatabaseName()), zap.String("table", meta.TableName()), - zap.Uint64("record counts", counter)) + zap.Uint64("total rows", counter)) if bf.Len() > 0 { wp.input <- bf }