diff --git a/drainer/metrics.go b/drainer/metrics.go index 978c8e3d5..685ade1fb 100644 --- a/drainer/metrics.go +++ b/drainer/metrics.go @@ -14,6 +14,7 @@ package drainer import ( + "github.com/pingcap/tidb-binlog/drainer/sync" bf "github.com/pingcap/tidb-binlog/pkg/binlogfile" "github.com/prometheus/client_golang/prometheus" ) @@ -124,6 +125,8 @@ var ( var registry = prometheus.NewRegistry() func init() { + sync.QueueSizeGauge = queueSizeGauge + registry.MustRegister(prometheus.NewProcessCollector(prometheus.ProcessCollectorOpts{})) registry.MustRegister(prometheus.NewGoCollector()) registry.MustRegister(pumpPositionGauge) diff --git a/drainer/sync/mysql.go b/drainer/sync/mysql.go index a85c59269..fb1d12799 100644 --- a/drainer/sync/mysql.go +++ b/drainer/sync/mysql.go @@ -30,6 +30,9 @@ import ( var _ Syncer = &MysqlSyncer{} +// QueueSizeGauge to be used. +var QueueSizeGauge *prometheus.GaugeVec + // MysqlSyncer sync binlog to Mysql type MysqlSyncer struct { db *sql.DB @@ -60,9 +63,12 @@ func CreateLoader( opts = append(opts, loader.Metrics(&loader.MetricsGroup{ QueryHistogramVec: queryHistogramVec, EventCounterVec: nil, + QueueSizeGauge: QueueSizeGauge, })) } + opts = append(opts, loader.Merge(cfg.Merge)) + if cfg.SyncMode != 0 { mode := loader.SyncMode(cfg.SyncMode) opts = append(opts, loader.SyncModeOption(mode)) diff --git a/drainer/sync/util.go b/drainer/sync/util.go index 7c2892ee1..0f46d30cb 100644 --- a/drainer/sync/util.go +++ b/drainer/sync/util.go @@ -35,6 +35,8 @@ type DBConfig struct { Checkpoint CheckpointConfig `toml:"checkpoint" json:"checkpoint"` BinlogFileDir string `toml:"dir" json:"dir"` + Merge bool `toml:"merge" json:"merge"` + ZKAddrs string `toml:"zookeeper-addrs" json:"zookeeper-addrs"` KafkaAddrs string `toml:"kafka-addrs" json:"kafka-addrs"` KafkaVersion string `toml:"kafka-version" json:"kafka-version"` diff --git a/go.mod b/go.mod index 7ea00f048..3a90acd17 100644 --- a/go.mod +++ b/go.mod @@ -32,7 +32,6 @@ require ( github.com/sirupsen/logrus v1.4.1 // indirect github.com/soheilhy/cmux v0.1.4 github.com/syndtr/goleveldb v1.0.1-0.20190625010220-02440ea7a285 - github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5 // indirect github.com/unrolled/render v0.0.0-20180914162206-b9786414de4d go.etcd.io/etcd v0.5.0-alpha.5.0.20190320044326-77d4b742cdbf go.uber.org/atomic v1.5.1 // indirect diff --git a/go.sum b/go.sum index 2a9044fe1..f5529bfb9 100644 --- a/go.sum +++ b/go.sum @@ -281,9 +281,8 @@ github.com/syndtr/goleveldb v1.0.1-0.20190625010220-02440ea7a285 h1:uSDYjYejelKy github.com/syndtr/goleveldb v1.0.1-0.20190625010220-02440ea7a285/go.mod h1:9OrXJhf154huy1nPWmuSrkgjPUtUNhA+Zmy+6AESzuA= github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2/go.mod h1:2PfKggNGDuadAa0LElHrByyrz4JPZ9fFx6Gs7nx7ZZU= github.com/tmc/grpc-websocket-proxy v0.0.0-20170815181823-89b8d40f7ca8/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U= +github.com/tmc/grpc-websocket-proxy v0.0.0-20171017195756-830351dc03c6 h1:lYIiVDtZnyTWlNwiAxLj0bbpTcx1BWCFhXjfsvmPdNc= github.com/tmc/grpc-websocket-proxy v0.0.0-20171017195756-830351dc03c6/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U= -github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5 h1:LnC5Kc/wtumK+WB441p7ynQJzVuNRJiqddSIE3IlSEQ= -github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U= github.com/uber-go/atomic v1.3.2 h1:Azu9lPBWRNKzYXSIwRfgRuDuS0YKsK4NFhiQv98gkxo= github.com/uber-go/atomic v1.3.2/go.mod h1:/Ct5t2lcmbJ4OSe/waGBoaVvVqtO0bmtfVNex1PFV8g= github.com/uber/jaeger-client-go v2.15.0+incompatible h1:NP3qsSqNxh8VYr956ur1N/1C1PjvOJnJykCzcD5QHbk= diff --git a/pkg/loader/load.go b/pkg/loader/load.go index a1e1569fa..2dbcb6af5 100644 --- a/pkg/loader/load.go +++ b/pkg/loader/load.go @@ -17,6 +17,7 @@ import ( "context" gosql "database/sql" "fmt" + "strconv" "sync" "sync/atomic" "time" @@ -102,6 +103,7 @@ type loaderImpl struct { type MetricsGroup struct { EventCounterVec *prometheus.CounterVec QueryHistogramVec *prometheus.HistogramVec + QueueSizeGauge *prometheus.GaugeVec } // SyncMode represents the sync mode of DML. @@ -120,6 +122,7 @@ type options struct { metrics *MetricsGroup saveAppliedTS bool syncMode SyncMode + merge bool } var defaultLoaderOptions = options{ @@ -129,6 +132,7 @@ var defaultLoaderOptions = options{ metrics: nil, saveAppliedTS: false, syncMode: SyncFullColumn, + merge: false, } // A Option sets options such batch size, worker count etc. @@ -155,6 +159,13 @@ func BatchSize(n int) Option { } } +// Merge set merge options. +func Merge(v bool) Option { + return func(o *options) { + o.merge = v + } +} + //SetloopBackSyncInfo set loop back sync info of loader func SetloopBackSyncInfo(loopBackSyncInfo *loopbacksync.LoopBackSync) Option { return func(o *options) { @@ -196,11 +207,10 @@ func NewLoader(db *gosql.DB, opt ...Option) (Loader, error) { loopBackSyncInfo: opts.loopBackSyncInfo, input: make(chan *Txn), successTxn: make(chan *Txn), - merge: true, + merge: opts.merge, saveAppliedTS: opts.saveAppliedTS, - - ctx: ctx, - cancel: cancel, + ctx: ctx, + cancel: cancel, } db.SetMaxOpenConns(opts.workerCount) @@ -430,6 +440,14 @@ func (s *loaderImpl) singleExec(executor *executor, dmls []*DML) error { } + if s.metrics != nil && s.metrics.QueueSizeGauge != nil { + // limit 10 sample + for i := 0; i < len(byHash) && i < 10; i++ { + name := "worker_" + strconv.Itoa(i) + s.metrics.QueueSizeGauge.WithLabelValues(name).Set(float64(len(byHash[i]))) + } + } + err := s.execByHash(executor, byHash) return errors.Trace(err) } @@ -513,7 +531,7 @@ func (s *loaderImpl) Run() error { }() } - txnManager := newTxnManager(1024, s.input) + txnManager := newTxnManager(100*1024 /* limit dml number */, s.input) defer txnManager.Close() batch := fNewBatchManager(s) @@ -572,7 +590,7 @@ func (s *loaderImpl) groupDMLs(dmls []*DML) (batchByTbls map[string][]*DML, sing batchByTbls = make(map[string][]*DML) for _, dml := range dmls { info := dml.info - if info.primaryKey != nil && len(info.uniqueKeys) == 0 { + if info.primaryKey != nil { tblName := dml.TableName() batchByTbls[tblName] = append(batchByTbls[tblName], dml) } else {