Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

infoschema: add a simply store for DM's SchemaTracker #35954

Merged
merged 10 commits into from
Jul 6, 2022
160 changes: 160 additions & 0 deletions infoschema/info_store.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
// Copyright 2022 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package infoschema

import (
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/table/tables"
)

// InfoStore is a simple structure that stores DBInfo and TableInfo. It's modifiable and not thread-safe.
type InfoStore struct {
lowerCaseTableNames int // same as variable lower_case_table_names

dbs map[string]*model.DBInfo
tables map[string]map[string]*model.TableInfo
}

// NewInfoStore creates a InfoStore.
func NewInfoStore(lowerCaseTableNames int) *InfoStore {
return &InfoStore{
lowerCaseTableNames: lowerCaseTableNames,
dbs: map[string]*model.DBInfo{},
tables: map[string]map[string]*model.TableInfo{},
}
}

func (i *InfoStore) ciStr2Key(name model.CIStr) string {
if i.lowerCaseTableNames == 0 {
return name.O
}
return name.L
}

// SchemaByName returns the DBInfo of given name. nil if not found.
func (i *InfoStore) SchemaByName(name model.CIStr) *model.DBInfo {
key := i.ciStr2Key(name)
return i.dbs[key]
}

// PutSchema puts a DBInfo, it will overwrite the old one.
func (i *InfoStore) PutSchema(dbInfo *model.DBInfo) {
key := i.ciStr2Key(dbInfo.Name)
i.dbs[key] = dbInfo
if i.tables[key] == nil {
i.tables[key] = map[string]*model.TableInfo{}
}
}

// DeleteSchema deletes the schema from InfoSchema. Returns true when the schema exists, false otherwise.
func (i *InfoStore) DeleteSchema(name model.CIStr) bool {
key := i.ciStr2Key(name)
_, ok := i.dbs[key]
if !ok {
return false
}
delete(i.dbs, key)
delete(i.tables, key)
return true
}

// TableByName returns the TableInfo. It will also return the error like an infoschema.
func (i *InfoStore) TableByName(schema, table model.CIStr) (*model.TableInfo, error) {
schemaKey := i.ciStr2Key(schema)
tables, ok := i.tables[schemaKey]
if !ok {
return nil, ErrDatabaseNotExists.GenWithStackByArgs(schema)
}

tableKey := i.ciStr2Key(table)
tbl, ok := tables[tableKey]
if !ok {
return nil, ErrTableNotExists.GenWithStackByArgs(schema, table)
}
return tbl, nil
}

// PutTable puts a TableInfo, it will overwrite the old one. If the schema doesn't exist, it will return ErrDatabaseNotExists.
func (i *InfoStore) PutTable(schemaName model.CIStr, tblInfo *model.TableInfo) error {
schemaKey := i.ciStr2Key(schemaName)
tables, ok := i.tables[schemaKey]
if !ok {
return ErrDatabaseNotExists.GenWithStackByArgs(schemaName)
}
tableKey := i.ciStr2Key(tblInfo.Name)
tables[tableKey] = tblInfo
return nil
}

// DeleteTable deletes the TableInfo, it will return ErrDatabaseNotExists or ErrTableNotExists when schema or table does
// not exist.
func (i *InfoStore) DeleteTable(schema, table model.CIStr) error {
schemaKey := i.ciStr2Key(schema)
tables, ok := i.tables[schemaKey]
if !ok {
return ErrDatabaseNotExists.GenWithStackByArgs(schema)
}

tableKey := i.ciStr2Key(table)
_, ok = tables[tableKey]
if !ok {
return ErrTableNotExists.GenWithStackByArgs(schema, table)
}
delete(tables, tableKey)
return nil
}

// InfoStoreAdaptor convert InfoStore to InfoSchema, it only implements a part of InfoSchema interface to be
// used by DDL interface.
// nolint:unused
type InfoStoreAdaptor struct {
InfoSchema
inner *InfoStore
}

// SchemaByName implements the InfoSchema interface.
// nolint:unused
func (i InfoStoreAdaptor) SchemaByName(schema model.CIStr) (*model.DBInfo, bool) {
dbInfo := i.inner.SchemaByName(schema)
return dbInfo, dbInfo != nil
}

// TableExists implements the InfoSchema interface.
// nolint:unused
func (i InfoStoreAdaptor) TableExists(schema, table model.CIStr) bool {
tableInfo, _ := i.inner.TableByName(schema, table)
return tableInfo != nil
}

