Skip to content

Commit

Permalink
lightning: make table in BaseKVEncoder private and provide some wra…
Browse files Browse the repository at this point in the history
…pped methods (#54673)

ref #54397
  • Loading branch information
lcwangchao authored Jul 18, 2024
1 parent e858f55 commit 26378cb
Show file tree
Hide file tree
Showing 8 changed files with 67 additions and 59 deletions.
6 changes: 3 additions & 3 deletions pkg/executor/importer/kv_encode.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ func (en *tableKVEncoder) GetColumnSize() map[int64]int64 {
sessionVars := en.SessionCtx.GetSessionVars()
sessionVars.TxnCtxMu.Lock()
defer sessionVars.TxnCtxMu.Unlock()
return sessionVars.TxnCtx.TableDeltaMap[en.Table.Meta().ID].ColSize
return sessionVars.TxnCtx.TableDeltaMap[en.TableMeta().ID].ColSize
}

// todo merge with code in load_data.go
Expand Down Expand Up @@ -197,12 +197,12 @@ func (en *tableKVEncoder) fillRow(row []types.Datum, hasValue []bool, rowID int6
record = append(record, value)
}

if common.TableHasAutoRowID(en.Table.Meta()) {
if common.TableHasAutoRowID(en.TableMeta()) {
rowValue := rowID
newRowID := en.AutoIDFn(rowID)
value = types.NewIntDatum(newRowID)
record = append(record, value)
alloc := en.Table.Allocators(en.SessionCtx.GetTableCtx()).Get(autoid.RowIDAllocType)
alloc := en.TableAllocators().Get(autoid.RowIDAllocType)
if err := alloc.Rebase(context.Background(), rowValue, false); err != nil {
return nil, errors.Trace(err)
}
Expand Down
30 changes: 23 additions & 7 deletions pkg/lightning/backend/kv/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/tidb/br/pkg/logutil"
"github.com/pingcap/tidb/pkg/expression"
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/lightning/backend/encode"
"github.com/pingcap/tidb/pkg/lightning/common"
"github.com/pingcap/tidb/pkg/lightning/log"
Expand Down Expand Up @@ -122,7 +123,7 @@ func (row RowArrayMarshaller) MarshalLogArray(encoder zapcore.ArrayEncoder) erro
type BaseKVEncoder struct {
GenCols []GeneratedCol
SessionCtx *Session
Table table.Table
table table.Table
Columns []*table.Column
AutoRandomColID int64
// convert auto id for shard rowid or auto random id base on row id generated by lightning
Expand Down Expand Up @@ -169,7 +170,7 @@ func NewBaseKVEncoder(config *encode.EncodingConfig) (*BaseKVEncoder, error) {
return &BaseKVEncoder{
GenCols: genCols,
SessionCtx: se,
Table: config.Table,
table: config.Table,
Columns: cols,
AutoRandomColID: autoRandomColID,
AutoIDFn: autoIDFn,
Expand All @@ -187,7 +188,7 @@ func (e *BaseKVEncoder) GetOrCreateRecord() []types.Datum {

// Record2KV converts a row into a KV pair.
func (e *BaseKVEncoder) Record2KV(record, originalRow []types.Datum, rowID int64) (*Pairs, error) {
_, err := e.Table.AddRecord(e.SessionCtx.GetTableCtx(), record)
_, err := e.AddRecord(record)
if err != nil {
e.logger.Error("kv encode failed",
zap.Array("originalRow", RowArrayMarshaller(originalRow)),
Expand All @@ -205,6 +206,21 @@ func (e *BaseKVEncoder) Record2KV(record, originalRow []types.Datum, rowID int64
return kvPairs, nil
}

// AddRecord adds a record into encoder
func (e *BaseKVEncoder) AddRecord(record []types.Datum) (kv.Handle, error) {
return e.table.AddRecord(e.SessionCtx.GetTableCtx(), record)
}

// TableAllocators returns the allocators of the table
func (e *BaseKVEncoder) TableAllocators() autoid.Allocators {
return e.table.Allocators(e.SessionCtx.GetTableCtx())
}

// TableMeta returns the meta of the table
func (e *BaseKVEncoder) TableMeta() *model.TableInfo {
return e.table.Meta()
}

// ProcessColDatum processes the datum of a column.
func (e *BaseKVEncoder) ProcessColDatum(col *table.Column, rowID int64, inputDatum *types.Datum) (types.Datum, error) {
value, err := e.getActualDatum(col, rowID, inputDatum)
Expand All @@ -213,17 +229,17 @@ func (e *BaseKVEncoder) ProcessColDatum(col *table.Column, rowID int64, inputDat
}

if e.IsAutoRandomCol(col.ToInfo()) {
meta := e.Table.Meta()
meta := e.table.Meta()
shardFmt := autoid.NewShardIDFormat(&col.FieldType, meta.AutoRandomBits, meta.AutoRandomRangeBits)
// this allocator is the same as the allocator in table importer, i.e. PanickingAllocators. below too.
alloc := e.Table.Allocators(e.SessionCtx.GetTableCtx()).Get(autoid.AutoRandomType)
alloc := e.TableAllocators().Get(autoid.AutoRandomType)
if err := alloc.Rebase(context.Background(), value.GetInt64()&shardFmt.IncrementalMask(), false); err != nil {
return value, errors.Trace(err)
}
}
if IsAutoIncCol(col.ToInfo()) {
// same as RowIDAllocType, since SepAutoInc is always false when initializing allocators of Table.
alloc := e.Table.Allocators(e.SessionCtx.GetTableCtx()).Get(autoid.AutoIncrementType)
alloc := e.TableAllocators().Get(autoid.AutoIncrementType)
if err := alloc.Rebase(context.Background(), GetAutoRecordID(value, &col.FieldType), false); err != nil {
return value, errors.Trace(err)
}
Expand Down Expand Up @@ -285,7 +301,7 @@ func (e *BaseKVEncoder) getActualDatum(col *table.Column, rowID int64, inputDatu

// IsAutoRandomCol checks if the column is auto random column.
func (e *BaseKVEncoder) IsAutoRandomCol(col *model.ColumnInfo) bool {
return e.Table.Meta().ContainsAutoRandomBits() && col.ID == e.AutoRandomColID
return e.table.Meta().ContainsAutoRandomBits() && col.ID == e.AutoRandomColID
}

// EvalGeneratedColumns evaluates the generated columns.
Expand Down
4 changes: 2 additions & 2 deletions pkg/lightning/backend/kv/sql2kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ func (kvcodec *tableKVEncoder) Encode(row []types.Datum,
record = append(record, value)
}

if common.TableHasAutoRowID(kvcodec.Table.Meta()) {
if common.TableHasAutoRowID(kvcodec.table.Meta()) {
rowValue := rowID
j := columnPermutation[len(kvcodec.Columns)]
if j >= 0 && j < len(row) {
Expand All @@ -261,7 +261,7 @@ func (kvcodec *tableKVEncoder) Encode(row []types.Datum,
return nil, kvcodec.LogKVConvertFailed(row, j, ExtraHandleColumnInfo, err)
}
record = append(record, value)
alloc := kvcodec.Table.Allocators(kvcodec.SessionCtx.GetTableCtx()).Get(autoid.RowIDAllocType)
alloc := kvcodec.TableAllocators().Get(autoid.RowIDAllocType)
if err := alloc.Rebase(context.Background(), rowValue, false); err != nil {
return nil, errors.Trace(err)
}
Expand Down
7 changes: 3 additions & 4 deletions pkg/lightning/backend/local/duplicate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,12 +114,11 @@ func buildTableForTestConvertToErrFoundConflictRecords(t *testing.T, node []ast.
types.NewStringDatum("3.csv"),
types.NewIntDatum(103),
}
tctx := encoder.SessionCtx.GetTableCtx()
_, err = encoder.Table.AddRecord(tctx, data1)
_, err = encoder.AddRecord(data1)
require.NoError(t, err)
_, err = encoder.Table.AddRecord(tctx, data2)
_, err = encoder.AddRecord(data2)
require.NoError(t, err)
_, err = encoder.Table.AddRecord(tctx, data3)
_, err = encoder.AddRecord(data3)
require.NoError(t, err)
return tbl, encoder.SessionCtx.TakeKvPairs()
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/lightning/errormanager/errormanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -620,7 +620,7 @@ func (em *ErrorManager) ReplaceConflictKeys(
// for nonclustered PK, need to append handle to decodedData for AddRecord
decodedData = append(decodedData, types.NewIntDatum(overwrittenHandle.IntValue()))
}
_, err = encoder.Table.AddRecord(encoder.SessionCtx.GetTableCtx(), decodedData)
_, err = encoder.AddRecord(decodedData)
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -800,7 +800,7 @@ func (em *ErrorManager) ReplaceConflictKeys(
// for nonclustered PK, need to append handle to decodedData for AddRecord
decodedData = append(decodedData, types.NewIntDatum(handle.IntValue()))
}
_, err = encoder.Table.AddRecord(encoder.SessionCtx.GetTableCtx(), decodedData)
_, err = encoder.AddRecord(decodedData)
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -829,7 +829,7 @@ func (em *ErrorManager) ReplaceConflictKeys(
// for nonclustered PK, need to append handle to decodedData for AddRecord
decodedData = append(decodedData, types.NewIntDatum(handle.IntValue()))
}
_, err = encoder.Table.AddRecord(encoder.SessionCtx.GetTableCtx(), decodedData)
_, err = encoder.AddRecord(decodedData)
if err != nil {
return errors.Trace(err)
}
Expand Down
22 changes: 10 additions & 12 deletions pkg/lightning/errormanager/errormanager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,16 +258,15 @@ func TestReplaceConflictOneKey(t *testing.T) {
types.NewIntDatum(4),
types.NewStringDatum("5.csv"),
}
tctx := encoder.SessionCtx.GetTableCtx()
_, err = encoder.Table.AddRecord(tctx, data1)
_, err = encoder.AddRecord(data1)
require.NoError(t, err)
_, err = encoder.Table.AddRecord(tctx, data2)
_, err = encoder.AddRecord(data2)
require.NoError(t, err)
_, err = encoder.Table.AddRecord(tctx, data3)
_, err = encoder.AddRecord(data3)
require.NoError(t, err)
_, err = encoder.Table.AddRecord(tctx, data4)
_, err = encoder.AddRecord(data4)
require.NoError(t, err)
_, err = encoder.Table.AddRecord(tctx, data5)
_, err = encoder.AddRecord(data5)
require.NoError(t, err)
kvPairs := encoder.SessionCtx.TakeKvPairs()

Expand Down Expand Up @@ -448,16 +447,15 @@ func TestReplaceConflictOneUniqueKey(t *testing.T) {
types.NewIntDatum(4),
types.NewStringDatum("5.csv"),
}
tctx := encoder.SessionCtx.GetTableCtx()
_, err = encoder.Table.AddRecord(tctx, data1)
_, err = encoder.AddRecord(data1)
require.NoError(t, err)
_, err = encoder.Table.AddRecord(tctx, data2)
_, err = encoder.AddRecord(data2)
require.NoError(t, err)
_, err = encoder.Table.AddRecord(tctx, data3)
_, err = encoder.AddRecord(data3)
require.NoError(t, err)
_, err = encoder.Table.AddRecord(tctx, data4)
_, err = encoder.AddRecord(data4)
require.NoError(t, err)
_, err = encoder.Table.AddRecord(tctx, data5)
_, err = encoder.AddRecord(data5)
require.NoError(t, err)
kvPairs := encoder.SessionCtx.TakeKvPairs()

Expand Down
48 changes: 22 additions & 26 deletions pkg/lightning/errormanager/resolveconflict_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,20 +118,19 @@ func TestReplaceConflictMultipleKeysNonclusteredPk(t *testing.T) {
types.NewStringDatum("5.csv"),
types.NewIntDatum(7),
}
tctx := encoder.SessionCtx.GetTableCtx()
_, err = encoder.Table.AddRecord(tctx, data1)
_, err = encoder.AddRecord(data1)
require.NoError(t, err)
_, err = encoder.Table.AddRecord(tctx, data2)
_, err = encoder.AddRecord(data2)
require.NoError(t, err)
_, err = encoder.Table.AddRecord(tctx, data3)
_, err = encoder.AddRecord(data3)
require.NoError(t, err)
_, err = encoder.Table.AddRecord(tctx, data4)
_, err = encoder.AddRecord(data4)
require.NoError(t, err)
_, err = encoder.Table.AddRecord(tctx, data5)
_, err = encoder.AddRecord(data5)
require.NoError(t, err)
_, err = encoder.Table.AddRecord(tctx, data6)
_, err = encoder.AddRecord(data6)
require.NoError(t, err)
_, err = encoder.Table.AddRecord(tctx, data7)
_, err = encoder.AddRecord(data7)
require.NoError(t, err)
kvPairs := encoder.SessionCtx.TakeKvPairs()

Expand Down Expand Up @@ -321,16 +320,15 @@ func TestReplaceConflictOneKeyNonclusteredPk(t *testing.T) {
types.NewStringDatum("5.csv"),
types.NewIntDatum(5),
}
tctx := encoder.SessionCtx.GetTableCtx()
_, err = encoder.Table.AddRecord(tctx, data1)
_, err = encoder.AddRecord(data1)
require.NoError(t, err)
_, err = encoder.Table.AddRecord(tctx, data2)
_, err = encoder.AddRecord(data2)
require.NoError(t, err)
_, err = encoder.Table.AddRecord(tctx, data3)
_, err = encoder.AddRecord(data3)
require.NoError(t, err)
_, err = encoder.Table.AddRecord(tctx, data4)
_, err = encoder.AddRecord(data4)
require.NoError(t, err)
_, err = encoder.Table.AddRecord(tctx, data5)
_, err = encoder.AddRecord(data5)
require.NoError(t, err)
kvPairs := encoder.SessionCtx.TakeKvPairs()

Expand Down Expand Up @@ -490,16 +488,15 @@ func TestReplaceConflictOneUniqueKeyNonclusteredPk(t *testing.T) {
types.NewStringDatum("5.csv"),
types.NewIntDatum(5),
}
tctx := encoder.SessionCtx.GetTableCtx()
_, err = encoder.Table.AddRecord(tctx, data1)
_, err = encoder.AddRecord(data1)
require.NoError(t, err)
_, err = encoder.Table.AddRecord(tctx, data2)
_, err = encoder.AddRecord(data2)
require.NoError(t, err)
_, err = encoder.Table.AddRecord(tctx, data3)
_, err = encoder.AddRecord(data3)
require.NoError(t, err)
_, err = encoder.Table.AddRecord(tctx, data4)
_, err = encoder.AddRecord(data4)
require.NoError(t, err)
_, err = encoder.Table.AddRecord(tctx, data5)
_, err = encoder.AddRecord(data5)
require.NoError(t, err)
kvPairs := encoder.SessionCtx.TakeKvPairs()

Expand Down Expand Up @@ -697,16 +694,15 @@ func TestReplaceConflictOneUniqueKeyNonclusteredVarcharPk(t *testing.T) {
types.NewStringDatum("5.csv"),
types.NewIntDatum(5),
}
tctx := encoder.SessionCtx.GetTableCtx()
_, err = encoder.Table.AddRecord(tctx, data1)
_, err = encoder.AddRecord(data1)
require.NoError(t, err)
_, err = encoder.Table.AddRecord(tctx, data2)
_, err = encoder.AddRecord(data2)
require.NoError(t, err)
_, err = encoder.Table.AddRecord(tctx, data3)
_, err = encoder.AddRecord(data3)
require.NoError(t, err)
_, err = encoder.Table.AddRecord(tctx, data4)
_, err = encoder.AddRecord(data4)
require.NoError(t, err)
_, err = encoder.Table.AddRecord(tctx, data5)
_, err = encoder.AddRecord(data5)
require.NoError(t, err)
kvPairs := encoder.SessionCtx.TakeKvPairs()

Expand Down
3 changes: 1 addition & 2 deletions pkg/table/tables/index_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,8 +196,7 @@ func TestGenIndexValueFromIndex(t *testing.T) {
types.NewIntDatum(23),
types.NewStringDatum("4.csv"),
}
tctx := encoder.SessionCtx.GetTableCtx()
_, err = encoder.Table.AddRecord(tctx, data1)
_, err = encoder.AddRecord(data1)
require.NoError(t, err)
kvPairs := encoder.SessionCtx.TakeKvPairs()

Expand Down

0 comments on commit 26378cb

Please sign in to comment.