Skip to content

Commit

Permalink
executor: fix the issue that some extracted conditions are not used i…
Browse files Browse the repository at this point in the history
…n information_schema (#55236)

close #55235
  • Loading branch information
YangKeao authored Aug 7, 2024
1 parent 7d0abb9 commit 7291829
Show file tree
Hide file tree
Showing 2 changed files with 98 additions and 2 deletions.
15 changes: 13 additions & 2 deletions pkg/executor/infoschema_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -1745,7 +1745,7 @@ func (e *memtableRetriever) setDataFromKeyColumnUsage(ctx context.Context, sctx
if checker != nil && !checker.RequestVerification(sctx.GetSessionVars().ActiveRoles, schema.L, table.Name.L, "", mysql.AllPrivMask) {
continue
}
rs := keyColumnUsageInTable(schema, table)
rs := keyColumnUsageInTable(schema, table, extractor)
rows = append(rows, rs...)
}
}
Expand Down Expand Up @@ -1815,7 +1815,7 @@ func (e *memtableRetriever) setDataForMetricTables() {
e.rows = rows
}

func keyColumnUsageInTable(schema model.CIStr, table *model.TableInfo) [][]types.Datum {
func keyColumnUsageInTable(schema model.CIStr, table *model.TableInfo, extractor *plannercore.InfoSchemaBaseExtractor) [][]types.Datum {
var rows [][]types.Datum
if table.PKIsHandle {
for _, col := range table.Columns {
Expand Down Expand Up @@ -1853,6 +1853,11 @@ func keyColumnUsageInTable(schema model.CIStr, table *model.TableInfo) [][]types
// Only handle unique/primary key
continue
}

if extractor != nil && extractor.Filter("constraint_name", idxName) {
continue
}

for i, key := range index.Columns {
col := nameToCol[key.Name.L]
if col.Hidden {
Expand Down Expand Up @@ -2125,6 +2130,9 @@ func (e *memtableRetriever) setDataFromTableConstraints(ctx context.Context, sct
if ok && extractor.Filter("constraint_schema", schema.L) {
continue
}
if ok && extractor.Filter("table_schema", schema.L) {
continue
}
tables, err := e.is.SchemaTableInfos(ctx, schema)
if err != nil {
return errors.Trace(err)
Expand Down Expand Up @@ -2161,6 +2169,9 @@ func (e *memtableRetriever) setDataFromTableConstraints(ctx context.Context, sct
// The index has no constriant.
continue
}
if ok && extractor.Filter("constraint_name", cname) {
continue
}
record := types.MakeDatums(
infoschema.CatalogVal, // CONSTRAINT_CATALOG
schema.O, // CONSTRAINT_SCHEMA
Expand Down
85 changes: 85 additions & 0 deletions pkg/executor/infoschema_reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -627,3 +627,88 @@ func TestReferencedTableSchemaWithForeignKey(t *testing.T) {
WHERE table_name = 't2' AND table_schema = 'test2';`).Check(testkit.Rows(
"id id t1 test2 test"))
}

func TestInfoSchemaConditionWorks(t *testing.T) {
// this test creates table in different schema with different index name, and check
// the condition in the following columns whether work as expected.
//
// - "table_schema"
// - "constraint_schema"
// - "table_name"
// - "constraint_name"
// - "partition_name"
// - "schema_name"
// - "index_name"
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
for db := 0; db < 2; db++ {
for table := 0; table < 2; table++ {
tk.MustExec(fmt.Sprintf("create database if not exists db%d;", db))
tk.MustExec(fmt.Sprintf(`create table db%d.table%d (id int primary key, data0 varchar(255), data1 varchar(255))
partition by range (id) (
partition p0 values less than (10),
partition p1 values less than (20)
);`, db, table))
for index := 0; index < 2; index++ {
tk.MustExec(fmt.Sprintf("create index idx%d on db%d.table%d (data%d);", index, db, table, index))
}
}
}

testColumns := map[string]string{
"table_schema": "db",
"constraint_schema": "db",
"table_name": "table",
"constraint_name": "idx",
"partition_name": "p",
"schema_name": "db",
"index_name": "idx",
}
testTables := []string{}
for _, row := range tk.MustQuery("show tables in information_schema").Rows() {
tableName := row[0].(string)
// exclude some tables which cannot run without TiKV.
if strings.HasPrefix(tableName, "CLUSTER_") ||
strings.HasPrefix(tableName, "INSPECTION_") ||
strings.HasPrefix(tableName, "METRICS_") ||
strings.HasPrefix(tableName, "TIFLASH_") ||
strings.HasPrefix(tableName, "TIKV_") ||
strings.HasPrefix(tableName, "USER_") ||
tableName == "TABLE_STORAGE_STATS" ||
strings.Contains(tableName, "REGION") {
continue
}
testTables = append(testTables, row[0].(string))
}
for _, table := range testTables {
rs, err := tk.Exec(fmt.Sprintf("select * from information_schema.%s", table))
require.NoError(t, err)
cols := rs.Fields()

chk := rs.NewChunk(nil)
rowCount := 0
for {
err := rs.Next(context.Background(), chk)
require.NoError(t, err)
if chk.NumRows() == 0 {
break
}
rowCount += chk.NumRows()
}
if rowCount == 0 {
// TODO: find a way to test the table without any rows by adding some rows to them.
continue
}
for i := 0; i < len(cols); i++ {
colName := cols[i].Column.Name.L
if valPrefix, ok := testColumns[colName]; ok {
for j := 0; j < 2; j++ {
rows := tk.MustQuery(fmt.Sprintf("select * from information_schema.%s where %s = '%s%d';",
table, colName, valPrefix, j)).Rows()
rowCountWithCondition := len(rows)
require.Less(t, rowCountWithCondition, rowCount, "%s has no effect on %s", colName, table)
}
}
}
}
}

0 comments on commit 7291829

Please sign in to comment.