Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor drainer syncer #532

Merged
merged 22 commits into from
Apr 24, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 1 addition & 5 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -99,12 +99,8 @@ else
grep -F '<option' "$(TEST_DIR)/all_cov.html"
endif


check-static:
gometalinter --disable-all --deadline 120s \
--enable megacheck \
--enable ineffassign \
$$($(PACKAGE_DIRECTORIES))
golangci-lint --disable errcheck run $$($(PACKAGE_DIRECTORIES))
kennytm marked this conversation as resolved.
Show resolved Hide resolved

update: update_vendor clean_vendor
update_vendor:
Expand Down
2 changes: 1 addition & 1 deletion drainer/checkpoint/checkpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func NewCheckPoint(name string, cfg *Config) (CheckPoint, error) {
case "mysql", "tidb":
cp, err = newMysql(name, cfg)
case "pb":
cp, err = newPb(cfg)
cp, err = NewPb(cfg)
case "kafka":
cp, err = newKafka(cfg)
case "flash":
Expand Down
48 changes: 1 addition & 47 deletions drainer/checkpoint/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
package checkpoint

import (
"math"
"sync"

"github.com/ngaut/log"
Expand All @@ -25,56 +24,18 @@ import (
type KafkaCheckpoint struct {
sync.Mutex
*PbCheckPoint

meta *KafkaMeta
}

// KafkaMeta maintains the safeTS which is synced to kafka
type KafkaMeta struct {
// set to be the minimize commit ts of binlog if we have some binlog not having successful sent to kafka
// or set to be `math.MaxInt64`
safeTS int64

lock sync.Mutex
cond *sync.Cond
}

// SetSafeTS set the safe ts to be ts
func (m *KafkaMeta) SetSafeTS(ts int64) {
log.Debug("set safe ts: ", ts)

m.lock.Lock()
m.safeTS = ts
m.cond.Signal()
m.lock.Unlock()
}

var kafkaMeta *KafkaMeta
var kafkaMetaOnce sync.Once

// GetKafkaMeta return the singleton instance of KafkaMeta
func GetKafkaMeta() *KafkaMeta {
kafkaMetaOnce.Do(func() {
kafkaMeta = new(KafkaMeta)
kafkaMeta.cond = sync.NewCond(&kafkaMeta.lock)
})

return kafkaMeta
}

func newKafka(cfg *Config) (CheckPoint, error) {
pb, err := newPb(cfg)
pb, err := NewPb(cfg)
if err != nil {
return nil, errors.Trace(err)
}

cp := &KafkaCheckpoint{
PbCheckPoint: pb.(*PbCheckPoint),
meta: GetKafkaMeta(),
}

cp.meta.SetSafeTS(math.MaxInt64)

return cp, nil
}

Expand All @@ -92,12 +53,5 @@ func (cp *KafkaCheckpoint) Save(ts int64) error {
return nil
}

cp.meta.lock.Lock()
for cp.meta.safeTS < ts {
log.Debugf("wait for ts: %d safe_ts: %d", ts, cp.meta.safeTS)
cp.meta.cond.Wait()
}
cp.meta.lock.Unlock()

return cp.PbCheckPoint.Save(ts)
}
20 changes: 0 additions & 20 deletions drainer/checkpoint/kafka_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ package checkpoint

import (
"path/filepath"
"time"

. "github.com/pingcap/check"
"github.com/pingcap/errors"
Expand All @@ -36,25 +35,6 @@ func (t *testCheckPointSuite) TestKafka(c *C) {
ts := cp.TS()
c.Assert(ts, Equals, testTs)

// test for safeTs
cp2, ok := cp.(*KafkaCheckpoint)
c.Assert(ok, IsTrue)

var safeTs int64 = 100
newTs := safeTs + 1
cp2.meta.SetSafeTS(safeTs)

go func() {
time.Sleep(500 * time.Millisecond) // sleep for a while
cp2.meta.SetSafeTS(newTs)
}()

begin := time.Now()
err = cp.Save(newTs) // block until `newTs` be set
c.Assert(err, IsNil)
c.Assert(cp.TS(), Equals, newTs)
c.Assert(time.Since(begin).Seconds(), Greater, 0.49) // ~ 0.5

// close the checkpoint
err = cp.Close()
c.Assert(err, IsNil)
Expand Down
2 changes: 1 addition & 1 deletion drainer/checkpoint/pb.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ type PbCheckPoint struct {
}

// NewPb creates a new Pb.
func newPb(cfg *Config) (CheckPoint, error) {
func NewPb(cfg *Config) (CheckPoint, error) {
pb := &PbCheckPoint{initialCommitTS: cfg.InitialCommitTS, name: cfg.CheckPointFile, saveTime: time.Now()}
err := pb.Load()
if err != nil {
Expand Down
6 changes: 3 additions & 3 deletions drainer/checkpoint/pb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ func (t *testCheckPointSuite) TestPb(c *C) {
notExistFileName := "test_not_exist"
cfg := new(Config)
cfg.CheckPointFile = fileName
meta, err := newPb(cfg)
meta, err := NewPb(cfg)
c.Assert(err, IsNil)
defer os.RemoveAll(fileName)

Expand All @@ -49,15 +49,15 @@ func (t *testCheckPointSuite) TestPb(c *C) {

// check not exist meta file
cfg.CheckPointFile = notExistFileName
meta, err = newPb(cfg)
meta, err = NewPb(cfg)
c.Assert(err, IsNil)
err = meta.Load()
c.Assert(err, IsNil)
c.Assert(meta.TS(), Equals, int64(0))

// check not exist meta file, but with initialCommitTs
cfg.InitialCommitTS = 123
meta, err = newPb(cfg)
meta, err = NewPb(cfg)
c.Assert(err, IsNil)
c.Assert(meta.TS(), Equals, cfg.InitialCommitTS)

Expand Down
6 changes: 3 additions & 3 deletions drainer/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/parser/mysql"

"github.com/pingcap/tidb-binlog/drainer/executor"
dsync "github.com/pingcap/tidb-binlog/drainer/sync"
"github.com/pingcap/tidb-binlog/pkg/filter"
"github.com/pingcap/tidb-binlog/pkg/flags"
"github.com/pingcap/tidb-binlog/pkg/security"
Expand Down Expand Up @@ -64,7 +64,7 @@ type SyncerConfig struct {
IgnoreTables []filter.TableName `toml:"ignore-table" json:"ignore-table"`
TxnBatch int `toml:"txn-batch" json:"txn-batch"`
WorkerCount int `toml:"worker-count" json:"worker-count"`
To *executor.DBConfig `toml:"to" json:"to"`
To *dsync.DBConfig `toml:"to" json:"to"`
DoTables []filter.TableName `toml:"replicate-do-table" json:"replicate-do-table"`
DoDBs []string `toml:"replicate-do-db" json:"replicate-do-db"`
DestDBType string `toml:"db-type" json:"db-type"`
Expand Down Expand Up @@ -282,7 +282,7 @@ func (cfg *Config) adjustConfig() error {

// add default syncer.to configuration if need
if cfg.SyncerCfg.To == nil {
cfg.SyncerCfg.To = new(executor.DBConfig)
cfg.SyncerCfg.To = new(dsync.DBConfig)
}
if cfg.SyncerCfg.DestDBType == "kafka" {
// get KafkaAddrs from zookeeper if ZkAddrs is setted
Expand Down
70 changes: 0 additions & 70 deletions drainer/executor/executor.go

This file was deleted.

51 changes: 0 additions & 51 deletions drainer/executor/mysql.go

This file was deleted.

Loading