Skip to content

Commit

Permalink
refactor drainer syncer (#532)
Browse files Browse the repository at this point in the history
Discard the origin Translator and executor interface, add a new Syncer interface in drainer/sync/
Avoid to split transaction in translator and assemble back in executor.
Avoid some magic handle about safe ts of checkpoint by kafka and flash doing batch execute at downstream
Unify using pkg/loader to sync to mysql in drainer like other tool
Much simplify syncer.go
Add unit test about relate change code, make syncer.go /sync /translator covered more than 75%,
  • Loading branch information
july2993 authored Apr 24, 2019
1 parent b23caeb commit 0780072
Show file tree
Hide file tree
Showing 39 changed files with 2,098 additions and 2,197 deletions.
6 changes: 1 addition & 5 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -99,12 +99,8 @@ else
grep -F '<option' "$(TEST_DIR)/all_cov.html"
endif


check-static:
gometalinter --disable-all --deadline 120s \
--enable megacheck \
--enable ineffassign \
$$($(PACKAGE_DIRECTORIES))
golangci-lint --disable errcheck run $$($(PACKAGE_DIRECTORIES))

update: update_vendor clean_vendor
update_vendor:
Expand Down
2 changes: 1 addition & 1 deletion drainer/checkpoint/checkpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func NewCheckPoint(name string, cfg *Config) (CheckPoint, error) {
case "mysql", "tidb":
cp, err = newMysql(name, cfg)
case "pb":
cp, err = newPb(cfg)
cp, err = NewPb(cfg)
case "kafka":
cp, err = newKafka(cfg)
case "flash":
Expand Down
48 changes: 1 addition & 47 deletions drainer/checkpoint/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
package checkpoint

import (
"math"
"sync"

"github.com/ngaut/log"
Expand All @@ -25,56 +24,18 @@ import (
type KafkaCheckpoint struct {
sync.Mutex
*PbCheckPoint

meta *KafkaMeta
}

// KafkaMeta maintains the safeTS which is synced to kafka
type KafkaMeta struct {
// set to be the minimize commit ts of binlog if we have some binlog not having successful sent to kafka
// or set to be `math.MaxInt64`
safeTS int64

lock sync.Mutex
cond *sync.Cond
}

// SetSafeTS set the safe ts to be ts
func (m *KafkaMeta) SetSafeTS(ts int64) {
log.Debug("set safe ts: ", ts)

m.lock.Lock()
m.safeTS = ts
m.cond.Signal()
m.lock.Unlock()
}

var kafkaMeta *KafkaMeta
var kafkaMetaOnce sync.Once

// GetKafkaMeta return the singleton instance of KafkaMeta
func GetKafkaMeta() *KafkaMeta {
kafkaMetaOnce.Do(func() {
kafkaMeta = new(KafkaMeta)
kafkaMeta.cond = sync.NewCond(&kafkaMeta.lock)
})

return kafkaMeta
}

func newKafka(cfg *Config) (CheckPoint, error) {
pb, err := newPb(cfg)
pb, err := NewPb(cfg)
if err != nil {
return nil, errors.Trace(err)
}

cp := &KafkaCheckpoint{
PbCheckPoint: pb.(*PbCheckPoint),
meta: GetKafkaMeta(),
}

cp.meta.SetSafeTS(math.MaxInt64)

return cp, nil
}

Expand All @@ -92,12 +53,5 @@ func (cp *KafkaCheckpoint) Save(ts int64) error {
return nil
}

cp.meta.lock.Lock()
for cp.meta.safeTS < ts {
log.Debugf("wait for ts: %d safe_ts: %d", ts, cp.meta.safeTS)
cp.meta.cond.Wait()
}
cp.meta.lock.Unlock()

return cp.PbCheckPoint.Save(ts)
}
20 changes: 0 additions & 20 deletions drainer/checkpoint/kafka_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ package checkpoint

import (
"path/filepath"
"time"

. "github.com/pingcap/check"
"github.com/pingcap/errors"
Expand All @@ -36,25 +35,6 @@ func (t *testCheckPointSuite) TestKafka(c *C) {
ts := cp.TS()
c.Assert(ts, Equals, testTs)

// test for safeTs
cp2, ok := cp.(*KafkaCheckpoint)
c.Assert(ok, IsTrue)

var safeTs int64 = 100
newTs := safeTs + 1
cp2.meta.SetSafeTS(safeTs)

go func() {
time.Sleep(500 * time.Millisecond) // sleep for a while
cp2.meta.SetSafeTS(newTs)
}()

begin := time.Now()
err = cp.Save(newTs) // block until `newTs` be set
c.Assert(err, IsNil)
c.Assert(cp.TS(), Equals, newTs)
c.Assert(time.Since(begin).Seconds(), Greater, 0.49) // ~ 0.5

// close the checkpoint
err = cp.Close()
c.Assert(err, IsNil)
Expand Down
2 changes: 1 addition & 1 deletion drainer/checkpoint/pb.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ type PbCheckPoint struct {
}

// NewPb creates a new Pb.
func newPb(cfg *Config) (CheckPoint, error) {
func NewPb(cfg *Config) (CheckPoint, error) {
pb := &PbCheckPoint{initialCommitTS: cfg.InitialCommitTS, name: cfg.CheckPointFile, saveTime: time.Now()}
err := pb.Load()
if err != nil {
Expand Down
6 changes: 3 additions & 3 deletions drainer/checkpoint/pb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ func (t *testCheckPointSuite) TestPb(c *C) {
notExistFileName := "test_not_exist"
cfg := new(Config)
cfg.CheckPointFile = fileName
meta, err := newPb(cfg)
meta, err := NewPb(cfg)
c.Assert(err, IsNil)
defer os.RemoveAll(fileName)

Expand All @@ -49,15 +49,15 @@ func (t *testCheckPointSuite) TestPb(c *C) {

// check not exist meta file
cfg.CheckPointFile = notExistFileName
meta, err = newPb(cfg)
meta, err = NewPb(cfg)
c.Assert(err, IsNil)
err = meta.Load()
c.Assert(err, IsNil)
c.Assert(meta.TS(), Equals, int64(0))

// check not exist meta file, but with initialCommitTs
cfg.InitialCommitTS = 123
meta, err = newPb(cfg)
meta, err = NewPb(cfg)
c.Assert(err, IsNil)
c.Assert(meta.TS(), Equals, cfg.InitialCommitTS)

Expand Down
6 changes: 3 additions & 3 deletions drainer/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/parser/mysql"

"github.com/pingcap/tidb-binlog/drainer/executor"
dsync "github.com/pingcap/tidb-binlog/drainer/sync"
"github.com/pingcap/tidb-binlog/pkg/filter"
"github.com/pingcap/tidb-binlog/pkg/flags"
"github.com/pingcap/tidb-binlog/pkg/security"
Expand Down Expand Up @@ -64,7 +64,7 @@ type SyncerConfig struct {
IgnoreTables []filter.TableName `toml:"ignore-table" json:"ignore-table"`
TxnBatch int `toml:"txn-batch" json:"txn-batch"`
WorkerCount int `toml:"worker-count" json:"worker-count"`
To *executor.DBConfig `toml:"to" json:"to"`
To *dsync.DBConfig `toml:"to" json:"to"`
DoTables []filter.TableName `toml:"replicate-do-table" json:"replicate-do-table"`
DoDBs []string `toml:"replicate-do-db" json:"replicate-do-db"`
DestDBType string `toml:"db-type" json:"db-type"`
Expand Down Expand Up @@ -282,7 +282,7 @@ func (cfg *Config) adjustConfig() error {

// add default syncer.to configuration if need
if cfg.SyncerCfg.To == nil {
cfg.SyncerCfg.To = new(executor.DBConfig)
cfg.SyncerCfg.To = new(dsync.DBConfig)
}
if cfg.SyncerCfg.DestDBType == "kafka" {
// get KafkaAddrs from zookeeper if ZkAddrs is setted
Expand Down
70 changes: 0 additions & 70 deletions drainer/executor/executor.go

This file was deleted.

51 changes: 0 additions & 51 deletions drainer/executor/mysql.go

This file was deleted.

Loading

0 comments on commit 0780072

Please sign in to comment.