Skip to content

Commit

Permalink
puller,mounter,processor: always pull the old value internally (#2271) (
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot authored Jul 30, 2021
1 parent 32b34a0 commit 7bba9f7
Show file tree
Hide file tree
Showing 10 changed files with 385 additions and 240 deletions.
19 changes: 0 additions & 19 deletions cdc/entry/codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,19 +31,16 @@ import (
var (
tablePrefix = []byte{'t'}
recordPrefix = []byte("_r")
indexPrefix = []byte("_i")
metaPrefix = []byte("m")
)

var (
intLen = 8
tablePrefixLen = len(tablePrefix)
recordPrefixLen = len(recordPrefix)
indexPrefixLen = len(indexPrefix)
metaPrefixLen = len(metaPrefix)
prefixTableIDLen = tablePrefixLen + intLen /*tableID*/
prefixRecordIDLen = recordPrefixLen + intLen /*recordID*/
prefixIndexLen = indexPrefixLen + intLen /*indexID*/
)

// MetaType is for data structure meta/data flag.
Expand Down Expand Up @@ -120,22 +117,6 @@ func decodeRecordID(key []byte) (rest []byte, recordID int64, err error) {
return
}

func decodeIndexKey(key []byte) (indexID int64, indexValue []types.Datum, err error) {
if len(key) < prefixIndexLen || !bytes.HasPrefix(key, indexPrefix) {
return 0, nil, cerror.ErrInvalidRecordKey.GenWithStackByArgs(key)
}
key = key[indexPrefixLen:]
key, indexID, err = codec.DecodeInt(key)
if err != nil {
return 0, nil, cerror.WrapError(cerror.ErrCodecDecode, err)
}
indexValue, err = codec.Decode(key, 2)
if err != nil {
return 0, nil, cerror.WrapError(cerror.ErrCodecDecode, err)
}
return
}

func decodeMetaKey(ek []byte) (meta, error) {
if !bytes.HasPrefix(ek, metaPrefix) {
return nil, cerror.ErrInvalidRecordKey.GenWithStackByArgs(ek)
Expand Down
211 changes: 27 additions & 184 deletions cdc/entry/mounter.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ package entry
import (
"bytes"
"context"
"encoding/binary"
"encoding/json"
"fmt"
"math"
Expand Down Expand Up @@ -64,66 +63,6 @@ type rowKVEntry struct {
PreRowExist bool
}

type indexKVEntry struct {
baseKVEntry
IndexID int64
IndexValue []types.Datum
}

func (idx *indexKVEntry) unflatten(tableInfo *model.TableInfo, tz *time.Location) error {
if tableInfo.ID != idx.PhysicalTableID {
isPartition := false
if pi := tableInfo.GetPartitionInfo(); pi != nil {
for _, p := range pi.Definitions {
if p.ID == idx.PhysicalTableID {
isPartition = true
break
}
}
}
if !isPartition {
return cerror.ErrWrongTableInfo.GenWithStackByArgs(tableInfo.ID, idx.PhysicalTableID)
}
}
index, exist := tableInfo.GetIndexInfo(idx.IndexID)
if !exist {
return cerror.ErrIndexKeyTableNotFound.GenWithStackByArgs(idx.IndexID)
}
if !isDistinct(index, idx.IndexValue) {
idx.RecordID = idx.baseKVEntry.RecordID
if idx.baseKVEntry.RecordID.IsInt() {
idx.IndexValue = idx.IndexValue[:len(idx.IndexValue)-1]
} else {
idx.IndexValue = idx.IndexValue[:len(idx.IndexValue)-idx.RecordID.NumCols()]
}
}
for i, v := range idx.IndexValue {
colOffset := index.Columns[i].Offset
fieldType := &tableInfo.Columns[colOffset].FieldType
datum, err := unflatten(v, fieldType, tz)
if err != nil {
return errors.Trace(err)
}
idx.IndexValue[i] = datum
}
return nil
}

func isDistinct(index *timodel.IndexInfo, indexValue []types.Datum) bool {
if index.Primary {
return true
}
if index.Unique {
for _, value := range indexValue {
if value.IsNull() {
return false
}
}
return true
}
return false
}

// Mounter is used to parse SQL events from KV events
type Mounter interface {
Run(ctx context.Context) error
Expand Down Expand Up @@ -258,8 +197,7 @@ func (m *mounterImpl) unmarshalAndMountRowChanged(ctx context.Context, raw *mode
}
return nil, cerror.ErrSnapshotTableNotFound.GenWithStackByArgs(physicalTableID)
}
switch {
case bytes.HasPrefix(key, recordPrefix):
if bytes.HasPrefix(key, recordPrefix) {
rowKV, err := m.unmarshalRowKVEntry(tableInfo, raw.Key, raw.Value, raw.OldValue, baseInfo)
if err != nil {
return nil, errors.Trace(err)
Expand All @@ -268,15 +206,6 @@ func (m *mounterImpl) unmarshalAndMountRowChanged(ctx context.Context, raw *mode
return nil, nil
}
return m.mountRowKVEntry(tableInfo, rowKV, raw.ApproximateSize())
case bytes.HasPrefix(key, indexPrefix):
indexKV, err := m.unmarshalIndexKVEntry(key, raw.Value, raw.OldValue, baseInfo)
if err != nil {
return nil, errors.Trace(err)
}
if indexKV == nil {
return nil, nil
}
return m.mountIndexKVEntry(tableInfo, indexKV, raw.ApproximateSize())
}
return nil, nil
}()
Expand Down Expand Up @@ -331,46 +260,6 @@ func (m *mounterImpl) unmarshalRowKVEntry(tableInfo *model.TableInfo, rawKey []b
}, nil
}

func (m *mounterImpl) unmarshalIndexKVEntry(restKey []byte, rawValue []byte, rawOldValue []byte, base baseKVEntry) (*indexKVEntry, error) {
// Skip set index KV.
// By default we cannot get the old value of a deleted row, then we must get the value of unique key
// or primary key for seeking the deleted row through its index key.
// After the old value was enabled, we can skip the index key.
if !base.Delete || m.enableOldValue {
return nil, nil
}

indexID, indexValue, err := decodeIndexKey(restKey)
if err != nil {
return nil, errors.Trace(err)
}
var handle kv.Handle

if len(rawValue) == 8 {
// primary key or unique index
var recordID int64
buf := bytes.NewBuffer(rawValue)
err = binary.Read(buf, binary.BigEndian, &recordID)
if err != nil {
return nil, errors.Trace(err)
}
handle = kv.IntHandle(recordID)
} else if len(rawValue) > 0 && rawValue[0] == tablecodec.CommonHandleFlag {
handleLen := uint16(rawValue[1])<<8 + uint16(rawValue[2])
handleEndOff := 3 + handleLen
handle, err = kv.NewCommonHandle(rawValue[3:handleEndOff])
if err != nil {
return nil, errors.Trace(err)
}
}
base.RecordID = handle
return &indexKVEntry{
baseKVEntry: base,
IndexID: indexID,
IndexValue: indexValue,
}, nil
}

const (
ddlJobListKey = "DDLJobList"
ddlAddIndexJobListKey = "DDLJobAddIdxList"
Expand Down Expand Up @@ -442,25 +331,32 @@ func datum2Column(tableInfo *model.TableInfo, datums map[int64]types.Datum, fill
}

func (m *mounterImpl) mountRowKVEntry(tableInfo *model.TableInfo, row *rowKVEntry, dataSize int64) (*model.RowChangedEvent, error) {
// if m.enableOldValue == true, go into this function
// if m.enableNewValue == false and row.Delete == false, go into this function
// if m.enableNewValue == false and row.Delete == true and use explict row id, go into this function
// only if m.enableNewValue == false and row.Delete == true and use implicit row id(_tidb_rowid), skip this function
useImplicitTiDBRowID := !tableInfo.PKIsHandle && !tableInfo.IsCommonHandle
if !m.enableOldValue && row.Delete && useImplicitTiDBRowID {
return nil, nil
}

var err error
// Decode previous columns.
var preCols []*model.Column
// Since we now always use old value internally,
// we need to control the output(sink will use the PreColumns field to determine whether to output old value).
// Normally old value is output when only enableOldValue is on,
// but for the Delete event, when the old value feature is off,
// the HandleKey column needs to be included as well. So we need to do the following filtering.
if row.PreRowExist {
// FIXME(leoppro): using pre table info to mounter pre column datum
// the pre column and current column in one event may using different table info
preCols, err = datum2Column(tableInfo, row.PreRow, m.enableOldValue)
if err != nil {
return nil, errors.Trace(err)
}

// NOTICE: When the old Value feature is off,
// the Delete event only needs to keep the handle key column.
if row.Delete && !m.enableOldValue {
for i := range preCols {
col := preCols[i]
if col != nil && !col.Flag.IsHandleKey() {
preCols[i] = nil
}
}
}
}

var cols []*model.Column
Expand All @@ -477,11 +373,20 @@ func (m *mounterImpl) mountRowKVEntry(tableInfo *model.TableInfo, row *rowKVEntr
if row.RecordID.IsInt() {
intRowID = row.RecordID.IntValue()
}

var tableInfoVersion uint64
// Align with the old format if old value disabled.
if row.Delete && !m.enableOldValue {
tableInfoVersion = 0
} else {
tableInfoVersion = tableInfo.TableInfoVersion
}

return &model.RowChangedEvent{
StartTs: row.StartTs,
CommitTs: row.CRTs,
RowID: intRowID,
TableInfoVersion: tableInfo.TableInfoVersion,
TableInfoVersion: tableInfoVersion,
Table: &model.TableName{
Schema: schemaName,
Table: tableName,
Expand All @@ -495,68 +400,6 @@ func (m *mounterImpl) mountRowKVEntry(tableInfo *model.TableInfo, row *rowKVEntr
}, nil
}

func (m *mounterImpl) mountIndexKVEntry(tableInfo *model.TableInfo, idx *indexKVEntry, dataSize int64) (*model.RowChangedEvent, error) {
// skip set index KV
if !idx.Delete || m.enableOldValue {
return nil, nil
}
// skip any index that is not the handle
if idx.IndexID != tableInfo.HandleIndexID {
return nil, nil
}

indexInfo, exist := tableInfo.GetIndexInfo(idx.IndexID)
if !exist {
log.Warn("index info not found", zap.Int64("indexID", idx.IndexID))
return nil, nil
}

if !tableInfo.IsIndexUnique(indexInfo) {
return nil, nil
}

err := idx.unflatten(tableInfo, m.tz)
if err != nil {
return nil, errors.Trace(err)
}

preCols := make([]*model.Column, len(tableInfo.RowColumnsOffset))
for i, idxCol := range indexInfo.Columns {
colInfo := tableInfo.Columns[idxCol.Offset]
value, warn, err := formatColVal(idx.IndexValue[i], colInfo.Tp)
if err != nil {
return nil, errors.Trace(err)
}
if warn != "" {
log.Warn(warn, zap.String("table", tableInfo.TableName.String()), zap.String("column", colInfo.Name.String()))
}
preCols[tableInfo.RowColumnsOffset[colInfo.ID]] = &model.Column{
Name: colInfo.Name.O,
Type: colInfo.Tp,
Value: value,
Flag: tableInfo.ColumnsFlag[colInfo.ID],
}
}
var intRowID int64
if idx.RecordID != nil && idx.RecordID.IsInt() {
intRowID = idx.RecordID.IntValue()
}
return &model.RowChangedEvent{
StartTs: idx.StartTs,
CommitTs: idx.CRTs,
RowID: intRowID,
Table: &model.TableName{
Schema: tableInfo.TableName.Schema,
Table: tableInfo.TableName.Table,
TableID: idx.PhysicalTableID,
IsPartition: tableInfo.GetPartitionInfo() != nil,
},
PreColumns: preCols,
IndexColumns: tableInfo.IndexColumnsOffset,
ApproximateSize: dataSize,
}, nil
}

var emptyBytes = make([]byte, 0)

func formatColVal(datum types.Datum, tp byte) (value interface{}, warn string, err error) {
Expand Down
2 changes: 1 addition & 1 deletion cdc/entry/mounter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -385,7 +385,7 @@ func walkTableSpanInStore(c *check.C, store tidbkv.Storage, tableID int64, f fun
txn, err := store.Begin()
c.Assert(err, check.IsNil)
defer txn.Rollback() //nolint:errcheck
tableSpan := regionspan.GetTableSpan(tableID, false)
tableSpan := regionspan.GetTableSpan(tableID)
kvIter, err := txn.Iter(tableSpan.Start, tableSpan.End)
c.Assert(err, check.IsNil)
defer kvIter.Close()
Expand Down
2 changes: 2 additions & 0 deletions cdc/model/schema_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,8 @@ func WrapTableInfo(schemaID int64, schemaName string, version uint64, info *mode
return ti
}

// TODO(hi-rustin): After we don't need to subscribe index update,
// findHandleIndex may be not necessary any more.
func (ti *TableInfo) findHandleIndex() {
if ti.HandleIndexID == HandleIndexPKIsHandle {
// pk is handle
Expand Down
Loading

0 comments on commit 7bba9f7

Please sign in to comment.