Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

expose merge mode by config and add metrics #987

Merged
merged 1 commit into from
Jul 7, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions drainer/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package drainer

import (
"github.com/pingcap/tidb-binlog/drainer/sync"
bf "github.com/pingcap/tidb-binlog/pkg/binlogfile"
"github.com/prometheus/client_golang/prometheus"
)
Expand Down Expand Up @@ -124,6 +125,8 @@ var (
var registry = prometheus.NewRegistry()

func init() {
sync.QueueSizeGauge = queueSizeGauge

registry.MustRegister(prometheus.NewProcessCollector(prometheus.ProcessCollectorOpts{}))
registry.MustRegister(prometheus.NewGoCollector())
registry.MustRegister(pumpPositionGauge)
Expand Down
6 changes: 6 additions & 0 deletions drainer/sync/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ import (

var _ Syncer = &MysqlSyncer{}

// QueueSizeGauge to be used.
var QueueSizeGauge *prometheus.GaugeVec

// MysqlSyncer sync binlog to Mysql
type MysqlSyncer struct {
db *sql.DB
Expand Down Expand Up @@ -60,9 +63,12 @@ func CreateLoader(
opts = append(opts, loader.Metrics(&loader.MetricsGroup{
QueryHistogramVec: queryHistogramVec,
EventCounterVec: nil,
QueueSizeGauge: QueueSizeGauge,
}))
}

opts = append(opts, loader.Merge(cfg.Merge))

if cfg.SyncMode != 0 {
mode := loader.SyncMode(cfg.SyncMode)
opts = append(opts, loader.SyncModeOption(mode))
Expand Down
2 changes: 2 additions & 0 deletions drainer/sync/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ type DBConfig struct {
Checkpoint CheckpointConfig `toml:"checkpoint" json:"checkpoint"`
BinlogFileDir string `toml:"dir" json:"dir"`

Merge bool `toml:"merge" json:"merge"`

ZKAddrs string `toml:"zookeeper-addrs" json:"zookeeper-addrs"`
KafkaAddrs string `toml:"kafka-addrs" json:"kafka-addrs"`
KafkaVersion string `toml:"kafka-version" json:"kafka-version"`
Expand Down
1 change: 0 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ require (
github.com/sirupsen/logrus v1.4.1 // indirect
github.com/soheilhy/cmux v0.1.4
github.com/syndtr/goleveldb v1.0.1-0.20190625010220-02440ea7a285
github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5 // indirect
github.com/unrolled/render v0.0.0-20180914162206-b9786414de4d
go.etcd.io/etcd v0.5.0-alpha.5.0.20190320044326-77d4b742cdbf
go.uber.org/atomic v1.5.1 // indirect
Expand Down
3 changes: 1 addition & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -281,9 +281,8 @@ github.com/syndtr/goleveldb v1.0.1-0.20190625010220-02440ea7a285 h1:uSDYjYejelKy
github.com/syndtr/goleveldb v1.0.1-0.20190625010220-02440ea7a285/go.mod h1:9OrXJhf154huy1nPWmuSrkgjPUtUNhA+Zmy+6AESzuA=
github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2/go.mod h1:2PfKggNGDuadAa0LElHrByyrz4JPZ9fFx6Gs7nx7ZZU=
github.com/tmc/grpc-websocket-proxy v0.0.0-20170815181823-89b8d40f7ca8/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U=
github.com/tmc/grpc-websocket-proxy v0.0.0-20171017195756-830351dc03c6 h1:lYIiVDtZnyTWlNwiAxLj0bbpTcx1BWCFhXjfsvmPdNc=
github.com/tmc/grpc-websocket-proxy v0.0.0-20171017195756-830351dc03c6/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U=
github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5 h1:LnC5Kc/wtumK+WB441p7ynQJzVuNRJiqddSIE3IlSEQ=
github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U=
github.com/uber-go/atomic v1.3.2 h1:Azu9lPBWRNKzYXSIwRfgRuDuS0YKsK4NFhiQv98gkxo=
github.com/uber-go/atomic v1.3.2/go.mod h1:/Ct5t2lcmbJ4OSe/waGBoaVvVqtO0bmtfVNex1PFV8g=
github.com/uber/jaeger-client-go v2.15.0+incompatible h1:NP3qsSqNxh8VYr956ur1N/1C1PjvOJnJykCzcD5QHbk=
Expand Down
30 changes: 24 additions & 6 deletions pkg/loader/load.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"context"
gosql "database/sql"
"fmt"
"strconv"
"sync"
"sync/atomic"
"time"
Expand Down Expand Up @@ -102,6 +103,7 @@ type loaderImpl struct {
type MetricsGroup struct {
EventCounterVec *prometheus.CounterVec
QueryHistogramVec *prometheus.HistogramVec
QueueSizeGauge *prometheus.GaugeVec
}

// SyncMode represents the sync mode of DML.
Expand All @@ -120,6 +122,7 @@ type options struct {
metrics *MetricsGroup
saveAppliedTS bool
syncMode SyncMode
merge bool
}

var defaultLoaderOptions = options{
Expand All @@ -129,6 +132,7 @@ var defaultLoaderOptions = options{
metrics: nil,
saveAppliedTS: false,
syncMode: SyncFullColumn,
merge: false,
}

// A Option sets options such batch size, worker count etc.
Expand All @@ -155,6 +159,13 @@ func BatchSize(n int) Option {
}
}

// Merge set merge options.
func Merge(v bool) Option {
return func(o *options) {
o.merge = v
}
}

//SetloopBackSyncInfo set loop back sync info of loader
func SetloopBackSyncInfo(loopBackSyncInfo *loopbacksync.LoopBackSync) Option {
return func(o *options) {
Expand Down Expand Up @@ -196,11 +207,10 @@ func NewLoader(db *gosql.DB, opt ...Option) (Loader, error) {
loopBackSyncInfo: opts.loopBackSyncInfo,
input: make(chan *Txn),
successTxn: make(chan *Txn),
merge: true,
merge: opts.merge,
saveAppliedTS: opts.saveAppliedTS,

ctx: ctx,
cancel: cancel,
ctx: ctx,
cancel: cancel,
}

db.SetMaxOpenConns(opts.workerCount)
Expand Down Expand Up @@ -430,6 +440,14 @@ func (s *loaderImpl) singleExec(executor *executor, dmls []*DML) error {

}

if s.metrics != nil && s.metrics.QueueSizeGauge != nil {
// limit 10 sample
for i := 0; i < len(byHash) && i < 10; i++ {
name := "worker_" + strconv.Itoa(i)
s.metrics.QueueSizeGauge.WithLabelValues(name).Set(float64(len(byHash[i])))
}
}

err := s.execByHash(executor, byHash)
return errors.Trace(err)
}
Expand Down Expand Up @@ -513,7 +531,7 @@ func (s *loaderImpl) Run() error {
}()
}

txnManager := newTxnManager(1024, s.input)
txnManager := newTxnManager(100*1024 /* limit dml number */, s.input)
defer txnManager.Close()

batch := fNewBatchManager(s)
Expand Down Expand Up @@ -572,7 +590,7 @@ func (s *loaderImpl) groupDMLs(dmls []*DML) (batchByTbls map[string][]*DML, sing
batchByTbls = make(map[string][]*DML)
for _, dml := range dmls {
info := dml.info
if info.primaryKey != nil && len(info.uniqueKeys) == 0 {
if info.primaryKey != nil {
tblName := dml.TableName()
batchByTbls[tblName] = append(batchByTbls[tblName], dml)
} else {
Expand Down