From 63d2fe6ff0c2aa3f6230eb9aaca176d95dc2d05f Mon Sep 17 00:00:00 2001 From: you06 Date: Tue, 22 Dec 2020 14:11:31 +0800 Subject: [PATCH 1/8] planner: check index valid while forUpdateRead (#21596) Signed-off-by: you06 --- planner/core/logical_plan_builder.go | 27 +++-- planner/core/logical_plans.go | 6 ++ planner/core/planbuilder.go | 80 +++++++++++++- planner/core/point_get_plan.go | 38 ++++++- planner/core/rule_partition_processor.go | 2 +- session/pessimistic_test.go | 130 +++++++++++++++++++++++ 6 files changed, 268 insertions(+), 15 deletions(-) diff --git a/planner/core/logical_plan_builder.go b/planner/core/logical_plan_builder.go index ed4a5336d736c..c650f53de879c 100644 --- a/planner/core/logical_plan_builder.go +++ b/planner/core/logical_plan_builder.go @@ -3290,12 +3290,21 @@ func (b *PlanBuilder) buildSelect(ctx context.Context, sel *ast.SelectStmt) (p L projExprs []expression.Expression ) - // For sub-queries, the FROM clause may have already been built in outer query when resolving correlated aggregates. - // If the ResultSetNode inside FROM clause has nothing to do with correlated aggregates, we can simply get the - // existing ResultSetNode from the cache. - p, err = b.buildTableRefsWithCache(ctx, sel.From) - if err != nil { - return nil, err + // set for update read to true before building result set node + if isForUpdateReadSelectLock(sel.LockInfo.LockType) { + b.isForUpdateRead = true + } + + if sel.From != nil { + // For sub-queries, the FROM clause may have already been built in outer query when resolving correlated aggregates. + // If the ResultSetNode inside FROM clause has nothing to do with correlated aggregates, we can simply get the + // existing ResultSetNode from the cache. + p, err = b.buildTableRefsWithCache(ctx, sel.From) + if err != nil { + return nil, err + } + } else { + p = b.buildTableDual() } originalFields := sel.Fields.Fields @@ -3599,7 +3608,7 @@ func (b *PlanBuilder) buildDataSource(ctx context.Context, tn *ast.TableName, as if tblName.L == "" { tblName = tn.Name } - possiblePaths, err := getPossibleAccessPaths(b.ctx, b.TableHints(), tn.IndexHints, tbl, dbName, tblName) + possiblePaths, err := getPossibleAccessPaths(b.ctx, b.TableHints(), tn.IndexHints, tbl, dbName, tblName, b.isForUpdateRead, b.is.SchemaMetaVersion()) if err != nil { return nil, err } @@ -3696,6 +3705,8 @@ func (b *PlanBuilder) buildDataSource(ctx context.Context, tn *ast.TableName, as partitionNames: tn.PartitionNames, TblCols: make([]*expression.Column, 0, len(columns)), preferPartitions: make(map[int][]model.CIStr), + is: b.is, + isForUpdateRead: b.isForUpdateRead, }.Init(b.ctx, b.getSelectOffset()) var handleCols HandleCols schema := expression.NewSchema(make([]*expression.Column, 0, len(columns))...) @@ -4221,6 +4232,7 @@ func (b *PlanBuilder) buildUpdate(ctx context.Context, update *ast.UpdateStmt) ( } b.inUpdateStmt = true + b.isForUpdateRead = true p, err := b.buildResultSetNode(ctx, update.TableRefs.TableRefs) if err != nil { @@ -4561,6 +4573,7 @@ func (b *PlanBuilder) buildDelete(ctx context.Context, delete *ast.DeleteStmt) ( }() b.inDeleteStmt = true + b.isForUpdateRead = true p, err := b.buildResultSetNode(ctx, delete.TableRefs.TableRefs) if err != nil { diff --git a/planner/core/logical_plans.go b/planner/core/logical_plans.go index 5f0731ce29d40..e16d57fd399b8 100644 --- a/planner/core/logical_plans.go +++ b/planner/core/logical_plans.go @@ -22,6 +22,7 @@ import ( "github.com/pingcap/parser/mysql" "github.com/pingcap/tidb/expression" "github.com/pingcap/tidb/expression/aggregation" + "github.com/pingcap/tidb/infoschema" "github.com/pingcap/tidb/planner/property" "github.com/pingcap/tidb/planner/util" "github.com/pingcap/tidb/statistics" @@ -520,6 +521,11 @@ type DataSource struct { // preferPartitions store the map, the key represents store type, the value represents the partition name list. preferPartitions map[int][]model.CIStr SampleInfo *TableSampleInfo + is infoschema.InfoSchema + // isForUpdateRead should be true in either of the following situations + // 1. use `inside insert`, `update`, `delete` or `select for update` statement + // 2. isolation level is RC + isForUpdateRead bool } // ExtractCorrelatedCols implements LogicalPlan interface. diff --git a/planner/core/planbuilder.go b/planner/core/planbuilder.go index dd7fa788e6cc8..14f2aaa4702c2 100644 --- a/planner/core/planbuilder.go +++ b/planner/core/planbuilder.go @@ -30,6 +30,7 @@ import ( "github.com/pingcap/parser/opcode" "github.com/pingcap/tidb/config" "github.com/pingcap/tidb/ddl" + "github.com/pingcap/tidb/domain" "github.com/pingcap/tidb/expression" "github.com/pingcap/tidb/infoschema" "github.com/pingcap/tidb/kv" @@ -469,6 +470,10 @@ type PlanBuilder struct { // cache ResultSetNodes and HandleHelperMap to avoid rebuilding. cachedResultSetNodes map[*ast.Join]LogicalPlan cachedHandleHelperMap map[*ast.Join]map[int64][]HandleCols + // isForUpdateRead should be true in either of the following situations + // 1. use `inside insert`, `update`, `delete` or `select for update` statement + // 2. isolation level is RC + isForUpdateRead bool } type handleColHelper struct { @@ -582,6 +587,7 @@ func NewPlanBuilder(sctx sessionctx.Context, is infoschema.InfoSchema, processor correlatedAggMapper: make(map[*ast.AggregateFuncExpr]*expression.CorrelatedColumn), cachedResultSetNodes: make(map[*ast.Join]LogicalPlan), cachedHandleHelperMap: make(map[*ast.Join]map[int64][]HandleCols), + isForUpdateRead: sctx.GetSessionVars().IsPessimisticReadConsistency(), }, savedBlockNames } @@ -892,7 +898,31 @@ func fillContentForTablePath(tablePath *util.AccessPath, tblInfo *model.TableInf } } -func getPossibleAccessPaths(ctx sessionctx.Context, tableHints *tableHintInfo, indexHints []*ast.IndexHint, tbl table.Table, dbName, tblName model.CIStr) ([]*util.AccessPath, error) { +func isForUpdateReadSelectLock(lock ast.SelectLockType) bool { + return lock == ast.SelectLockForUpdate || lock == ast.SelectLockForUpdateNoWait +} + +func getLatestIndexInfo(ctx sessionctx.Context, id int64, startVer int64) (map[int64]*model.IndexInfo, bool, error) { + dom := domain.GetDomain(ctx) + if dom == nil { + return nil, false, errors.New("domain not found for ctx") + } + is := dom.InfoSchema() + if is.SchemaMetaVersion() == startVer { + return nil, false, nil + } + latestIndexes := make(map[int64]*model.IndexInfo) + latestTbl, exist := is.TableByID(id) + if exist { + latestTblInfo := latestTbl.Meta() + for _, index := range latestTblInfo.Indices { + latestIndexes[index.ID] = index + } + } + return latestIndexes, true, nil +} + +func getPossibleAccessPaths(ctx sessionctx.Context, tableHints *tableHintInfo, indexHints []*ast.IndexHint, tbl table.Table, dbName, tblName model.CIStr, check bool, startVer int64) ([]*util.AccessPath, error) { tblInfo := tbl.Meta() publicPaths := make([]*util.AccessPath, 0, len(tblInfo.Indices)+2) tp := kv.TiKV @@ -907,6 +937,11 @@ func getPossibleAccessPaths(ctx sessionctx.Context, tableHints *tableHintInfo, i publicPaths = append(publicPaths, genTiFlashPath(tblInfo, true)) } optimizerUseInvisibleIndexes := ctx.GetSessionVars().OptimizerUseInvisibleIndexes + + check = check && ctx.GetSessionVars().ConnectionID > 0 + var latestIndexes map[int64]*model.IndexInfo + var err error + for _, index := range tblInfo.Indices { if index.State == model.StatePublic { // Filter out invisible index, because they are not visible for optimizer @@ -916,6 +951,17 @@ func getPossibleAccessPaths(ctx sessionctx.Context, tableHints *tableHintInfo, i if tblInfo.IsCommonHandle && index.Primary { continue } + if check && latestIndexes == nil { + latestIndexes, check, err = getLatestIndexInfo(ctx, tblInfo.ID, 0) + if err != nil { + return nil, err + } + } + if check { + if latestIndex, ok := latestIndexes[index.ID]; !ok || latestIndex.State != model.StatePublic { + continue + } + } publicPaths = append(publicPaths, &util.AccessPath{Index: index}) } } @@ -1386,13 +1432,40 @@ func (b *PlanBuilder) buildPhysicalIndexLookUpReaders(ctx context.Context, dbNam // get index information indexInfos := make([]*model.IndexInfo, 0, len(tblInfo.Indices)) indexLookUpReaders := make([]Plan, 0, len(tblInfo.Indices)) + + check := b.isForUpdateRead && b.ctx.GetSessionVars().ConnectionID > 0 + var latestIndexes map[int64]*model.IndexInfo + var err error + for _, idx := range indices { idxInfo := idx.Meta() if idxInfo.State != model.StatePublic { - logutil.Logger(context.Background()).Info("build physical index lookup reader, the index isn't public", - zap.String("index", idxInfo.Name.O), zap.Stringer("state", idxInfo.State), zap.String("table", tblInfo.Name.O)) + logutil.Logger(ctx).Info("build physical index lookup reader, the index isn't public", + zap.String("index", idxInfo.Name.O), + zap.Stringer("state", idxInfo.State), + zap.String("table", tblInfo.Name.O)) continue } + if check && latestIndexes == nil { + latestIndexes, check, err = getLatestIndexInfo(b.ctx, tblInfo.ID, b.is.SchemaMetaVersion()) + if err != nil { + return nil, nil, err + } + } + if check { + if latestIndex, ok := latestIndexes[idxInfo.ID]; !ok || latestIndex.State != model.StatePublic { + forUpdateState := model.StateNone + if ok { + forUpdateState = latestIndex.State + } + logutil.Logger(ctx).Info("build physical index lookup reader, the index isn't public in forUpdateRead", + zap.String("index", idxInfo.Name.O), + zap.Stringer("state", idxInfo.State), + zap.Stringer("forUpdateRead state", forUpdateState), + zap.String("table", tblInfo.Name.O)) + continue + } + } indexInfos = append(indexInfos, idxInfo) // For partition tables. if pi := tbl.Meta().GetPartitionInfo(); pi != nil { @@ -2698,6 +2771,7 @@ func (c *colNameInOnDupExtractor) Leave(node ast.Node) (ast.Node, bool) { } func (b *PlanBuilder) buildSelectPlanOfInsert(ctx context.Context, insert *ast.InsertStmt, insertPlan *Insert) error { + b.isForUpdateRead = true affectedValuesCols, err := b.getAffectCols(insert, insertPlan) if err != nil { return err diff --git a/planner/core/point_get_plan.go b/planner/core/point_get_plan.go index 83bd72fd0c2bb..6158200ebda32 100644 --- a/planner/core/point_get_plan.go +++ b/planner/core/point_get_plan.go @@ -37,10 +37,12 @@ import ( "github.com/pingcap/tidb/types" driver "github.com/pingcap/tidb/types/parser_driver" tidbutil "github.com/pingcap/tidb/util" + "github.com/pingcap/tidb/util/logutil" "github.com/pingcap/tidb/util/math" "github.com/pingcap/tidb/util/plancodec" "github.com/pingcap/tidb/util/stringutil" "github.com/pingcap/tipb/go-tipb" + "go.uber.org/zap" ) // PointGetPlan is a fast plan for simple point get. @@ -432,7 +434,7 @@ func TryFastPlan(ctx sessionctx.Context, node ast.Node) (p Plan) { p = fp return } - if fp := tryPointGetPlan(ctx, x); fp != nil { + if fp := tryPointGetPlan(ctx, x, isForUpdateReadSelectLock(x.LockInfo.LockType)); fp != nil { if checkFastPlanPrivilege(ctx, fp.dbName, fp.TblInfo.Name.L, mysql.SelectPriv) != nil { return nil } @@ -757,7 +759,7 @@ func tryWhereIn2BatchPointGet(ctx sessionctx.Context, selStmt *ast.SelectStmt) * // 2. It must be a single table select. // 3. All the columns must be public and generated. // 4. The condition is an access path that the range is a unique key. -func tryPointGetPlan(ctx sessionctx.Context, selStmt *ast.SelectStmt) *PointGetPlan { +func tryPointGetPlan(ctx sessionctx.Context, selStmt *ast.SelectStmt, check bool) *PointGetPlan { if selStmt.Having != nil { return nil } else if selStmt.Limit != nil { @@ -838,11 +840,27 @@ func tryPointGetPlan(ctx sessionctx.Context, selStmt *ast.SelectStmt) *PointGetP return nil } + check = check && ctx.GetSessionVars().ConnectionID > 0 + var latestIndexes map[int64]*model.IndexInfo + var err error + for _, idxInfo := range tbl.Indices { if !idxInfo.Unique || idxInfo.State != model.StatePublic || idxInfo.Invisible { continue } if isTableDual { + if check && latestIndexes == nil { + latestIndexes, check, err = getLatestIndexInfo(ctx, tbl.ID, 0) + if err != nil { + logutil.BgLogger().Warn("get information schema failed", zap.Error(err)) + return nil + } + } + if check { + if latestIndex, ok := latestIndexes[idxInfo.ID]; !ok || latestIndex.State != model.StatePublic { + continue + } + } p := newPointGetPlan(ctx, tblName.Schema.O, schema, tbl, names) p.IsTableDual = true return p @@ -852,6 +870,18 @@ func tryPointGetPlan(ctx sessionctx.Context, selStmt *ast.SelectStmt) *PointGetP if idxValues == nil { continue } + if check && latestIndexes == nil { + latestIndexes, check, err = getLatestIndexInfo(ctx, tbl.ID, 0) + if err != nil { + logutil.BgLogger().Warn("get information schema failed", zap.Error(err)) + return nil + } + } + if check { + if latestIndex, ok := latestIndexes[idxInfo.ID]; !ok || latestIndex.State != model.StatePublic { + continue + } + } p := newPointGetPlan(ctx, dbName, schema, tbl, names) p.IndexInfo = idxInfo p.IndexValues = idxValues @@ -1164,7 +1194,7 @@ func tryUpdatePointPlan(ctx sessionctx.Context, updateStmt *ast.UpdateStmt) Plan OrderBy: updateStmt.Order, Limit: updateStmt.Limit, } - pointGet := tryPointGetPlan(ctx, selStmt) + pointGet := tryPointGetPlan(ctx, selStmt, true) if pointGet != nil { if pointGet.IsTableDual { return PhysicalTableDual{ @@ -1258,7 +1288,7 @@ func tryDeletePointPlan(ctx sessionctx.Context, delStmt *ast.DeleteStmt) Plan { OrderBy: delStmt.Order, Limit: delStmt.Limit, } - if pointGet := tryPointGetPlan(ctx, selStmt); pointGet != nil { + if pointGet := tryPointGetPlan(ctx, selStmt, true); pointGet != nil { if pointGet.IsTableDual { return PhysicalTableDual{ names: pointGet.outputNames, diff --git a/planner/core/rule_partition_processor.go b/planner/core/rule_partition_processor.go index 3d2ed5bf4425b..cb53d00467c49 100644 --- a/planner/core/rule_partition_processor.go +++ b/planner/core/rule_partition_processor.go @@ -1092,7 +1092,7 @@ func pruneUseBinarySearch(lessThan lessThanDataInt, data dataForPrune, unsigned func (s *partitionProcessor) resolveAccessPaths(ds *DataSource) error { possiblePaths, err := getPossibleAccessPaths( ds.ctx, &tableHintInfo{indexMergeHintList: ds.indexMergeHints, indexHintList: ds.IndexHints}, - ds.astIndexHints, ds.table, ds.DBName, ds.tableInfo.Name) + ds.astIndexHints, ds.table, ds.DBName, ds.tableInfo.Name, ds.isForUpdateRead, ds.is.SchemaMetaVersion()) if err != nil { return err } diff --git a/session/pessimistic_test.go b/session/pessimistic_test.go index 5109c78e3603a..1cdb96eda18af 100644 --- a/session/pessimistic_test.go +++ b/session/pessimistic_test.go @@ -2281,3 +2281,133 @@ func (s *testPessimisticSuite) TestAmendWithColumnTypeChange(c *C) { tk2.MustExec("alter table t modify column v varchar(5);") c.Assert(tk.ExecToErr("commit"), NotNil) } + +func (s *testPessimisticSuite) TestIssue21498(c *C) { + tk := testkit.NewTestKitWithInit(c, s.store) + tk2 := testkit.NewTestKitWithInit(c, s.store) + tk.MustExec("set tidb_enable_amend_pessimistic_txn = 1") + + for _, partition := range []bool{false, true} { + //RC test + tk.MustExec("drop table if exists t") + createTable := "create table t (id int primary key, v int, index iv (v))" + if partition { + createTable += " partition by range (id) (partition p0 values less than (0),partition p1 values less than (1),partition p2 values less than (2),partition p3 values less than (3),partition pn values less than MAXVALUE)" + } + tk.MustExec(createTable) + tk.MustExec("insert into t values (1, 10), (2, 20), (3, 30), (4, 40)") + + tk.MustExec("set tx_isolation = 'READ-COMMITTED'") + tk.MustExec("begin pessimistic") + tk.MustQuery("select * from t where v = 10").Check(testkit.Rows("1 10")) + + tk2.MustExec("alter table t drop index iv") + tk2.MustExec("update t set v = 11 where id = 1") + + tk.MustQuery("select * from t where v = 10").Check(testkit.Rows()) + tk.MustQuery("select * from t where v = 11").Check(testkit.Rows("1 11")) + tk.MustQuery("select * from t where id = 1").Check(testkit.Rows("1 11")) + tk.MustExec("admin check table t") + tk.MustExec("commit") + + tk.MustExec("drop table if exists t") + createTable = "create table t (id int primary key, v int, index iv (v), v2 int)" + if partition { + createTable += " partition by range (id) (partition p0 values less than (0),partition p1 values less than (1),partition p2 values less than (2),partition p3 values less than (3),partition pn values less than MAXVALUE)" + } + tk.MustExec(createTable) + tk.MustExec("insert into t values (1, 10, 100), (2, 20, 200), (3, 30, 300), (4, 40, 400)") + + tk.MustExec("begin pessimistic") + tk.MustQuery("select * from t use index (iv) where v = 10").Check(testkit.Rows("1 10 100")) + tk2.MustExec("alter table t drop index iv") + tk2.MustExec("update t set v = 11 where id = 1") + err := tk.ExecToErr("select * from t use index (iv) where v = 10") + c.Assert(err.Error(), Equals, "[planner:1176]Key 'iv' doesn't exist in table 't'") + tk.MustQuery("select * from t where v = 10").Check(testkit.Rows()) + tk2.MustExec("update t set id = 5 where id = 1") + err = tk.ExecToErr("select * from t use index (iv) where v = 10") // select with + c.Assert(err.Error(), Equals, "[planner:1176]Key 'iv' doesn't exist in table 't'") + tk.MustQuery("select * from t where v = 10").Check(testkit.Rows()) + if !partition { + // amend transaction does not support partition table + tk.MustExec("insert into t(id, v, v2) select 6, v + 20, v2 + 200 from t where id = 4") // insert ... select with index unchanged + } + err = tk.ExecToErr("insert into t(id, v, v2) select 7, v + 30, v2 + 300 from t use index (iv) where id = 4") // insert ... select with index changed + c.Assert(err.Error(), Equals, "[planner:1176]Key 'iv' doesn't exist in table 't'") + tk.MustExec("admin check table t") // check consistency inside txn + tk.MustExec("commit") + if !partition { + tk.MustQuery("select * from t").Check(testkit.Rows("2 20 200", "3 30 300", "4 40 400", "5 11 100", "6 60 600")) + } + tk.MustExec("admin check table t") // check consistency out of txn + + //RR test for non partition + if partition { + continue + } + + tk.MustExec("set tx_isolation = 'REPEATABLE-READ'") + tk2.MustExec("alter table t add unique index iv(v)") + tk.MustExec("begin pessimistic") + tk2.MustExec("alter table t drop index iv") + tk2.MustExec("update t set v = 21 where v = 20") + tk2.MustExec("update t set v = 31 where v = 30") + tk.MustExec("update t set v = 22 where v = 21") // fast path + tk.CheckExecResult(1, 0) + tk.MustExec("update t set v = 23 where v = 22") + tk.CheckExecResult(1, 0) + tk.MustExec("update t set v = 32 where v >= 31 and v < 40") // common path + tk.CheckExecResult(1, 0) + tk.MustExec("commit") + tk.MustQuery("select * from t").Check(testkit.Rows("2 23 200", "3 32 300", "4 40 400", "5 11 100", "6 60 600")) + + tk2.MustExec("alter table t add unique index iv(v)") + tk.MustExec("begin pessimistic") + tk2.MustExec("alter table t drop index iv") + tk2.MustExec("update t set v = 24 where v = 23") + tk2.MustExec("update t set v = 41 where v = 40") + // fast path + tk.MustQuery("select * from t where v = 23").Check(testkit.Rows("2 23 200")) + tk.MustQuery("select * from t where v = 24").Check(testkit.Rows()) + tk.MustQuery("select * from t where v = 23 for update").Check(testkit.Rows()) + tk.MustQuery("select * from t where v = 24 for update").Check(testkit.Rows("2 24 200")) + // test index look up + tk.MustQuery("select * from t s, t t1 where s.v = 23 and s.id = t1.id").Check(testkit.Rows("2 23 200 2 23 200")) + tk.MustQuery("select * from t s, t t1 where s.v = 24 and s.id = t1.id").Check(testkit.Rows()) + tk.MustQuery("select * from t s, t t1 where s.v = 23 and s.id = t1.id for update").Check(testkit.Rows()) + tk.MustQuery("select * from t s, t t1 where s.v = 24 and s.id = t1.id for update").Check(testkit.Rows("2 24 200 2 24 200")) + tk.MustExec("delete from t where v = 24") + tk.CheckExecResult(1, 0) + // common path + tk.MustQuery("select * from t where v >= 41 and v < 50").Check(testkit.Rows()) + tk.MustQuery("select * from t where v >= 41 and v < 50 for update").Check(testkit.Rows("4 41 400")) + tk.MustExec("delete from t where v >= 41 and v < 50") + tk.CheckExecResult(1, 0) + tk.MustExec("commit") + tk.MustQuery("select * from t").Check(testkit.Rows("3 32 300", "5 11 100", "6 60 600")) + + tk2.MustExec("alter table t add unique index iv(v)") + tk.MustExec("begin pessimistic") + tk2.MustExec("alter table t drop index iv") + tk2.MustExec("update t set v = 33 where v = 32") + tk.MustExec("insert into t(id, v, v2) select 3 * id, 3 * v, 3 * v2 from t where v = 33") + tk.CheckExecResult(1, 0) + tk.MustExec("commit") + tk.MustQuery("select * from t").Check(testkit.Rows("3 33 300", "5 11 100", "6 60 600", "9 99 900")) + + tk2.MustExec("alter table t add unique index iv(v)") + tk2.MustExec("drop table if exists t1") + tk2.MustExec("create table t1(id int primary key, v int, index iv (v), v2 int)") + tk.MustExec("begin pessimistic") + tk2.MustExec("alter table t drop index iv") + tk2.MustExec("update t set v = 34 where v = 33") + tk2.MustExec("update t set v = 12 where v = 11") + tk.MustExec("insert into t1(id, v, v2) select * from t where v = 33") + tk.CheckExecResult(0, 0) + tk.MustExec("insert into t1(id, v, v2) select * from t where v = 12") + tk.CheckExecResult(1, 0) + tk.MustExec("commit") + tk.MustQuery("select * from t1").Check(testkit.Rows("5 12 100")) + } +} From 069340f062954cd8514a39f0816aff85d738ef64 Mon Sep 17 00:00:00 2001 From: you06 Date: Mon, 4 Jan 2021 15:42:25 +0800 Subject: [PATCH 2/8] fix panic Signed-off-by: you06 --- planner/core/logical_plan_builder.go | 2 +- planner/core/planbuilder.go | 7 +++++-- planner/core/point_get_plan.go | 2 +- 3 files changed, 7 insertions(+), 4 deletions(-) diff --git a/planner/core/logical_plan_builder.go b/planner/core/logical_plan_builder.go index c650f53de879c..6c9acb30db1d8 100644 --- a/planner/core/logical_plan_builder.go +++ b/planner/core/logical_plan_builder.go @@ -3291,7 +3291,7 @@ func (b *PlanBuilder) buildSelect(ctx context.Context, sel *ast.SelectStmt) (p L ) // set for update read to true before building result set node - if isForUpdateReadSelectLock(sel.LockInfo.LockType) { + if isForUpdateReadSelectLock(sel.LockInfo) { b.isForUpdateRead = true } diff --git a/planner/core/planbuilder.go b/planner/core/planbuilder.go index 14f2aaa4702c2..33f145b86ca5f 100644 --- a/planner/core/planbuilder.go +++ b/planner/core/planbuilder.go @@ -898,8 +898,11 @@ func fillContentForTablePath(tablePath *util.AccessPath, tblInfo *model.TableInf } } -func isForUpdateReadSelectLock(lock ast.SelectLockType) bool { - return lock == ast.SelectLockForUpdate || lock == ast.SelectLockForUpdateNoWait +func isForUpdateReadSelectLock(lock *ast.SelectLockInfo) bool { + if lock == nil { + return false + } + return lock.LockType == ast.SelectLockForUpdate || lock.LockType == ast.SelectLockForUpdateNoWait } func getLatestIndexInfo(ctx sessionctx.Context, id int64, startVer int64) (map[int64]*model.IndexInfo, bool, error) { diff --git a/planner/core/point_get_plan.go b/planner/core/point_get_plan.go index 6158200ebda32..bae63b367af83 100644 --- a/planner/core/point_get_plan.go +++ b/planner/core/point_get_plan.go @@ -434,7 +434,7 @@ func TryFastPlan(ctx sessionctx.Context, node ast.Node) (p Plan) { p = fp return } - if fp := tryPointGetPlan(ctx, x, isForUpdateReadSelectLock(x.LockInfo.LockType)); fp != nil { + if fp := tryPointGetPlan(ctx, x, isForUpdateReadSelectLock(x.LockInfo)); fp != nil { if checkFastPlanPrivilege(ctx, fp.dbName, fp.TblInfo.Name.L, mysql.SelectPriv) != nil { return nil } From a1a2c4780f4d386cf1de92cf7440cd0ba8f6854e Mon Sep 17 00:00:00 2001 From: you06 Date: Wed, 6 Jan 2021 00:49:53 +0800 Subject: [PATCH 3/8] do not use cache when for update read Signed-off-by: you06 --- planner/core/logical_plan_builder.go | 29 +++++++++++++++------------- session/pessimistic_test.go | 10 +++++++++- 2 files changed, 25 insertions(+), 14 deletions(-) diff --git a/planner/core/logical_plan_builder.go b/planner/core/logical_plan_builder.go index 6c9acb30db1d8..195cdb2127bcb 100644 --- a/planner/core/logical_plan_builder.go +++ b/planner/core/logical_plan_builder.go @@ -315,9 +315,16 @@ func (b *PlanBuilder) buildTableRefs(ctx context.Context, from *ast.TableRefsCla var ok bool p, ok = b.cachedResultSetNodes[from.TableRefs] if ok { - m := b.cachedHandleHelperMap[from.TableRefs] - b.handleHelper.pushMap(m) - return + if _, ok = p.(*LogicalSelection); ok { + var stmt *ast.SelectStmt + if stmt, ok = from.TableRefs.Left.(*ast.SelectStmt); ok { + if !isForUpdateReadSelectLock(stmt.LockInfo) { + m := b.cachedHandleHelperMap[from.TableRefs] + b.handleHelper.pushMap(m) + return + } + } + } } p, err = b.buildResultSetNode(ctx, from.TableRefs) if err != nil { @@ -3295,16 +3302,12 @@ func (b *PlanBuilder) buildSelect(ctx context.Context, sel *ast.SelectStmt) (p L b.isForUpdateRead = true } - if sel.From != nil { - // For sub-queries, the FROM clause may have already been built in outer query when resolving correlated aggregates. - // If the ResultSetNode inside FROM clause has nothing to do with correlated aggregates, we can simply get the - // existing ResultSetNode from the cache. - p, err = b.buildTableRefsWithCache(ctx, sel.From) - if err != nil { - return nil, err - } - } else { - p = b.buildTableDual() + // For sub-queries, the FROM clause may have already been built in outer query when resolving correlated aggregates. + // If the ResultSetNode inside FROM clause has nothing to do with correlated aggregates, we can simply get the + // existing ResultSetNode from the cache. + p, err = b.buildTableRefsWithCache(ctx, sel.From) + if err != nil { + return nil, err } originalFields := sel.Fields.Fields diff --git a/session/pessimistic_test.go b/session/pessimistic_test.go index 1cdb96eda18af..fe814d78da27e 100644 --- a/session/pessimistic_test.go +++ b/session/pessimistic_test.go @@ -2289,13 +2289,15 @@ func (s *testPessimisticSuite) TestIssue21498(c *C) { for _, partition := range []bool{false, true} { //RC test - tk.MustExec("drop table if exists t") + tk.MustExec("drop table if exists t, t1") createTable := "create table t (id int primary key, v int, index iv (v))" if partition { createTable += " partition by range (id) (partition p0 values less than (0),partition p1 values less than (1),partition p2 values less than (2),partition p3 values less than (3),partition pn values less than MAXVALUE)" } tk.MustExec(createTable) tk.MustExec("insert into t values (1, 10), (2, 20), (3, 30), (4, 40)") + tk.MustExec("create table t1(id int)") + tk.MustExec("insert into t1 values(1)") tk.MustExec("set tx_isolation = 'READ-COMMITTED'") tk.MustExec("begin pessimistic") @@ -2370,8 +2372,14 @@ func (s *testPessimisticSuite) TestIssue21498(c *C) { // fast path tk.MustQuery("select * from t where v = 23").Check(testkit.Rows("2 23 200")) tk.MustQuery("select * from t where v = 24").Check(testkit.Rows()) + tk.MustQuery("select (select id from t where v = 23), id from t1").Check(testkit.Rows("2 1")) + tk.MustQuery("select (select id from t where v = 24), id from t1").Check(testkit.Rows(" 1")) + tk.MustQuery("select * from t where v = 23 for update").Check(testkit.Rows()) tk.MustQuery("select * from t where v = 24 for update").Check(testkit.Rows("2 24 200")) + tk.MustQuery("select (select id from t where v = 23 for update), id from t1").Check(testkit.Rows(" 1")) + tk.MustQuery("select (select id from t where v = 24 for update), id from t1").Check(testkit.Rows("2 1")) + // test index look up tk.MustQuery("select * from t s, t t1 where s.v = 23 and s.id = t1.id").Check(testkit.Rows("2 23 200 2 23 200")) tk.MustQuery("select * from t s, t t1 where s.v = 24 and s.id = t1.id").Check(testkit.Rows()) From 13a4ba0e04360085e28d1e992b557685e9de9e1f Mon Sep 17 00:00:00 2001 From: you06 Date: Wed, 6 Jan 2021 11:52:40 +0800 Subject: [PATCH 4/8] fix invalid cache usage Signed-off-by: you06 --- planner/core/logical_plan_builder.go | 17 +++++++---------- session/pessimistic_test.go | 7 +++---- 2 files changed, 10 insertions(+), 14 deletions(-) diff --git a/planner/core/logical_plan_builder.go b/planner/core/logical_plan_builder.go index 195cdb2127bcb..3a7c843951e4f 100644 --- a/planner/core/logical_plan_builder.go +++ b/planner/core/logical_plan_builder.go @@ -315,16 +315,9 @@ func (b *PlanBuilder) buildTableRefs(ctx context.Context, from *ast.TableRefsCla var ok bool p, ok = b.cachedResultSetNodes[from.TableRefs] if ok { - if _, ok = p.(*LogicalSelection); ok { - var stmt *ast.SelectStmt - if stmt, ok = from.TableRefs.Left.(*ast.SelectStmt); ok { - if !isForUpdateReadSelectLock(stmt.LockInfo) { - m := b.cachedHandleHelperMap[from.TableRefs] - b.handleHelper.pushMap(m) - return - } - } - } + m := b.cachedHandleHelperMap[from.TableRefs] + b.handleHelper.pushMap(m) + return } p, err = b.buildResultSetNode(ctx, from.TableRefs) if err != nil { @@ -2260,6 +2253,10 @@ func (r *correlatedAggregateResolver) resolveSelect(sel *ast.SelectStmt) (err er if err != nil { return err } + // do not use cache when for update read + if isForUpdateReadSelectLock(sel.LockInfo) { + useCache = false + } // we cannot use cache if there are correlated aggregates inside FROM clause, // since the plan we are building now is not correct and need to be rebuild later. p, err := r.b.buildTableRefs(r.ctx, sel.From, useCache) diff --git a/session/pessimistic_test.go b/session/pessimistic_test.go index fe814d78da27e..1f5c5ca627ace 100644 --- a/session/pessimistic_test.go +++ b/session/pessimistic_test.go @@ -2344,7 +2344,7 @@ func (s *testPessimisticSuite) TestIssue21498(c *C) { } tk.MustExec("admin check table t") // check consistency out of txn - //RR test for non partition + // RR test for non partition if partition { continue } @@ -2372,11 +2372,10 @@ func (s *testPessimisticSuite) TestIssue21498(c *C) { // fast path tk.MustQuery("select * from t where v = 23").Check(testkit.Rows("2 23 200")) tk.MustQuery("select * from t where v = 24").Check(testkit.Rows()) - tk.MustQuery("select (select id from t where v = 23), id from t1").Check(testkit.Rows("2 1")) - tk.MustQuery("select (select id from t where v = 24), id from t1").Check(testkit.Rows(" 1")) - tk.MustQuery("select * from t where v = 23 for update").Check(testkit.Rows()) tk.MustQuery("select * from t where v = 24 for update").Check(testkit.Rows("2 24 200")) + tk.MustQuery("select (select id from t where v = 23), id from t1 for update").Check(testkit.Rows("2 1")) + tk.MustQuery("select (select id from t where v = 24), id from t1 for update").Check(testkit.Rows(" 1")) tk.MustQuery("select (select id from t where v = 23 for update), id from t1").Check(testkit.Rows(" 1")) tk.MustQuery("select (select id from t where v = 24 for update), id from t1").Check(testkit.Rows("2 1")) From ace8009a7709864086319fc702a560141849332c Mon Sep 17 00:00:00 2001 From: you06 Date: Thu, 7 Jan 2021 21:26:31 +0800 Subject: [PATCH 5/8] add comments Signed-off-by: you06 --- planner/core/planbuilder.go | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/planner/core/planbuilder.go b/planner/core/planbuilder.go index 33f145b86ca5f..d71f156e94694 100644 --- a/planner/core/planbuilder.go +++ b/planner/core/planbuilder.go @@ -898,13 +898,18 @@ func fillContentForTablePath(tablePath *util.AccessPath, tblInfo *model.TableInf } } +// isForUpdateReadSelectLock checks if the lock type need to use forUpdateRead func isForUpdateReadSelectLock(lock *ast.SelectLockInfo) bool { if lock == nil { return false } - return lock.LockType == ast.SelectLockForUpdate || lock.LockType == ast.SelectLockForUpdateNoWait + return lock.LockType == ast.SelectLockForUpdate || + lock.LockType == ast.SelectLockForUpdateNoWait || + lock.LockType == ast.SelectLockForUpdateWaitN } +// getLatestIndexInfo gets the index info of latest schema version from given table id, +// it returns nil if the schema version is not changed func getLatestIndexInfo(ctx sessionctx.Context, id int64, startVer int64) (map[int64]*model.IndexInfo, bool, error) { dom := domain.GetDomain(ctx) if dom == nil { From 945f4cf86d47c9ca98d5e3aa78a8a38e4feebe61 Mon Sep 17 00:00:00 2001 From: you06 Date: Fri, 8 Jan 2021 13:35:55 +0800 Subject: [PATCH 6/8] capture isForUpdateRead Signed-off-by: you06 --- planner/core/logical_plan_builder.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/planner/core/logical_plan_builder.go b/planner/core/logical_plan_builder.go index 3a7c843951e4f..2077d7600f957 100644 --- a/planner/core/logical_plan_builder.go +++ b/planner/core/logical_plan_builder.go @@ -2235,7 +2235,11 @@ func (r *correlatedAggregateResolver) Enter(n ast.Node) (ast.Node, bool) { r.b.outerSchemas = append(r.b.outerSchemas, outerSchema) r.b.outerNames = append(r.b.outerNames, r.outerPlan.OutputNames()) } + // capture the forUpdateRead mark + isForUpdateRead := r.b.isForUpdateRead + r.b.isForUpdateRead = false r.err = r.resolveSelect(v) + r.b.isForUpdateRead = isForUpdateRead return n, true } return n, false From f6cac2f831e54643d4367f6ab7a206f9ec7e1dcf Mon Sep 17 00:00:00 2001 From: you06 Date: Fri, 8 Jan 2021 18:00:22 +0800 Subject: [PATCH 7/8] move capture to rewrite Signed-off-by: you06 --- planner/core/expression_rewriter.go | 4 ++++ planner/core/logical_plan_builder.go | 4 ---- session/pessimistic_test.go | 7 ++++++- 3 files changed, 10 insertions(+), 5 deletions(-) diff --git a/planner/core/expression_rewriter.go b/planner/core/expression_rewriter.go index 1c8477992e408..bf6b9ca456c9f 100644 --- a/planner/core/expression_rewriter.go +++ b/planner/core/expression_rewriter.go @@ -107,7 +107,11 @@ func (b *PlanBuilder) rewriteInsertOnDuplicateUpdate(ctx context.Context, exprNo // asScalar means whether this expression must be treated as a scalar expression. // And this function returns a result expression, a new plan that may have apply or semi-join. func (b *PlanBuilder) rewrite(ctx context.Context, exprNode ast.ExprNode, p LogicalPlan, aggMapper map[*ast.AggregateFuncExpr]int, asScalar bool) (expression.Expression, LogicalPlan, error) { + // capture the forUpdateRead mark + isForUpdateRead := b.isForUpdateRead + b.isForUpdateRead = false expr, resultPlan, err := b.rewriteWithPreprocess(ctx, exprNode, p, aggMapper, nil, asScalar, nil) + b.isForUpdateRead = isForUpdateRead return expr, resultPlan, err } diff --git a/planner/core/logical_plan_builder.go b/planner/core/logical_plan_builder.go index 2077d7600f957..3a7c843951e4f 100644 --- a/planner/core/logical_plan_builder.go +++ b/planner/core/logical_plan_builder.go @@ -2235,11 +2235,7 @@ func (r *correlatedAggregateResolver) Enter(n ast.Node) (ast.Node, bool) { r.b.outerSchemas = append(r.b.outerSchemas, outerSchema) r.b.outerNames = append(r.b.outerNames, r.outerPlan.OutputNames()) } - // capture the forUpdateRead mark - isForUpdateRead := r.b.isForUpdateRead - r.b.isForUpdateRead = false r.err = r.resolveSelect(v) - r.b.isForUpdateRead = isForUpdateRead return n, true } return n, false diff --git a/session/pessimistic_test.go b/session/pessimistic_test.go index 1f5c5ca627ace..c16ef1024c83a 100644 --- a/session/pessimistic_test.go +++ b/session/pessimistic_test.go @@ -2400,8 +2400,13 @@ func (s *testPessimisticSuite) TestIssue21498(c *C) { tk2.MustExec("update t set v = 33 where v = 32") tk.MustExec("insert into t(id, v, v2) select 3 * id, 3 * v, 3 * v2 from t where v = 33") tk.CheckExecResult(1, 0) + tk.MustExec("insert into t(id, v, v2) select (select 4 * id from t where v = 32) id, 4 * v, 4 * v2 from t where v = 33") + tk.CheckExecResult(1, 0) + err = tk.ExecToErr("insert into t(id, v, v2) select (select 4 * id from t where v = 33) id, 4 * v, 4 * v2 from t where v = 33") + c.Assert(err, NotNil) + c.Assert(err.Error(), Equals, "[table:1048]Column 'id' cannot be null") tk.MustExec("commit") - tk.MustQuery("select * from t").Check(testkit.Rows("3 33 300", "5 11 100", "6 60 600", "9 99 900")) + tk.MustQuery("select * from t").Check(testkit.Rows("3 33 300", "5 11 100", "6 60 600", "9 99 900", "12 132 1200")) tk2.MustExec("alter table t add unique index iv(v)") tk2.MustExec("drop table if exists t1") From ac74dd7234c37875608ae2cc5ff65bcea3a28054 Mon Sep 17 00:00:00 2001 From: you06 Date: Tue, 12 Jan 2021 14:03:15 +0800 Subject: [PATCH 8/8] add test for sub queries Signed-off-by: you06 --- planner/core/expression_rewriter.go | 4 ---- session/pessimistic_test.go | 10 ++++++++++ 2 files changed, 10 insertions(+), 4 deletions(-) diff --git a/planner/core/expression_rewriter.go b/planner/core/expression_rewriter.go index bf6b9ca456c9f..1c8477992e408 100644 --- a/planner/core/expression_rewriter.go +++ b/planner/core/expression_rewriter.go @@ -107,11 +107,7 @@ func (b *PlanBuilder) rewriteInsertOnDuplicateUpdate(ctx context.Context, exprNo // asScalar means whether this expression must be treated as a scalar expression. // And this function returns a result expression, a new plan that may have apply or semi-join. func (b *PlanBuilder) rewrite(ctx context.Context, exprNode ast.ExprNode, p LogicalPlan, aggMapper map[*ast.AggregateFuncExpr]int, asScalar bool) (expression.Expression, LogicalPlan, error) { - // capture the forUpdateRead mark - isForUpdateRead := b.isForUpdateRead - b.isForUpdateRead = false expr, resultPlan, err := b.rewriteWithPreprocess(ctx, exprNode, p, aggMapper, nil, asScalar, nil) - b.isForUpdateRead = isForUpdateRead return expr, resultPlan, err } diff --git a/session/pessimistic_test.go b/session/pessimistic_test.go index c16ef1024c83a..2a28a23bafa7c 100644 --- a/session/pessimistic_test.go +++ b/session/pessimistic_test.go @@ -2378,6 +2378,16 @@ func (s *testPessimisticSuite) TestIssue21498(c *C) { tk.MustQuery("select (select id from t where v = 24), id from t1 for update").Check(testkit.Rows(" 1")) tk.MustQuery("select (select id from t where v = 23 for update), id from t1").Check(testkit.Rows(" 1")) tk.MustQuery("select (select id from t where v = 24 for update), id from t1").Check(testkit.Rows("2 1")) + tk.MustQuery("select (select id + 1 from t where v = 24 for update), id from t1").Check(testkit.Rows("3 1")) + // sub queries + tk.MustQuery("select (select id from (select id from t where v = 24 for update) tmp for update), (select id from t where v = 23), id from t where v = 23").Check(testkit.Rows("2 2 2")) + tk.MustQuery("select (select id + (select id from t where v = 23) from (select id from t where v = 24 for update) tmp), id from t where v = 23").Check(testkit.Rows("4 2")) + tk.MustQuery("select (select id + (select id from t where v = 23) from (select id from t where v = 24 for update) tmp for update), id from t where v = 23").Check(testkit.Rows("4 2")) + tk.MustQuery("select (select id + (select id from t where v = 23 for update) from (select id from t where v = 24 for update) tmp), id from t where v = 23").Check(testkit.Rows(" 2")) + tk.MustQuery("select (select id + (select id from t where v = 23 for update) from (select id from t where v = 24 for update) tmp for update), id from t where v = 23").Check(testkit.Rows(" 2")) + tk.MustQuery("select (select id + (select id from t where v = 23) from (select id from t where v = 23) tmp), id from t where v = 24 for update").Check(testkit.Rows("4 2")) + tk.MustQuery("select (select id + (select id from t where v = 23) from (select id from t where v = 24 for update) tmp), id from t where v = 24 for update").Check(testkit.Rows("4 2")) + tk.MustQuery("select (select id + (select id from t where v = 24 for update) from (select id from t where v = 23) tmp), id from t where v = 24 for update").Check(testkit.Rows("4 2")) // test index look up tk.MustQuery("select * from t s, t t1 where s.v = 23 and s.id = t1.id").Check(testkit.Rows("2 23 200 2 23 200"))