Skip to content

Commit

Permalink
expose merge mode by config and add metrics (#980) (#987)
Browse files Browse the repository at this point in the history
  • Loading branch information
july2993 authored Jul 7, 2020
1 parent 1d976dd commit 4018c14
Show file tree
Hide file tree
Showing 6 changed files with 36 additions and 9 deletions.
3 changes: 3 additions & 0 deletions drainer/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package drainer

import (
"github.com/pingcap/tidb-binlog/drainer/sync"
bf "github.com/pingcap/tidb-binlog/pkg/binlogfile"
"github.com/prometheus/client_golang/prometheus"
)
Expand Down Expand Up @@ -124,6 +125,8 @@ var (
var registry = prometheus.NewRegistry()

func init() {
sync.QueueSizeGauge = queueSizeGauge

registry.MustRegister(prometheus.NewProcessCollector(prometheus.ProcessCollectorOpts{}))
registry.MustRegister(prometheus.NewGoCollector())
registry.MustRegister(pumpPositionGauge)
Expand Down
6 changes: 6 additions & 0 deletions drainer/sync/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ import (

var _ Syncer = &MysqlSyncer{}

// QueueSizeGauge to be used.
var QueueSizeGauge *prometheus.GaugeVec

// MysqlSyncer sync binlog to Mysql
type MysqlSyncer struct {
db *sql.DB
Expand Down Expand Up @@ -60,9 +63,12 @@ func CreateLoader(
opts = append(opts, loader.Metrics(&loader.MetricsGroup{
QueryHistogramVec: queryHistogramVec,
EventCounterVec: nil,
QueueSizeGauge: QueueSizeGauge,
}))
}

opts = append(opts, loader.Merge(cfg.Merge))

if cfg.SyncMode != 0 {
mode := loader.SyncMode(cfg.SyncMode)
opts = append(opts, loader.SyncModeOption(mode))
Expand Down
2 changes: 2 additions & 0 deletions drainer/sync/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ type DBConfig struct {
Checkpoint CheckpointConfig `toml:"checkpoint" json:"checkpoint"`
BinlogFileDir string `toml:"dir" json:"dir"`

Merge bool `toml:"merge" json:"merge"`

ZKAddrs string `toml:"zookeeper-addrs" json:"zookeeper-addrs"`
KafkaAddrs string `toml:"kafka-addrs" json:"kafka-addrs"`
KafkaVersion string `toml:"kafka-version" json:"kafka-version"`
Expand Down
1 change: 0 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ require (
github.com/sirupsen/logrus v1.4.1 // indirect
github.com/soheilhy/cmux v0.1.4
github.com/syndtr/goleveldb v1.0.1-0.20190625010220-02440ea7a285
github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5 // indirect
github.com/unrolled/render v0.0.0-20180914162206-b9786414de4d
go.etcd.io/etcd v0.5.0-alpha.5.0.20190320044326-77d4b742cdbf
go.uber.org/atomic v1.5.1 // indirect
Expand Down
3 changes: 1 addition & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -281,9 +281,8 @@ github.com/syndtr/goleveldb v1.0.1-0.20190625010220-02440ea7a285 h1:uSDYjYejelKy
github.com/syndtr/goleveldb v1.0.1-0.20190625010220-02440ea7a285/go.mod h1:9OrXJhf154huy1nPWmuSrkgjPUtUNhA+Zmy+6AESzuA=
github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2/go.mod h1:2PfKggNGDuadAa0LElHrByyrz4JPZ9fFx6Gs7nx7ZZU=
github.com/tmc/grpc-websocket-proxy v0.0.0-20170815181823-89b8d40f7ca8/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U=
github.com/tmc/grpc-websocket-proxy v0.0.0-20171017195756-830351dc03c6 h1:lYIiVDtZnyTWlNwiAxLj0bbpTcx1BWCFhXjfsvmPdNc=
github.com/tmc/grpc-websocket-proxy v0.0.0-20171017195756-830351dc03c6/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U=
github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5 h1:LnC5Kc/wtumK+WB441p7ynQJzVuNRJiqddSIE3IlSEQ=
github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U=
github.com/uber-go/atomic v1.3.2 h1:Azu9lPBWRNKzYXSIwRfgRuDuS0YKsK4NFhiQv98gkxo=
github.com/uber-go/atomic v1.3.2/go.mod h1:/Ct5t2lcmbJ4OSe/waGBoaVvVqtO0bmtfVNex1PFV8g=
github.com/uber/jaeger-client-go v2.15.0+incompatible h1:NP3qsSqNxh8VYr956ur1N/1C1PjvOJnJykCzcD5QHbk=
Expand Down
30 changes: 24 additions & 6 deletions pkg/loader/load.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"context"
gosql "database/sql"
"fmt"
"strconv"
"sync"
"sync/atomic"
"time"
Expand Down Expand Up @@ -102,6 +103,7 @@ type loaderImpl struct {
type MetricsGroup struct {
EventCounterVec *prometheus.CounterVec
QueryHistogramVec *prometheus.HistogramVec
QueueSizeGauge *prometheus.GaugeVec
}

// SyncMode represents the sync mode of DML.
Expand All @@ -120,6 +122,7 @@ type options struct {
metrics *MetricsGroup
saveAppliedTS bool
syncMode SyncMode
merge bool
}

var defaultLoaderOptions = options{
Expand All @@ -129,6 +132,7 @@ var defaultLoaderOptions = options{
metrics: nil,
saveAppliedTS: false,
syncMode: SyncFullColumn,
merge: false,
}

// A Option sets options such batch size, worker count etc.
Expand All @@ -155,6 +159,13 @@ func BatchSize(n int) Option {
}
}

// Merge set merge options.
func Merge(v bool) Option {
return func(o *options) {
o.merge = v
}
}

//SetloopBackSyncInfo set loop back sync info of loader
func SetloopBackSyncInfo(loopBackSyncInfo *loopbacksync.LoopBackSync) Option {
return func(o *options) {
Expand Down Expand Up @@ -196,11 +207,10 @@ func NewLoader(db *gosql.DB, opt ...Option) (Loader, error) {
loopBackSyncInfo: opts.loopBackSyncInfo,
input: make(chan *Txn),
successTxn: make(chan *Txn),
merge: true,
merge: opts.merge,
saveAppliedTS: opts.saveAppliedTS,

ctx: ctx,
cancel: cancel,
ctx: ctx,
cancel: cancel,
}

db.SetMaxOpenConns(opts.workerCount)
Expand Down Expand Up @@ -430,6 +440,14 @@ func (s *loaderImpl) singleExec(executor *executor, dmls []*DML) error {

}

if s.metrics != nil && s.metrics.QueueSizeGauge != nil {
// limit 10 sample
for i := 0; i < len(byHash) && i < 10; i++ {
name := "worker_" + strconv.Itoa(i)
s.metrics.QueueSizeGauge.WithLabelValues(name).Set(float64(len(byHash[i])))
}
}

err := s.execByHash(executor, byHash)
return errors.Trace(err)
}
Expand Down Expand Up @@ -513,7 +531,7 @@ func (s *loaderImpl) Run() error {
}()
}

txnManager := newTxnManager(1024, s.input)
txnManager := newTxnManager(100*1024 /* limit dml number */, s.input)
defer txnManager.Close()

batch := fNewBatchManager(s)
Expand Down Expand Up @@ -572,7 +590,7 @@ func (s *loaderImpl) groupDMLs(dmls []*DML) (batchByTbls map[string][]*DML, sing
batchByTbls = make(map[string][]*DML)
for _, dml := range dmls {
info := dml.info
if info.primaryKey != nil && len(info.uniqueKeys) == 0 {
if info.primaryKey != nil {
tblName := dml.TableName()
batchByTbls[tblName] = append(batchByTbls[tblName], dml)
} else {
Expand Down

0 comments on commit 4018c14

Please sign in to comment.