Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix cycle detection for foreign keys #15458

Merged
merged 3 commits into from
Mar 13, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions go/vt/sqlparser/ast_funcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -2556,6 +2556,16 @@ func (ra ReferenceAction) IsRestrict() bool {
}
}

// IsCascade returns true if the reference action is of cascade type.
func (ra ReferenceAction) IsCascade() bool {
switch ra {
case Cascade:
return true
default:
return false
}
}

// IsLiteral returns true if the expression is of a literal type.
func IsLiteral(expr Expr) bool {
switch expr.(type) {
Expand Down
56 changes: 36 additions & 20 deletions go/vt/vtgate/vschema_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,39 +260,46 @@ func (vm *VSchemaManager) updateFromSchema(vschema *vindexes.VSchema) {
}
}

type tableCol struct {
tableName sqlparser.TableName
colNames sqlparser.Columns
}

var tableColHash = func(tc tableCol) string {
res := sqlparser.String(tc.tableName)
for _, colName := range tc.colNames {
res += "|" + sqlparser.String(colName)
}
return res
}

func markErrorIfCyclesInFk(vschema *vindexes.VSchema) {
for ksName, ks := range vschema.Keyspaces {
// Only check cyclic foreign keys for keyspaces that have
// foreign keys managed in Vitess.
if ks.ForeignKeyMode != vschemapb.Keyspace_managed {
continue
}
/*
3 cases for creating the graph for cycle detection:
1. ON DELETE RESTRICT ON UPDATE RESTRICT: This is the simplest case where no update/delete is required on the child table, we only need to verify whether a value exists or not. So we don't need to add any edge for this case.
2. ON DELETE SET NULL, ON UPDATE SET NULL, ON UPDATE CASCADE: In this case having any update/delete on any of the columns in the parent side of the foreign key will make a corresponding delete/update on all the column in the child side of the foreign key. So we will add an edge from all the columns in the parent side to all the columns in the child side.
3. ON DELETE CASCADE: This is a special case wherein a deletion on the parent table will affect all the columns in the child table irrespective of the columns involved in the foreign key! So, we'll add an edge from all the columns in the parent side of the foreign key to all the columns of the child table.
*/
g := graph.NewGraph[string]()
for _, table := range ks.Tables {
for _, cfk := range table.ChildForeignKeys {
childTable := cfk.Table
parentVertex := tableCol{
tableName: table.GetTableName(),
colNames: cfk.ParentColumns,

// Check for case 1.
if cfk.OnUpdate.IsRestrict() && cfk.OnDelete.IsRestrict() {
continue
GuptaManan100 marked this conversation as resolved.
Show resolved Hide resolved
}
childVertex := tableCol{
tableName: childTable.GetTableName(),
colNames: cfk.ChildColumns,
var parentVertices []string
var childVertices []string
for _, column := range cfk.ParentColumns {
parentVertices = append(parentVertices, sqlparser.String(sqlparser.NewColNameWithQualifier(column.String(), table.GetTableName())))
}
g.AddEdge(tableColHash(parentVertex), tableColHash(childVertex))

// Check for case 3.
if cfk.OnDelete.IsCascade() {
for _, column := range childTable.Columns {
childVertices = append(childVertices, sqlparser.String(sqlparser.NewColNameWithQualifier(column.Name.String(), childTable.GetTableName())))
}
} else {
// Case 2.
for _, column := range cfk.ChildColumns {
childVertices = append(childVertices, sqlparser.String(sqlparser.NewColNameWithQualifier(column.String(), childTable.GetTableName())))
}
}
addCrossEdges(g, parentVertices, childVertices)
}
}
if g.HasCycles() {
Expand All @@ -301,6 +308,15 @@ func markErrorIfCyclesInFk(vschema *vindexes.VSchema) {
}
}

// addCrossEdges adds the edges from all the vertices in the first list to all the vertices in the second list.
func addCrossEdges(g *graph.Graph[string], from []string, to []string) {
for _, fromStr := range from {
for _, toStr := range to {
g.AddEdge(fromStr, toStr)
}
}
}

func setColumns(ks *vindexes.KeyspaceSchema, tblName string, columns []vindexes.Column) *vindexes.Table {
vTbl := ks.Tables[tblName]
if vTbl == nil {
Expand Down
167 changes: 163 additions & 4 deletions go/vt/vtgate/vschema_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -449,7 +449,7 @@ func TestMarkErrorIfCyclesInFk(t *testing.T) {
errWanted string
}{
{
name: "Has a cycle",
name: "Has a direct cycle",
getVschema: func() *vindexes.VSchema {
vschema := &vindexes.VSchema{
Keyspaces: map[string]*vindexes.KeyspaceSchema{
Expand All @@ -472,13 +472,44 @@ func TestMarkErrorIfCyclesInFk(t *testing.T) {
},
},
}
_ = vschema.AddForeignKey("ks", "t2", createFkDefinition([]string{"col"}, "t1", []string{"col"}, sqlparser.Cascade, sqlparser.Cascade))
_ = vschema.AddForeignKey("ks", "t3", createFkDefinition([]string{"col"}, "t2", []string{"col"}, sqlparser.Cascade, sqlparser.Cascade))
_ = vschema.AddForeignKey("ks", "t1", createFkDefinition([]string{"col"}, "t3", []string{"col"}, sqlparser.Cascade, sqlparser.Cascade))
_ = vschema.AddForeignKey("ks", "t2", createFkDefinition([]string{"col"}, "t1", []string{"col"}, sqlparser.SetNull, sqlparser.SetNull))
_ = vschema.AddForeignKey("ks", "t3", createFkDefinition([]string{"col"}, "t2", []string{"col"}, sqlparser.SetNull, sqlparser.SetNull))
_ = vschema.AddForeignKey("ks", "t1", createFkDefinition([]string{"col"}, "t3", []string{"col"}, sqlparser.SetNull, sqlparser.SetNull))
return vschema
},
errWanted: "VT09019: keyspace 'ks' has cyclic foreign keys",
GuptaManan100 marked this conversation as resolved.
Show resolved Hide resolved
},
{
name: "Has a direct cycle but there is a restrict constraint in between",
getVschema: func() *vindexes.VSchema {
vschema := &vindexes.VSchema{
Keyspaces: map[string]*vindexes.KeyspaceSchema{
ksName: {
ForeignKeyMode: vschemapb.Keyspace_managed,
Tables: map[string]*vindexes.Table{
"t1": {
Name: sqlparser.NewIdentifierCS("t1"),
Keyspace: keyspace,
},
"t2": {
Name: sqlparser.NewIdentifierCS("t2"),
Keyspace: keyspace,
},
"t3": {
Name: sqlparser.NewIdentifierCS("t3"),
Keyspace: keyspace,
},
},
},
},
}
_ = vschema.AddForeignKey("ks", "t2", createFkDefinition([]string{"col"}, "t1", []string{"col"}, sqlparser.SetNull, sqlparser.SetNull))
_ = vschema.AddForeignKey("ks", "t3", createFkDefinition([]string{"col"}, "t2", []string{"col"}, sqlparser.Restrict, sqlparser.Restrict))
_ = vschema.AddForeignKey("ks", "t1", createFkDefinition([]string{"col"}, "t3", []string{"col"}, sqlparser.SetNull, sqlparser.SetNull))
return vschema
},
errWanted: "",
},
{
name: "No cycle",
getVschema: func() *vindexes.VSchema {
Expand Down Expand Up @@ -508,6 +539,134 @@ func TestMarkErrorIfCyclesInFk(t *testing.T) {
return vschema
},
errWanted: "",
}, {
name: "Self-referencing foreign key with delete cascade",
getVschema: func() *vindexes.VSchema {
vschema := &vindexes.VSchema{
Keyspaces: map[string]*vindexes.KeyspaceSchema{
ksName: {
ForeignKeyMode: vschemapb.Keyspace_managed,
Tables: map[string]*vindexes.Table{
"t1": {
Name: sqlparser.NewIdentifierCS("t1"),
Keyspace: keyspace,
Columns: []vindexes.Column{
{
Name: sqlparser.NewIdentifierCI("id"),
},
{
Name: sqlparser.NewIdentifierCI("manager_id"),
},
},
},
},
},
},
}
_ = vschema.AddForeignKey("ks", "t1", createFkDefinition([]string{"manager_id"}, "t1", []string{"id"}, sqlparser.SetNull, sqlparser.Cascade))
return vschema
},
errWanted: "VT09019: keyspace 'ks' has cyclic foreign keys",
}, {
name: "Self-referencing foreign key without delete cascade",
getVschema: func() *vindexes.VSchema {
vschema := &vindexes.VSchema{
Keyspaces: map[string]*vindexes.KeyspaceSchema{
ksName: {
ForeignKeyMode: vschemapb.Keyspace_managed,
Tables: map[string]*vindexes.Table{
"t1": {
Name: sqlparser.NewIdentifierCS("t1"),
Keyspace: keyspace,
Columns: []vindexes.Column{
{
Name: sqlparser.NewIdentifierCI("id"),
},
{
Name: sqlparser.NewIdentifierCI("manager_id"),
},
},
},
},
},
},
}
_ = vschema.AddForeignKey("ks", "t1", createFkDefinition([]string{"manager_id"}, "t1", []string{"id"}, sqlparser.SetNull, sqlparser.SetNull))
return vschema
},
errWanted: "",
}, {
name: "Has an indirect cycle because of cascades",
getVschema: func() *vindexes.VSchema {
vschema := &vindexes.VSchema{
Keyspaces: map[string]*vindexes.KeyspaceSchema{
ksName: {
ForeignKeyMode: vschemapb.Keyspace_managed,
Tables: map[string]*vindexes.Table{
"t1": {
Name: sqlparser.NewIdentifierCS("t1"),
Keyspace: keyspace,
Columns: []vindexes.Column{
{
Name: sqlparser.NewIdentifierCI("a"),
},
{
Name: sqlparser.NewIdentifierCI("b"),
},
{
Name: sqlparser.NewIdentifierCI("c"),
},
},
},
"t2": {
Name: sqlparser.NewIdentifierCS("t2"),
Keyspace: keyspace,
Columns: []vindexes.Column{
{
Name: sqlparser.NewIdentifierCI("d"),
},
{
Name: sqlparser.NewIdentifierCI("e"),
},
{
Name: sqlparser.NewIdentifierCI("f"),
},
},
},
},
},
},
}
_ = vschema.AddForeignKey("ks", "t2", createFkDefinition([]string{"f"}, "t1", []string{"a"}, sqlparser.SetNull, sqlparser.Cascade))
_ = vschema.AddForeignKey("ks", "t1", createFkDefinition([]string{"b"}, "t2", []string{"e"}, sqlparser.SetNull, sqlparser.Cascade))
return vschema
},
errWanted: "VT09019: keyspace 'ks' has cyclic foreign keys",
}, {
name: "Cycle part of a multi-column foreign key",
getVschema: func() *vindexes.VSchema {
vschema := &vindexes.VSchema{
Keyspaces: map[string]*vindexes.KeyspaceSchema{
ksName: {
ForeignKeyMode: vschemapb.Keyspace_managed,
Tables: map[string]*vindexes.Table{
"t1": {
Name: sqlparser.NewIdentifierCS("t1"),
Keyspace: keyspace,
},
"t2": {
Name: sqlparser.NewIdentifierCS("t2"),
Keyspace: keyspace,
},
},
},
},
}
_ = vschema.AddForeignKey("ks", "t2", createFkDefinition([]string{"e", "f"}, "t1", []string{"a", "b"}, sqlparser.SetNull, sqlparser.SetNull))
_ = vschema.AddForeignKey("ks", "t1", createFkDefinition([]string{"b"}, "t2", []string{"e"}, sqlparser.SetNull, sqlparser.SetNull))
return vschema
},
errWanted: "VT09019: keyspace 'ks' has cyclic foreign keys",
},
}
for _, tt := range tests {
Expand Down
Loading