Skip to content

Commit

Permalink
Merge pull request #8461 from dolthub/zachmu/fk-debug
Browse files Browse the repository at this point in the history
Serialize schema names in foreign key constraints
  • Loading branch information
zachmu authored Oct 17, 2024
2 parents 695349d + 1f6d3d1 commit 5bf1898
Show file tree
Hide file tree
Showing 15 changed files with 192 additions and 174 deletions.
6 changes: 3 additions & 3 deletions go/cmd/dolt/commands/cvcmds/verify_constraints.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,16 +129,16 @@ func (cmd VerifyConstraintsCmd) Exec(ctx context.Context, commandStr string, arg
}

for _, tableName := range tablesWithViolations.AsSortedSlice() {
tbl, ok, err := endRoot.GetTable(ctx, doltdb.TableName{Name: tableName})
tbl, ok, err := endRoot.GetTable(ctx, tableName)
if err != nil {
return commands.HandleVErrAndExitCode(errhand.BuildDError("Error loading table.").AddCause(err).Build(), nil)
}
if !ok {
return commands.HandleVErrAndExitCode(errhand.BuildDError("Unable to load table '%s'.", tableName).Build(), nil)
}
cli.Println("")
cli.Println(doltdb.DoltConstViolTablePrefix + tableName)
dErr := printViolationsForTable(ctx, dbName, tableName, tbl, eng)
cli.Println(doltdb.DoltConstViolTablePrefix + tableName.Name)
dErr := printViolationsForTable(ctx, dbName, tableName.Name, tbl, eng)
if dErr != nil {
return commands.HandleVErrAndExitCode(dErr, nil)
}
Expand Down
6 changes: 2 additions & 4 deletions go/libraries/doltcore/diff/table_deltas.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,8 +216,7 @@ func GetTableDeltas(ctx context.Context, fromRoot, toRoot doltdb.RootValue) (del
func getFkParentSchs(ctx context.Context, root doltdb.RootValue, fks ...doltdb.ForeignKey) (map[doltdb.TableName]schema.Schema, error) {
schs := make(map[doltdb.TableName]schema.Schema)
for _, toFk := range fks {
// TODO: schema
toRefTable, _, ok, err := doltdb.GetTableInsensitive(ctx, root, doltdb.TableName{Name: toFk.ReferencedTableName})
toRefTable, _, ok, err := doltdb.GetTableInsensitive(ctx, root, toFk.ReferencedTableName)
if err != nil {
return nil, err
}
Expand All @@ -228,8 +227,7 @@ func getFkParentSchs(ctx context.Context, root doltdb.RootValue, fks ...doltdb.F
if err != nil {
return nil, err
}
// TODO: schema name
schs[doltdb.TableName{Name: toFk.ReferencedTableName}] = toRefSch
schs[toFk.ReferencedTableName] = toRefSch
}
return schs, nil
}
Expand Down
42 changes: 22 additions & 20 deletions go/libraries/doltcore/doltdb/foreign_key_coll.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,10 +88,10 @@ const (
// ForeignKey is the complete, internal representation of a Foreign Key.
type ForeignKey struct {
Name string `noms:"name" json:"name"`
TableName string `noms:"tbl_name" json:"tbl_name"`
TableName TableName `noms:"tbl_name" json:"tbl_name"`
TableIndex string `noms:"tbl_index" json:"tbl_index"`
TableColumns []uint64 `noms:"tbl_cols" json:"tbl_cols"`
ReferencedTableName string `noms:"ref_tbl_name" json:"ref_tbl_name"`
ReferencedTableName TableName `noms:"ref_tbl_name" json:"ref_tbl_name"`
ReferencedTableIndex string `noms:"ref_tbl_index" json:"ref_tbl_index"`
ReferencedTableColumns []uint64 `noms:"ref_tbl_cols" json:"ref_tbl_cols"`
OnUpdate ForeignKeyReferentialAction `noms:"on_update" json:"on_update"`
Expand Down Expand Up @@ -167,7 +167,7 @@ func (fk ForeignKey) Equals(other ForeignKey, fkSchemasByName, otherSchemasByNam
}
for i, tag := range resolvedFK.TableColumns {
unresolvedColName := unresolvedFK.UnresolvedFKDetails.TableColumns[i]
resolvedSch, ok := resolvedSchemasByName[TableName{Name: resolvedFK.TableName}]
resolvedSch, ok := resolvedSchemasByName[resolvedFK.TableName]
if !ok {
return false
}
Expand All @@ -186,7 +186,7 @@ func (fk ForeignKey) Equals(other ForeignKey, fkSchemasByName, otherSchemasByNam
}
for i, tag := range resolvedFK.ReferencedTableColumns {
unresolvedColName := unresolvedFK.UnresolvedFKDetails.ReferencedTableColumns[i]
resolvedSch, ok := resolvedSchemasByName[TableName{Name: unresolvedFK.ReferencedTableName}]
resolvedSch, ok := resolvedSchemasByName[unresolvedFK.ReferencedTableName]
if !ok {
return false
}
Expand Down Expand Up @@ -246,6 +246,8 @@ func (fk ForeignKey) HashOf() (hash.Hash, error) {
_, err = bb.Write(t)
case uint64:
err = binary.Write(&bb, binary.LittleEndian, t)
case TableName:
_, err = bb.Write([]byte(t.String()))
default:
return hash.Hash{}, fmt.Errorf("unsupported type %T", t)
}
Expand Down Expand Up @@ -278,7 +280,7 @@ func CombinedHash(fks []ForeignKey) (hash.Hash, error) {

// IsSelfReferential returns whether the table declaring the foreign key is also referenced by the foreign key.
func (fk ForeignKey) IsSelfReferential() bool {
return strings.EqualFold(fk.TableName, fk.ReferencedTableName)
return fk.TableName.EqualFold(fk.ReferencedTableName)
}

// IsResolved returns whether the foreign key has been resolved.
Expand Down Expand Up @@ -543,13 +545,13 @@ OuterLoopResolved:
len(fk.ReferencedTableColumns) != len(existingFk.UnresolvedFKDetails.ReferencedTableColumns) {
continue
}
// TODO: schema name
tblSch, ok := allSchemas[TableName{Name: existingFk.TableName}]

tblSch, ok := allSchemas[existingFk.TableName]
if !ok {
continue
}
// TODO: schema name
refTblSch, ok := allSchemas[TableName{Name: existingFk.ReferencedTableName}]

refTblSch, ok := allSchemas[existingFk.ReferencedTableName]
if !ok {
continue
}
Expand Down Expand Up @@ -590,10 +592,10 @@ func (fkc *ForeignKeyCollection) Iter(cb func(fk ForeignKey) (stop bool, err err
// it will be present in both declaresFk and referencedByFk. Each array is sorted by name ascending.
func (fkc *ForeignKeyCollection) KeysForTable(tableName TableName) (declaredFk, referencedByFk []ForeignKey) {
for _, foreignKey := range fkc.foreignKeys {
if strings.EqualFold(foreignKey.TableName, tableName.Name) {
if foreignKey.TableName.EqualFold(tableName) {
declaredFk = append(declaredFk, foreignKey)
}
if strings.EqualFold(foreignKey.ReferencedTableName, tableName.Name) {
if foreignKey.ReferencedTableName.EqualFold(tableName) {
referencedByFk = append(referencedByFk, foreignKey)
}
}
Expand Down Expand Up @@ -643,9 +645,9 @@ func (fkc *ForeignKeyCollection) RemoveKeyByName(foreignKeyName string) bool {
func (fkc *ForeignKeyCollection) RemoveTables(ctx context.Context, tables ...TableName) error {
outgoing := NewTableNameSet(tables)
for _, fk := range fkc.foreignKeys {
// TODO: schema names
dropChild := outgoing.Contains(TableName{Name: fk.TableName})
dropParent := outgoing.Contains(TableName{Name: fk.ReferencedTableName})

dropChild := outgoing.Contains(fk.TableName)
dropParent := outgoing.Contains(fk.ReferencedTableName)
if dropParent && !dropChild {
return fmt.Errorf("unable to remove `%s` since it is referenced from table `%s`", fk.ReferencedTableName, fk.TableName)
}
Expand All @@ -666,8 +668,8 @@ func (fkc *ForeignKeyCollection) RemoveTables(ctx context.Context, tables ...Tab
func (fkc *ForeignKeyCollection) RemoveAndUnresolveTables(ctx context.Context, root RootValue, tables ...TableName) error {
outgoing := NewTableNameSet(tables)
for _, fk := range fkc.foreignKeys {
dropChild := outgoing.Contains(TableName{Name: fk.TableName})
dropParent := outgoing.Contains(TableName{Name: fk.ReferencedTableName})
dropChild := outgoing.Contains(fk.TableName)
dropParent := outgoing.Contains(fk.ReferencedTableName)
if dropParent && !dropChild {
if !fk.IsResolved() {
continue
Expand All @@ -682,7 +684,7 @@ func (fkc *ForeignKeyCollection) RemoveAndUnresolveTables(ctx context.Context, r
fk.UnresolvedFKDetails.TableColumns = make([]string, len(fk.TableColumns))
fk.UnresolvedFKDetails.ReferencedTableColumns = make([]string, len(fk.ReferencedTableColumns))

tbl, ok, err := root.GetTable(ctx, TableName{Name: fk.TableName})
tbl, ok, err := root.GetTable(ctx, fk.TableName)
if err != nil {
return err
}
Expand All @@ -703,7 +705,7 @@ func (fkc *ForeignKeyCollection) RemoveAndUnresolveTables(ctx context.Context, r
fk.UnresolvedFKDetails.TableColumns[i] = col.Name
}

refTbl, ok, err := root.GetTable(ctx, TableName{Name: fk.ReferencedTableName})
refTbl, ok, err := root.GetTable(ctx, fk.ReferencedTableName)
if err != nil {
return err
}
Expand Down Expand Up @@ -747,8 +749,8 @@ func (fkc *ForeignKeyCollection) RemoveAndUnresolveTables(ctx context.Context, r
}

// Tables returns the set of all tables that either declare a foreign key or are referenced by a foreign key.
func (fkc *ForeignKeyCollection) Tables() map[string]struct{} {
tables := make(map[string]struct{})
func (fkc *ForeignKeyCollection) Tables() map[TableName]struct{} {
tables := make(map[TableName]struct{})
for _, fk := range fkc.foreignKeys {
tables[fk.TableName] = struct{}{}
tables[fk.ReferencedTableName] = struct{}{}
Expand Down
18 changes: 14 additions & 4 deletions go/libraries/doltcore/doltdb/foreign_key_serialization.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,12 +131,22 @@ func deserializeFlatbufferForeignKeys(msg types.SerialMessage) (*ForeignKeyColle
}
}

tableName, ok := decodeTableNameFromSerialization(string(fk.ChildTableName()))
if !ok {
return nil, fmt.Errorf("could not decode table name: %s", string(fk.ChildTableName()))
}

parentTableName, ok := decodeTableNameFromSerialization(string(fk.ParentTableName()))
if !ok {
return nil, fmt.Errorf("could not decode table name: %s", string(fk.ParentTableName()))
}

err := collection.AddKeys(ForeignKey{
Name: string(fk.Name()),
TableName: string(fk.ChildTableName()),
TableName: tableName,
TableIndex: string(fk.ChildTableIndex()),
TableColumns: childCols,
ReferencedTableName: string(fk.ParentTableName()),
ReferencedTableName: parentTableName,
ReferencedTableIndex: string(fk.ParentTableIndex()),
ReferencedTableColumns: parentCols,
OnUpdate: ForeignKeyReferentialAction(fk.OnUpdate()),
Expand Down Expand Up @@ -181,9 +191,9 @@ func serializeFlatbufferForeignKeys(fkc *ForeignKeyCollection) types.SerialMessa
}
parentCols = serializeUint64Vector(b, fk.ReferencedTableColumns)
childCols = serializeUint64Vector(b, fk.TableColumns)
parentTable = b.CreateString(fk.ReferencedTableName)
parentTable = b.CreateString(encodeTableNameForSerialization(fk.ReferencedTableName))
parentIndex = b.CreateString(fk.ReferencedTableIndex)
childTable = b.CreateString(fk.TableName)
childTable = b.CreateString(encodeTableNameForSerialization(fk.TableName))
childIndex = b.CreateString(fk.TableIndex)
foreignKeyName = b.CreateString(fk.Name)

Expand Down
Loading

0 comments on commit 5bf1898

Please sign in to comment.