Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: Fix issue 39999, used wrong column id list for checking partitions #40003

Merged
merged 9 commits into from
Dec 20, 2022
85 changes: 38 additions & 47 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -3511,17 +3511,39 @@ func buildIndexRangeForEachPartition(ctx sessionctx.Context, usedPartitions []ta
return nextRange, nil
}

func keyColumnsIncludeAllPartitionColumns(keyColumns []int, pe *tables.PartitionExpr) bool {
tmp := make(map[int]struct{}, len(keyColumns))
for _, offset := range keyColumns {
tmp[offset] = struct{}{}
func getPartitionKeyColOffsets(keyColIDs []int64, pt table.PartitionedTable) []int {
keyColOffsets := make([]int, len(keyColIDs))
for i, colID := range keyColIDs {
offset := -1
for j, col := range pt.Cols() {
if colID == col.ID {
offset = j
break
}
}
if offset == -1 {
return nil
}
keyColOffsets[i] = offset
}

pe, err := pt.(interface {
PartitionExpr() (*tables.PartitionExpr, error)
}).PartitionExpr()
if err != nil {
return nil
}

offsetMap := make(map[int]struct{})
for _, offset := range keyColOffsets {
offsetMap[offset] = struct{}{}
}
for _, offset := range pe.ColumnOffset {
if _, ok := tmp[offset]; !ok {
return false
if _, ok := offsetMap[offset]; !ok {
return nil
}
}
return true
return keyColOffsets
}

func (builder *dataReaderBuilder) prunePartitionForInnerExecutor(tbl table.Table, schema *expression.Schema, partitionInfo *plannercore.PartitionInfo,
Expand All @@ -3536,45 +3558,16 @@ func (builder *dataReaderBuilder) prunePartitionForInnerExecutor(tbl table.Table
return nil, false, nil, err
}

// check whether can runtime prune.
type partitionExpr interface {
PartitionExpr() (*tables.PartitionExpr, error)
}
pe, err := tbl.(partitionExpr).PartitionExpr()
if err != nil {
return nil, false, nil, err
}

// recalculate key column offsets
if len(lookUpContent) == 0 {
return nil, false, nil, nil
}
if lookUpContent[0].keyColIDs == nil {
return nil, false, nil, plannercore.ErrInternal.GenWithStack("cannot get column IDs when dynamic pruning")
}
keyColOffsets := make([]int, len(lookUpContent[0].keyColIDs))
for i, colID := range lookUpContent[0].keyColIDs {
offset := -1
for j, col := range partitionTbl.Cols() {
if colID == col.ID {
offset = j
break
}
}
if offset == -1 {
return nil, false, nil, plannercore.ErrInternal.GenWithStack("invalid column offset when dynamic pruning")
}
keyColOffsets[i] = offset
}

offsetMap := make(map[int]bool)
for _, offset := range keyColOffsets {
offsetMap[offset] = true
}
for _, offset := range pe.ColumnOffset {
if _, ok := offsetMap[offset]; !ok {
return condPruneResult, false, nil, nil
}
keyColOffsets := getPartitionKeyColOffsets(lookUpContent[0].keyColIDs, partitionTbl)
if len(keyColOffsets) == 0 {
return condPruneResult, false, nil, nil
}

locateKey := make([]types.Datum, len(partitionTbl.Cols()))
Expand Down Expand Up @@ -4149,12 +4142,6 @@ func (builder *dataReaderBuilder) buildTableReaderForIndexJoin(ctx context.Conte
}
tbl, _ := builder.is.TableByID(tbInfo.ID)
pt := tbl.(table.PartitionedTable)
pe, err := tbl.(interface {
bb7133 marked this conversation as resolved.
Show resolved Hide resolved
PartitionExpr() (*tables.PartitionExpr, error)
}).PartitionExpr()
if err != nil {
return nil, err
}
partitionInfo := &v.PartitionInfo
usedPartitionList, err := builder.partitionPruning(pt, partitionInfo.PruningConds, partitionInfo.PartitionNames, partitionInfo.Columns, partitionInfo.ColumnNames)
if err != nil {
Expand All @@ -4165,8 +4152,12 @@ func (builder *dataReaderBuilder) buildTableReaderForIndexJoin(ctx context.Conte
usedPartitions[p.GetPhysicalID()] = p
}
var kvRanges []kv.KeyRange
var keyColOffsets []int
if len(lookUpContents) > 0 {
keyColOffsets = getPartitionKeyColOffsets(lookUpContents[0].keyColIDs, pt)
}
if v.IsCommonHandle {
if len(lookUpContents) > 0 && keyColumnsIncludeAllPartitionColumns(lookUpContents[0].keyCols, pe) {
if len(keyColOffsets) > 0 {
locateKey := make([]types.Datum, e.Schema().Len())
kvRanges = make([]kv.KeyRange, 0, len(lookUpContents))
// lookUpContentsByPID groups lookUpContents by pid(partition) so that kv ranges for same partition can be merged.
Expand Down Expand Up @@ -4212,7 +4203,7 @@ func (builder *dataReaderBuilder) buildTableReaderForIndexJoin(ctx context.Conte

handles, lookUpContents := dedupHandles(lookUpContents)

if len(lookUpContents) > 0 && keyColumnsIncludeAllPartitionColumns(lookUpContents[0].keyCols, pe) {
if len(keyColOffsets) > 0 {
locateKey := make([]types.Datum, e.Schema().Len())
kvRanges = make([]kv.KeyRange, 0, len(lookUpContents))
for _, content := range lookUpContents {
Expand Down
20 changes: 11 additions & 9 deletions executor/index_lookup_join_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -428,17 +428,19 @@ PARTITIONS 1`)

// Why does the t2.prefiller need be at least 2^32 ? If smaller the bug will not appear!?!
tk.MustExec("insert into t2 values ( pow(2,32), 1, 1), ( pow(2,32)+1, 2, 0)")
tk.MustExec(`analyze table t1`)
tk.MustExec(`analyze table t2`)

// Why must it be = 1 and not 2?
tk.MustQuery("explain select /* +INL_JOIN(t1,t2) */ t1.id, t1.pc from t1 where id in ( select prefiller from t2 where t2.postfiller = 1 )").Check(testkit.Rows("" +
"IndexJoin_15 10.00 root inner join, inner:TableReader_14, outer key:test.t2.prefiller, inner key:test.t1.id, equal cond:eq(test.t2.prefiller, test.t1.id)]\n" +
"[├─HashAgg_25(Build) 8.00 root group by:test.t2.prefiller, funcs:firstrow(test.t2.prefiller)->test.t2.prefiller]\n" +
"[│ └─TableReader_26 8.00 root data:HashAgg_20]\n" +
"[│ └─HashAgg_20 8.00 cop[tikv] group by:test.t2.prefiller, ]\n" +
"[│ └─Selection_24 10.00 cop[tikv] eq(test.t2.postfiller, 1)]\n" +
"[│ └─TableFullScan_23 10000.00 cop[tikv] table:t2 keep order:false, stats:pseudo]\n" +
"[└─TableReader_14(Probe) 8.00 root partition:all data:TableRangeScan_13]\n" +
"[ └─TableRangeScan_13 8.00 cop[tikv] table:t1 range: decided by [eq(test.t1.id, test.t2.prefiller)], keep order:false, stats:pseudo"))
tk.MustQuery("explain format='brief' select /* +INL_JOIN(t1,t2) */ t1.id, t1.pc from t1 where id in ( select prefiller from t2 where t2.postfiller = 1 )").Check(testkit.Rows(""+
`IndexJoin 1.25 root inner join, inner:TableReader, outer key:test.t2.prefiller, inner key:test.t1.id, equal cond:eq(test.t2.prefiller, test.t1.id)`,
`├─HashAgg(Build) 1.00 root group by:test.t2.prefiller, funcs:firstrow(test.t2.prefiller)->test.t2.prefiller`,
`│ └─TableReader 1.00 root data:HashAgg`,
`│ └─HashAgg 1.00 cop[tikv] group by:test.t2.prefiller, `,
`│ └─Selection 1.00 cop[tikv] eq(test.t2.postfiller, 1)`,
`│ └─TableFullScan 2.00 cop[tikv] table:t2 keep order:false`,
`└─TableReader(Probe) 1.00 root partition:all data:TableRangeScan`,
` └─TableRangeScan 1.00 cop[tikv] table:t1 range: decided by [eq(test.t1.id, test.t2.prefiller)], keep order:false, stats:pseudo`))
tk.MustQuery("show warnings").Check(testkit.Rows())
// without fix it fails with: "runtime error: index out of range [0] with length 0"
tk.MustQuery("select /* +INL_JOIN(t1,t2) */ t1.id, t1.pc from t1 where id in ( select prefiller from t2 where t2.postfiller = 1 )").Check(testkit.Rows())
Expand Down
69 changes: 69 additions & 0 deletions executor/partition_table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3831,3 +3831,72 @@ func TestIssue21732(t *testing.T) {
})
}
}

func TestIssue39999(t *testing.T) {
store := testkit.CreateMockStore(t)

tk := testkit.NewTestKit(t, store)

tk.MustExec(`create schema test39999`)
tk.MustExec(`use test39999`)
tk.MustExec(`drop table if exists c, t`)
tk.MustExec("CREATE TABLE `c` (" +
"`serial_id` varchar(24)," +
"`occur_trade_date` date," +
"`txt_account_id` varchar(24)," +
"`capital_sub_class` varchar(10)," +
"`occur_amount` decimal(16,2)," +
"`broker` varchar(10)," +
"PRIMARY KEY (`txt_account_id`,`occur_trade_date`,`serial_id`) /*T![clustered_index] CLUSTERED */," +
"KEY `idx_serial_id` (`serial_id`)" +
") ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_unicode_ci " +
"PARTITION BY RANGE COLUMNS(`serial_id`) (" +
"PARTITION `p202209` VALUES LESS THAN ('20221001')," +
"PARTITION `p202210` VALUES LESS THAN ('20221101')," +
"PARTITION `p202211` VALUES LESS THAN ('20221201')" +
")")

tk.MustExec("CREATE TABLE `t` ( " +
"`txn_account_id` varchar(24), " +
"`account_id` varchar(32), " +
"`broker` varchar(10), " +
"PRIMARY KEY (`txn_account_id`) /*T![clustered_index] CLUSTERED */ " +
") ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_unicode_ci")

tk.MustExec("INSERT INTO `c` (serial_id, txt_account_id, capital_sub_class, occur_trade_date, occur_amount, broker) VALUES ('2022111700196920','04482786','CUST','2022-11-17',-2.01,'0009')")
tk.MustExec("INSERT INTO `t` VALUES ('04482786','1142927','0009')")

tk.MustExec(`set tidb_partition_prune_mode='dynamic'`)
tk.MustExec(`analyze table c`)
tk.MustExec(`analyze table t`)
query := `select
/*+ inl_join(c) */
c.occur_amount
from
c
join t on c.txt_account_id = t.txn_account_id
and t.broker = '0009'
and c.occur_trade_date = '2022-11-17'`
tk.MustQuery("explain " + query).Check(testkit.Rows(""+
"IndexJoin_22 1.00 root inner join, inner:TableReader_21, outer key:test39999.t.txn_account_id, inner key:test39999.c.txt_account_id, equal cond:eq(test39999.t.txn_account_id, test39999.c.txt_account_id)",
"├─TableReader_27(Build) 1.00 root data:Selection_26",
"│ └─Selection_26 1.00 cop[tikv] eq(test39999.t.broker, \"0009\")",
"│ └─TableFullScan_25 1.00 cop[tikv] table:t keep order:false",
"└─TableReader_21(Probe) 1.00 root partition:all data:Selection_20",
" └─Selection_20 1.00 cop[tikv] eq(test39999.c.occur_trade_date, 2022-11-17 00:00:00.000000)",
" └─TableRangeScan_19 1.00 cop[tikv] table:c range: decided by [eq(test39999.c.txt_account_id, test39999.t.txn_account_id) eq(test39999.c.occur_trade_date, 2022-11-17 00:00:00.000000)], keep order:false"))
tk.MustQuery(query).Check(testkit.Rows("-2.01"))

// Add the missing partition key part.
tk.MustExec(`alter table t add column serial_id varchar(24) default '2022111700196920'`)
query += ` and c.serial_id = t.serial_id`
tk.MustQuery(query).Check(testkit.Rows("-2.01"))
tk.MustQuery("explain " + query).Check(testkit.Rows(""+
`IndexJoin_20 0.80 root inner join, inner:TableReader_19, outer key:test39999.t.txn_account_id, test39999.t.serial_id, inner key:test39999.c.txt_account_id, test39999.c.serial_id, equal cond:eq(test39999.t.serial_id, test39999.c.serial_id), eq(test39999.t.txn_account_id, test39999.c.txt_account_id)`,
`├─TableReader_25(Build) 0.80 root data:Selection_24`,
`│ └─Selection_24 0.80 cop[tikv] eq(test39999.t.broker, "0009"), not(isnull(test39999.t.serial_id))`,
`│ └─TableFullScan_23 1.00 cop[tikv] table:t keep order:false`,
`└─TableReader_19(Probe) 0.80 root partition:all data:Selection_18`,
` └─Selection_18 0.80 cop[tikv] eq(test39999.c.occur_trade_date, 2022-11-17 00:00:00.000000)`,
` └─TableRangeScan_17 0.80 cop[tikv] table:c range: decided by [eq(test39999.c.txt_account_id, test39999.t.txn_account_id) eq(test39999.c.serial_id, test39999.t.serial_id) eq(test39999.c.occur_trade_date, 2022-11-17 00:00:00.000000)], keep order:false`))
}