Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add sequence iterator and BinlogGenrator #538

Merged
merged 9 commits into from
Apr 16, 2019
62 changes: 0 additions & 62 deletions drainer/translator/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"github.com/pingcap/tidb/mysql"
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/codec"
)

const implicitColID = -1
Expand Down Expand Up @@ -598,64 +597,3 @@ func formatData(data types.Datum, ft types.FieldType) (types.Datum, error) {

return data, nil
}

// DecodeOldAndNewRow decodes a byte slice into datums with a existing row map.
// Row layout: colID1, value1, colID2, value2, .....
func DecodeOldAndNewRow(b []byte, cols map[int64]*types.FieldType, loc *time.Location) (map[int64]types.Datum, map[int64]types.Datum, error) {
if b == nil {
return nil, nil, nil
}
if b[0] == codec.NilFlag {
return nil, nil, nil
}

cnt := 0
var (
data []byte
err error
oldRow = make(map[int64]types.Datum, len(cols))
newRow = make(map[int64]types.Datum, len(cols))
)
for len(b) > 0 {
// Get col id.
data, b, err = codec.CutOne(b)
if err != nil {
return nil, nil, errors.Trace(err)
}
_, cid, err := codec.DecodeOne(data)
if err != nil {
return nil, nil, errors.Trace(err)
}
// Get col value.
data, b, err = codec.CutOne(b)
if err != nil {
return nil, nil, errors.Trace(err)
}
id := cid.GetInt64()
ft, ok := cols[id]
if ok {
v, err := tablecodec.DecodeColumnValue(data, ft, loc)
if err != nil {
return nil, nil, errors.Trace(err)
}

if _, ok := oldRow[id]; ok {
newRow[id] = v
} else {
oldRow[id] = v
}

cnt++
if cnt == len(cols)*2 {
// Get enough data.
break
}
}
}

if cnt != len(cols)*2 || len(newRow) != len(oldRow) {
return nil, nil, errors.Errorf(" row data is corruption %v", b)
}

return oldRow, newRow, nil
}
48 changes: 48 additions & 0 deletions drainer/translator/sequence_iterator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package translator

import (
"io"

"github.com/pingcap/errors"
"github.com/pingcap/tipb/go-binlog"
)

// sequenceIterator is a helper to iterate row event by sequence
type sequenceIterator struct {
mutation *binlog.TableMutation
idx int
insertIdx int
deleteIdx int
updateIdx int
}

func newSequenceIterator(mutation *binlog.TableMutation) *sequenceIterator {
return &sequenceIterator{mutation: mutation}
}

func (si *sequenceIterator) next() (tp binlog.MutationType, row []byte, err error) {
if si.idx >= len(si.mutation.Sequence) {
err = io.EOF
return
}

tp = si.mutation.Sequence[si.idx]
si.idx++

switch tp {
case binlog.MutationType_Insert:
row = si.mutation.InsertedRows[si.insertIdx]
si.insertIdx++
case binlog.MutationType_Update:
row = si.mutation.UpdatedRows[si.updateIdx]
si.updateIdx++
case binlog.MutationType_DeleteRow:
row = si.mutation.DeletedRows[si.deleteIdx]
si.deleteIdx++
default:
err = errors.Errorf("unknown mutation type: %v", tp)
return
}

return
}
61 changes: 61 additions & 0 deletions drainer/translator/sequence_iterator_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package translator

import (
"io"

. "github.com/pingcap/check"
ti "github.com/pingcap/tipb/go-binlog"
)

type testSequenceIteratorSuite struct{}

var _ = Suite(&testSequenceIteratorSuite{})

func (t *testSequenceIteratorSuite) TestIterator(c *C) {
mut := new(ti.TableMutation)
var tps []ti.MutationType
var rows [][]byte

// generate test data
for i := 0; i < 10; i++ {
row := []byte{byte(i)}
rows = append(rows, row)
switch i % 3 {
case 0:
mut.Sequence = append(mut.Sequence, ti.MutationType_Insert)
mut.InsertedRows = append(mut.InsertedRows, row)
tps = append(tps, ti.MutationType_Insert)
case 1:
mut.Sequence = append(mut.Sequence, ti.MutationType_Update)
mut.UpdatedRows = append(mut.UpdatedRows, row)
tps = append(tps, ti.MutationType_Update)
case 2:
mut.Sequence = append(mut.Sequence, ti.MutationType_DeleteRow)
mut.DeletedRows = append(mut.DeletedRows, row)
tps = append(tps, ti.MutationType_DeleteRow)
}
}

// get back by iterator
iter := newSequenceIterator(mut)
var getTps []ti.MutationType
var getRows [][]byte

for {
tp, row, err := iter.next()
if err == io.EOF {
break
}

c.Assert(err, IsNil)
c.Fail()
break
}

getTps = append(getTps, tp)
getRows = append(getRows, row)
}

c.Assert(getTps, DeepEquals, tps)
c.Assert(getRows, DeepEquals, rows)
}
9 changes: 9 additions & 0 deletions drainer/translator/table_info.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package translator

import "github.com/pingcap/parser/model"

// TableInfoGetter is used to get table info by table id of TiDB
type TableInfoGetter interface {
TableByID(id int64) (info *model.TableInfo, ok bool)
SchemaAndTableName(id int64) (string, string, bool)
}
Loading