// TableIsView implements the InfoSchema interface.
// nolint:unused
func (i InfoStoreAdaptor) TableIsView(schema, table model.CIStr) bool {
tableInfo, _ := i.inner.TableByName(schema, table)
if tableInfo == nil {
return false
}
return tableInfo.IsView()
}

// TableByName implements the InfoSchema interface.
// nolint:unused
func (i InfoStoreAdaptor) TableByName(schema, table model.CIStr) (t table.Table, err error) {
tableInfo, err := i.inner.TableByName(schema, table)
if err != nil {
return nil, err
}
return tables.MockTableFromMeta(tableInfo), nil
}
115 changes: 115 additions & 0 deletions infoschema/info_store_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
// Copyright 2022 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package infoschema

import (
"testing"

"github.com/pingcap/tidb/parser/model"
"github.com/stretchr/testify/require"
)

func TestInfoStoreLowerCaseTableNames(t *testing.T) {
dbName := model.NewCIStr("DBName")
lowerDBName := model.NewCIStr("dbname")
tableName := model.NewCIStr("TableName")
lowerTableName := model.NewCIStr("tablename")
dbInfo := &model.DBInfo{Name: dbName}
tableInfo := &model.TableInfo{Name: tableName}

// case-sensitive

is := NewInfoStore(0)
is.PutSchema(dbInfo)
got := is.SchemaByName(dbName)
require.NotNil(t, got)
got = is.SchemaByName(lowerDBName)
require.Nil(t, got)

err := is.PutTable(lowerDBName, tableInfo)
require.True(t, ErrDatabaseNotExists.Equal(err))
err = is.PutTable(dbName, tableInfo)
require.NoError(t, err)
got2, err := is.TableByName(dbName, tableName)
require.NoError(t, err)
require.NotNil(t, got2)
got2, err = is.TableByName(lowerTableName, tableName)
require.True(t, ErrDatabaseNotExists.Equal(err))
require.Nil(t, got2)
got2, err = is.TableByName(dbName, lowerTableName)
require.True(t, ErrTableNotExists.Equal(err))
require.Nil(t, got2)

// compare-insensitive

is = NewInfoStore(2)
is.PutSchema(dbInfo)
got = is.SchemaByName(dbName)
require.NotNil(t, got)
got = is.SchemaByName(lowerDBName)
require.NotNil(t, got)
require.Equal(t, dbName, got.Name)

err = is.PutTable(lowerDBName, tableInfo)
require.NoError(t, err)
got2, err = is.TableByName(dbName, tableName)
require.NoError(t, err)
require.NotNil(t, got2)
got2, err = is.TableByName(dbName, lowerTableName)
require.NoError(t, err)
require.NotNil(t, got2)
require.Equal(t, tableName, got2.Name)
}

func TestInfoStoreDeleteTables(t *testing.T) {
is := NewInfoStore(0)
dbName1 := model.NewCIStr("DBName1")
dbName2 := model.NewCIStr("DBName2")
tableName1 := model.NewCIStr("TableName1")
tableName2 := model.NewCIStr("TableName2")
dbInfo1 := &model.DBInfo{Name: dbName1}
dbInfo2 := &model.DBInfo{Name: dbName2}
tableInfo1 := &model.TableInfo{Name: tableName1}
tableInfo2 := &model.TableInfo{Name: tableName2}

is.PutSchema(dbInfo1)
err := is.PutTable(dbName1, tableInfo1)
require.NoError(t, err)
err = is.PutTable(dbName1, tableInfo2)
require.NoError(t, err)

// db2 not created
ok := is.DeleteSchema(dbName2)
require.False(t, ok)
err = is.PutTable(dbName2, tableInfo1)
require.True(t, ErrDatabaseNotExists.Equal(err))
err = is.DeleteTable(dbName2, tableName1)
require.True(t, ErrDatabaseNotExists.Equal(err))

is.PutSchema(dbInfo2)
err = is.PutTable(dbName2, tableInfo1)
require.NoError(t, err)

err = is.DeleteTable(dbName2, tableName2)
require.True(t, ErrTableNotExists.Equal(err))
err = is.DeleteTable(dbName2, tableName1)
require.NoError(t, err)

// delete db will remove its tables
ok = is.DeleteSchema(dbName1)
require.True(t, ok)
_, err = is.TableByName(dbName1, tableName1)
require.True(t, ErrDatabaseNotExists.Equal(err))
}