Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

puller,mounter,processor: always pull the old value internally #2271

Merged
merged 23 commits into from
Jul 16, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 0 additions & 19 deletions cdc/entry/codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,19 +31,16 @@ import (
var (
tablePrefix = []byte{'t'}
recordPrefix = []byte("_r")
indexPrefix = []byte("_i")
metaPrefix = []byte("m")
)

var (
intLen = 8
tablePrefixLen = len(tablePrefix)
recordPrefixLen = len(recordPrefix)
indexPrefixLen = len(indexPrefix)
metaPrefixLen = len(metaPrefix)
prefixTableIDLen = tablePrefixLen + intLen /*tableID*/
prefixRecordIDLen = recordPrefixLen + intLen /*recordID*/
prefixIndexLen = indexPrefixLen + intLen /*indexID*/
)

// MetaType is for data structure meta/data flag.
Expand Down Expand Up @@ -120,22 +117,6 @@ func decodeRecordID(key []byte) (rest []byte, recordID int64, err error) {
return
}

func decodeIndexKey(key []byte) (indexID int64, indexValue []types.Datum, err error) {
if len(key) < prefixIndexLen || !bytes.HasPrefix(key, indexPrefix) {
return 0, nil, cerror.ErrInvalidRecordKey.GenWithStackByArgs(key)
}
key = key[indexPrefixLen:]
key, indexID, err = codec.DecodeInt(key)
if err != nil {
return 0, nil, cerror.WrapError(cerror.ErrCodecDecode, err)
}
indexValue, err = codec.Decode(key, 2)
if err != nil {
return 0, nil, cerror.WrapError(cerror.ErrCodecDecode, err)
}
return
}

func decodeMetaKey(ek []byte) (meta, error) {
if !bytes.HasPrefix(ek, metaPrefix) {
return nil, cerror.ErrInvalidRecordKey.GenWithStackByArgs(ek)
Expand Down
211 changes: 27 additions & 184 deletions cdc/entry/mounter.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ package entry
import (
"bytes"
"context"
"encoding/binary"
"encoding/json"
"fmt"
"math"
Expand Down Expand Up @@ -64,66 +63,6 @@ type rowKVEntry struct {
PreRowExist bool
}

type indexKVEntry struct {
baseKVEntry
IndexID int64
IndexValue []types.Datum
}

func (idx *indexKVEntry) unflatten(tableInfo *model.TableInfo, tz *time.Location) error {
if tableInfo.ID != idx.PhysicalTableID {
isPartition := false
if pi := tableInfo.GetPartitionInfo(); pi != nil {
for _, p := range pi.Definitions {
if p.ID == idx.PhysicalTableID {
isPartition = true
break
}
}
}
if !isPartition {
return cerror.ErrWrongTableInfo.GenWithStackByArgs(tableInfo.ID, idx.PhysicalTableID)
}
}
index, exist := tableInfo.GetIndexInfo(idx.IndexID)
if !exist {
return cerror.ErrIndexKeyTableNotFound.GenWithStackByArgs(idx.IndexID)
}
if !isDistinct(index, idx.IndexValue) {
idx.RecordID = idx.baseKVEntry.RecordID
if idx.baseKVEntry.RecordID.IsInt() {
idx.IndexValue = idx.IndexValue[:len(idx.IndexValue)-1]
} else {
idx.IndexValue = idx.IndexValue[:len(idx.IndexValue)-idx.RecordID.NumCols()]
}
}
for i, v := range idx.IndexValue {
colOffset := index.Columns[i].Offset
fieldType := &tableInfo.Columns[colOffset].FieldType
datum, err := unflatten(v, fieldType, tz)
if err != nil {
return errors.Trace(err)
}
idx.IndexValue[i] = datum
}
return nil
}

func isDistinct(index *timodel.IndexInfo, indexValue []types.Datum) bool {
if index.Primary {
return true
}
if index.Unique {
for _, value := range indexValue {
if value.IsNull() {
return false
}
}
return true
}
return false
}

// Mounter is used to parse SQL events from KV events
type Mounter interface {
Run(ctx context.Context) error
Expand Down Expand Up @@ -258,8 +197,7 @@ func (m *mounterImpl) unmarshalAndMountRowChanged(ctx context.Context, raw *mode
}
return nil, cerror.ErrSnapshotTableNotFound.GenWithStackByArgs(physicalTableID)
}
switch {
case bytes.HasPrefix(key, recordPrefix):
if bytes.HasPrefix(key, recordPrefix) {
rowKV, err := m.unmarshalRowKVEntry(tableInfo, raw.Key, raw.Value, raw.OldValue, baseInfo)
if err != nil {
return nil, errors.Trace(err)
Expand All @@ -268,15 +206,6 @@ func (m *mounterImpl) unmarshalAndMountRowChanged(ctx context.Context, raw *mode
return nil, nil
}
return m.mountRowKVEntry(tableInfo, rowKV, raw.ApproximateSize())
case bytes.HasPrefix(key, indexPrefix):
indexKV, err := m.unmarshalIndexKVEntry(key, raw.Value, raw.OldValue, baseInfo)
if err != nil {
return nil, errors.Trace(err)
}
if indexKV == nil {
return nil, nil
}
return m.mountIndexKVEntry(tableInfo, indexKV, raw.ApproximateSize())
}
return nil, nil
}()
Expand Down Expand Up @@ -331,46 +260,6 @@ func (m *mounterImpl) unmarshalRowKVEntry(tableInfo *model.TableInfo, rawKey []b
}, nil
}

func (m *mounterImpl) unmarshalIndexKVEntry(restKey []byte, rawValue []byte, rawOldValue []byte, base baseKVEntry) (*indexKVEntry, error) {
// Skip set index KV.
// By default we cannot get the old value of a deleted row, then we must get the value of unique key
// or primary key for seeking the deleted row through its index key.
// After the old value was enabled, we can skip the index key.
if !base.Delete || m.enableOldValue {
return nil, nil
}

indexID, indexValue, err := decodeIndexKey(restKey)
if err != nil {
return nil, errors.Trace(err)
}
var handle kv.Handle

if len(rawValue) == 8 {
// primary key or unique index
var recordID int64
buf := bytes.NewBuffer(rawValue)
err = binary.Read(buf, binary.BigEndian, &recordID)
if err != nil {
return nil, errors.Trace(err)
}
handle = kv.IntHandle(recordID)
} else if len(rawValue) > 0 && rawValue[0] == tablecodec.CommonHandleFlag {
handleLen := uint16(rawValue[1])<<8 + uint16(rawValue[2])
handleEndOff := 3 + handleLen
handle, err = kv.NewCommonHandle(rawValue[3:handleEndOff])
if err != nil {
return nil, errors.Trace(err)
}
}
base.RecordID = handle
return &indexKVEntry{
baseKVEntry: base,
IndexID: indexID,
IndexValue: indexValue,
}, nil
}

const (
ddlJobListKey = "DDLJobList"
ddlAddIndexJobListKey = "DDLJobAddIdxList"
Expand Down Expand Up @@ -442,25 +331,32 @@ func datum2Column(tableInfo *model.TableInfo, datums map[int64]types.Datum, fill
}

func (m *mounterImpl) mountRowKVEntry(tableInfo *model.TableInfo, row *rowKVEntry, dataSize int64) (*model.RowChangedEvent, error) {
// if m.enableOldValue == true, go into this function
// if m.enableNewValue == false and row.Delete == false, go into this function
// if m.enableNewValue == false and row.Delete == true and use explict row id, go into this function
// only if m.enableNewValue == false and row.Delete == true and use implicit row id(_tidb_rowid), skip this function
useImplicitTiDBRowID := !tableInfo.PKIsHandle && !tableInfo.IsCommonHandle
if !m.enableOldValue && row.Delete && useImplicitTiDBRowID {
return nil, nil
}

var err error
// Decode previous columns.
var preCols []*model.Column
// Since we now always use old value internally,
// we need to control the output(sink will use the PreColumns field to determine whether to output old value).
// Normally old value is output when only enableOldValue is on,
// but for the Delete event, when the old value feature is off,
// the HandleKey column needs to be included as well. So we need to do the following filtering.
if row.PreRowExist {
// FIXME(leoppro): using pre table info to mounter pre column datum
// the pre column and current column in one event may using different table info
Rustin170506 marked this conversation as resolved.
Show resolved Hide resolved
preCols, err = datum2Column(tableInfo, row.PreRow, m.enableOldValue)
if err != nil {
return nil, errors.Trace(err)
}

// NOTICE: When the old Value feature is off,
// the Delete event only needs to keep the handle key column.
if row.Delete && !m.enableOldValue {
for i := range preCols {
col := preCols[i]
if col != nil && !col.Flag.IsHandleKey() {
preCols[i] = nil
}
}
}
}

var cols []*model.Column
Expand All @@ -477,11 +373,20 @@ func (m *mounterImpl) mountRowKVEntry(tableInfo *model.TableInfo, row *rowKVEntr
if row.RecordID.IsInt() {
intRowID = row.RecordID.IntValue()
}

var tableInfoVersion uint64
// Align with the old format if old value disabled.
if row.Delete && !m.enableOldValue {
tableInfoVersion = 0
} else {
tableInfoVersion = tableInfo.TableInfoVersion
}

return &model.RowChangedEvent{
StartTs: row.StartTs,
CommitTs: row.CRTs,
RowID: intRowID,
TableInfoVersion: tableInfo.TableInfoVersion,
TableInfoVersion: tableInfoVersion,
Table: &model.TableName{
Schema: schemaName,
Table: tableName,
Expand All @@ -495,68 +400,6 @@ func (m *mounterImpl) mountRowKVEntry(tableInfo *model.TableInfo, row *rowKVEntr
}, nil
}

func (m *mounterImpl) mountIndexKVEntry(tableInfo *model.TableInfo, idx *indexKVEntry, dataSize int64) (*model.RowChangedEvent, error) {
// skip set index KV
if !idx.Delete || m.enableOldValue {
return nil, nil
}
// skip any index that is not the handle
if idx.IndexID != tableInfo.HandleIndexID {
return nil, nil
}

indexInfo, exist := tableInfo.GetIndexInfo(idx.IndexID)
if !exist {
log.Warn("index info not found", zap.Int64("indexID", idx.IndexID))
return nil, nil
}

if !tableInfo.IsIndexUnique(indexInfo) {
return nil, nil
}

err := idx.unflatten(tableInfo, m.tz)
if err != nil {
return nil, errors.Trace(err)
}

preCols := make([]*model.Column, len(tableInfo.RowColumnsOffset))
for i, idxCol := range indexInfo.Columns {
colInfo := tableInfo.Columns[idxCol.Offset]
value, warn, err := formatColVal(idx.IndexValue[i], colInfo.Tp)
if err != nil {
return nil, errors.Trace(err)
}
if warn != "" {
log.Warn(warn, zap.String("table", tableInfo.TableName.String()), zap.String("column", colInfo.Name.String()))
}
preCols[tableInfo.RowColumnsOffset[colInfo.ID]] = &model.Column{
Name: colInfo.Name.O,
Type: colInfo.Tp,
Value: value,
Flag: tableInfo.ColumnsFlag[colInfo.ID],
}
}
var intRowID int64
if idx.RecordID != nil && idx.RecordID.IsInt() {
intRowID = idx.RecordID.IntValue()
}
return &model.RowChangedEvent{
StartTs: idx.StartTs,
CommitTs: idx.CRTs,
RowID: intRowID,
Table: &model.TableName{
Schema: tableInfo.TableName.Schema,
Table: tableInfo.TableName.Table,
TableID: idx.PhysicalTableID,
IsPartition: tableInfo.GetPartitionInfo() != nil,
},
PreColumns: preCols,
IndexColumns: tableInfo.IndexColumnsOffset,
ApproximateSize: dataSize,
}, nil
}

var emptyBytes = make([]byte, 0)

func formatColVal(datum types.Datum, tp byte) (value interface{}, warn string, err error) {
Expand Down
2 changes: 1 addition & 1 deletion cdc/entry/mounter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -385,7 +385,7 @@ func walkTableSpanInStore(c *check.C, store tidbkv.Storage, tableID int64, f fun
txn, err := store.Begin()
c.Assert(err, check.IsNil)
defer txn.Rollback() //nolint:errcheck
tableSpan := regionspan.GetTableSpan(tableID, false)
tableSpan := regionspan.GetTableSpan(tableID)
kvIter, err := txn.Iter(tableSpan.Start, tableSpan.End)
c.Assert(err, check.IsNil)
defer kvIter.Close()
Expand Down
2 changes: 2 additions & 0 deletions cdc/model/schema_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,8 @@ func WrapTableInfo(schemaID int64, schemaName string, version uint64, info *mode
return ti
}

// TODO(hi-rustin): After we don't need to subscribe index update,
// findHandleIndex may be not necessary any more.
func (ti *TableInfo) findHandleIndex() {
if ti.HandleIndexID == HandleIndexPKIsHandle {
// pk is handle
Expand Down
7 changes: 4 additions & 3 deletions cdc/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -811,16 +811,17 @@ func (p *oldProcessor) addTable(ctx context.Context, tableID int64, replicaInfo

startPuller := func(tableID model.TableID, pResolvedTs *uint64, pCheckpointTs *uint64) sink.Sink {
// start table puller
enableOldValue := p.changefeed.Config.EnableOldValue
span := regionspan.GetTableSpan(tableID, enableOldValue)
span := regionspan.GetTableSpan(tableID)
kvStorage, err := util.KVStorageFromCtx(ctx)
if err != nil {
p.sendError(err)
return nil
}
// NOTICE: always pull the old value internally
// See also: TODO(hi-rustin): add issue link here.
Rustin170506 marked this conversation as resolved.
Show resolved Hide resolved
plr := puller.NewPuller(ctx, p.pdCli, p.credential, kvStorage,
replicaInfo.StartTs, []regionspan.Span{span}, p.limitter,
enableOldValue)
true)
go func() {
err := plr.Run(ctx)
if errors.Cause(err) != context.Canceled {
Expand Down
9 changes: 5 additions & 4 deletions cdc/processor/pipeline/puller.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,23 +54,24 @@ func (n *pullerNode) tableSpan(ctx cdcContext.Context) []regionspan.Span {
// start table puller
config := ctx.ChangefeedVars().Info.Config
spans := make([]regionspan.Span, 0, 4)
spans = append(spans, regionspan.GetTableSpan(n.tableID, config.EnableOldValue))
spans = append(spans, regionspan.GetTableSpan(n.tableID))

if config.Cyclic.IsEnabled() && n.replicaInfo.MarkTableID != 0 {
spans = append(spans, regionspan.GetTableSpan(n.replicaInfo.MarkTableID, config.EnableOldValue))
spans = append(spans, regionspan.GetTableSpan(n.replicaInfo.MarkTableID))
}
return spans
}

func (n *pullerNode) Init(ctx pipeline.NodeContext) error {
metricTableResolvedTsGauge := tableResolvedTsGauge.WithLabelValues(ctx.ChangefeedVars().ID, ctx.GlobalVars().CaptureInfo.AdvertiseAddr, n.tableName)
globalConfig := config.GetGlobalServerConfig()
config := ctx.ChangefeedVars().Info.Config
ctxC, cancel := context.WithCancel(ctx)
ctxC = util.PutTableInfoInCtx(ctxC, n.tableID, n.tableName)
ctxC = util.PutChangefeedIDInCtx(ctxC, ctx.ChangefeedVars().ID)
// NOTICE: always pull the old value internally
// See also: TODO(hi-rustin): add issue link here.
plr := puller.NewPuller(ctxC, ctx.GlobalVars().PDClient, globalConfig.Security, ctx.GlobalVars().KVStorage,
n.replicaInfo.StartTs, n.tableSpan(ctx), n.limitter, config.EnableOldValue)
n.replicaInfo.StartTs, n.tableSpan(ctx), n.limitter, true)
Rustin170506 marked this conversation as resolved.
Show resolved Hide resolved
n.wg.Go(func() error {
ctx.Throw(errors.Trace(plr.Run(ctxC)))
return nil
Expand Down
Loading