Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: fix a bug that the pessimistic lock doesn't work on a partition #14921

Merged
merged 9 commits into from
Mar 3, 2020
7 changes: 4 additions & 3 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -577,9 +577,10 @@ func (b *executorBuilder) buildSelectLock(v *plannercore.PhysicalLock) Executor
return src
}
e := &SelectLockExec{
baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ExplainID(), src),
Lock: v.Lock,
tblID2Handle: v.TblID2Handle,
baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ExplainID(), src),
Lock: v.Lock,
tblID2Handle: v.TblID2Handle,
partitionedTable: v.PartitionedTable,
}
return e
}
Expand Down
39 changes: 33 additions & 6 deletions executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -817,7 +817,11 @@ type SelectLockExec struct {
Lock ast.SelectLockType
keys []kv.Key

tblID2Handle map[int64][]*expression.Column
tblID2Handle map[int64][]*expression.Column
partitionedTable []table.PartitionedTable

// tblID2Table is cached to reduce cost.
tblID2Table map[int64]table.PartitionedTable
}

// Open implements the Executor Open interface.
Expand All @@ -831,6 +835,18 @@ func (e *SelectLockExec) Open(ctx context.Context) error {
// This operation is only for schema validator check.
txnCtx.UpdateDeltaForTable(id, 0, 0, map[int64]int64{})
}

if len(e.tblID2Handle) > 0 && len(e.partitionedTable) > 0 {
e.tblID2Table = make(map[int64]table.PartitionedTable, len(e.partitionedTable))
for id := range e.tblID2Handle {
for _, p := range e.partitionedTable {
if id == p.Meta().ID {
e.tblID2Table[id] = p
}
}
}
}

return nil
}

