From 6e022a5c31919ee473fac921bf994131a2da0ba4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=8E=8B=E8=B6=85?= Date: Thu, 1 Feb 2024 17:01:54 +0800 Subject: [PATCH] table: introduce `MutateContext` and `AllocatorContext` for `table.Table` (#50862) close pingcap/tidb#50861 --- br/pkg/lightning/backend/kv/BUILD.bazel | 1 - br/pkg/lightning/backend/kv/base.go | 4 +- br/pkg/lightning/backend/kv/sql2kv.go | 2 +- br/pkg/lightning/backend/kv/sql2kv_test.go | 15 +-- pkg/ddl/ddl_api.go | 4 +- pkg/ddl/table.go | 2 +- pkg/errctx/context.go | 6 +- pkg/executor/executor.go | 2 +- .../importer/importer_testkit_test.go | 2 +- pkg/executor/importer/kv_encode.go | 2 +- pkg/executor/infoschema_reader.go | 2 +- pkg/executor/insert_common.go | 16 ++- pkg/executor/show.go | 2 +- pkg/executor/test/ddl/ddl_test.go | 2 +- pkg/executor/write.go | 6 +- pkg/expression/simple_rewriter.go | 9 +- pkg/infoschema/tables.go | 16 +-- pkg/planner/cardinality/trace.go | 3 +- pkg/planner/core/common_plans.go | 2 +- pkg/planner/core/find_best_task.go | 6 +- pkg/planner/core/physical_plans.go | 2 +- pkg/planner/core/rule_partition_processor.go | 16 ++- pkg/planner/util/path.go | 5 +- pkg/table/BUILD.bazel | 2 + pkg/table/column.go | 11 +- pkg/table/index.go | 5 +- pkg/table/table.go | 42 ++++-- pkg/table/tables/cache.go | 9 +- pkg/table/tables/index.go | 5 +- pkg/table/tables/partition.go | 84 ++++++------ pkg/table/tables/tables.go | 127 +++++++++--------- pkg/table/tables/tables_test.go | 2 +- pkg/util/ranger/types.go | 14 +- 33 files changed, 234 insertions(+), 194 deletions(-) diff --git a/br/pkg/lightning/backend/kv/BUILD.bazel b/br/pkg/lightning/backend/kv/BUILD.bazel index a15f31d90f2e8..c8d8a7eefb28f 100644 --- a/br/pkg/lightning/backend/kv/BUILD.bazel +++ b/br/pkg/lightning/backend/kv/BUILD.bazel @@ -70,7 +70,6 @@ go_test( "//pkg/parser/model", "//pkg/parser/mysql", "//pkg/planner/core", - "//pkg/sessionctx", "//pkg/sessionctx/variable", "//pkg/table", "//pkg/table/tables", diff --git a/br/pkg/lightning/backend/kv/base.go b/br/pkg/lightning/backend/kv/base.go index 1dceebf90e568..5b2916261d45f 100644 --- a/br/pkg/lightning/backend/kv/base.go +++ b/br/pkg/lightning/backend/kv/base.go @@ -218,14 +218,14 @@ func (e *BaseKVEncoder) ProcessColDatum(col *table.Column, rowID int64, inputDat meta := e.Table.Meta() shardFmt := autoid.NewShardIDFormat(&col.FieldType, meta.AutoRandomBits, meta.AutoRandomRangeBits) // this allocator is the same as the allocator in table importer, i.e. PanickingAllocators. below too. - alloc := e.Table.Allocators(e.SessionCtx).Get(autoid.AutoRandomType) + alloc := e.Table.Allocators(e.SessionCtx.GetSessionVars()).Get(autoid.AutoRandomType) if err := alloc.Rebase(context.Background(), value.GetInt64()&shardFmt.IncrementalMask(), false); err != nil { return value, errors.Trace(err) } } if IsAutoIncCol(col.ToInfo()) { // same as RowIDAllocType, since SepAutoInc is always false when initializing allocators of Table. - alloc := e.Table.Allocators(e.SessionCtx).Get(autoid.AutoIncrementType) + alloc := e.Table.Allocators(e.SessionCtx.GetSessionVars()).Get(autoid.AutoIncrementType) if err := alloc.Rebase(context.Background(), GetAutoRecordID(value, &col.FieldType), false); err != nil { return value, errors.Trace(err) } diff --git a/br/pkg/lightning/backend/kv/sql2kv.go b/br/pkg/lightning/backend/kv/sql2kv.go index 1d3ef8b1d2038..afe49e7ce37ba 100644 --- a/br/pkg/lightning/backend/kv/sql2kv.go +++ b/br/pkg/lightning/backend/kv/sql2kv.go @@ -227,7 +227,7 @@ func (kvcodec *tableKVEncoder) Encode(row []types.Datum, return nil, kvcodec.LogKVConvertFailed(row, j, ExtraHandleColumnInfo, err) } record = append(record, value) - alloc := kvcodec.Table.Allocators(kvcodec.SessionCtx).Get(autoid.RowIDAllocType) + alloc := kvcodec.Table.Allocators(kvcodec.SessionCtx.GetSessionVars()).Get(autoid.RowIDAllocType) if err := alloc.Rebase(context.Background(), rowValue, false); err != nil { return nil, errors.Trace(err) } diff --git a/br/pkg/lightning/backend/kv/sql2kv_test.go b/br/pkg/lightning/backend/kv/sql2kv_test.go index 8bca01293d719..24bdf82c4f12d 100644 --- a/br/pkg/lightning/backend/kv/sql2kv_test.go +++ b/br/pkg/lightning/backend/kv/sql2kv_test.go @@ -33,7 +33,6 @@ import ( "github.com/pingcap/tidb/pkg/parser/model" "github.com/pingcap/tidb/pkg/parser/mysql" _ "github.com/pingcap/tidb/pkg/planner/core" // to setup expression.EvalAstExpr. Otherwise we cannot parse the default value - "github.com/pingcap/tidb/pkg/sessionctx" "github.com/pingcap/tidb/pkg/table" "github.com/pingcap/tidb/pkg/table/tables" "github.com/pingcap/tidb/pkg/tablecodec" @@ -72,7 +71,7 @@ type mockTable struct { table.Table } -func (mockTable) AddRecord(ctx sessionctx.Context, r []types.Datum, opts ...table.AddRecordOption) (recordID kv.Handle, err error) { +func (mockTable) AddRecord(ctx table.MutateContext, r []types.Datum, opts ...table.AddRecordOption) (recordID kv.Handle, err error) { return kv.IntHandle(-1), errors.New("mock error") } @@ -385,7 +384,7 @@ func TestEncodeDoubleAutoIncrement(t *testing.T) { require.NoError(t, err) require.Equal(t, pairsExpect, pairs) - require.Equal(t, tbl.Allocators(lkv.GetEncoderSe(encoder)).Get(autoid.AutoIncrementType).Base(), int64(70)) + require.Equal(t, tbl.Allocators(lkv.GetEncoderSe(encoder).GetSessionVars()).Get(autoid.AutoIncrementType).Base(), int64(70)) } func TestEncodeMissingAutoValue(t *testing.T) { @@ -446,13 +445,13 @@ func TestEncodeMissingAutoValue(t *testing.T) { }, rowID, []int{0}, 1234) require.NoError(t, err) require.Equalf(t, pairsExpect, pairs, "test table info: %+v", testTblInfo) - require.Equalf(t, rowID, tbl.Allocators(lkv.GetEncoderSe(encoder)).Get(testTblInfo.AllocType).Base(), "test table info: %+v", testTblInfo) + require.Equalf(t, rowID, tbl.Allocators(lkv.GetEncoderSe(encoder).GetSessionVars()).Get(testTblInfo.AllocType).Base(), "test table info: %+v", testTblInfo) // test insert a row without specifying the auto_xxxx column pairs, err = encoder.Encode([]types.Datum{}, rowID, []int{0}, 1234) require.NoError(t, err) require.Equalf(t, pairsExpect, pairs, "test table info: %+v", testTblInfo) - require.Equalf(t, rowID, tbl.Allocators(lkv.GetEncoderSe(encoder)).Get(testTblInfo.AllocType).Base(), "test table info: %+v", testTblInfo) + require.Equalf(t, rowID, tbl.Allocators(lkv.GetEncoderSe(encoder).GetSessionVars()).Get(testTblInfo.AllocType).Base(), "test table info: %+v", testTblInfo) } } @@ -525,7 +524,7 @@ func TestDefaultAutoRandoms(t *testing.T) { RowID: common.EncodeIntRowID(70), }, })) - require.Equal(t, tbl.Allocators(lkv.GetSession4test(encoder)).Get(autoid.AutoRandomType).Base(), int64(70)) + require.Equal(t, tbl.Allocators(lkv.GetSession4test(encoder).GetSessionVars()).Get(autoid.AutoRandomType).Base(), int64(70)) pairs, err = encoder.Encode([]types.Datum{types.NewStringDatum("")}, 71, []int{-1, 0}, 1234) require.NoError(t, err) @@ -536,7 +535,7 @@ func TestDefaultAutoRandoms(t *testing.T) { RowID: common.EncodeIntRowID(71), }, })) - require.Equal(t, tbl.Allocators(lkv.GetSession4test(encoder)).Get(autoid.AutoRandomType).Base(), int64(71)) + require.Equal(t, tbl.Allocators(lkv.GetSession4test(encoder).GetSessionVars()).Get(autoid.AutoRandomType).Base(), int64(71)) } func TestShardRowId(t *testing.T) { @@ -567,7 +566,7 @@ func TestShardRowId(t *testing.T) { keyMap[rowID>>60] = struct{}{} } require.Len(t, keyMap, 8) - require.Equal(t, tbl.Allocators(lkv.GetSession4test(encoder)).Get(autoid.RowIDAllocType).Base(), int64(32)) + require.Equal(t, tbl.Allocators(lkv.GetSession4test(encoder).GetSessionVars()).Get(autoid.RowIDAllocType).Base(), int64(32)) } func TestSplitIntoChunks(t *testing.T) { diff --git a/pkg/ddl/ddl_api.go b/pkg/ddl/ddl_api.go index e4bd784b0e6a3..7a78ba17cc259 100644 --- a/pkg/ddl/ddl_api.go +++ b/pkg/ddl/ddl_api.go @@ -3994,7 +3994,7 @@ func (d *ddl) RebaseAutoID(ctx sessionctx.Context, ident ast.Ident, newBase int6 } if !force { - newBaseTemp, err := adjustNewBaseToNextGlobalID(ctx, t, tp, newBase) + newBaseTemp, err := adjustNewBaseToNextGlobalID(ctx.GetSessionVars(), t, tp, newBase) if err != nil { return err } @@ -4021,7 +4021,7 @@ func (d *ddl) RebaseAutoID(ctx sessionctx.Context, ident ast.Ident, newBase int6 return errors.Trace(err) } -func adjustNewBaseToNextGlobalID(ctx sessionctx.Context, t table.Table, tp autoid.AllocatorType, newBase int64) (int64, error) { +func adjustNewBaseToNextGlobalID(ctx table.AllocatorContext, t table.Table, tp autoid.AllocatorType, newBase int64) (int64, error) { alloc := t.Allocators(ctx).Get(tp) if alloc == nil { return newBase, nil diff --git a/pkg/ddl/table.go b/pkg/ddl/table.go index ec2bf40392c3f..cbb508d3728d7 100644 --- a/pkg/ddl/table.go +++ b/pkg/ddl/table.go @@ -1007,7 +1007,7 @@ func verifyNoOverflowShardBits(s *sess.Pool, tbl table.Table, shardRowIDBits uin } defer s.Put(ctx) // Check next global max auto ID first. - autoIncID, err := tbl.Allocators(ctx).Get(autoid.RowIDAllocType).NextGlobalAutoID() + autoIncID, err := tbl.Allocators(ctx.GetSessionVars()).Get(autoid.RowIDAllocType).NextGlobalAutoID() if err != nil { return errors.Trace(err) } diff --git a/pkg/errctx/context.go b/pkg/errctx/context.go index 7be4c20c59ec0..35104a7de5a09 100644 --- a/pkg/errctx/context.go +++ b/pkg/errctx/context.go @@ -80,8 +80,8 @@ func (ctx *Context) WithErrGroupLevels(levels LevelMap) Context { } } -// appendWarning appends the error to warning. If the inner `warnHandler` is nil, do nothing. -func (ctx *Context) appendWarning(err error) { +// AppendWarning appends the error to warning. If the inner `warnHandler` is nil, do nothing. +func (ctx *Context) AppendWarning(err error) { intest.Assert(ctx.warnHandler != nil) if w := ctx.warnHandler; w != nil { // warnHandler should always not be nil, check fn != nil here to just make code safe. @@ -148,7 +148,7 @@ func (ctx *Context) HandleErrorWithAlias(internalErr error, err error, warnErr e case LevelError: return err case LevelWarn: - ctx.appendWarning(warnErr) + ctx.AppendWarning(warnErr) case LevelIgnore: } diff --git a/pkg/executor/executor.go b/pkg/executor/executor.go index 1d181e269fa08..351d4f1b97b32 100644 --- a/pkg/executor/executor.go +++ b/pkg/executor/executor.go @@ -298,7 +298,7 @@ func (e *ShowNextRowIDExec) Next(_ context.Context, req *chunk.Chunk) error { } tblMeta := tbl.Meta() - allocators := tbl.Allocators(e.Ctx()) + allocators := tbl.Allocators(e.Ctx().GetSessionVars()) for _, alloc := range allocators.Allocs { nextGlobalID, err := alloc.NextGlobalAutoID() if err != nil { diff --git a/pkg/executor/importer/importer_testkit_test.go b/pkg/executor/importer/importer_testkit_test.go index ac46a322ef5fe..1495b8334cbcc 100644 --- a/pkg/executor/importer/importer_testkit_test.go +++ b/pkg/executor/importer/importer_testkit_test.go @@ -256,7 +256,7 @@ func TestPostProcess(t *testing.T) { require.NoError(t, importer.PostProcess(ctx, tk.Session(), map[autoid.AllocatorType]int64{ autoid.RowIDAllocType: 123, }, plan, localChecksum, logger)) - allocators := table.Allocators(tk.Session()) + allocators := table.Allocators(tk.Session().GetSessionVars()) nextGlobalAutoID, err := allocators.Get(autoid.RowIDAllocType).NextGlobalAutoID() require.NoError(t, err) require.Equal(t, int64(124), nextGlobalAutoID) diff --git a/pkg/executor/importer/kv_encode.go b/pkg/executor/importer/kv_encode.go index f39e9c69a179d..df4d31de05c5b 100644 --- a/pkg/executor/importer/kv_encode.go +++ b/pkg/executor/importer/kv_encode.go @@ -202,7 +202,7 @@ func (en *tableKVEncoder) fillRow(row []types.Datum, hasValue []bool, rowID int6 newRowID := en.AutoIDFn(rowID) value = types.NewIntDatum(newRowID) record = append(record, value) - alloc := en.Table.Allocators(en.SessionCtx).Get(autoid.RowIDAllocType) + alloc := en.Table.Allocators(en.SessionCtx.GetSessionVars()).Get(autoid.RowIDAllocType) if err := alloc.Rebase(context.Background(), rowValue, false); err != nil { return nil, errors.Trace(err) } diff --git a/pkg/executor/infoschema_reader.go b/pkg/executor/infoschema_reader.go index 9fc2595ea77e0..585aa291f0803 100644 --- a/pkg/executor/infoschema_reader.go +++ b/pkg/executor/infoschema_reader.go @@ -229,7 +229,7 @@ func getAutoIncrementID(ctx sessionctx.Context, schema *model.DBInfo, tblInfo *m if err != nil { return 0, err } - return tbl.Allocators(ctx).Get(autoid.AutoIncrementType).Base() + 1, nil + return tbl.Allocators(ctx.GetSessionVars()).Get(autoid.AutoIncrementType).Base() + 1, nil } func hasPriv(ctx sessionctx.Context, priv mysql.PrivilegeType) bool { diff --git a/pkg/executor/insert_common.go b/pkg/executor/insert_common.go index ff6cb313afed5..447d990747e67 100644 --- a/pkg/executor/insert_common.go +++ b/pkg/executor/insert_common.go @@ -820,7 +820,8 @@ func (e *InsertValues) lazyAdjustAutoIncrementDatum(ctx context.Context, rows [] if !found { return rows, nil } - retryInfo := e.Ctx().GetSessionVars().RetryInfo + sessVars := e.Ctx().GetSessionVars() + retryInfo := sessVars.RetryInfo rowCount := len(rows) for processedIdx := 0; processedIdx < rowCount; processedIdx++ { autoDatum := rows[processedIdx][idx] @@ -835,7 +836,7 @@ func (e *InsertValues) lazyAdjustAutoIncrementDatum(ctx context.Context, rows [] } // Use the value if it's not null and not 0. if recordID != 0 { - alloc := e.Table.Allocators(e.Ctx()).Get(autoid.AutoIncrementType) + alloc := e.Table.Allocators(sessVars).Get(autoid.AutoIncrementType) err = alloc.Rebase(ctx, recordID, true) if err != nil { return nil, err @@ -906,7 +907,8 @@ func (e *InsertValues) lazyAdjustAutoIncrementDatum(ctx context.Context, rows [] func (e *InsertValues) adjustAutoIncrementDatum( ctx context.Context, d types.Datum, hasValue bool, c *table.Column, ) (types.Datum, error) { - retryInfo := e.Ctx().GetSessionVars().RetryInfo + sessVars := e.Ctx().GetSessionVars() + retryInfo := sessVars.RetryInfo if retryInfo.Retrying { id, ok := retryInfo.GetCurrAutoIncrementID() if ok { @@ -931,7 +933,7 @@ func (e *InsertValues) adjustAutoIncrementDatum( } // Use the value if it's not null and not 0. if recordID != 0 { - err = e.Table.Allocators(e.Ctx()).Get(autoid.AutoIncrementType).Rebase(ctx, recordID, true) + err = e.Table.Allocators(sessVars).Get(autoid.AutoIncrementType).Rebase(ctx, recordID, true) if err != nil { return types.Datum{}, err } @@ -1049,7 +1051,7 @@ func (e *InsertValues) adjustAutoRandomDatum( // allocAutoRandomID allocates a random id for primary key column. It assumes tableInfo.AutoRandomBits > 0. func (e *InsertValues) allocAutoRandomID(ctx context.Context, fieldType *types.FieldType) (int64, error) { - alloc := e.Table.Allocators(e.Ctx()).Get(autoid.AutoRandomType) + alloc := e.Table.Allocators(e.Ctx().GetSessionVars()).Get(autoid.AutoRandomType) tableInfo := e.Table.Meta() increment := e.Ctx().GetSessionVars().AutoIncrementIncrement offset := e.Ctx().GetSessionVars().AutoIncrementOffset @@ -1073,7 +1075,7 @@ func (e *InsertValues) rebaseAutoRandomID(ctx context.Context, recordID int64, f if recordID < 0 { return nil } - alloc := e.Table.Allocators(e.Ctx()).Get(autoid.AutoRandomType) + alloc := e.Table.Allocators(e.Ctx().GetSessionVars()).Get(autoid.AutoRandomType) tableInfo := e.Table.Meta() shardFmt := autoid.NewShardIDFormat(fieldType, tableInfo.AutoRandomBits, tableInfo.AutoRandomRangeBits) @@ -1129,7 +1131,7 @@ func (e *InsertValues) rebaseImplicitRowID(ctx context.Context, recordID int64) if recordID < 0 { return nil } - alloc := e.Table.Allocators(e.Ctx()).Get(autoid.RowIDAllocType) + alloc := e.Table.Allocators(e.Ctx().GetSessionVars()).Get(autoid.RowIDAllocType) tableInfo := e.Table.Meta() shardFmt := autoid.NewShardIDFormat( diff --git a/pkg/executor/show.go b/pkg/executor/show.go index caee27fc3b7c4..afc0b36ee7daa 100644 --- a/pkg/executor/show.go +++ b/pkg/executor/show.go @@ -1424,7 +1424,7 @@ func (e *ShowExec) fetchShowCreateTable() error { tableInfo := tb.Meta() var buf bytes.Buffer // TODO: let the result more like MySQL. - if err = constructResultOfShowCreateTable(e.Ctx(), &e.DBName, tableInfo, tb.Allocators(e.Ctx()), &buf); err != nil { + if err = constructResultOfShowCreateTable(e.Ctx(), &e.DBName, tableInfo, tb.Allocators(e.Ctx().GetSessionVars()), &buf); err != nil { return err } if tableInfo.IsView() { diff --git a/pkg/executor/test/ddl/ddl_test.go b/pkg/executor/test/ddl/ddl_test.go index 5f799eebf6092..e721b7a0514de 100644 --- a/pkg/executor/test/ddl/ddl_test.go +++ b/pkg/executor/test/ddl/ddl_test.go @@ -580,7 +580,7 @@ func TestShardRowIDBits(t *testing.T) { tbl, err = dom.InfoSchema().TableByName(model.NewCIStr("test"), model.NewCIStr("t1")) require.NoError(t, err) maxID := 1<<(64-15-1) - 1 - alloc := tbl.Allocators(tk.Session()).Get(autoid.RowIDAllocType) + alloc := tbl.Allocators(tk.Session().GetSessionVars()).Get(autoid.RowIDAllocType) err = alloc.Rebase(context.Background(), int64(maxID)-1, false) require.NoError(t, err) tk.MustExec("insert into t1 values(1)") diff --git a/pkg/executor/write.go b/pkg/executor/write.go index e535245e8318b..1c841d662c854 100644 --- a/pkg/executor/write.go +++ b/pkg/executor/write.go @@ -103,7 +103,7 @@ func updateRecord( if err != nil { return false, err } - if err = t.Allocators(sctx).Get(autoid.AutoIncrementType).Rebase(ctx, recordID, true); err != nil { + if err = t.Allocators(sctx.GetSessionVars()).Get(autoid.AutoIncrementType).Rebase(ctx, recordID, true); err != nil { return false, err } } @@ -306,7 +306,7 @@ func rebaseAutoRandomValue( shardFmt := autoid.NewShardIDFormat(&col.FieldType, tableInfo.AutoRandomBits, tableInfo.AutoRandomRangeBits) // Set bits except incremental_bits to zero. recordID = recordID & shardFmt.IncrementalMask() - return t.Allocators(sctx).Get(autoid.AutoRandomType).Rebase(ctx, recordID, true) + return t.Allocators(sctx.GetSessionVars()).Get(autoid.AutoRandomType).Rebase(ctx, recordID, true) } // resetErrDataTooLong reset ErrDataTooLong error msg. @@ -341,7 +341,7 @@ func checkRowForExchangePartition(sctx sessionctx.Context, row []types.Datum, tb } if variable.EnableCheckConstraint.Load() { type CheckConstraintTable interface { - CheckRowConstraint(sctx sessionctx.Context, rowToCheck []types.Datum) error + CheckRowConstraint(ctx expression.EvalContext, rowToCheck []types.Datum) error } cc, ok := pt.(CheckConstraintTable) if !ok { diff --git a/pkg/expression/simple_rewriter.go b/pkg/expression/simple_rewriter.go index ce4b6dcd0fcfa..4eab100217588 100644 --- a/pkg/expression/simple_rewriter.go +++ b/pkg/expression/simple_rewriter.go @@ -24,11 +24,12 @@ import ( "github.com/pingcap/tidb/pkg/sessionctx" "github.com/pingcap/tidb/pkg/types" "github.com/pingcap/tidb/pkg/util" + "github.com/pingcap/tidb/pkg/util/sqlexec" ) // ParseSimpleExprWithTableInfo parses simple expression string to Expression. // The expression string must only reference the column in table Info. -func ParseSimpleExprWithTableInfo(ctx sessionctx.Context, exprStr string, tableInfo *model.TableInfo) (Expression, error) { +func ParseSimpleExprWithTableInfo(ctx BuildContext, exprStr string, tableInfo *model.TableInfo) (Expression, error) { if len(exprStr) == 0 { return nil, nil } @@ -36,9 +37,7 @@ func ParseSimpleExprWithTableInfo(ctx sessionctx.Context, exprStr string, tableI var stmts []ast.StmtNode var err error var warns []error - if p, ok := ctx.(interface { - ParseSQL(context.Context, string, ...parser.ParseParam) ([]ast.StmtNode, []error, error) - }); ok { + if p, ok := ctx.(sqlexec.SQLParser); ok { stmts, warns, err = p.ParseSQL(context.Background(), exprStr) } else { stmts, warns, err = parser.New().ParseSQL(exprStr) @@ -51,7 +50,7 @@ func ParseSimpleExprWithTableInfo(ctx sessionctx.Context, exprStr string, tableI return nil, errors.Trace(err) } expr := stmts[0].(*ast.SelectStmt).Fields.Fields[0].Expr - return RewriteSimpleExprWithTableInfo(ctx, tableInfo, expr, false) + return BuildExprWithAst(ctx, expr, WithSourceTable(tableInfo)) } // ParseSimpleExprCastWithTableInfo parses simple expression string to Expression. diff --git a/pkg/infoschema/tables.go b/pkg/infoschema/tables.go index b8dacddb9f76f..5d314654b282f 100644 --- a/pkg/infoschema/tables.go +++ b/pkg/infoschema/tables.go @@ -2281,22 +2281,22 @@ func (it *infoschemaTable) IndexPrefix() kv.Key { } // AddRecord implements table.Table AddRecord interface. -func (it *infoschemaTable) AddRecord(ctx sessionctx.Context, r []types.Datum, opts ...table.AddRecordOption) (recordID kv.Handle, err error) { +func (it *infoschemaTable) AddRecord(ctx table.MutateContext, r []types.Datum, opts ...table.AddRecordOption) (recordID kv.Handle, err error) { return nil, table.ErrUnsupportedOp } // RemoveRecord implements table.Table RemoveRecord interface. -func (it *infoschemaTable) RemoveRecord(ctx sessionctx.Context, h kv.Handle, r []types.Datum) error { +func (it *infoschemaTable) RemoveRecord(ctx table.MutateContext, h kv.Handle, r []types.Datum) error { return table.ErrUnsupportedOp } // UpdateRecord implements table.Table UpdateRecord interface. -func (it *infoschemaTable) UpdateRecord(gctx context.Context, ctx sessionctx.Context, h kv.Handle, oldData, newData []types.Datum, touched []bool) error { +func (it *infoschemaTable) UpdateRecord(gctx context.Context, ctx table.MutateContext, h kv.Handle, oldData, newData []types.Datum, touched []bool) error { return table.ErrUnsupportedOp } // Allocators implements table.Table Allocators interface. -func (it *infoschemaTable) Allocators(_ sessionctx.Context) autoid.Allocators { +func (it *infoschemaTable) Allocators(_ table.AllocatorContext) autoid.Allocators { return autoid.Allocators{} } @@ -2369,22 +2369,22 @@ func (vt *VirtualTable) IndexPrefix() kv.Key { } // AddRecord implements table.Table AddRecord interface. -func (vt *VirtualTable) AddRecord(ctx sessionctx.Context, r []types.Datum, opts ...table.AddRecordOption) (recordID kv.Handle, err error) { +func (vt *VirtualTable) AddRecord(ctx table.MutateContext, r []types.Datum, opts ...table.AddRecordOption) (recordID kv.Handle, err error) { return nil, table.ErrUnsupportedOp } // RemoveRecord implements table.Table RemoveRecord interface. -func (vt *VirtualTable) RemoveRecord(ctx sessionctx.Context, h kv.Handle, r []types.Datum) error { +func (vt *VirtualTable) RemoveRecord(ctx table.MutateContext, h kv.Handle, r []types.Datum) error { return table.ErrUnsupportedOp } // UpdateRecord implements table.Table UpdateRecord interface. -func (vt *VirtualTable) UpdateRecord(ctx context.Context, sctx sessionctx.Context, h kv.Handle, oldData, newData []types.Datum, touched []bool) error { +func (vt *VirtualTable) UpdateRecord(ctx context.Context, sctx table.MutateContext, h kv.Handle, oldData, newData []types.Datum, touched []bool) error { return table.ErrUnsupportedOp } // Allocators implements table.Table Allocators interface. -func (vt *VirtualTable) Allocators(_ sessionctx.Context) autoid.Allocators { +func (vt *VirtualTable) Allocators(_ table.AllocatorContext) autoid.Allocators { return autoid.Allocators{} } diff --git a/pkg/planner/cardinality/trace.go b/pkg/planner/cardinality/trace.go index 6ab3a69266c55..ca99ddd89a896 100644 --- a/pkg/planner/cardinality/trace.go +++ b/pkg/planner/cardinality/trace.go @@ -207,9 +207,10 @@ func recordUsedItemStatsStatus(sctx sessionctx.Context, stats any, tableID, id i // ceTraceRange appends a list of ranges and related information into CE trace func ceTraceRange(sctx sessionctx.Context, tableID int64, colNames []string, ranges []*ranger.Range, tp string, rowCount uint64) { sc := sctx.GetSessionVars().StmtCtx + tc := sc.TypeCtx() allPoint := true for _, ran := range ranges { - if !ran.IsPointNullable(sctx) { + if !ran.IsPointNullable(tc) { allPoint = false break } diff --git a/pkg/planner/core/common_plans.go b/pkg/planner/core/common_plans.go index 7f69168be0886..4bf6682dc831e 100644 --- a/pkg/planner/core/common_plans.go +++ b/pkg/planner/core/common_plans.go @@ -1411,7 +1411,7 @@ func IsPointGetWithPKOrUniqueKeyByAutoCommit(ctx sessionctx.Context, p Plan) (bo if !ok { return false, nil } - isPointRange := len(tableScan.Ranges) == 1 && tableScan.Ranges[0].IsPointNonNullable(ctx) + isPointRange := len(tableScan.Ranges) == 1 && tableScan.Ranges[0].IsPointNonNullable(ctx.GetSessionVars().StmtCtx.TypeCtx()) if !isPointRange { return false, nil } diff --git a/pkg/planner/core/find_best_task.go b/pkg/planner/core/find_best_task.go index 4ffb101c4108a..35ae462b304fc 100644 --- a/pkg/planner/core/find_best_task.go +++ b/pkg/planner/core/find_best_task.go @@ -1223,8 +1223,9 @@ func (ds *DataSource) findBestTask(prop *property.PhysicalProperty, planCounter } if canConvertPointGet { allRangeIsPoint := true + tc := ds.SCtx().GetSessionVars().StmtCtx.TypeCtx() for _, ran := range path.Ranges { - if !ran.IsPointNonNullable(ds.SCtx()) { + if !ran.IsPointNonNullable(tc) { // unique indexes can have duplicated NULL rows so we cannot use PointGet if there is NULL allRangeIsPoint = false break @@ -2107,8 +2108,9 @@ func (ds *DataSource) isPointGetPath(path *util.AccessPath) bool { } } } + tc := ds.SCtx().GetSessionVars().StmtCtx.TypeCtx() for _, ran := range path.Ranges { - if !ran.IsPointNonNullable(ds.SCtx()) { + if !ran.IsPointNonNullable(tc) { return false } } diff --git a/pkg/planner/core/physical_plans.go b/pkg/planner/core/physical_plans.go index 50574fe567218..c4ecd60502101 100644 --- a/pkg/planner/core/physical_plans.go +++ b/pkg/planner/core/physical_plans.go @@ -2146,7 +2146,7 @@ func (p *PhysicalIndexScan) IsPointGetByUniqueKey(sctx sessionctx.Context) bool return len(p.Ranges) == 1 && p.Index.Unique && len(p.Ranges[0].LowVal) == len(p.Index.Columns) && - p.Ranges[0].IsPointNonNullable(sctx) + p.Ranges[0].IsPointNonNullable(sctx.GetSessionVars().StmtCtx.TypeCtx()) } // PhysicalSelection represents a filter. diff --git a/pkg/planner/core/rule_partition_processor.go b/pkg/planner/core/rule_partition_processor.go index bb3fd35a43d5b..dbe7c3d328d20 100644 --- a/pkg/planner/core/rule_partition_processor.go +++ b/pkg/planner/core/rule_partition_processor.go @@ -153,8 +153,9 @@ func (s *partitionProcessor) getUsedHashPartitions(ctx sessionctx.Context, } ranges := detachedResult.Ranges used := make([]int, 0, len(ranges)) + tc := ctx.GetSessionVars().StmtCtx.TypeCtx() for _, r := range ranges { - if !r.IsPointNullable(ctx) { + if !r.IsPointNullable(tc) { // processing hash partition pruning. eg: // create table t2 (a int, b bigint, index (a), index (b)) partition by hash(a) partitions 10; // desc select * from t2 where t2.a between 10 and 15; @@ -269,8 +270,9 @@ func (s *partitionProcessor) getUsedKeyPartitions(ctx sessionctx.Context, ranges := detachedResult.Ranges used := make([]int, 0, len(ranges)) + tc := ctx.GetSessionVars().StmtCtx.TypeCtx() for _, r := range ranges { - if !r.IsPointNullable(ctx) { + if !r.IsPointNullable(tc) { if len(partCols) == 1 && partCols[0].RetType.EvalType() == types.ETInt { col := partCols[0] posHigh, highIsNull, err := col.EvalInt(ctx, chunk.MutRowFromDatums(r.HighVal).ToRow()) @@ -642,14 +644,15 @@ func (l *listPartitionPruner) locateColumnPartitionsByCondition(cond expression. } sc := l.ctx.GetSessionVars().StmtCtx + tc, ec := sc.TypeCtx(), sc.ErrCtx() helper := tables.NewListPartitionLocationHelper() for _, r := range ranges { if len(r.LowVal) != 1 || len(r.HighVal) != 1 { return nil, true, nil } var locations []tables.ListPartitionLocation - if r.IsPointNullable(l.ctx) { - location, err := colPrune.LocatePartition(sc, r.HighVal[0]) + if r.IsPointNullable(tc) { + location, err := colPrune.LocatePartition(tc, ec, r.HighVal[0]) if types.ErrOverflow.Equal(err) { return nil, true, nil // return full-scan if over-flow } @@ -671,7 +674,7 @@ func (l *listPartitionPruner) locateColumnPartitionsByCondition(cond expression. } locations = append(locations, location) } else { - locations, err = colPrune.LocateRanges(sc, r, l.listPrune.GetDefaultIdx()) + locations, err = colPrune.LocateRanges(tc, ec, r, l.listPrune.GetDefaultIdx()) if types.ErrOverflow.Equal(err) { return nil, true, nil // return full-scan if over-flow } @@ -748,8 +751,9 @@ func (l *listPartitionPruner) findUsedListPartitions(conds []expression.Expressi return nil, err } used := make(map[int]struct{}, len(ranges)) + tc := l.ctx.GetSessionVars().StmtCtx.TypeCtx() for _, r := range ranges { - if !r.IsPointNullable(l.ctx) { + if !r.IsPointNullable(tc) { return l.fullRange, nil } if len(r.HighVal) != len(exprCols) { diff --git a/pkg/planner/util/path.go b/pkg/planner/util/path.go index 52a4011c584b7..4fe34ad2f799a 100644 --- a/pkg/planner/util/path.go +++ b/pkg/planner/util/path.go @@ -218,9 +218,10 @@ func isColEqExpr(expr expression.Expression, col *expression.Column, checkFn fun // OnlyPointRange checks whether each range is a point(no interval range exists). func (path *AccessPath) OnlyPointRange(sctx sessionctx.Context) bool { + tc := sctx.GetSessionVars().StmtCtx.TypeCtx() if path.IsIntHandlePath { for _, ran := range path.Ranges { - if !ran.IsPointNullable(sctx) { + if !ran.IsPointNullable(tc) { return false } } @@ -228,7 +229,7 @@ func (path *AccessPath) OnlyPointRange(sctx sessionctx.Context) bool { } for _, ran := range path.Ranges { // Not point or the not full matched. - if !ran.IsPointNonNullable(sctx) || len(ran.HighVal) != len(path.Index.Columns) { + if !ran.IsPointNonNullable(tc) || len(ran.HighVal) != len(path.Index.Columns) { return false } } diff --git a/pkg/table/BUILD.bazel b/pkg/table/BUILD.bazel index 307a18c222389..a040739231b16 100644 --- a/pkg/table/BUILD.bazel +++ b/pkg/table/BUILD.bazel @@ -32,9 +32,11 @@ go_library( "//pkg/util/logutil", "//pkg/util/mock", "//pkg/util/sqlexec", + "//pkg/util/tableutil", "//pkg/util/timeutil", "//pkg/util/tracing", "@com_github_pingcap_errors//:errors", + "@com_github_pingcap_tipb//go-binlog", "@org_uber_go_zap//:zap", ], ) diff --git a/pkg/table/column.go b/pkg/table/column.go index 22031273c1117..a818e739e4680 100644 --- a/pkg/table/column.go +++ b/pkg/table/column.go @@ -326,10 +326,11 @@ func handleZeroDatetime(sessVars *variable.SessionVars, col *model.ColumnInfo, c // If the handle of err is changed latter, the behavior of forceIgnoreTruncate also need to change. // TODO: change the third arg to TypeField. Not pass ColumnInfo. func CastValue(sctx sessionctx.Context, val types.Datum, col *model.ColumnInfo, returnErr, forceIgnoreTruncate bool) (casted types.Datum, err error) { - return castValue(sctx.GetSessionVars(), val, col, returnErr, forceIgnoreTruncate) + return CastColumnValue(sctx.GetSessionVars(), val, col, returnErr, forceIgnoreTruncate) } -func castValue(vars *variable.SessionVars, val types.Datum, col *model.ColumnInfo, returnErr, forceIgnoreTruncate bool) (casted types.Datum, err error) { +// CastColumnValue casts a value based on column type. +func CastColumnValue(vars *variable.SessionVars, val types.Datum, col *model.ColumnInfo, returnErr, forceIgnoreTruncate bool) (casted types.Datum, err error) { sc := vars.StmtCtx casted, err = val.ConvertTo(sc.TypeCtx(), &col.FieldType) // TODO: make sure all truncate errors are handled by ConvertTo. @@ -524,7 +525,7 @@ type getColOriginDefaultValue struct { } // GetColOriginDefaultValue gets default value of the column from original default value. -func GetColOriginDefaultValue(ctx sessionctx.Context, col *model.ColumnInfo) (types.Datum, error) { +func GetColOriginDefaultValue(ctx expression.BuildContext, col *model.ColumnInfo) (types.Datum, error) { return getColDefaultValue(ctx, col, col.GetOriginDefaultValue(), nil) } @@ -570,7 +571,7 @@ func getColDefaultExprValue(ctx expression.BuildContext, col *model.ColumnInfo, return types.Datum{}, err } // Check the evaluated data type by cast. - value, err := castValue(ctx.GetSessionVars(), d, col, false, false) + value, err := CastColumnValue(ctx.GetSessionVars(), d, col, false, false) if err != nil { return types.Datum{}, err } @@ -585,7 +586,7 @@ func getColDefaultValue(ctx expression.BuildContext, col *model.ColumnInfo, defa switch col.GetType() { case mysql.TypeTimestamp, mysql.TypeDate, mysql.TypeDatetime: default: - value, err := castValue(ctx.GetSessionVars(), types.NewDatum(defaultVal), col, false, false) + value, err := CastColumnValue(ctx.GetSessionVars(), types.NewDatum(defaultVal), col, false, false) if err != nil { return types.Datum{}, err } diff --git a/pkg/table/index.go b/pkg/table/index.go index ed556a20e3191..d451a22db1eca 100644 --- a/pkg/table/index.go +++ b/pkg/table/index.go @@ -21,7 +21,6 @@ import ( "github.com/pingcap/tidb/pkg/errctx" "github.com/pingcap/tidb/pkg/kv" "github.com/pingcap/tidb/pkg/parser/model" - "github.com/pingcap/tidb/pkg/sessionctx" "github.com/pingcap/tidb/pkg/types" ) @@ -77,9 +76,9 @@ type Index interface { // TableMeta returns TableInfo TableMeta() *model.TableInfo // Create supports insert into statement. - Create(ctx sessionctx.Context, txn kv.Transaction, indexedValues []types.Datum, h kv.Handle, handleRestoreData []types.Datum, opts ...CreateIdxOptFunc) (kv.Handle, error) + Create(ctx MutateContext, txn kv.Transaction, indexedValues []types.Datum, h kv.Handle, handleRestoreData []types.Datum, opts ...CreateIdxOptFunc) (kv.Handle, error) // Delete supports delete from statement. - Delete(ctx sessionctx.Context, txn kv.Transaction, indexedValues []types.Datum, h kv.Handle) error + Delete(ctx MutateContext, txn kv.Transaction, indexedValues []types.Datum, h kv.Handle) error // GenIndexKVIter generate index key and value for multi-valued index, use iterator to reduce the memory allocation. GenIndexKVIter(ec errctx.Context, loc *time.Location, indexedValue []types.Datum, h kv.Handle, handleRestoreData []types.Datum) IndexKVGenerator // Exist supports check index exists or not. diff --git a/pkg/table/table.go b/pkg/table/table.go index e5bee0d5f14d9..2c9c656d67880 100644 --- a/pkg/table/table.go +++ b/pkg/table/table.go @@ -23,14 +23,18 @@ import ( "time" mysql "github.com/pingcap/tidb/pkg/errno" + "github.com/pingcap/tidb/pkg/expression" "github.com/pingcap/tidb/pkg/kv" "github.com/pingcap/tidb/pkg/meta/autoid" "github.com/pingcap/tidb/pkg/parser/model" "github.com/pingcap/tidb/pkg/sessionctx" + "github.com/pingcap/tidb/pkg/sessionctx/variable" "github.com/pingcap/tidb/pkg/types" "github.com/pingcap/tidb/pkg/util/dbterror" "github.com/pingcap/tidb/pkg/util/sqlexec" + "github.com/pingcap/tidb/pkg/util/tableutil" "github.com/pingcap/tidb/pkg/util/tracing" + "github.com/pingcap/tipb/go-binlog" ) // Type is used to distinguish between different tables that store data in different ways. @@ -171,6 +175,28 @@ type columnAPI interface { FullHiddenColsAndVisibleCols() []*Column } +// MutateContext is used to when mutating a table. +type MutateContext interface { + expression.BuildContext + // GetSessionVars returns the session variables. + GetSessionVars() *variable.SessionVars + // Txn returns the current transaction which is created before executing a statement. + // The returned kv.Transaction is not nil, but it maybe pending or invalid. + // If the active parameter is true, call this function will wait for the pending txn + // to become valid. + Txn(active bool) (kv.Transaction, error) + // StmtGetMutation gets the binlog mutation for current statement. + StmtGetMutation(int64) *binlog.TableMutation + // GetDomainInfoSchema returns the latest information schema in domain + GetDomainInfoSchema() sessionctx.InfoschemaMetaVersion +} + +// AllocatorContext is used to provide context for method `table.Allocators`. +type AllocatorContext interface { + // GetTemporaryTable returns some runtime information for temporary tables to allocate IDs. + GetTemporaryTable(tbl *model.TableInfo) tableutil.TempTable +} + // Table is used to retrieve and modify rows in table. type Table interface { columnAPI @@ -185,16 +211,16 @@ type Table interface { IndexPrefix() kv.Key // AddRecord inserts a row which should contain only public columns - AddRecord(ctx sessionctx.Context, r []types.Datum, opts ...AddRecordOption) (recordID kv.Handle, err error) + AddRecord(ctx MutateContext, r []types.Datum, opts ...AddRecordOption) (recordID kv.Handle, err error) // UpdateRecord updates a row which should contain only writable columns. - UpdateRecord(ctx context.Context, sctx sessionctx.Context, h kv.Handle, currData, newData []types.Datum, touched []bool) error + UpdateRecord(gctx context.Context, ctx MutateContext, h kv.Handle, currData, newData []types.Datum, touched []bool) error // RemoveRecord removes a row in the table. - RemoveRecord(ctx sessionctx.Context, h kv.Handle, r []types.Datum) error + RemoveRecord(ctx MutateContext, h kv.Handle, r []types.Datum) error // Allocators returns all allocators. - Allocators(ctx sessionctx.Context) autoid.Allocators + Allocators(ctx AllocatorContext) autoid.Allocators // Meta returns TableInfo. Meta() *model.TableInfo @@ -212,7 +238,7 @@ func AllocAutoIncrementValue(ctx context.Context, t Table, sctx sessionctx.Conte defer r.End() increment := sctx.GetSessionVars().AutoIncrementIncrement offset := sctx.GetSessionVars().AutoIncrementOffset - alloc := t.Allocators(sctx).Get(autoid.AutoIncrementType) + alloc := t.Allocators(sctx.GetSessionVars()).Get(autoid.AutoIncrementType) _, max, err := alloc.Alloc(ctx, uint64(1), int64(increment), int64(offset)) if err != nil { return 0, err @@ -225,7 +251,7 @@ func AllocAutoIncrementValue(ctx context.Context, t Table, sctx sessionctx.Conte func AllocBatchAutoIncrementValue(ctx context.Context, t Table, sctx sessionctx.Context, N int) (firstID int64, increment int64, err error) { increment = int64(sctx.GetSessionVars().AutoIncrementIncrement) offset := int64(sctx.GetSessionVars().AutoIncrementOffset) - alloc := t.Allocators(sctx).Get(autoid.AutoIncrementType) + alloc := t.Allocators(sctx.GetSessionVars()).Get(autoid.AutoIncrementType) min, max, err := alloc.Alloc(ctx, uint64(N), increment, offset) if err != nil { return min, max, err @@ -249,11 +275,11 @@ type PhysicalTable interface { type PartitionedTable interface { Table GetPartition(physicalID int64) PhysicalTable - GetPartitionByRow(sessionctx.Context, []types.Datum) (PhysicalTable, error) + GetPartitionByRow(expression.BuildContext, []types.Datum) (PhysicalTable, error) GetAllPartitionIDs() []int64 GetPartitionColumnIDs() []int64 GetPartitionColumnNames() []model.CIStr - CheckForExchangePartition(ctx sessionctx.Context, pi *model.PartitionInfo, r []types.Datum, partID, ntID int64) error + CheckForExchangePartition(ctx expression.BuildContext, pi *model.PartitionInfo, r []types.Datum, partID, ntID int64) error } // TableFromMeta builds a table.Table from *model.TableInfo. diff --git a/pkg/table/tables/cache.go b/pkg/table/tables/cache.go index 2dfc8ba730d5f..e77b06834cc78 100644 --- a/pkg/table/tables/cache.go +++ b/pkg/table/tables/cache.go @@ -24,7 +24,6 @@ import ( "github.com/pingcap/log" "github.com/pingcap/tidb/pkg/kv" "github.com/pingcap/tidb/pkg/metrics" - "github.com/pingcap/tidb/pkg/sessionctx" "github.com/pingcap/tidb/pkg/table" "github.com/pingcap/tidb/pkg/tablecodec" "github.com/pingcap/tidb/pkg/types" @@ -241,7 +240,7 @@ func (c *cachedTable) updateLockForRead(ctx context.Context, handle StateRemote, const cachedTableSizeLimit = 64 * (1 << 20) // AddRecord implements the AddRecord method for the table.Table interface. -func (c *cachedTable) AddRecord(sctx sessionctx.Context, r []types.Datum, opts ...table.AddRecordOption) (recordID kv.Handle, err error) { +func (c *cachedTable) AddRecord(sctx table.MutateContext, r []types.Datum, opts ...table.AddRecordOption) (recordID kv.Handle, err error) { if atomic.LoadInt64(&c.totalSize) > cachedTableSizeLimit { return nil, table.ErrOptOnCacheTable.GenWithStackByArgs("table too large") } @@ -249,7 +248,7 @@ func (c *cachedTable) AddRecord(sctx sessionctx.Context, r []types.Datum, opts . return c.TableCommon.AddRecord(sctx, r, opts...) } -func txnCtxAddCachedTable(sctx sessionctx.Context, tid int64, handle *cachedTable) { +func txnCtxAddCachedTable(sctx table.MutateContext, tid int64, handle *cachedTable) { txnCtx := sctx.GetSessionVars().TxnCtx if txnCtx.CachedTables == nil { txnCtx.CachedTables = make(map[int64]any) @@ -260,7 +259,7 @@ func txnCtxAddCachedTable(sctx sessionctx.Context, tid int64, handle *cachedTabl } // UpdateRecord implements table.Table -func (c *cachedTable) UpdateRecord(ctx context.Context, sctx sessionctx.Context, h kv.Handle, oldData, newData []types.Datum, touched []bool) error { +func (c *cachedTable) UpdateRecord(ctx context.Context, sctx table.MutateContext, h kv.Handle, oldData, newData []types.Datum, touched []bool) error { // Prevent furthur writing when the table is already too large. if atomic.LoadInt64(&c.totalSize) > cachedTableSizeLimit { return table.ErrOptOnCacheTable.GenWithStackByArgs("table too large") @@ -270,7 +269,7 @@ func (c *cachedTable) UpdateRecord(ctx context.Context, sctx sessionctx.Context, } // RemoveRecord implements table.Table RemoveRecord interface. -func (c *cachedTable) RemoveRecord(sctx sessionctx.Context, h kv.Handle, r []types.Datum) error { +func (c *cachedTable) RemoveRecord(sctx table.MutateContext, h kv.Handle, r []types.Datum) error { txnCtxAddCachedTable(sctx, c.Meta().ID, c) return c.TableCommon.RemoveRecord(sctx, h, r) } diff --git a/pkg/table/tables/index.go b/pkg/table/tables/index.go index beb995dce894a..c90eb900b21af 100644 --- a/pkg/table/tables/index.go +++ b/pkg/table/tables/index.go @@ -24,7 +24,6 @@ import ( "github.com/pingcap/tidb/pkg/kv" "github.com/pingcap/tidb/pkg/parser/model" "github.com/pingcap/tidb/pkg/parser/mysql" - "github.com/pingcap/tidb/pkg/sessionctx" "github.com/pingcap/tidb/pkg/table" "github.com/pingcap/tidb/pkg/tablecodec" "github.com/pingcap/tidb/pkg/types" @@ -154,7 +153,7 @@ out: // Create creates a new entry in the kvIndex data. // If the index is unique and there is an existing entry with the same key, // Create will return the existing entry's handle as the first return value, ErrKeyExists as the second return value. -func (c *index) Create(sctx sessionctx.Context, txn kv.Transaction, indexedValue []types.Datum, h kv.Handle, handleRestoreData []types.Datum, opts ...table.CreateIdxOptFunc) (kv.Handle, error) { +func (c *index) Create(sctx table.MutateContext, txn kv.Transaction, indexedValue []types.Datum, h kv.Handle, handleRestoreData []types.Datum, opts ...table.CreateIdxOptFunc) (kv.Handle, error) { if c.Meta().Unique { txn.CacheTableInfo(c.phyTblID, c.tblInfo) } @@ -390,7 +389,7 @@ func needPresumeKeyNotExistsFlag(ctx context.Context, txn kv.Transaction, key, t } // Delete removes the entry for handle h and indexedValues from KV index. -func (c *index) Delete(ctx sessionctx.Context, txn kv.Transaction, indexedValue []types.Datum, h kv.Handle) error { +func (c *index) Delete(ctx table.MutateContext, txn kv.Transaction, indexedValue []types.Datum, h kv.Handle) error { indexedValues := c.getIndexedValue(indexedValue) sc := ctx.GetSessionVars().StmtCtx for _, value := range indexedValues { diff --git a/pkg/table/tables/partition.go b/pkg/table/tables/partition.go index fb52dc1ee6605..964c9d9621a41 100644 --- a/pkg/table/tables/partition.go +++ b/pkg/table/tables/partition.go @@ -27,6 +27,7 @@ import ( "github.com/google/btree" "github.com/pingcap/errors" + "github.com/pingcap/tidb/pkg/errctx" "github.com/pingcap/tidb/pkg/expression" "github.com/pingcap/tidb/pkg/kv" "github.com/pingcap/tidb/pkg/parser" @@ -34,7 +35,6 @@ import ( "github.com/pingcap/tidb/pkg/parser/model" "github.com/pingcap/tidb/pkg/parser/mysql" "github.com/pingcap/tidb/pkg/sessionctx" - "github.com/pingcap/tidb/pkg/sessionctx/stmtctx" "github.com/pingcap/tidb/pkg/sessionctx/variable" "github.com/pingcap/tidb/pkg/table" "github.com/pingcap/tidb/pkg/tablecodec" @@ -378,12 +378,12 @@ func dataForRangeColumnsPruning(ctx sessionctx.Context, defs []model.PartitionDe // parseSimpleExprWithNames parses simple expression string to Expression. // The expression string must only reference the column in the given NameSlice. -func parseSimpleExprWithNames(p *parser.Parser, ctx sessionctx.Context, exprStr string, schema *expression.Schema, names types.NameSlice) (expression.Expression, error) { +func parseSimpleExprWithNames(p *parser.Parser, ctx expression.BuildContext, exprStr string, schema *expression.Schema, names types.NameSlice) (expression.Expression, error) { exprNode, err := parseExpr(p, exprStr) if err != nil { return nil, errors.Trace(err) } - return expression.RewriteSimpleExprWithNames(ctx, exprNode, schema, names) + return expression.BuildExprWithAst(ctx, exprNode, expression.WithInputSchemaAndNames(schema, names)) } // ForKeyPruning is used for key partition pruning. @@ -446,7 +446,7 @@ type ForListColumnPruning struct { // To deal with the location partition failure caused by inconsistent NewCollationEnabled values(see issue #32416). // The following fields are used to delay building valueMap. - ctx sessionctx.Context + ctx expression.BuildContext tblInfo *model.TableInfo schema *expression.Schema names types.NameSlice @@ -977,7 +977,7 @@ func (lp *ForListPruning) LocatePartition(value int64, isNull bool) int { return partitionIdx } -func (lp *ForListPruning) locateListPartitionByRow(ctx sessionctx.Context, r []types.Datum) (int, error) { +func (lp *ForListPruning) locateListPartitionByRow(ctx expression.EvalContext, r []types.Datum) (int, error) { value, isNull, err := lp.LocateExpr.EvalInt(ctx, chunk.MutRowFromDatums(r).ToRow()) if err != nil { return -1, errors.Trace(err) @@ -999,11 +999,10 @@ func (lp *ForListPruning) locateListPartitionByRow(ctx sessionctx.Context, r []t return -1, table.ErrNoPartitionForGivenValue.GenWithStackByArgs(valueMsg) } -func (lp *ForListPruning) locateListColumnsPartitionByRow(ctx sessionctx.Context, r []types.Datum) (int, error) { +func (lp *ForListPruning) locateListColumnsPartitionByRow(tc types.Context, ec errctx.Context, r []types.Datum) (int, error) { helper := NewListPartitionLocationHelper() - sc := ctx.GetSessionVars().StmtCtx for _, colPrune := range lp.ColPrunes { - location, err := colPrune.LocatePartition(sc, r[colPrune.ExprCol.Index]) + location, err := colPrune.LocatePartition(tc, ec, r[colPrune.ExprCol.Index]) if err != nil { return -1, errors.Trace(err) } @@ -1053,7 +1052,6 @@ func (lp *ForListColumnPruning) RebuildPartitionValueMapAndSorted(p *parser.Pars } func (lp *ForListColumnPruning) buildListPartitionValueMapAndSorted(p *parser.Parser, defs []model.PartitionDefinition) error { - sc := lp.ctx.GetSessionVars().StmtCtx DEFS: for partitionIdx, def := range defs { for groupIdx, vs := range def.InValues { @@ -1061,7 +1059,7 @@ DEFS: lp.defaultPartID = def.ID continue DEFS } - keyBytes, err := lp.genConstExprKey(lp.ctx, sc, vs[lp.colIdx], lp.schema, lp.names, p) + keyBytes, err := lp.genConstExprKey(lp.ctx, vs[lp.colIdx], lp.schema, lp.names, p) if err != nil { return errors.Trace(err) } @@ -1085,7 +1083,7 @@ DEFS: return nil } -func (lp *ForListColumnPruning) genConstExprKey(ctx sessionctx.Context, sc *stmtctx.StatementContext, exprStr string, +func (lp *ForListColumnPruning) genConstExprKey(ctx expression.BuildContext, exprStr string, schema *expression.Schema, names types.NameSlice, p *parser.Parser) ([]byte, error) { expr, err := parseSimpleExprWithNames(p, ctx, exprStr, schema, names) if err != nil { @@ -1095,26 +1093,28 @@ func (lp *ForListColumnPruning) genConstExprKey(ctx sessionctx.Context, sc *stmt if err != nil { return nil, errors.Trace(err) } - key, err := lp.genKey(sc, v) + sc := ctx.GetSessionVars().StmtCtx + tc, ec := sc.TypeCtx(), sc.ErrCtx() + key, err := lp.genKey(tc, ec, v) if err != nil { return nil, errors.Trace(err) } return key, nil } -func (lp *ForListColumnPruning) genKey(sc *stmtctx.StatementContext, v types.Datum) ([]byte, error) { - v, err := v.ConvertTo(sc.TypeCtx(), lp.valueTp) +func (lp *ForListColumnPruning) genKey(tc types.Context, ec errctx.Context, v types.Datum) ([]byte, error) { + v, err := v.ConvertTo(tc, lp.valueTp) if err != nil { return nil, errors.Trace(err) } - valByte, err := codec.EncodeKey(sc.TimeZone(), nil, v) - err = sc.HandleError(err) + valByte, err := codec.EncodeKey(tc.Location(), nil, v) + err = ec.HandleError(err) return valByte, err } // LocatePartition locates partition by the column value -func (lp *ForListColumnPruning) LocatePartition(sc *stmtctx.StatementContext, v types.Datum) (ListPartitionLocation, error) { - key, err := lp.genKey(sc, v) +func (lp *ForListColumnPruning) LocatePartition(tc types.Context, ec errctx.Context, v types.Datum) (ListPartitionLocation, error) { + key, err := lp.genKey(tc, ec, v) if err != nil { return nil, errors.Trace(err) } @@ -1126,7 +1126,7 @@ func (lp *ForListColumnPruning) LocatePartition(sc *stmtctx.StatementContext, v } // LocateRanges locates partition ranges by the column range -func (lp *ForListColumnPruning) LocateRanges(sc *stmtctx.StatementContext, r *ranger.Range, defaultPartIdx int) ([]ListPartitionLocation, error) { +func (lp *ForListColumnPruning) LocateRanges(tc types.Context, ec errctx.Context, r *ranger.Range, defaultPartIdx int) ([]ListPartitionLocation, error) { var lowKey, highKey []byte var err error lowVal := r.LowVal[0] @@ -1143,7 +1143,7 @@ func (lp *ForListColumnPruning) LocateRanges(sc *stmtctx.StatementContext, r *ra if lp.ExprCol.GetType().EvalType() == types.ETString && r.LowVal[0].Kind() == types.KindMinNotNull { lowKey = (&lowVal).GetBytes() } else { - lowKey, err = lp.genKey(sc, lowVal) + lowKey, err = lp.genKey(tc, ec, lowVal) if err != nil { return nil, errors.Trace(err) } @@ -1152,7 +1152,7 @@ func (lp *ForListColumnPruning) LocateRanges(sc *stmtctx.StatementContext, r *ra if lp.ExprCol.GetType().EvalType() == types.ETString && r.HighVal[0].Kind() == types.KindMaxValue { highKey = (&highVal).GetBytes() } else { - highKey, err = lp.genKey(sc, highVal) + highKey, err = lp.genKey(tc, ec, highVal) if err != nil { return nil, errors.Trace(err) } @@ -1270,7 +1270,7 @@ func PartitionRecordKey(pid int64, handle int64) kv.Key { return tablecodec.EncodeRecordKey(recordPrefix, kv.IntHandle(handle)) } -func (t *partitionedTable) CheckForExchangePartition(ctx sessionctx.Context, pi *model.PartitionInfo, r []types.Datum, partID, ntID int64) error { +func (t *partitionedTable) CheckForExchangePartition(ctx expression.BuildContext, pi *model.PartitionInfo, r []types.Datum, partID, ntID int64) error { defID, err := t.locatePartition(ctx, r) if err != nil { return err @@ -1282,7 +1282,7 @@ func (t *partitionedTable) CheckForExchangePartition(ctx sessionctx.Context, pi } // locatePartitionCommon returns the partition idx of the input record. -func (t *partitionedTable) locatePartitionCommon(ctx sessionctx.Context, tp model.PartitionType, partitionExpr *PartitionExpr, num uint64, columnsPartitioned bool, r []types.Datum) (int, error) { +func (t *partitionedTable) locatePartitionCommon(ctx expression.BuildContext, tp model.PartitionType, partitionExpr *PartitionExpr, num uint64, columnsPartitioned bool, r []types.Datum) (int, error) { var err error var idx int switch tp { @@ -1308,7 +1308,7 @@ func (t *partitionedTable) locatePartitionCommon(ctx sessionctx.Context, tp mode return idx, nil } -func (t *partitionedTable) locatePartition(ctx sessionctx.Context, r []types.Datum) (int64, error) { +func (t *partitionedTable) locatePartition(ctx expression.BuildContext, r []types.Datum) (int64, error) { pi := t.Meta().GetPartitionInfo() columnsSet := len(t.meta.Partition.Columns) > 0 idx, err := t.locatePartitionCommon(ctx, pi.Type, t.partitionExpr, pi.Num, columnsSet, r) @@ -1318,7 +1318,7 @@ func (t *partitionedTable) locatePartition(ctx sessionctx.Context, r []types.Dat return pi.Definitions[idx].ID, nil } -func (t *partitionedTable) locateReorgPartition(ctx sessionctx.Context, r []types.Datum) (int64, error) { +func (t *partitionedTable) locateReorgPartition(ctx expression.BuildContext, r []types.Datum) (int64, error) { pi := t.Meta().GetPartitionInfo() columnsSet := len(pi.DDLColumns) > 0 // Note that for KEY/HASH partitioning, since we do not support LINEAR, @@ -1339,7 +1339,7 @@ func (t *partitionedTable) locateReorgPartition(ctx sessionctx.Context, r []type return pi.AddingDefinitions[idx].ID, nil } -func (t *partitionedTable) locateRangeColumnPartition(ctx sessionctx.Context, partitionExpr *PartitionExpr, r []types.Datum) (int, error) { +func (t *partitionedTable) locateRangeColumnPartition(ctx expression.BuildContext, partitionExpr *PartitionExpr, r []types.Datum) (int, error) { upperBounds := partitionExpr.UpperBounds var lastError error evalBuffer := t.evalBufferPool.Get().(*chunk.MutRow) @@ -1381,15 +1381,17 @@ func (t *partitionedTable) locateRangeColumnPartition(ctx sessionctx.Context, pa return idx, nil } -func (pe *PartitionExpr) locateListPartition(ctx sessionctx.Context, r []types.Datum) (int, error) { +func (pe *PartitionExpr) locateListPartition(ctx expression.EvalContext, r []types.Datum) (int, error) { lp := pe.ForListPruning if len(lp.ColPrunes) == 0 { return lp.locateListPartitionByRow(ctx, r) } - return lp.locateListColumnsPartitionByRow(ctx, r) + sc := ctx.GetSessionVars().StmtCtx + tc, ec := sc.TypeCtx(), sc.ErrCtx() + return lp.locateListColumnsPartitionByRow(tc, ec, r) } -func (t *partitionedTable) locateRangePartition(ctx sessionctx.Context, partitionExpr *PartitionExpr, r []types.Datum) (int, error) { +func (t *partitionedTable) locateRangePartition(ctx expression.BuildContext, partitionExpr *PartitionExpr, r []types.Datum) (int, error) { var ( ret int64 val int64 @@ -1449,7 +1451,7 @@ func (t *partitionedTable) locateRangePartition(ctx sessionctx.Context, partitio } // TODO: supports linear hashing -func (t *partitionedTable) locateHashPartition(ctx sessionctx.Context, partExpr *PartitionExpr, numParts uint64, r []types.Datum) (int, error) { +func (t *partitionedTable) locateHashPartition(ctx expression.EvalContext, partExpr *PartitionExpr, numParts uint64, r []types.Datum) (int, error) { if col, ok := partExpr.Expr.(*expression.Column); ok { var data types.Datum switch r[col.Index].Kind() { @@ -1533,7 +1535,7 @@ func GetReorganizedPartitionedTable(t table.Table) (table.PartitionedTable, erro } // GetPartitionByRow returns a Table, which is actually a Partition. -func (t *partitionedTable) GetPartitionByRow(ctx sessionctx.Context, r []types.Datum) (table.PhysicalTable, error) { +func (t *partitionedTable) GetPartitionByRow(ctx expression.BuildContext, r []types.Datum) (table.PhysicalTable, error) { pid, err := t.locatePartition(ctx, r) if err != nil { return nil, errors.Trace(err) @@ -1542,7 +1544,7 @@ func (t *partitionedTable) GetPartitionByRow(ctx sessionctx.Context, r []types.D } // GetPartitionByRow returns a Table, which is actually a Partition. -func (t *partitionTableWithGivenSets) GetPartitionByRow(ctx sessionctx.Context, r []types.Datum) (table.PhysicalTable, error) { +func (t *partitionTableWithGivenSets) GetPartitionByRow(ctx expression.BuildContext, r []types.Datum) (table.PhysicalTable, error) { pid, err := t.locatePartition(ctx, r) if err != nil { return nil, errors.Trace(err) @@ -1555,7 +1557,7 @@ func (t *partitionTableWithGivenSets) GetPartitionByRow(ctx sessionctx.Context, // checkConstraintForExchangePartition is only used for ExchangePartition by partitionTable during write only state. // It check if rowData inserted or updated violate checkConstraints of non-partitionTable. -func checkConstraintForExchangePartition(sctx sessionctx.Context, row []types.Datum, partID, ntID int64) error { +func checkConstraintForExchangePartition(sctx table.MutateContext, row []types.Datum, partID, ntID int64) error { type InfoSchema interface { TableByID(id int64) (val table.Table, ok bool) } @@ -1572,7 +1574,7 @@ func checkConstraintForExchangePartition(sctx sessionctx.Context, row []types.Da } } type CheckConstraintTable interface { - CheckRowConstraint(sctx sessionctx.Context, rowToCheck []types.Datum) error + CheckRowConstraint(ctx expression.EvalContext, rowToCheck []types.Datum) error } cc, ok := nt.(CheckConstraintTable) if !ok { @@ -1587,11 +1589,11 @@ func checkConstraintForExchangePartition(sctx sessionctx.Context, row []types.Da } // AddRecord implements the AddRecord method for the table.Table interface. -func (t *partitionedTable) AddRecord(ctx sessionctx.Context, r []types.Datum, opts ...table.AddRecordOption) (recordID kv.Handle, err error) { +func (t *partitionedTable) AddRecord(ctx table.MutateContext, r []types.Datum, opts ...table.AddRecordOption) (recordID kv.Handle, err error) { return partitionedTableAddRecord(ctx, t, r, nil, opts) } -func partitionedTableAddRecord(ctx sessionctx.Context, t *partitionedTable, r []types.Datum, partitionSelection map[int64]struct{}, opts []table.AddRecordOption) (recordID kv.Handle, err error) { +func partitionedTableAddRecord(ctx table.MutateContext, t *partitionedTable, r []types.Datum, partitionSelection map[int64]struct{}, opts []table.AddRecordOption) (recordID kv.Handle, err error) { pid, err := t.locatePartition(ctx, r) if err != nil { return nil, errors.Trace(err) @@ -1656,7 +1658,7 @@ func NewPartitionTableWithGivenSets(tbl table.PartitionedTable, partitions map[i } // AddRecord implements the AddRecord method for the table.Table interface. -func (t *partitionTableWithGivenSets) AddRecord(ctx sessionctx.Context, r []types.Datum, opts ...table.AddRecordOption) (recordID kv.Handle, err error) { +func (t *partitionTableWithGivenSets) AddRecord(ctx table.MutateContext, r []types.Datum, opts ...table.AddRecordOption) (recordID kv.Handle, err error) { return partitionedTableAddRecord(ctx, t.partitionedTable, r, t.givenSetPartitions, opts) } @@ -1669,7 +1671,7 @@ func (t *partitionTableWithGivenSets) GetAllPartitionIDs() []int64 { } // RemoveRecord implements table.Table RemoveRecord interface. -func (t *partitionedTable) RemoveRecord(ctx sessionctx.Context, h kv.Handle, r []types.Datum) error { +func (t *partitionedTable) RemoveRecord(ctx table.MutateContext, h kv.Handle, r []types.Datum) error { pid, err := t.locatePartition(ctx, r) if err != nil { return errors.Trace(err) @@ -1709,15 +1711,15 @@ func (t *partitionedTable) GetAllPartitionIDs() []int64 { // UpdateRecord implements table.Table UpdateRecord interface. // `touched` means which columns are really modified, used for secondary indices. // Length of `oldData` and `newData` equals to length of `t.WritableCols()`. -func (t *partitionedTable) UpdateRecord(ctx context.Context, sctx sessionctx.Context, h kv.Handle, currData, newData []types.Datum, touched []bool) error { +func (t *partitionedTable) UpdateRecord(ctx context.Context, sctx table.MutateContext, h kv.Handle, currData, newData []types.Datum, touched []bool) error { return partitionedTableUpdateRecord(ctx, sctx, t, h, currData, newData, touched, nil) } -func (t *partitionTableWithGivenSets) UpdateRecord(ctx context.Context, sctx sessionctx.Context, h kv.Handle, currData, newData []types.Datum, touched []bool) error { +func (t *partitionTableWithGivenSets) UpdateRecord(ctx context.Context, sctx table.MutateContext, h kv.Handle, currData, newData []types.Datum, touched []bool) error { return partitionedTableUpdateRecord(ctx, sctx, t.partitionedTable, h, currData, newData, touched, t.givenSetPartitions) } -func partitionedTableUpdateRecord(gctx context.Context, ctx sessionctx.Context, t *partitionedTable, h kv.Handle, currData, newData []types.Datum, touched []bool, partitionSelection map[int64]struct{}) error { +func partitionedTableUpdateRecord(gctx context.Context, ctx table.MutateContext, t *partitionedTable, h kv.Handle, currData, newData []types.Datum, touched []bool, partitionSelection map[int64]struct{}) error { from, err := t.locatePartition(ctx, currData) if err != nil { return errors.Trace(err) diff --git a/pkg/table/tables/tables.go b/pkg/table/tables/tables.go index e864ef1ce353a..946d23d766041 100644 --- a/pkg/table/tables/tables.go +++ b/pkg/table/tables/tables.go @@ -30,6 +30,7 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/failpoint" + "github.com/pingcap/tidb/pkg/expression" "github.com/pingcap/tidb/pkg/kv" "github.com/pingcap/tidb/pkg/meta" "github.com/pingcap/tidb/pkg/meta/autoid" @@ -363,9 +364,9 @@ func (t *TableCommon) WritableConstraint() []*table.Constraint { } // CheckRowConstraint verify row check constraints. -func (t *TableCommon) CheckRowConstraint(sctx sessionctx.Context, rowToCheck []types.Datum) error { +func (t *TableCommon) CheckRowConstraint(ctx expression.EvalContext, rowToCheck []types.Datum) error { for _, constraint := range t.WritableConstraint() { - ok, isNull, err := constraint.ConstraintExpr.EvalInt(sctx, chunk.MutRowFromDatums(rowToCheck).ToRow()) + ok, isNull, err := constraint.ConstraintExpr.EvalInt(ctx, chunk.MutRowFromDatums(rowToCheck).ToRow()) if err != nil { return err } @@ -408,11 +409,11 @@ func (t *TableCommon) RecordKey(h kv.Handle) kv.Key { // shouldAssert checks if the partition should be in consistent // state and can have assertion. -func (t *TableCommon) shouldAssert(sctx sessionctx.Context) bool { +func (t *TableCommon) shouldAssert(level variable.AssertionLevel) bool { p := t.Meta().Partition if p != nil { // This disables asserting during Reorganize Partition. - switch sctx.GetSessionVars().AssertionLevel { + switch level { case variable.AssertionLevelFast: // Fast option, just skip assertion for all partitions. if p.DDLState != model.StateNone && p.DDLState != model.StatePublic { @@ -434,7 +435,7 @@ func (t *TableCommon) shouldAssert(sctx sessionctx.Context) bool { // UpdateRecord implements table.Table UpdateRecord interface. // `touched` means which columns are really modified, used for secondary indices. // Length of `oldData` and `newData` equals to length of `t.WritableCols()`. -func (t *TableCommon) UpdateRecord(ctx context.Context, sctx sessionctx.Context, h kv.Handle, oldData, newData []types.Datum, touched []bool) error { +func (t *TableCommon) UpdateRecord(ctx context.Context, sctx table.MutateContext, h kv.Handle, oldData, newData []types.Datum, touched []bool) error { txn, err := sctx.Txn(true) if err != nil { return err @@ -459,7 +460,7 @@ func (t *TableCommon) UpdateRecord(ctx context.Context, sctx sessionctx.Context, numColsCap := len(newData) + 1 // +1 for the extra handle column that we may need to append. colIDs = make([]int64, 0, numColsCap) row = make([]types.Datum, 0, numColsCap) - if shouldWriteBinlog(sctx, t.meta) { + if shouldWriteBinlog(sctx.GetSessionVars(), t.meta) { binlogColIDs = make([]int64, 0, numColsCap) binlogOldRow = make([]types.Datum, 0, numColsCap) binlogNewRow = make([]types.Datum, 0, numColsCap) @@ -473,7 +474,7 @@ func (t *TableCommon) UpdateRecord(ctx context.Context, sctx sessionctx.Context, if col.State == model.StateDeleteOnly || col.State == model.StateDeleteReorganization { if col.ChangeStateInfo != nil { // TODO: Check overflow or ignoreTruncate. - value, err = table.CastValue(sctx, oldData[col.DependencyColumnOffset], col.ColumnInfo, false, false) + value, err = table.CastColumnValue(sctx.GetSessionVars(), oldData[col.DependencyColumnOffset], col.ColumnInfo, false, false) if err != nil { logutil.BgLogger().Info("update record cast value failed", zap.Any("col", col), zap.Uint64("txnStartTS", txn.StartTS()), zap.String("handle", h.String()), zap.Any("val", oldData[col.DependencyColumnOffset]), zap.Error(err)) @@ -485,7 +486,7 @@ func (t *TableCommon) UpdateRecord(ctx context.Context, sctx sessionctx.Context, if needChecksum { if col.ChangeStateInfo != nil { // TODO: Check overflow or ignoreTruncate. - v, err := table.CastValue(sctx, newData[col.DependencyColumnOffset], col.ColumnInfo, false, false) + v, err := table.CastColumnValue(sctx.GetSessionVars(), newData[col.DependencyColumnOffset], col.ColumnInfo, false, false) if err != nil { return err } @@ -507,7 +508,7 @@ func (t *TableCommon) UpdateRecord(ctx context.Context, sctx sessionctx.Context, value = oldData[col.Offset] if col.ChangeStateInfo != nil { // TODO: Check overflow or ignoreTruncate. - value, err = table.CastValue(sctx, newData[col.DependencyColumnOffset], col.ColumnInfo, false, false) + value, err = table.CastColumnValue(sctx.GetSessionVars(), newData[col.DependencyColumnOffset], col.ColumnInfo, false, false) if err != nil { return err } @@ -526,7 +527,7 @@ func (t *TableCommon) UpdateRecord(ctx context.Context, sctx sessionctx.Context, row = append(row, value) } rowToCheck = append(rowToCheck, value) - if shouldWriteBinlog(sctx, t.meta) && !t.canSkipUpdateBinlog(col, value) { + if shouldWriteBinlog(sctx.GetSessionVars(), t.meta) && !t.canSkipUpdateBinlog(col, value) { binlogColIDs = append(binlogColIDs, col.ID) binlogOldRow = append(binlogOldRow, oldData[col.Offset]) binlogNewRow = append(binlogNewRow, value) @@ -582,7 +583,7 @@ func (t *TableCommon) UpdateRecord(ctx context.Context, sctx sessionctx.Context, } }) - if t.shouldAssert(sctx) { + if t.shouldAssert(sessVars.AssertionLevel) { err = txn.SetAssertion(key, kv.SetAssertExist) } else { err = txn.SetAssertion(key, kv.SetAssertUnknown) @@ -601,7 +602,7 @@ func (t *TableCommon) UpdateRecord(ctx context.Context, sctx sessionctx.Context, } memBuffer.Release(sh) - if shouldWriteBinlog(sctx, t.meta) { + if shouldWriteBinlog(sctx.GetSessionVars(), t.meta) { if !t.meta.PKIsHandle && !t.meta.IsCommonHandle { binlogColIDs = append(binlogColIDs, model.ExtraHandleID) binlogOldRow = append(binlogOldRow, types.NewIntDatum(h.IntValue())) @@ -630,7 +631,7 @@ func (t *TableCommon) UpdateRecord(ctx context.Context, sctx sessionctx.Context, return nil } -func (t *TableCommon) rebuildIndices(ctx sessionctx.Context, txn kv.Transaction, h kv.Handle, touched []bool, oldData []types.Datum, newData []types.Datum, opts ...table.CreateIdxOptFunc) error { +func (t *TableCommon) rebuildIndices(ctx table.MutateContext, txn kv.Transaction, h kv.Handle, touched []bool, oldData []types.Datum, newData []types.Datum, opts ...table.CreateIdxOptFunc) error { for _, idx := range t.deletableIndices() { if t.meta.IsCommonHandle && idx.Meta().Primary { continue @@ -786,7 +787,7 @@ func TryGetCommonPkColumns(tbl table.Table) []*table.Column { return pkCols } -func addTemporaryTable(sctx sessionctx.Context, tblInfo *model.TableInfo) tableutil.TempTable { +func addTemporaryTable(sctx table.MutateContext, tblInfo *model.TableInfo) tableutil.TempTable { tempTable := sctx.GetSessionVars().GetTemporaryTable(tblInfo) tempTable.SetModified(true) return tempTable @@ -802,7 +803,7 @@ func handleTempTableSize(t tableutil.TempTable, txnSizeBefore int, txn kv.Transa t.SetSize(newSize) } -func checkTempTableSize(ctx sessionctx.Context, tmpTable tableutil.TempTable, tblInfo *model.TableInfo) error { +func checkTempTableSize(ctx table.MutateContext, tmpTable tableutil.TempTable, tblInfo *model.TableInfo) error { tmpTableSize := tmpTable.GetSize() if tempTableData := ctx.GetSessionVars().TemporaryTableData; tempTableData != nil { tmpTableSize += tempTableData.GetTableSize(tblInfo.ID) @@ -816,7 +817,7 @@ func checkTempTableSize(ctx sessionctx.Context, tmpTable tableutil.TempTable, tb } // AddRecord implements table.Table AddRecord interface. -func (t *TableCommon) AddRecord(sctx sessionctx.Context, r []types.Datum, opts ...table.AddRecordOption) (recordID kv.Handle, err error) { +func (t *TableCommon) AddRecord(sctx table.MutateContext, r []types.Datum, opts ...table.AddRecordOption) (recordID kv.Handle, err error) { txn, err := sctx.Txn(true) if err != nil { return nil, err @@ -886,8 +887,9 @@ func (t *TableCommon) AddRecord(sctx sessionctx.Context, r []types.Datum, opts . // The reserved ID could be used in the future within this statement, by the // following AddRecord() operation. // Make the IDs continuous benefit for the performance of TiKV. - stmtCtx := sctx.GetSessionVars().StmtCtx - stmtCtx.BaseRowID, stmtCtx.MaxRowID, err = allocHandleIDs(ctx, sctx, t, uint64(opt.ReserveAutoID)) + sessVars := sctx.GetSessionVars() + stmtCtx := sessVars.StmtCtx + stmtCtx.BaseRowID, stmtCtx.MaxRowID, err = allocHandleIDs(ctx, sessVars, sctx, t, uint64(opt.ReserveAutoID)) if err != nil { return nil, err } @@ -923,7 +925,7 @@ func (t *TableCommon) AddRecord(sctx sessionctx.Context, r []types.Datum, opts . if needChecksum { if col.ChangeStateInfo != nil { // TODO: Check overflow or ignoreTruncate. - v, err := table.CastValue(sctx, r[col.DependencyColumnOffset], col.ColumnInfo, false, false) + v, err := table.CastColumnValue(sctx.GetSessionVars(), r[col.DependencyColumnOffset], col.ColumnInfo, false, false) if err != nil { return nil, err } @@ -942,7 +944,7 @@ func (t *TableCommon) AddRecord(sctx sessionctx.Context, r []types.Datum, opts . // for the new insert statement, we should use the casted value of relative column to insert. if col.ChangeStateInfo != nil && col.State != model.StatePublic { // TODO: Check overflow or ignoreTruncate. - value, err = table.CastValue(sctx, r[col.DependencyColumnOffset], col.ColumnInfo, false, false) + value, err = table.CastColumnValue(sctx.GetSessionVars(), r[col.DependencyColumnOffset], col.ColumnInfo, false, false) if err != nil { return nil, err } @@ -1093,7 +1095,7 @@ func (t *TableCommon) AddRecord(sctx sessionctx.Context, r []types.Datum, opts . memBuffer.Release(sh) - if shouldWriteBinlog(sctx, t.meta) { + if shouldWriteBinlog(sctx.GetSessionVars(), t.meta) { // For insert, TiDB and Binlog can use same row and schema. binlogRow = row binlogColIDs = colIDs @@ -1142,7 +1144,7 @@ func genIndexKeyStr(colVals []types.Datum) (string, error) { } // addIndices adds data into indices. If any key is duplicated, returns the original handle. -func (t *TableCommon) addIndices(sctx sessionctx.Context, recordID kv.Handle, r []types.Datum, txn kv.Transaction, opts []table.CreateIdxOptFunc) (kv.Handle, error) { +func (t *TableCommon) addIndices(sctx table.MutateContext, recordID kv.Handle, r []types.Datum, txn kv.Transaction, opts []table.CreateIdxOptFunc) (kv.Handle, error) { writeBufs := sctx.GetSessionVars().GetWriteStmtBufs() indexVals := writeBufs.IndexValsBuf skipCheck := sctx.GetSessionVars().StmtCtx.BatchCheck @@ -1308,7 +1310,7 @@ func GetChangingColVal(ctx sessionctx.Context, cols []*table.Column, col *table. } // RemoveRecord implements table.Table RemoveRecord interface. -func (t *TableCommon) RemoveRecord(ctx sessionctx.Context, h kv.Handle, r []types.Datum) error { +func (t *TableCommon) RemoveRecord(ctx table.MutateContext, h kv.Handle, r []types.Datum) error { txn, err := ctx.Txn(true) if err != nil { return err @@ -1339,7 +1341,7 @@ func (t *TableCommon) RemoveRecord(ctx sessionctx.Context, h kv.Handle, r []type // The changing column datum derived from related column should be casted here. // Otherwise, the existed changing indexes will not be deleted. relatedColDatum := r[t.Columns[len(r)].ChangeStateInfo.DependencyColumnOffset] - value, err := table.CastValue(ctx, relatedColDatum, t.Columns[len(r)].ColumnInfo, false, false) + value, err := table.CastColumnValue(ctx.GetSessionVars(), relatedColDatum, t.Columns[len(r)].ColumnInfo, false, false) if err != nil { logutil.BgLogger().Info("remove record cast value failed", zap.Any("col", t.Columns[len(r)]), zap.String("handle", h.String()), zap.Any("val", relatedColDatum), zap.Error(err)) @@ -1364,7 +1366,7 @@ func (t *TableCommon) RemoveRecord(ctx sessionctx.Context, h kv.Handle, r []type } memBuffer.Release(sh) - if shouldWriteBinlog(ctx, t.meta) { + if shouldWriteBinlog(ctx.GetSessionVars(), t.meta) { cols := t.Cols() colIDs := make([]int64, 0, len(cols)+1) for _, col := range cols { @@ -1400,7 +1402,7 @@ func (t *TableCommon) RemoveRecord(ctx sessionctx.Context, h kv.Handle, r []type return err } -func (t *TableCommon) addInsertBinlog(ctx sessionctx.Context, h kv.Handle, row []types.Datum, colIDs []int64) error { +func (t *TableCommon) addInsertBinlog(ctx table.MutateContext, h kv.Handle, row []types.Datum, colIDs []int64) error { mutation := t.getMutation(ctx) handleData, err := h.Data() if err != nil { @@ -1422,7 +1424,7 @@ func (t *TableCommon) addInsertBinlog(ctx sessionctx.Context, h kv.Handle, row [ return nil } -func (t *TableCommon) addUpdateBinlog(ctx sessionctx.Context, oldRow, newRow []types.Datum, colIDs []int64) error { +func (t *TableCommon) addUpdateBinlog(ctx table.MutateContext, oldRow, newRow []types.Datum, colIDs []int64) error { old, err := tablecodec.EncodeOldRow(ctx.GetSessionVars().StmtCtx.TimeZone(), oldRow, colIDs, nil, nil) err = ctx.GetSessionVars().StmtCtx.HandleError(err) if err != nil { @@ -1440,7 +1442,7 @@ func (t *TableCommon) addUpdateBinlog(ctx sessionctx.Context, oldRow, newRow []t return nil } -func (t *TableCommon) addDeleteBinlog(ctx sessionctx.Context, r []types.Datum, colIDs []int64) error { +func (t *TableCommon) addDeleteBinlog(ctx table.MutateContext, r []types.Datum, colIDs []int64) error { data, err := tablecodec.EncodeOldRow(ctx.GetSessionVars().StmtCtx.TimeZone(), r, colIDs, nil, nil) err = ctx.GetSessionVars().StmtCtx.HandleError(err) if err != nil { @@ -1476,7 +1478,7 @@ func writeSequenceUpdateValueBinlog(sctx sessionctx.Context, db, sequence string return err } -func (t *TableCommon) removeRowData(ctx sessionctx.Context, h kv.Handle) error { +func (t *TableCommon) removeRowData(ctx table.MutateContext, h kv.Handle) error { // Remove row data. txn, err := ctx.Txn(true) if err != nil { @@ -1495,7 +1497,7 @@ func (t *TableCommon) removeRowData(ctx sessionctx.Context, h kv.Handle) error { } } }) - if t.shouldAssert(ctx) { + if t.shouldAssert(ctx.GetSessionVars().AssertionLevel) { err = txn.SetAssertion(key, kv.SetAssertExist) } else { err = txn.SetAssertion(key, kv.SetAssertUnknown) @@ -1507,7 +1509,7 @@ func (t *TableCommon) removeRowData(ctx sessionctx.Context, h kv.Handle) error { } // removeRowIndices removes all the indices of a row. -func (t *TableCommon) removeRowIndices(ctx sessionctx.Context, h kv.Handle, rec []types.Datum) error { +func (t *TableCommon) removeRowIndices(ctx table.MutateContext, h kv.Handle, rec []types.Datum) error { txn, err := ctx.Txn(true) if err != nil { return err @@ -1535,12 +1537,12 @@ func (t *TableCommon) removeRowIndices(ctx sessionctx.Context, h kv.Handle, rec } // removeRowIndex implements table.Table RemoveRowIndex interface. -func (t *TableCommon) removeRowIndex(ctx sessionctx.Context, h kv.Handle, vals []types.Datum, idx table.Index, txn kv.Transaction) error { +func (t *TableCommon) removeRowIndex(ctx table.MutateContext, h kv.Handle, vals []types.Datum, idx table.Index, txn kv.Transaction) error { return idx.Delete(ctx, txn, vals, h) } // buildIndexForRow implements table.Table BuildIndexForRow interface. -func (t *TableCommon) buildIndexForRow(ctx sessionctx.Context, h kv.Handle, vals []types.Datum, newData []types.Datum, idx table.Index, txn kv.Transaction, untouched bool, popts ...table.CreateIdxOptFunc) error { +func (t *TableCommon) buildIndexForRow(ctx table.MutateContext, h kv.Handle, vals []types.Datum, newData []types.Datum, idx table.Index, txn kv.Transaction, untouched bool, popts ...table.CreateIdxOptFunc) error { var opts []table.CreateIdxOptFunc opts = append(opts, popts...) if untouched { @@ -1683,9 +1685,11 @@ func GetColDefaultValue(ctx sessionctx.Context, col *table.Column, defaultVals [ // AllocHandle allocate a new handle. // A statement could reserve some ID in the statement context, try those ones first. -func AllocHandle(ctx context.Context, sctx sessionctx.Context, t table.Table) (kv.Handle, error) { - if sctx != nil { - if stmtCtx := sctx.GetSessionVars().StmtCtx; stmtCtx != nil { +func AllocHandle(ctx context.Context, mctx table.MutateContext, t table.Table) (kv.Handle, error) { + var actx table.AllocatorContext + if mctx != nil { + actx = mctx.GetSessionVars() + if stmtCtx := mctx.GetSessionVars().StmtCtx; stmtCtx != nil { // First try to alloc if the statement has reserved auto ID. if stmtCtx.BaseRowID < stmtCtx.MaxRowID { stmtCtx.BaseRowID++ @@ -1694,13 +1698,13 @@ func AllocHandle(ctx context.Context, sctx sessionctx.Context, t table.Table) (k } } - _, rowID, err := allocHandleIDs(ctx, sctx, t, 1) + _, rowID, err := allocHandleIDs(ctx, actx, mctx, t, 1) return kv.IntHandle(rowID), err } -func allocHandleIDs(ctx context.Context, sctx sessionctx.Context, t table.Table, n uint64) (int64, int64, error) { +func allocHandleIDs(ctx context.Context, actx table.AllocatorContext, mctx table.MutateContext, t table.Table, n uint64) (int64, int64, error) { meta := t.Meta() - base, maxID, err := t.Allocators(sctx).Get(autoid.RowIDAllocType).Alloc(ctx, n, 1, 1) + base, maxID, err := t.Allocators(actx).Get(autoid.RowIDAllocType).Alloc(ctx, n, 1, 1) if err != nil { return 0, 0, err } @@ -1717,7 +1721,7 @@ func allocHandleIDs(ctx context.Context, sctx sessionctx.Context, t table.Table, // shard = 0010000000000000000000000000000000000000000000000000000000000000 return 0, 0, autoid.ErrAutoincReadFailed } - shard := sctx.GetSessionVars().GetCurrentShard(int(n)) + shard := mctx.GetSessionVars().GetCurrentShard(int(n)) base = shardFmt.Compose(shard, base) maxID = shardFmt.Compose(shard, maxID) } @@ -1735,15 +1739,17 @@ func OverflowShardBits(recordID int64, shardRowIDBits uint64, typeBitsLength uin } // Allocators implements table.Table Allocators interface. -func (t *TableCommon) Allocators(ctx sessionctx.Context) autoid.Allocators { +func (t *TableCommon) Allocators(ctx table.AllocatorContext) autoid.Allocators { if ctx == nil { return t.allocs } // Use an independent allocator for global temporary tables. if t.meta.TempTableType == model.TempTableGlobal { - if alloc := ctx.GetSessionVars().GetTemporaryTable(t.meta).GetAutoIDAllocator(); alloc != nil { - return autoid.NewAllocators(false, alloc) + if tbl := ctx.GetTemporaryTable(t.meta); tbl != nil { + if alloc := tbl.GetAutoIDAllocator(); alloc != nil { + return autoid.NewAllocators(false, alloc) + } } // If the session is not in a txn, for example, in "show create table", use the original allocator. // Otherwise the would be a nil pointer dereference. @@ -1756,38 +1762,38 @@ func (t *TableCommon) Type() table.Type { return table.NormalTable } -func shouldWriteBinlog(ctx sessionctx.Context, tblInfo *model.TableInfo) bool { +func shouldWriteBinlog(vars *variable.SessionVars, tblInfo *model.TableInfo) bool { failpoint.Inject("forceWriteBinlog", func() { // Just to cover binlog related code in this package, since the `BinlogClient` is // still nil, mutations won't be written to pump on commit. failpoint.Return(true) }) - if ctx.GetSessionVars().BinlogClient == nil { + if vars.BinlogClient == nil { return false } if tblInfo.TempTableType != model.TempTableNone { return false } - return !ctx.GetSessionVars().InRestrictedSQL + return !vars.InRestrictedSQL } func shouldIncreaseTTLMetricCount(tblInfo *model.TableInfo) bool { return tblInfo.TTLInfo != nil } -func (t *TableCommon) getMutation(ctx sessionctx.Context) *binlog.TableMutation { +func (t *TableCommon) getMutation(ctx table.MutateContext) *binlog.TableMutation { return ctx.StmtGetMutation(t.tableID) } // initChecksumData allocates data for checksum calculation, returns nil if checksum is disabled or unavailable. The // length of returned data can be considered as the number of checksums we need to write. -func (t *TableCommon) initChecksumData(sctx sessionctx.Context, h kv.Handle) [][]rowcodec.ColData { +func (t *TableCommon) initChecksumData(sctx table.MutateContext, h kv.Handle) [][]rowcodec.ColData { if !sctx.GetSessionVars().IsRowLevelChecksumEnabled() { return nil } numNonPubCols := len(t.Columns) - len(t.Cols()) if numNonPubCols > 1 { - logWithContext(sctx, logutil.BgLogger().Warn, + logWithContext(sctx.GetSessionVars(), logutil.BgLogger().Warn, "skip checksum since the number of non-public columns is greater than 1", zap.Stringer("key", t.RecordKey(h)), zap.Int64("tblID", t.meta.ID), zap.Any("cols", t.meta.Columns)) return nil @@ -1799,7 +1805,7 @@ func (t *TableCommon) initChecksumData(sctx sessionctx.Context, h kv.Handle) [][ // and it will be reset for each col, so do NOT pass a buf that contains data you may use later. If the capacity of // `buf` is enough, it gets returned directly, otherwise a new bytes with larger capacity will be returned, and you can // hold the returned buf for later use (to avoid memory allocation). -func (t *TableCommon) calcChecksums(sctx sessionctx.Context, h kv.Handle, data [][]rowcodec.ColData, buf []byte) ([]uint32, []byte) { +func (t *TableCommon) calcChecksums(sctx table.MutateContext, h kv.Handle, data [][]rowcodec.ColData, buf []byte) ([]uint32, []byte) { if len(data) == 0 { return nil, buf } @@ -1812,7 +1818,7 @@ func (t *TableCommon) calcChecksums(sctx sessionctx.Context, h kv.Handle, data [ checksum, err := row.Checksum() buf = row.Data if err != nil { - logWithContext(sctx, logutil.BgLogger().Error, + logWithContext(sctx.GetSessionVars(), logutil.BgLogger().Error, "skip checksum due to encode error", zap.Stringer("key", t.RecordKey(h)), zap.Int64("tblID", t.meta.ID), zap.Error(err)) return nil, buf @@ -1825,13 +1831,13 @@ func (t *TableCommon) calcChecksums(sctx sessionctx.Context, h kv.Handle, data [ // appendPublicColForChecksum appends a public column data for checksum. If the column is in changing, that is, it's the // old column of an on-going modify-column ddl, then skip it since it will be handle by `appendInChangeColForChecksum`. func (t *TableCommon) appendPublicColForChecksum( - sctx sessionctx.Context, h kv.Handle, data [][]rowcodec.ColData, c *model.ColumnInfo, d *types.Datum, + sctx table.MutateContext, h kv.Handle, data [][]rowcodec.ColData, c *model.ColumnInfo, d *types.Datum, ) [][]rowcodec.ColData { if len(data) == 0 { // no need for checksum return nil } if c.State != model.StatePublic { // assert col is public - logWithContext(sctx, logutil.BgLogger().Error, + logWithContext(sctx.GetSessionVars(), logutil.BgLogger().Error, "skip checksum due to inconsistent column state", zap.Stringer("key", t.RecordKey(h)), zap.Int64("tblID", t.meta.ID), zap.Any("col", c)) return nil @@ -1856,18 +1862,18 @@ func (t *TableCommon) appendPublicColForChecksum( // value of this column. The extra checksum shall be calculated without this non-public column, thus nothing to do with // data[1]. func (t *TableCommon) appendNonPublicColForChecksum( - sctx sessionctx.Context, h kv.Handle, data [][]rowcodec.ColData, c *model.ColumnInfo, d *types.Datum, + sctx table.MutateContext, h kv.Handle, data [][]rowcodec.ColData, c *model.ColumnInfo, d *types.Datum, ) [][]rowcodec.ColData { if size := len(data); size == 0 { // no need for checksum return nil } else if size == 1 { // assert that 2 checksums are required - logWithContext(sctx, logutil.BgLogger().Error, + logWithContext(sctx.GetSessionVars(), logutil.BgLogger().Error, "skip checksum due to inconsistent length of column data", zap.Stringer("key", t.RecordKey(h)), zap.Int64("tblID", t.meta.ID)) return nil } if c.State == model.StatePublic || c.ChangeStateInfo != nil { // assert col is not public and is not in changing - logWithContext(sctx, logutil.BgLogger().Error, + logWithContext(sctx.GetSessionVars(), logutil.BgLogger().Error, "skip checksum due to inconsistent column state", zap.Stringer("key", t.RecordKey(h)), zap.Int64("tblID", t.meta.ID), zap.Any("col", c)) return nil @@ -1881,18 +1887,18 @@ func (t *TableCommon) appendNonPublicColForChecksum( // there is a non-public column. The first checksum should be calculate with the old version of this column and the extra // checksum should be calculated with the new version of column. func (t *TableCommon) appendInChangeColForChecksum( - sctx sessionctx.Context, h kv.Handle, data [][]rowcodec.ColData, c *model.ColumnInfo, oldVal *types.Datum, newVal *types.Datum, + sctx table.MutateContext, h kv.Handle, data [][]rowcodec.ColData, c *model.ColumnInfo, oldVal *types.Datum, newVal *types.Datum, ) [][]rowcodec.ColData { if size := len(data); size == 0 { // no need for checksum return nil } else if size == 1 { // assert that 2 checksums are required - logWithContext(sctx, logutil.BgLogger().Error, + logWithContext(sctx.GetSessionVars(), logutil.BgLogger().Error, "skip checksum due to inconsistent length of column data", zap.Stringer("key", t.RecordKey(h)), zap.Int64("tblID", t.meta.ID)) return nil } if c.State == model.StatePublic || c.ChangeStateInfo == nil { // assert col is not public and is in changing - logWithContext(sctx, logutil.BgLogger().Error, + logWithContext(sctx.GetSessionVars(), logutil.BgLogger().Error, "skip checksum due to inconsistent column state", zap.Stringer("key", t.RecordKey(h)), zap.Int64("tblID", t.meta.ID), zap.Any("col", c)) return nil @@ -1915,8 +1921,7 @@ func appendColForChecksum(dst []rowcodec.ColData, t *TableCommon, c *model.Colum return append(dst, rowcodec.ColData{ColumnInfo: c, Datum: d}) } -func logWithContext(sctx sessionctx.Context, log func(msg string, fields ...zap.Field), msg string, fields ...zap.Field) { - sessVars := sctx.GetSessionVars() +func logWithContext(sessVars *variable.SessionVars, log func(msg string, fields ...zap.Field), msg string, fields ...zap.Field) { ctxFields := make([]zap.Field, 0, len(fields)+2) ctxFields = append(ctxFields, zap.Uint64("conn", sessVars.ConnectionID)) if sessVars.TxnCtx != nil { diff --git a/pkg/table/tables/tables_test.go b/pkg/table/tables/tables_test.go index 43d8dfe92d53a..12e939830a502 100644 --- a/pkg/table/tables/tables_test.go +++ b/pkg/table/tables/tables_test.go @@ -404,7 +404,7 @@ func TestTableFromMeta(t *testing.T) { require.NoError(t, err) maxID := 1<<(64-15-1) - 1 - err = tb.Allocators(tk.Session()).Get(autoid.RowIDAllocType).Rebase(context.Background(), int64(maxID), false) + err = tb.Allocators(tk.Session().GetSessionVars()).Get(autoid.RowIDAllocType).Rebase(context.Background(), int64(maxID), false) require.NoError(t, err) _, err = tables.AllocHandle(context.Background(), tk.Session(), tb) diff --git a/pkg/util/ranger/types.go b/pkg/util/ranger/types.go index f6eee7abdf774..d55e14c04d2b2 100644 --- a/pkg/util/ranger/types.go +++ b/pkg/util/ranger/types.go @@ -96,10 +96,10 @@ func (ran *Range) Clone() *Range { // IsPoint returns if the range is a point. func (ran *Range) IsPoint(sctx sessionctx.Context) bool { - return ran.isPoint(sctx.GetSessionVars().StmtCtx, sctx.GetSessionVars().RegardNULLAsPoint) + return ran.isPoint(sctx.GetSessionVars().StmtCtx.TypeCtx(), sctx.GetSessionVars().RegardNULLAsPoint) } -func (ran *Range) isPoint(stmtCtx *stmtctx.StatementContext, regardNullAsPoint bool) bool { +func (ran *Range) isPoint(tc types.Context, regardNullAsPoint bool) bool { if len(ran.LowVal) != len(ran.HighVal) { return false } @@ -109,7 +109,7 @@ func (ran *Range) isPoint(stmtCtx *stmtctx.StatementContext, regardNullAsPoint b if a.Kind() == types.KindMinNotNull || b.Kind() == types.KindMaxValue { return false } - cmp, err := a.Compare(stmtCtx.TypeCtx(), &b, ran.Collators[i]) + cmp, err := a.Compare(tc, &b, ran.Collators[i]) if err != nil { return false } @@ -127,14 +127,14 @@ func (ran *Range) isPoint(stmtCtx *stmtctx.StatementContext, regardNullAsPoint b } // IsPointNonNullable returns if the range is a point without NULL. -func (ran *Range) IsPointNonNullable(sctx sessionctx.Context) bool { - return ran.isPoint(sctx.GetSessionVars().StmtCtx, false) +func (ran *Range) IsPointNonNullable(tc types.Context) bool { + return ran.isPoint(tc, false) } // IsPointNullable returns if the range is a point. // TODO: unify the parameter type with IsPointNullable and IsPoint -func (ran *Range) IsPointNullable(sctx sessionctx.Context) bool { - return ran.isPoint(sctx.GetSessionVars().StmtCtx, true) +func (ran *Range) IsPointNullable(tc types.Context) bool { + return ran.isPoint(tc, true) } // IsFullRange check if the range is full scan range