Skip to content

Commit

Permalink
executor: add test for same table name in two schemas (#55222)
Browse files Browse the repository at this point in the history
close #55221
  • Loading branch information
tangenta authored Aug 7, 2024
1 parent 3dfa15c commit 165ef9d
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 10 deletions.
30 changes: 30 additions & 0 deletions pkg/executor/infoschema_reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -628,6 +628,36 @@ func TestReferencedTableSchemaWithForeignKey(t *testing.T) {
"id id t1 test2 test"))
}

func TestSameTableNameInTwoSchemas(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("create database test1;")
tk.MustExec("create database test2;")
tk.MustExec("create table test1.t (a int);")
tk.MustExec("create table test2.t (a int);")

rs := tk.MustQuery("select tidb_table_id from information_schema.tables where table_name = 't' and table_schema = 'test1';").Rows()
t1ID, err := strconv.Atoi(rs[0][0].(string))
require.NoError(t, err)
rs = tk.MustQuery("select tidb_table_id from information_schema.tables where table_name = 't' and table_schema = 'test2';").Rows()
t2ID, err := strconv.Atoi(rs[0][0].(string))
require.NoError(t, err)

tk.MustQuery(fmt.Sprintf("select table_schema, table_name, tidb_table_id from information_schema.tables where tidb_table_id = %d;", t1ID)).
Check(testkit.Rows(fmt.Sprintf("test1 t %d", t1ID)))
tk.MustQuery(fmt.Sprintf("select table_schema, table_name, tidb_table_id from information_schema.tables where tidb_table_id = %d;", t2ID)).
Check(testkit.Rows(fmt.Sprintf("test2 t %d", t2ID)))

tk.MustQuery(fmt.Sprintf("select table_schema, table_name, tidb_table_id from information_schema.tables where table_name = 't' and tidb_table_id = %d;", t1ID)).
Check(testkit.Rows(fmt.Sprintf("test1 t %d", t1ID)))
tk.MustQuery(fmt.Sprintf("select table_schema, table_name, tidb_table_id from information_schema.tables where table_schema = 'test1' and tidb_table_id = %d;", t1ID)).
Check(testkit.Rows(fmt.Sprintf("test1 t %d", t1ID)))
tk.MustQuery(fmt.Sprintf("select table_schema, table_name, tidb_table_id from information_schema.tables where table_name = 'unknown' and tidb_table_id = %d;", t1ID)).
Check(testkit.Rows())
tk.MustQuery(fmt.Sprintf("select table_schema, table_name, tidb_table_id from information_schema.tables where table_schema = 'unknown' and tidb_table_id = %d;", t1ID)).
Check(testkit.Rows())
}

func TestInfoSchemaConditionWorks(t *testing.T) {
// this test creates table in different schema with different index name, and check
// the condition in the following columns whether work as expected.
Expand Down
28 changes: 18 additions & 10 deletions pkg/planner/core/memtable_infoschema_extractor.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,8 +203,7 @@ func (e *InfoSchemaTablesExtractor) ListSchemasAndTables(
if len(tableIDs) > 0 {
tableMap := make(map[int64]*model.TableInfo, len(tableIDs))
findTablesByID(is, tableIDs, tableNames, tableMap)
schemaSlice, tableSlice := findSchemasForTables(is, schemas, maps.Values(tableMap))
return schemaSlice, tableSlice, nil
return findSchemasForTables(ctx, is, schemas, maps.Values(tableMap))
}
if len(tableNames) > 0 {
return findTableAndSchemaByName(ctx, is, schemas, tableNames)
Expand All @@ -231,8 +230,7 @@ func (e *InfoSchemaPartitionsExtractor) ListSchemasAndTables(
if len(partIDs) > 0 {
tableMap := make(map[int64]*model.TableInfo, len(partIDs))
findTablesByPartID(is, partIDs, tableNames, tableMap)
schemaSlice, tableSlice := findSchemasForTables(is, schemas, maps.Values(tableMap))
return schemaSlice, tableSlice, nil
return findSchemasForTables(ctx, is, schemas, maps.Values(tableMap))
}
if len(tableNames) > 0 {
return findTableAndSchemaByName(ctx, is, schemas, tableNames)
Expand Down Expand Up @@ -353,7 +351,8 @@ func findTablesByID(
continue
}
if len(tableNames) > 0 {
if _, ok := tblNameMap[tblInfo.Name.L]; ok {
if _, found := tblNameMap[tblInfo.Name.L]; !found {
// table_id does not match table_name, skip it.
continue
}
}
Expand All @@ -378,7 +377,8 @@ func findTablesByPartID(
continue
}
if len(tableNames) > 0 {
if _, ok := tblNameMap[tbl.Meta().Name.L]; ok {
if _, found := tblNameMap[tbl.Meta().Name.L]; !found {
// partition_id does not match table_name, skip it.
continue
}
}
Expand Down Expand Up @@ -433,7 +433,7 @@ func listTablesForEachSchema(
for _, s := range schemas {
tables, err := is.SchemaTableInfos(ctx, s)
if err != nil {
return nil, nil, err
return nil, nil, errors.Trace(err)
}
for _, t := range tables {
schemaSlice = append(schemaSlice, s)
Expand All @@ -444,15 +444,23 @@ func listTablesForEachSchema(
}

func findSchemasForTables(
ctx context.Context,
is infoschema.InfoSchema,
schemas []model.CIStr,
tableSlice []*model.TableInfo,
) ([]model.CIStr, []*model.TableInfo) {
) ([]model.CIStr, []*model.TableInfo, error) {
schemaSlice := make([]model.CIStr, 0, len(tableSlice))
for i, tbl := range tableSlice {
found := false
for _, s := range schemas {
if is.TableExists(s, tbl.Name) {
isTbl, err := is.TableByName(ctx, s, tbl.Name)
if err != nil {
if terror.ErrorEqual(err, infoschema.ErrTableNotExists) {
continue
}
return nil, nil, errors.Trace(err)
}
if isTbl.Meta().ID == tbl.ID {
schemaSlice = append(schemaSlice, s)
found = true
break
Expand All @@ -469,7 +477,7 @@ func findSchemasForTables(
remains = append(remains, tbl)
}
}
return schemaSlice, remains
return schemaSlice, remains, nil
}

func parseIDs(ids []model.CIStr) []int64 {
Expand Down

0 comments on commit 165ef9d

Please sign in to comment.