Expand All @@ -845,12 +861,23 @@ func (e *SelectLockExec) Next(ctx context.Context, req *chunk.Chunk) error {
if len(e.tblID2Handle) == 0 || (e.Lock != ast.SelectLockForUpdate && e.Lock != ast.SelectLockForUpdateNoWait) {
return nil
}
if req.NumRows() != 0 {

if req.NumRows() != 0 && len(e.tblID2Handle) > 0 {
tiancaiamao marked this conversation as resolved.
Show resolved Hide resolved
iter := chunk.NewIterator4Chunk(req)
for id, cols := range e.tblID2Handle {
for _, col := range cols {
for row := iter.Begin(); row != iter.End(); row = iter.Next() {
e.keys = append(e.keys, tablecodec.EncodeRowKeyWithHandle(id, row.GetInt64(col.Index)))
for row := iter.Begin(); row != iter.End(); row = iter.Next() {
for id, cols := range e.tblID2Handle {
physicalID := id
if pt, ok := e.tblID2Table[id]; ok {
// On a partitioned table, we have to use physical ID to encode the lock key!
p, err := pt.GetPartitionByRow(e.ctx, row.GetDatumRow(e.base().retFieldTypes))
if err != nil {
return err
}
physicalID = p.GetPhysicalID()
}

for _, col := range cols {
e.keys = append(e.keys, tablecodec.EncodeRowKeyWithHandle(physicalID, row.GetInt64(col.Index)))
}
}
}
Expand Down
5 changes: 3 additions & 2 deletions planner/core/exhaust_physical_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -1699,8 +1699,9 @@ func (p *LogicalLimit) exhaustPhysicalPlans(prop *property.PhysicalProperty) []P
func (p *LogicalLock) exhaustPhysicalPlans(prop *property.PhysicalProperty) []PhysicalPlan {
childProp := prop.Clone()
lock := PhysicalLock{
Lock: p.Lock,
TblID2Handle: p.tblID2Handle,
Lock: p.Lock,
TblID2Handle: p.tblID2Handle,
PartitionedTable: p.partitionedTable,
}.Init(p.ctx, p.stats.ScaleByExpectCnt(prop.ExpectedCnt), childProp)
return []PhysicalPlan{lock}
}
Expand Down
1 change: 1 addition & 0 deletions planner/core/logical_plan_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -2565,6 +2565,7 @@ func (b *PlanBuilder) buildDataSource(ctx context.Context, tn *ast.TableName, as

if tableInfo.GetPartitionInfo() != nil {
b.optFlag = b.optFlag | flagPartitionProcessor
b.partitionedTable = append(b.partitionedTable, tbl.(table.PartitionedTable))
// check partition by name.
for _, name := range tn.PartitionNames {
_, err = tables.FindPartitionByName(tableInfo, name.L)
Expand Down
5 changes: 3 additions & 2 deletions planner/core/logical_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -878,8 +878,9 @@ type LogicalLimit struct {
type LogicalLock struct {
baseLogicalPlan

Lock ast.SelectLockType
tblID2Handle map[int64][]*expression.Column
Lock ast.SelectLockType
tblID2Handle map[int64][]*expression.Column
partitionedTable []table.PartitionedTable
}

// WindowFrame represents a window function frame.
Expand Down
4 changes: 3 additions & 1 deletion planner/core/physical_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/stmtctx"
"github.com/pingcap/tidb/statistics"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/ranger"
)
Expand Down Expand Up @@ -416,7 +417,8 @@ type PhysicalLock struct {

Lock ast.SelectLockType

TblID2Handle map[int64][]*expression.Column
TblID2Handle map[int64][]*expression.Column
PartitionedTable []table.PartitionedTable
}

// PhysicalLimit is the physical operator of Limit.
Expand Down
8 changes: 6 additions & 2 deletions planner/core/planbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,9 @@ type PlanBuilder struct {
hintProcessor *BlockHintProcessor
// selectOffset is the offsets of current processing select stmts.
selectOffset []int

// SelectLock need this information to locate the lock on partitions.
partitionedTable []table.PartitionedTable
}

type handleColHelper struct {
Expand Down Expand Up @@ -799,8 +802,9 @@ func removeIgnoredPaths(paths, ignoredPaths []*util.AccessPath, tblInfo *model.T

func (b *PlanBuilder) buildSelectLock(src LogicalPlan, lock ast.SelectLockType) *LogicalLock {
selectLock := LogicalLock{
Lock: lock,
tblID2Handle: b.handleHelper.tailMap(),
Lock: lock,
tblID2Handle: b.handleHelper.tailMap(),
partitionedTable: b.partitionedTable,
}.Init(b.ctx)
selectLock.SetChildren(src)
return selectLock
Expand Down
6 changes: 6 additions & 0 deletions planner/core/rule_column_pruning.go
Original file line number Diff line number Diff line change
Expand Up @@ -359,6 +359,12 @@ func (p *LogicalLock) PruneColumns(parentUsedCols []*expression.Column) error {
return p.baseLogicalPlan.PruneColumns(parentUsedCols)
}

if len(p.partitionedTable) > 0 {
// If the children include partitioned tables, do not prune columns.
// Because the executor needs the partitioned columns to calculate the lock key.
return p.children[0].PruneColumns(p.Schema().Columns)
}

for _, cols := range p.tblID2Handle {
parentUsedCols = append(parentUsedCols, cols...)
}
Expand Down
46 changes: 46 additions & 0 deletions session/session_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3024,3 +3024,49 @@ func (s *testSessionSuite2) TestStmtHints(c *C) {
c.Assert(tk.Se.GetSessionVars().StmtCtx.GetWarnings(), HasLen, 1)
c.Assert(tk.Se.GetSessionVars().GetReplicaRead(), Equals, kv.ReplicaReadFollower)
}

func (s *testSessionSuite2) TestPessimisticLockOnPartition(c *C) {
// This test checks that 'select ... for update' locks the partition instead of the table.
// Cover a bug that table ID is used to encode the lock key mistakenly.
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
tk.MustExec(`create table if not exists forupdate_on_partition (
age int not null primary key,
nickname varchar(20) not null,
gender int not null default 0,
first_name varchar(30) not null default '',
last_name varchar(20) not null default '',
full_name varchar(60) as (concat(first_name, ' ', last_name)),
index idx_nickname (nickname)
) partition by range (age) (
partition child values less than (18),
partition young values less than (30),
partition middle values less than (50),
partition old values less than (123)
);`)
tk.MustExec("insert into forupdate_on_partition (`age`, `nickname`) values (25, 'cosven');")

tk1 := testkit.NewTestKit(c, s.store)
tk1.MustExec("use test")

tk.MustExec("begin pessimistic")
tk.MustQuery("select * from forupdate_on_partition where age=25 for update").Check(testkit.Rows("25 cosven 0 "))
tk1.MustExec("begin pessimistic")

val := int32(0)
ch := make(chan int32, 5)
go func() {
tk1.MustExec("update forupdate_on_partition set first_name='sw' where age=25")
ch <- val
tiancaiamao marked this conversation as resolved.
Show resolved Hide resolved
tk1.MustExec("commit")
}()

// Leave 50ms for tk1 to run, if it finish within the duration,
// the channel value should be 0, otherwise the value would be 1.
time.Sleep(50 * time.Millisecond)
val = 1

tk.MustExec("commit")
// tk1 should be blocked until tk commit, so the channel value should be 1.
c.Assert(<-ch, Equals, int32(1))
}
2 changes: 1 addition & 1 deletion table/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ type PhysicalTable interface {
type PartitionedTable interface {
Table
GetPartition(physicalID int64) PhysicalTable
GetPartitionByRow(sessionctx.Context, []types.Datum) (Table, error)
GetPartitionByRow(sessionctx.Context, []types.Datum) (PhysicalTable, error)
}

// TableFromMeta builds a table.Table from *model.TableInfo.
Expand Down
2 changes: 1 addition & 1 deletion table/tables/partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -340,7 +340,7 @@ func (t *partitionedTable) GetPartition(pid int64) table.PhysicalTable {
}

// GetPartitionByRow returns a Table, which is actually a Partition.
func (t *partitionedTable) GetPartitionByRow(ctx sessionctx.Context, r []types.Datum) (table.Table, error) {
func (t *partitionedTable) GetPartitionByRow(ctx sessionctx.Context, r []types.Datum) (table.PhysicalTable, error) {
pid, err := t.locatePartition(ctx, t.Meta().GetPartitionInfo(), r)
if err != nil {
return nil, errors.Trace(err)
Expand Down