Skip to content

Commit

Permalink
update black/white list to block/allow list && update master/slave to…
Browse files Browse the repository at this point in the history
… primary/secondary (#983)
  • Loading branch information
ti-srebot authored Jun 28, 2020
1 parent 53e4181 commit 1d976dd
Show file tree
Hide file tree
Showing 20 changed files with 61 additions and 61 deletions.
2 changes: 1 addition & 1 deletion arbiter/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,7 @@ func syncBinlogs(ctx context.Context, source <-chan *reader.Message, ld loader.L
}
receivedTs = msg.Binlog.CommitTs

txn, err := loader.SlaveBinlogToTxn(msg.Binlog)
txn, err := loader.SecondaryBinlogToTxn(msg.Binlog)
if err != nil {
log.Error("transfer binlog failed, program will stop handling data from loader", zap.Error(err))
return err
Expand Down
2 changes: 1 addition & 1 deletion drainer/checkpoint/checkpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ type CheckPoint interface {
Load() error

// Save saves checkpoint information.
Save(commitTS int64, slaveTS int64, consistent bool) error
Save(commitTS int64, secondaryTS int64, consistent bool) error

// TS gets checkpoint commit timestamp.
TS() int64
Expand Down
2 changes: 1 addition & 1 deletion drainer/checkpoint/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func (sp *FileCheckPoint) Load() error {
}

// Save implements CheckPoint.Save interface
func (sp *FileCheckPoint) Save(ts, slaveTS int64, consistent bool) error {
func (sp *FileCheckPoint) Save(ts, secondaryTS int64, consistent bool) error {
sp.Lock()
defer sp.Unlock()

Expand Down
8 changes: 4 additions & 4 deletions drainer/checkpoint/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ func (sp *MysqlCheckPoint) Load() error {
}

// Save implements checkpoint.Save interface
func (sp *MysqlCheckPoint) Save(ts, slaveTS int64, consistent bool) error {
func (sp *MysqlCheckPoint) Save(ts, secondaryTS int64, consistent bool) error {
sp.Lock()
defer sp.Unlock()

Expand All @@ -133,9 +133,9 @@ func (sp *MysqlCheckPoint) Save(ts, slaveTS int64, consistent bool) error {
sp.CommitTS = ts
sp.ConsistentSaved = consistent

if slaveTS > 0 {
sp.TsMap["master-ts"] = ts
sp.TsMap["slave-ts"] = slaveTS
if secondaryTS > 0 {
sp.TsMap["primary-ts"] = ts
sp.TsMap["secondary-ts"] = secondaryTS
}

b, err := json.Marshal(sp)
Expand Down
10 changes: 5 additions & 5 deletions drainer/checkpoint/mysql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,8 @@ func (s *saveSuite) TestShouldUpdateTsMap(c *C) {
}
err = cp.Save(65536, 3333, false)
c.Assert(err, IsNil)
c.Assert(cp.TsMap["master-ts"], Equals, int64(65536))
c.Assert(cp.TsMap["slave-ts"], Equals, int64(3333))
c.Assert(cp.TsMap["primary-ts"], Equals, int64(65536))
c.Assert(cp.TsMap["secondary-ts"], Equals, int64(3333))
}

type loadSuite struct{}
Expand All @@ -83,15 +83,15 @@ func (s *loadSuite) TestShouldLoadFromDB(c *C) {
TsMap: make(map[string]int64),
}
rows := sqlmock.NewRows([]string{"checkPoint"}).
AddRow(`{"commitTS": 1024, "consistent": true, "ts-map": {"master-ts": 2000, "slave-ts": 1999}}`)
AddRow(`{"commitTS": 1024, "consistent": true, "ts-map": {"primary-ts": 2000, "secondary-ts": 1999}}`)
mock.ExpectQuery("select checkPoint from db.tbl.*").WillReturnRows(rows)

err = cp.Load()
c.Assert(err, IsNil)
c.Assert(cp.CommitTS, Equals, int64(1024))
c.Assert(cp.ConsistentSaved, Equals, true)
c.Assert(cp.TsMap["master-ts"], Equals, int64(2000))
c.Assert(cp.TsMap["slave-ts"], Equals, int64(1999))
c.Assert(cp.TsMap["primary-ts"], Equals, int64(2000))
c.Assert(cp.TsMap["secondary-ts"], Equals, int64(1999))
}

func (s *loadSuite) TestShouldUseInitialCommitTs(c *C) {
Expand Down
4 changes: 2 additions & 2 deletions drainer/relay.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ func feedByRelayLog(r relay.Reader, ld loader.Loader, cp checkpoint.CheckPoint)
continue
}

txn, err := loader.SlaveBinlogToTxn(sbinlog)
txn, err := loader.SecondaryBinlogToTxn(sbinlog)
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -149,7 +149,7 @@ func feedByRelayLog(r relay.Reader, ld loader.Loader, cp checkpoint.CheckPoint)
return errors.Trace(readerErr)
}

err := cp.Save(lastSuccessTS, 0 /* slaveTS */, true /*consistent*/)
err := cp.Save(lastSuccessTS, 0 /* secondaryTS */, true /*consistent*/)
if err != nil {
return errors.Trace(err)
}
Expand Down
6 changes: 3 additions & 3 deletions drainer/relay/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,16 +79,16 @@ func (r *reader) Run() context.CancelFunc {
break
}

slaveBinlog := new(obinlog.Binlog)
if err = slaveBinlog.Unmarshal(blg.Payload); err != nil {
secondaryBinlog := new(obinlog.Binlog)
if err = secondaryBinlog.Unmarshal(blg.Payload); err != nil {
break
}

select {
case <-ctx.Done():
err = ctx.Err()
log.Warn("Producing transaction is interrupted")
case r.binlogs <- slaveBinlog:
case r.binlogs <- secondaryBinlog:
}
}
// If binlogger is not done, notify it to stop.
Expand Down
2 changes: 1 addition & 1 deletion drainer/relay/reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func (r *testReaderSuite) readBinlogAndCheck(c *C, dir string, expectedNumber in
number := 0
for txn := range relayReader.Binlogs() {
number++
loaderTxn, err := loader.SlaveBinlogToTxn(txn)
loaderTxn, err := loader.SecondaryBinlogToTxn(txn)
c.Assert(err, IsNil)
lastTxn = loaderTxn
}
Expand Down
2 changes: 1 addition & 1 deletion drainer/relay/relayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func NewRelayer(dir string, maxFileSize int64, tableInfoGetter translator.TableI
// WriteBinlog writes binlog to relay log.
func (r *relayer) WriteBinlog(schema string, table string, tiBinlog *tb.Binlog, pv *tb.PrewriteValue) (tb.Pos, error) {
pos := tb.Pos{}
binlog, err := translator.TiBinlogToSlaveBinlog(r.tableInfoGetter, schema, table, tiBinlog, pv)
binlog, err := translator.TiBinlogToSecondaryBinlog(r.tableInfoGetter, schema, table, tiBinlog, pv)
if err != nil {
return pos, errors.Trace(err)
}
Expand Down
4 changes: 2 additions & 2 deletions drainer/sync/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,12 +121,12 @@ func (p *KafkaSyncer) SetSafeMode(mode bool) bool {

// Sync implements Syncer interface
func (p *KafkaSyncer) Sync(item *Item) error {
slaveBinlog, err := translator.TiBinlogToSlaveBinlog(p.tableInfoGetter, item.Schema, item.Table, item.Binlog, item.PrewriteValue)
secondaryBinlog, err := translator.TiBinlogToSecondaryBinlog(p.tableInfoGetter, item.Schema, item.Table, item.Binlog, item.PrewriteValue)
if err != nil {
return errors.Trace(err)
}

err = p.saveBinlog(slaveBinlog, item)
err = p.saveBinlog(secondaryBinlog, item)
if err != nil {
return errors.Trace(err)
}
Expand Down
4 changes: 2 additions & 2 deletions drainer/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,13 +249,13 @@ func (s *Syncer) handleSuccess(fakeBinlog chan *pb.Binlog, lastTS *int64) {
log.Info("handleSuccess quit")
}

func (s *Syncer) savePoint(ts, slaveTS int64) {
func (s *Syncer) savePoint(ts, secondaryTS int64) {
if ts < s.cp.TS() {
log.Error("save ts is less than checkpoint ts %d", zap.Int64("save ts", ts), zap.Int64("checkpoint ts", s.cp.TS()))
}

log.Info("write save point", zap.Int64("ts", ts))
err := s.cp.Save(ts, slaveTS, false)
err := s.cp.Save(ts, secondaryTS, false)
if err != nil {
log.Fatal("save checkpoint failed", zap.Int64("ts", ts), zap.Error(err))
}
Expand Down
14 changes: 7 additions & 7 deletions drainer/translator/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,16 +32,16 @@ import (
"go.uber.org/zap"
)

// TiBinlogToSlaveBinlog translates the format to slave binlog
func TiBinlogToSlaveBinlog(
// TiBinlogToSecondaryBinlog translates the format to secondary binlog
func TiBinlogToSecondaryBinlog(
infoGetter TableInfoGetter,
schema string,
table string,
tiBinlog *pb.Binlog,
pv *pb.PrewriteValue,
) (*obinlog.Binlog, error) {
if tiBinlog.DdlJobId > 0 { // DDL
slaveBinlog := &obinlog.Binlog{
secondaryBinlog := &obinlog.Binlog{
Type: obinlog.BinlogType_DDL,
CommitTs: tiBinlog.GetCommitTs(),
DdlData: &obinlog.DDLData{
Expand All @@ -50,10 +50,10 @@ func TiBinlogToSlaveBinlog(
DdlQuery: tiBinlog.GetDdlQuery(),
},
}
return slaveBinlog, nil
return secondaryBinlog, nil
}

slaveBinlog := &obinlog.Binlog{
secondaryBinlog := &obinlog.Binlog{
Type: obinlog.BinlogType_DML,
CommitTs: tiBinlog.GetCommitTs(),
DmlData: new(obinlog.DMLData),
Expand All @@ -73,7 +73,7 @@ func TiBinlogToSlaveBinlog(

iter := newSequenceIterator(&mut)
table := genTable(schema, info)
slaveBinlog.DmlData.Tables = append(slaveBinlog.DmlData.Tables, table)
secondaryBinlog.DmlData.Tables = append(secondaryBinlog.DmlData.Tables, table)

for {
tableMutation, err := nextRow(schema, info, isTblDroppingCol, iter)
Expand All @@ -86,7 +86,7 @@ func TiBinlogToSlaveBinlog(
table.Mutations = append(table.Mutations, tableMutation)
}
}
return slaveBinlog, nil
return secondaryBinlog, nil
}

func genTable(schema string, tableInfo *model.TableInfo) (table *obinlog.Table) {
Expand Down
22 changes: 11 additions & 11 deletions drainer/translator/kafka_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,10 @@ var _ = check.Suite(&testKafkaSuite{})
func (t *testKafkaSuite) TestDDL(c *check.C) {
t.SetDDL()

slaveBinog, err := TiBinlogToSlaveBinlog(t, t.Schema, t.Table, t.TiBinlog, nil)
secondaryBinlog, err := TiBinlogToSecondaryBinlog(t, t.Schema, t.Table, t.TiBinlog, nil)
c.Assert(err, check.IsNil)

c.Assert(slaveBinog, check.DeepEquals, &obinlog.Binlog{
c.Assert(secondaryBinlog, check.DeepEquals, &obinlog.Binlog{
Type: obinlog.BinlogType_DDL,
CommitTs: t.TiBinlog.GetCommitTs(),
DdlData: &obinlog.DDLData{
Expand All @@ -48,13 +48,13 @@ func (t *testKafkaSuite) TestDDL(c *check.C) {
}

func (t *testKafkaSuite) testDML(c *check.C, tp obinlog.MutationType) {
slaveBinog, err := TiBinlogToSlaveBinlog(t, t.Schema, t.Table, t.TiBinlog, t.PV)
secondaryBinlog, err := TiBinlogToSecondaryBinlog(t, t.Schema, t.Table, t.TiBinlog, t.PV)
c.Assert(err, check.IsNil)

c.Assert(slaveBinog.GetCommitTs(), check.Equals, t.TiBinlog.GetCommitTs())
c.Assert(slaveBinog.Type, check.Equals, obinlog.BinlogType_DML)
c.Assert(secondaryBinlog.GetCommitTs(), check.Equals, t.TiBinlog.GetCommitTs())
c.Assert(secondaryBinlog.Type, check.Equals, obinlog.BinlogType_DML)

table := slaveBinog.DmlData.Tables[0]
table := secondaryBinlog.DmlData.Tables[0]
tableMut := table.Mutations[0]
c.Assert(tableMut.GetType(), check.Equals, tp)

Expand All @@ -67,13 +67,13 @@ func (t *testKafkaSuite) testDML(c *check.C, tp obinlog.MutationType) {
func (t *testKafkaSuite) TestAllDML(c *check.C) {
t.SetAllDML(c)

slaveBinog, err := TiBinlogToSlaveBinlog(t, t.Schema, t.Table, t.TiBinlog, t.PV)
secondaryBinlog, err := TiBinlogToSecondaryBinlog(t, t.Schema, t.Table, t.TiBinlog, t.PV)
c.Assert(err, check.IsNil)

c.Assert(slaveBinog.Type, check.Equals, obinlog.BinlogType_DML)
c.Assert(slaveBinog.GetCommitTs(), check.Equals, t.TiBinlog.GetCommitTs())
c.Assert(secondaryBinlog.Type, check.Equals, obinlog.BinlogType_DML)
c.Assert(secondaryBinlog.GetCommitTs(), check.Equals, t.TiBinlog.GetCommitTs())

table := slaveBinog.DmlData.Tables[0]
table := secondaryBinlog.DmlData.Tables[0]

insertMut := table.Mutations[0]
updateMut := table.Mutations[1]
Expand Down Expand Up @@ -137,7 +137,7 @@ func checkColumn(c *check.C, info *obinlog.ColumnInfo, col *obinlog.Column, datu

datumV := fmt.Sprintf("%v", datum.GetValue())
if info.GetMysqlType() == "enum" {
// we set uint64 as the index for slave proto but not the name
// we set uint64 as the index for secondary proto but not the name
datumV = fmt.Sprintf("%v", datum.GetInt64())
}

Expand Down
14 changes: 7 additions & 7 deletions pkg/filter/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,14 +77,14 @@ func (s *Filter) genRegexMap() {
}
}

// whiteFilter whitelist filtering
func (s *Filter) whiteFilter(stbs []TableName) []TableName {
// allowFilter allowlist filtering
func (s *Filter) allowFilter(stbs []TableName) []TableName {
var tbs []TableName
if len(s.doTables) == 0 && len(s.doDBs) == 0 {
return stbs
}
for _, tb := range stbs {
// if the white list contains "schema_s.table_t" and "schema_s",
// if the allow list contains "schema_s.table_t" and "schema_s",
// all tables in that schema_s will pass the Filter.
if s.matchTable(s.doTables, tb) {
tbs = append(tbs, tb)
Expand All @@ -96,8 +96,8 @@ func (s *Filter) whiteFilter(stbs []TableName) []TableName {
return tbs
}

// blackFilter return TableName which is not in the blacklist
func (s *Filter) blackFilter(stbs []TableName) []TableName {
// blockFilter return TableName which is not in the blocklist
func (s *Filter) blockFilter(stbs []TableName) []TableName {
var tbs []TableName
if len(s.ignoreTables) == 0 && len(s.ignoreDBs) == 0 {
return stbs
Expand All @@ -119,8 +119,8 @@ func (s *Filter) blackFilter(stbs []TableName) []TableName {
func (s *Filter) SkipSchemaAndTable(schema string, table string) bool {
tbs := []TableName{{Schema: strings.ToLower(schema), Table: strings.ToLower(table)}}

tbs = s.whiteFilter(tbs)
tbs = s.blackFilter(tbs)
tbs = s.allowFilter(tbs)
tbs = s.blockFilter(tbs)
return len(tbs) == 0
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/loader/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ A package to load data into MySQL in real-time, aimed to be used by *reparo*, *d
### Getting started
- Example is available via [example_loader_test.go](./example_loader_test.go)

You need to write a translator to use *Loader* like *SlaveBinlogToTxn* in [translate.go](./translate.go) to translate upstream data format (e.g. binlog) into `Txn` objects.
You need to write a translator to use *Loader* like *SecondaryBinlogToTxn* in [translate.go](./translate.go) to translate upstream data format (e.g. binlog) into `Txn` objects.


## Overview
Expand Down
2 changes: 1 addition & 1 deletion pkg/loader/load.go
Original file line number Diff line number Diff line change
Expand Up @@ -810,7 +810,7 @@ func getAppliedTS(db *gosql.DB) int64 {
errCode, ok := pkgsql.GetSQLErrCode(err)
// if tidb dont't support `show master status`, will return 1105 ErrUnknown error
if !ok || int(errCode) != tmysql.ErrUnknown {
log.Warn("get ts from slave cluster failed", zap.Error(err))
log.Warn("get ts from secondary cluster failed", zap.Error(err))
}
return 0
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/loader/translate.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ import (
"github.com/pingcap/tidb/types"
)

// SlaveBinlogToTxn translate the Binlog format into Txn
func SlaveBinlogToTxn(binlog *pb.Binlog) (*Txn, error) {
// SecondaryBinlogToTxn translate the Binlog format into Txn
func SecondaryBinlogToTxn(binlog *pb.Binlog) (*Txn, error) {
txn := new(Txn)
var err error
switch binlog.Type {
Expand Down
14 changes: 7 additions & 7 deletions pkg/loader/translate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@ import (
. "github.com/pingcap/check"
)

type slaveBinlogToTxnSuite struct{}
type secondaryBinlogToTxnSuite struct{}

var _ = Suite(&slaveBinlogToTxnSuite{})
var _ = Suite(&secondaryBinlogToTxnSuite{})

func (s *slaveBinlogToTxnSuite) TestTranslateDDL(c *C) {
func (s *secondaryBinlogToTxnSuite) TestTranslateDDL(c *C) {
db, table := "test", "hello"
sql := "CREATE TABLE hello (id INT AUTO_INCREMENT) PRIMARY KEY(id);"
binlog := pb.Binlog{
Expand All @@ -34,14 +34,14 @@ func (s *slaveBinlogToTxnSuite) TestTranslateDDL(c *C) {
DdlQuery: []byte(sql),
},
}
txn, err := SlaveBinlogToTxn(&binlog)
txn, err := SecondaryBinlogToTxn(&binlog)
c.Assert(err, IsNil)
c.Assert(txn.DDL.Database, Equals, db)
c.Assert(txn.DDL.Table, Equals, table)
c.Assert(txn.DDL.SQL, Equals, sql)
}

func (s *slaveBinlogToTxnSuite) TestTranslateDML(c *C) {
func (s *secondaryBinlogToTxnSuite) TestTranslateDML(c *C) {
db, table := "test", "hello"
var oldVal, newVal int64 = 41, 42
dml := pb.DMLData{
Expand Down Expand Up @@ -81,7 +81,7 @@ func (s *slaveBinlogToTxnSuite) TestTranslateDML(c *C) {
binlog := pb.Binlog{
DmlData: &dml,
}
txn, err := SlaveBinlogToTxn(&binlog)
txn, err := SecondaryBinlogToTxn(&binlog)
c.Assert(err, IsNil)
c.Assert(txn.DMLs, HasLen, 2)
for _, dml := range txn.DMLs {
Expand All @@ -99,7 +99,7 @@ func (s *slaveBinlogToTxnSuite) TestTranslateDML(c *C) {
c.Assert(insert.Values["uid"], Equals, newVal)
}

func (s *slaveBinlogToTxnSuite) TestGetDMLType(c *C) {
func (s *secondaryBinlogToTxnSuite) TestGetDMLType(c *C) {
mut := pb.TableMutation{}
mut.Type = pb.MutationType(404).Enum()
c.Assert(getDMLType(&mut), Equals, UnknownDMLType)
Expand Down
Loading

0 comments on commit 1d976dd

Please sign in to comment.