From 93a49ca0612220d34446c6af69a48aef5e96465f Mon Sep 17 00:00:00 2001 From: djshow832 <873581766@qq.com> Date: Sun, 9 May 2021 21:48:02 +0800 Subject: [PATCH] in memory allocator --- executor/insert_test.go | 117 +++++++++++++++++++++++++++++++++ sessionctx/variable/session.go | 24 ++++++- table/tables/tables.go | 73 ++++++++++++++++---- util/tableutil/tableutil.go | 40 +++++++++++ 4 files changed, 240 insertions(+), 14 deletions(-) create mode 100644 util/tableutil/tableutil.go diff --git a/executor/insert_test.go b/executor/insert_test.go index bee38e51c0fea..ffcfdc214bdb9 100644 --- a/executor/insert_test.go +++ b/executor/insert_test.go @@ -1590,3 +1590,120 @@ func (s *testSuite10) TestBinaryLiteralInsertToSet(c *C) { tk.MustExec("insert into bintest(h) values(0x61)") tk.MustQuery("select * from bintest").Check(testkit.Rows("a")) } + +var _ = SerialSuites(&testSuite13{&baseTestSuite{}}) + +type testSuite13 struct { + *baseTestSuite +} + +func (s *testSuite13) TestGlobalTempTableAutoInc(c *C) { + tk := testkit.NewTestKit(c, s.store) + tk.MustExec(`use test`) + tk.MustExec("drop table if exists temp_test") + tk.MustExec("create global temporary table temp_test(id int primary key auto_increment) on commit delete rows") + defer tk.MustExec("drop table if exists temp_test") + + // Data is cleared after transaction auto commits. + tk.MustExec("insert into temp_test(id) values(0)") + tk.MustQuery("select * from temp_test").Check(testkit.Rows()) + + // Data is not cleared inside a transaction. + tk.MustExec("begin") + tk.MustExec("insert into temp_test(id) values(0)") + tk.MustQuery("select * from temp_test").Check(testkit.Rows("1")) + tk.MustExec("commit") + + // AutoID allocator is cleared. + tk.MustExec("begin") + tk.MustExec("insert into temp_test(id) values(0)") + tk.MustQuery("select * from temp_test").Check(testkit.Rows("1")) + // Test whether auto-inc is incremental + tk.MustExec("insert into temp_test(id) values(0)") + tk.MustQuery("select id from temp_test order by id").Check(testkit.Rows("1", "2")) + tk.MustExec("commit") + + // multi-value insert + tk.MustExec("begin") + tk.MustExec("insert into temp_test(id) values(0), (0)") + tk.MustQuery("select id from temp_test order by id").Check(testkit.Rows("1", "2")) + tk.MustExec("insert into temp_test(id) values(0), (0)") + tk.MustQuery("select id from temp_test order by id").Check(testkit.Rows("1", "2", "3", "4")) + tk.MustExec("commit") + + // rebase + tk.MustExec("begin") + tk.MustExec("insert into temp_test(id) values(10)") + tk.MustExec("insert into temp_test(id) values(0)") + tk.MustQuery("select id from temp_test order by id").Check(testkit.Rows("10", "11")) + tk.MustExec("insert into temp_test(id) values(20), (30)") + tk.MustExec("insert into temp_test(id) values(0), (0)") + tk.MustQuery("select id from temp_test order by id").Check(testkit.Rows("10", "11", "20", "30", "31", "32")) + tk.MustExec("commit") +} + +func (s *testSuite13) TestGlobalTempTableRowID(c *C) { + tk := testkit.NewTestKit(c, s.store) + tk.MustExec(`use test`) + tk.MustExec("drop table if exists temp_test") + tk.MustExec("create global temporary table temp_test(id int) on commit delete rows") + defer tk.MustExec("drop table if exists temp_test") + + // Data is cleared after transaction auto commits. + tk.MustExec("insert into temp_test(id) values(0)") + tk.MustQuery("select _tidb_rowid from temp_test").Check(testkit.Rows()) + + // Data is not cleared inside a transaction. + tk.MustExec("begin") + tk.MustExec("insert into temp_test(id) values(0)") + tk.MustQuery("select _tidb_rowid from temp_test").Check(testkit.Rows("1")) + tk.MustExec("commit") + + // AutoID allocator is cleared. + tk.MustExec("begin") + tk.MustExec("insert into temp_test(id) values(0)") + tk.MustQuery("select _tidb_rowid from temp_test").Check(testkit.Rows("1")) + // Test whether row id is incremental + tk.MustExec("insert into temp_test(id) values(0)") + tk.MustQuery("select _tidb_rowid from temp_test order by _tidb_rowid").Check(testkit.Rows("1", "2")) + tk.MustExec("commit") + + // multi-value insert + tk.MustExec("begin") + tk.MustExec("insert into temp_test(id) values(0), (0)") + tk.MustQuery("select _tidb_rowid from temp_test order by _tidb_rowid").Check(testkit.Rows("1", "2")) + tk.MustExec("insert into temp_test(id) values(0), (0)") + tk.MustQuery("select _tidb_rowid from temp_test order by _tidb_rowid").Check(testkit.Rows("1", "2", "3", "4")) + tk.MustExec("commit") +} + +func (s *testSuite13) TestGlobalTempTableParallel(c *C) { + tk := testkit.NewTestKit(c, s.store) + tk.MustExec(`use test`) + tk.MustExec("drop table if exists temp_test") + tk.MustExec("create global temporary table temp_test(id int primary key auto_increment) on commit delete rows") + defer tk.MustExec("drop table if exists temp_test") + + threads := 8 + loops := 1 + wg := sync.WaitGroup{} + wg.Add(threads) + + insertFunc := func() { + defer wg.Done() + newTk := testkit.NewTestKitWithInit(c, s.store) + newTk.MustExec("begin") + for i := 0; i < loops; i++ { + newTk.MustExec("insert temp_test value(0)") + newTk.MustExec("insert temp_test value(0), (0)") + } + maxID := strconv.Itoa(loops * 3) + newTk.MustQuery("select max(id) from temp_test").Check(testkit.Rows(maxID)) + newTk.MustExec("commit") + } + + for i := 0; i < threads; i++ { + go insertFunc() + } + wg.Wait() +} diff --git a/sessionctx/variable/session.go b/sessionctx/variable/session.go index 618120b5da6e6..34819491b7081 100644 --- a/sessionctx/variable/session.go +++ b/sessionctx/variable/session.go @@ -32,6 +32,7 @@ import ( "github.com/pingcap/parser" "github.com/pingcap/parser/ast" "github.com/pingcap/parser/auth" + "github.com/pingcap/parser/model" "github.com/pingcap/parser/mysql" "github.com/pingcap/parser/terror" pumpcli "github.com/pingcap/tidb-tools/tidb-binlog/pump_client" @@ -49,6 +50,7 @@ import ( "github.com/pingcap/tidb/util/logutil" "github.com/pingcap/tidb/util/rowcodec" "github.com/pingcap/tidb/util/stringutil" + "github.com/pingcap/tidb/util/tableutil" "github.com/pingcap/tidb/util/timeutil" "github.com/twmb/murmur3" atomic2 "go.uber.org/atomic" @@ -174,7 +176,9 @@ type TransactionContext struct { // TableDeltaMap lock to prevent potential data race tdmLock sync.Mutex - GlobalTemporaryTables map[int64]struct{} + // GlobalTemporaryTables is used to store transaction-specific information for global temporary tables. + // It can also be stored in sessionCtx with local temporary tables, but it's easier to clean this data after transaction ends. + GlobalTemporaryTables map[int64]tableutil.TempTable } // GetShard returns the shard prefix for the next `count` rowids. @@ -1445,6 +1449,24 @@ func (s *SessionVars) LazyCheckKeyNotExists() bool { return s.PresumeKeyNotExists || (s.TxnCtx.IsPessimistic && !s.StmtCtx.DupKeyAsWarning) } +// GetTemporaryTable returns a TempTable by tableInfo. +func (s *SessionVars) GetTemporaryTable(tblInfo *model.TableInfo) tableutil.TempTable { + if tblInfo.TempTableType == model.TempTableGlobal { + if s.TxnCtx.GlobalTemporaryTables == nil { + s.TxnCtx.GlobalTemporaryTables = make(map[int64]tableutil.TempTable) + } + globalTempTables := s.TxnCtx.GlobalTemporaryTables + globalTempTable, ok := globalTempTables[tblInfo.ID] + if !ok { + globalTempTable = tableutil.TempTableFromMeta(tblInfo) + globalTempTables[tblInfo.ID] = globalTempTable + } + return globalTempTable + } + // TODO: check local temporary tables + return nil +} + // special session variables. const ( SQLModeVar = "sql_mode" diff --git a/table/tables/tables.go b/table/tables/tables.go index 8fd3cca9e2657..74fd2d82f3ef9 100644 --- a/table/tables/tables.go +++ b/table/tables/tables.go @@ -36,6 +36,7 @@ import ( "github.com/pingcap/tidb/sessionctx/binloginfo" "github.com/pingcap/tidb/sessionctx/stmtctx" "github.com/pingcap/tidb/sessionctx/variable" + "github.com/pingcap/tidb/statistics" tikvstore "github.com/pingcap/tidb/store/tikv/kv" "github.com/pingcap/tidb/table" "github.com/pingcap/tidb/tablecodec" @@ -46,6 +47,7 @@ import ( "github.com/pingcap/tidb/util/generatedexpr" "github.com/pingcap/tidb/util/logutil" "github.com/pingcap/tidb/util/stringutil" + "github.com/pingcap/tidb/util/tableutil" "github.com/pingcap/tipb/go-binlog" "github.com/pingcap/tipb/go-tipb" "go.uber.org/zap" @@ -322,8 +324,8 @@ func (t *TableCommon) UpdateRecord(ctx context.Context, sctx sessionctx.Context, sh := memBuffer.Staging() defer memBuffer.Cleanup(sh) - if meta := t.Meta(); meta.TempTableType == model.TempTableGlobal { - addTemporaryTableID(sctx, meta.ID) + if m := t.Meta(); m.TempTableType == model.TempTableGlobal { + addTemporaryTable(sctx, m) } var colIDs, binlogColIDs []int64 @@ -588,12 +590,9 @@ func TryGetCommonPkColumns(tbl table.Table) []*table.Column { return pkCols } -func addTemporaryTableID(sctx sessionctx.Context, id int64) { - txnCtx := sctx.GetSessionVars().TxnCtx - if txnCtx.GlobalTemporaryTables == nil { - txnCtx.GlobalTemporaryTables = make(map[int64]struct{}) - } - txnCtx.GlobalTemporaryTables[id] = struct{}{} +func addTemporaryTable(sctx sessionctx.Context, tblInfo *model.TableInfo) { + tempTable := sctx.GetSessionVars().GetTemporaryTable(tblInfo) + tempTable.SetModified(true) } // AddRecord implements table.Table AddRecord interface. @@ -608,8 +607,8 @@ func (t *TableCommon) AddRecord(sctx sessionctx.Context, r []types.Datum, opts . fn.ApplyOn(&opt) } - if meta := t.Meta(); meta.TempTableType == model.TempTableGlobal { - addTemporaryTableID(sctx, meta.ID) + if m := t.Meta(); m.TempTableType == model.TempTableGlobal { + addTemporaryTable(sctx, m) } var ctx context.Context @@ -1010,8 +1009,8 @@ func (t *TableCommon) RemoveRecord(ctx sessionctx.Context, h kv.Handle, r []type return err } - if meta := t.Meta(); meta.TempTableType == model.TempTableGlobal { - addTemporaryTableID(ctx, meta.ID) + if m := t.Meta(); m.TempTableType == model.TempTableGlobal { + addTemporaryTable(ctx, m) } // The table has non-public column and this column is doing the operation of "modify/change column". @@ -1370,7 +1369,14 @@ func OverflowShardBits(recordID int64, shardRowIDBits uint64, typeBitsLength uin // Allocators implements table.Table Allocators interface. func (t *TableCommon) Allocators(ctx sessionctx.Context) autoid.Allocators { - if ctx == nil || ctx.GetSessionVars().IDAllocator == nil { + if ctx == nil { + return t.allocs + } else if ctx.GetSessionVars().IDAllocator == nil { + // Use an independent allocator for global temporary tables. + if t.meta.TempTableType == model.TempTableGlobal { + alloc := ctx.GetSessionVars().GetTemporaryTable(t.meta).GetAutoIDAllocator() + return autoid.Allocators{alloc} + } return t.allocs } @@ -1498,6 +1504,7 @@ func getDuplicateErrorHandleString(t table.Table, handle kv.Handle, row []types. func init() { table.TableFromMeta = TableFromMeta table.MockTableFromMeta = MockTableFromMeta + tableutil.TempTableFromMeta = TempTableFromMeta } // sequenceCommon cache the sequence value. @@ -1763,3 +1770,43 @@ func BuildTableScanFromInfos(tableInfo *model.TableInfo, columnInfos []*model.Co } return tsExec } + +// TemporaryTable is used to store transaction-specific or session-specific information for global / local temporary tables. +// For example, stats and autoID should have their own copies of data, instead of being shared by all sessions. +type TemporaryTable struct { + // Whether it's modified in this transaction. + modified bool + // The stats of this table. So far it's always pseudo stats. + stats *statistics.Table + // The autoID allocator of this table. + autoIDAllocator autoid.Allocator +} + +// TempTableFromMeta builds a TempTable from model.TableInfo. +func TempTableFromMeta(tblInfo *model.TableInfo) tableutil.TempTable { + return &TemporaryTable{ + modified: false, + stats: statistics.PseudoTable(tblInfo), + autoIDAllocator: autoid.NewAllocatorFromTempTblInfo(tblInfo), + } +} + +// GetAutoIDAllocator is implemented from TempTable.GetAutoIDAllocator. +func (t *TemporaryTable) GetAutoIDAllocator() autoid.Allocator { + return t.autoIDAllocator +} + +// SetModified is implemented from TempTable.SetModified. +func (t *TemporaryTable) SetModified(modified bool) { + t.modified = modified +} + +// GetModified is implemented from TempTable.GetModified. +func (t *TemporaryTable) GetModified() bool { + return t.modified +} + +// GetStats is implemented from TempTable.GetStats. +func (t *TemporaryTable) GetStats() interface{} { + return t.stats +} diff --git a/util/tableutil/tableutil.go b/util/tableutil/tableutil.go new file mode 100644 index 0000000000000..11cbe626dcc56 --- /dev/null +++ b/util/tableutil/tableutil.go @@ -0,0 +1,40 @@ +// Copyright 2021 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package tableutil + +import ( + "github.com/pingcap/parser/model" + "github.com/pingcap/tidb/meta/autoid" +) + +// TempTable is used to store transaction-specific or session-specific information for global / local temporary tables. +// For example, stats and autoID should have their own copies of data, instead of being shared by all sessions. +type TempTable interface { + // GetAutoIDAllocator gets the autoID allocator of this table. + GetAutoIDAllocator() autoid.Allocator + + // SetModified sets that the table is modified. + SetModified(bool) + + // GetModified queries whether the table is modified. + GetModified() bool + + // The stats of this table (*statistics.Table). + // Define the return type as interface{} here to avoid cycle imports. + GetStats() interface{} +} + +// TempTableFromMeta builds a TempTable from *model.TableInfo. +// Currently, it is assigned to tables.TempTableFromMeta in tidb package's init function. +var TempTableFromMeta func(tblInfo *model.TableInfo) TempTable