Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

planner: check index valid while forUpdateRead #22152

Merged
merged 9 commits into from
Jan 12, 2021
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 14 additions & 1 deletion planner/core/logical_plan_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -2253,6 +2253,10 @@ func (r *correlatedAggregateResolver) resolveSelect(sel *ast.SelectStmt) (err er
if err != nil {
return err
}
// do not use cache when for update read
if isForUpdateReadSelectLock(sel.LockInfo) {
useCache = false
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

after debug, it seems useCache = false only not-use-cache, but it still refill cache..

so in Access Path Selection will happen in correlatedAggregateResolver and directly use cache in later buildProjection

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems useCache = false only cache sub-sub-query when there is a select stmt inside from clause.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added some cases for sub queries, the cache problem is also related with rewrite, it may be solved together with it.

}
// we cannot use cache if there are correlated aggregates inside FROM clause,
// since the plan we are building now is not correct and need to be rebuild later.
p, err := r.b.buildTableRefs(r.ctx, sel.From, useCache)
Expand Down Expand Up @@ -3290,6 +3294,11 @@ func (b *PlanBuilder) buildSelect(ctx context.Context, sel *ast.SelectStmt) (p L
projExprs []expression.Expression
)

// set for update read to true before building result set node
if isForUpdateReadSelectLock(sel.LockInfo) {
b.isForUpdateRead = true
}

// For sub-queries, the FROM clause may have already been built in outer query when resolving correlated aggregates.
// If the ResultSetNode inside FROM clause has nothing to do with correlated aggregates, we can simply get the
// existing ResultSetNode from the cache.
Expand Down Expand Up @@ -3599,7 +3608,7 @@ func (b *PlanBuilder) buildDataSource(ctx context.Context, tn *ast.TableName, as
if tblName.L == "" {
tblName = tn.Name
}
possiblePaths, err := getPossibleAccessPaths(b.ctx, b.TableHints(), tn.IndexHints, tbl, dbName, tblName)
possiblePaths, err := getPossibleAccessPaths(b.ctx, b.TableHints(), tn.IndexHints, tbl, dbName, tblName, b.isForUpdateRead, b.is.SchemaMetaVersion())
Copy link
Contributor

@lysu lysu Jan 5, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

buildTableRefsWithCache's cache mechanism should be taken care...(which buildDs during correlatedAggregateResolver#resolveSelect)

for sql:

create table t1(id int);
create table t2(id int);
select (select id from t2 limit 1 for update), id from t1;

the value in this line should be:

tbl b.isForUpdateRead
t1 false
t2 true

but current get:

tbl b.isForUpdateRead
t1 false
t2 false

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch, added some cases for resolveSelect.

if err != nil {
return nil, err
}
Expand Down Expand Up @@ -3696,6 +3705,8 @@ func (b *PlanBuilder) buildDataSource(ctx context.Context, tn *ast.TableName, as
partitionNames: tn.PartitionNames,
TblCols: make([]*expression.Column, 0, len(columns)),
preferPartitions: make(map[int][]model.CIStr),
is: b.is,
isForUpdateRead: b.isForUpdateRead,
}.Init(b.ctx, b.getSelectOffset())
var handleCols HandleCols
schema := expression.NewSchema(make([]*expression.Column, 0, len(columns))...)
Expand Down Expand Up @@ -4221,6 +4232,7 @@ func (b *PlanBuilder) buildUpdate(ctx context.Context, update *ast.UpdateStmt) (
}

b.inUpdateStmt = true
b.isForUpdateRead = true

p, err := b.buildResultSetNode(ctx, update.TableRefs.TableRefs)
if err != nil {
Expand Down Expand Up @@ -4561,6 +4573,7 @@ func (b *PlanBuilder) buildDelete(ctx context.Context, delete *ast.DeleteStmt) (
}()

b.inDeleteStmt = true
b.isForUpdateRead = true

p, err := b.buildResultSetNode(ctx, delete.TableRefs.TableRefs)
if err != nil {
Expand Down
6 changes: 6 additions & 0 deletions planner/core/logical_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/pingcap/parser/mysql"
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/expression/aggregation"
"github.com/pingcap/tidb/infoschema"
"github.com/pingcap/tidb/planner/property"
"github.com/pingcap/tidb/planner/util"
"github.com/pingcap/tidb/statistics"
Expand Down Expand Up @@ -520,6 +521,11 @@ type DataSource struct {
// preferPartitions store the map, the key represents store type, the value represents the partition name list.
preferPartitions map[int][]model.CIStr
SampleInfo *TableSampleInfo
is infoschema.InfoSchema
// isForUpdateRead should be true in either of the following situations
// 1. use `inside insert`, `update`, `delete` or `select for update` statement
// 2. isolation level is RC
isForUpdateRead bool
}

// ExtractCorrelatedCols implements LogicalPlan interface.
Expand Down
83 changes: 80 additions & 3 deletions planner/core/planbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/pingcap/parser/opcode"
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/ddl"
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/infoschema"
"github.com/pingcap/tidb/kv"
Expand Down Expand Up @@ -469,6 +470,10 @@ type PlanBuilder struct {
// cache ResultSetNodes and HandleHelperMap to avoid rebuilding.
cachedResultSetNodes map[*ast.Join]LogicalPlan
cachedHandleHelperMap map[*ast.Join]map[int64][]HandleCols
// isForUpdateRead should be true in either of the following situations
// 1. use `inside insert`, `update`, `delete` or `select for update` statement
// 2. isolation level is RC
isForUpdateRead bool
}

type handleColHelper struct {
Expand Down Expand Up @@ -582,6 +587,7 @@ func NewPlanBuilder(sctx sessionctx.Context, is infoschema.InfoSchema, processor
correlatedAggMapper: make(map[*ast.AggregateFuncExpr]*expression.CorrelatedColumn),
cachedResultSetNodes: make(map[*ast.Join]LogicalPlan),
cachedHandleHelperMap: make(map[*ast.Join]map[int64][]HandleCols),
isForUpdateRead: sctx.GetSessionVars().IsPessimisticReadConsistency(),
}, savedBlockNames
}

Expand Down Expand Up @@ -892,7 +898,34 @@ func fillContentForTablePath(tablePath *util.AccessPath, tblInfo *model.TableInf
}
}

func getPossibleAccessPaths(ctx sessionctx.Context, tableHints *tableHintInfo, indexHints []*ast.IndexHint, tbl table.Table, dbName, tblName model.CIStr) ([]*util.AccessPath, error) {
func isForUpdateReadSelectLock(lock *ast.SelectLockInfo) bool {
if lock == nil {
return false
}
return lock.LockType == ast.SelectLockForUpdate || lock.LockType == ast.SelectLockForUpdateNoWait
}

func getLatestIndexInfo(ctx sessionctx.Context, id int64, startVer int64) (map[int64]*model.IndexInfo, bool, error) {
you06 marked this conversation as resolved.
Show resolved Hide resolved
dom := domain.GetDomain(ctx)
if dom == nil {
return nil, false, errors.New("domain not found for ctx")
}
is := dom.InfoSchema()
if is.SchemaMetaVersion() == startVer {
return nil, false, nil
}
latestIndexes := make(map[int64]*model.IndexInfo)
latestTbl, exist := is.TableByID(id)
if exist {
latestTblInfo := latestTbl.Meta()
for _, index := range latestTblInfo.Indices {
latestIndexes[index.ID] = index
}
}
return latestIndexes, true, nil
}

func getPossibleAccessPaths(ctx sessionctx.Context, tableHints *tableHintInfo, indexHints []*ast.IndexHint, tbl table.Table, dbName, tblName model.CIStr, check bool, startVer int64) ([]*util.AccessPath, error) {
tblInfo := tbl.Meta()
publicPaths := make([]*util.AccessPath, 0, len(tblInfo.Indices)+2)
tp := kv.TiKV
Expand All @@ -907,6 +940,11 @@ func getPossibleAccessPaths(ctx sessionctx.Context, tableHints *tableHintInfo, i
publicPaths = append(publicPaths, genTiFlashPath(tblInfo, true))
}
optimizerUseInvisibleIndexes := ctx.GetSessionVars().OptimizerUseInvisibleIndexes

check = check && ctx.GetSessionVars().ConnectionID > 0
var latestIndexes map[int64]*model.IndexInfo
var err error

for _, index := range tblInfo.Indices {
if index.State == model.StatePublic {
// Filter out invisible index, because they are not visible for optimizer
Expand All @@ -916,6 +954,17 @@ func getPossibleAccessPaths(ctx sessionctx.Context, tableHints *tableHintInfo, i
if tblInfo.IsCommonHandle && index.Primary {
continue
}
if check && latestIndexes == nil {
latestIndexes, check, err = getLatestIndexInfo(ctx, tblInfo.ID, 0)
if err != nil {
return nil, err
}
}
if check {
if latestIndex, ok := latestIndexes[index.ID]; !ok || latestIndex.State != model.StatePublic {
continue
}
}
publicPaths = append(publicPaths, &util.AccessPath{Index: index})
}
}
Expand Down Expand Up @@ -1386,13 +1435,40 @@ func (b *PlanBuilder) buildPhysicalIndexLookUpReaders(ctx context.Context, dbNam
// get index information
indexInfos := make([]*model.IndexInfo, 0, len(tblInfo.Indices))
indexLookUpReaders := make([]Plan, 0, len(tblInfo.Indices))

check := b.isForUpdateRead && b.ctx.GetSessionVars().ConnectionID > 0
var latestIndexes map[int64]*model.IndexInfo
var err error

for _, idx := range indices {
idxInfo := idx.Meta()
if idxInfo.State != model.StatePublic {
logutil.Logger(context.Background()).Info("build physical index lookup reader, the index isn't public",
zap.String("index", idxInfo.Name.O), zap.Stringer("state", idxInfo.State), zap.String("table", tblInfo.Name.O))
logutil.Logger(ctx).Info("build physical index lookup reader, the index isn't public",
zap.String("index", idxInfo.Name.O),
zap.Stringer("state", idxInfo.State),
zap.String("table", tblInfo.Name.O))
continue
}
if check && latestIndexes == nil {
latestIndexes, check, err = getLatestIndexInfo(b.ctx, tblInfo.ID, b.is.SchemaMetaVersion())
if err != nil {
return nil, nil, err
}
}
if check {
if latestIndex, ok := latestIndexes[idxInfo.ID]; !ok || latestIndex.State != model.StatePublic {
forUpdateState := model.StateNone
if ok {
forUpdateState = latestIndex.State
}
logutil.Logger(ctx).Info("build physical index lookup reader, the index isn't public in forUpdateRead",
zap.String("index", idxInfo.Name.O),
zap.Stringer("state", idxInfo.State),
zap.Stringer("forUpdateRead state", forUpdateState),
zap.String("table", tblInfo.Name.O))
continue
}
}
indexInfos = append(indexInfos, idxInfo)
// For partition tables.
if pi := tbl.Meta().GetPartitionInfo(); pi != nil {
Expand Down Expand Up @@ -2698,6 +2774,7 @@ func (c *colNameInOnDupExtractor) Leave(node ast.Node) (ast.Node, bool) {
}

func (b *PlanBuilder) buildSelectPlanOfInsert(ctx context.Context, insert *ast.InsertStmt, insertPlan *Insert) error {
b.isForUpdateRead = true
affectedValuesCols, err := b.getAffectCols(insert, insertPlan)
if err != nil {
return err
Expand Down
38 changes: 34 additions & 4 deletions planner/core/point_get_plan.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,12 @@ import (
"github.com/pingcap/tidb/types"
driver "github.com/pingcap/tidb/types/parser_driver"
tidbutil "github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/math"
"github.com/pingcap/tidb/util/plancodec"
"github.com/pingcap/tidb/util/stringutil"
"github.com/pingcap/tipb/go-tipb"
"go.uber.org/zap"
)

// PointGetPlan is a fast plan for simple point get.
Expand Down Expand Up @@ -432,7 +434,7 @@ func TryFastPlan(ctx sessionctx.Context, node ast.Node) (p Plan) {
p = fp
return
}
if fp := tryPointGetPlan(ctx, x); fp != nil {
if fp := tryPointGetPlan(ctx, x, isForUpdateReadSelectLock(x.LockInfo)); fp != nil {
if checkFastPlanPrivilege(ctx, fp.dbName, fp.TblInfo.Name.L, mysql.SelectPriv) != nil {
return nil
}
Expand Down Expand Up @@ -757,7 +759,7 @@ func tryWhereIn2BatchPointGet(ctx sessionctx.Context, selStmt *ast.SelectStmt) *
// 2. It must be a single table select.
// 3. All the columns must be public and generated.
// 4. The condition is an access path that the range is a unique key.
func tryPointGetPlan(ctx sessionctx.Context, selStmt *ast.SelectStmt) *PointGetPlan {
func tryPointGetPlan(ctx sessionctx.Context, selStmt *ast.SelectStmt, check bool) *PointGetPlan {
if selStmt.Having != nil {
return nil
} else if selStmt.Limit != nil {
Expand Down Expand Up @@ -838,11 +840,27 @@ func tryPointGetPlan(ctx sessionctx.Context, selStmt *ast.SelectStmt) *PointGetP
return nil
}

check = check && ctx.GetSessionVars().ConnectionID > 0
var latestIndexes map[int64]*model.IndexInfo
var err error

for _, idxInfo := range tbl.Indices {
if !idxInfo.Unique || idxInfo.State != model.StatePublic || idxInfo.Invisible {
continue
}
if isTableDual {
if check && latestIndexes == nil {
latestIndexes, check, err = getLatestIndexInfo(ctx, tbl.ID, 0)
if err != nil {
logutil.BgLogger().Warn("get information schema failed", zap.Error(err))
return nil
}
}
if check {
if latestIndex, ok := latestIndexes[idxInfo.ID]; !ok || latestIndex.State != model.StatePublic {
continue
}
}
p := newPointGetPlan(ctx, tblName.Schema.O, schema, tbl, names)
p.IsTableDual = true
return p
Expand All @@ -852,6 +870,18 @@ func tryPointGetPlan(ctx sessionctx.Context, selStmt *ast.SelectStmt) *PointGetP
if idxValues == nil {
continue
}
if check && latestIndexes == nil {
latestIndexes, check, err = getLatestIndexInfo(ctx, tbl.ID, 0)
if err != nil {
logutil.BgLogger().Warn("get information schema failed", zap.Error(err))
return nil
}
}
if check {
if latestIndex, ok := latestIndexes[idxInfo.ID]; !ok || latestIndex.State != model.StatePublic {
continue
}
}
p := newPointGetPlan(ctx, dbName, schema, tbl, names)
p.IndexInfo = idxInfo
p.IndexValues = idxValues
Expand Down Expand Up @@ -1164,7 +1194,7 @@ func tryUpdatePointPlan(ctx sessionctx.Context, updateStmt *ast.UpdateStmt) Plan
OrderBy: updateStmt.Order,
Limit: updateStmt.Limit,
}
pointGet := tryPointGetPlan(ctx, selStmt)
pointGet := tryPointGetPlan(ctx, selStmt, true)
if pointGet != nil {
if pointGet.IsTableDual {
return PhysicalTableDual{
Expand Down Expand Up @@ -1258,7 +1288,7 @@ func tryDeletePointPlan(ctx sessionctx.Context, delStmt *ast.DeleteStmt) Plan {
OrderBy: delStmt.Order,
Limit: delStmt.Limit,
}
if pointGet := tryPointGetPlan(ctx, selStmt); pointGet != nil {
if pointGet := tryPointGetPlan(ctx, selStmt, true); pointGet != nil {
if pointGet.IsTableDual {
return PhysicalTableDual{
names: pointGet.outputNames,
Expand Down
2 changes: 1 addition & 1 deletion planner/core/rule_partition_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -1092,7 +1092,7 @@ func pruneUseBinarySearch(lessThan lessThanDataInt, data dataForPrune, unsigned
func (s *partitionProcessor) resolveAccessPaths(ds *DataSource) error {
possiblePaths, err := getPossibleAccessPaths(
ds.ctx, &tableHintInfo{indexMergeHintList: ds.indexMergeHints, indexHintList: ds.IndexHints},
ds.astIndexHints, ds.table, ds.DBName, ds.tableInfo.Name)
ds.astIndexHints, ds.table, ds.DBName, ds.tableInfo.Name, ds.isForUpdateRead, ds.is.SchemaMetaVersion())
if err != nil {
return err
}
Expand Down
Loading