diff --git a/go/vt/vtgate/engine/cached_size.go b/go/vt/vtgate/engine/cached_size.go index 71192f0aca4..19329df5cd1 100644 --- a/go/vt/vtgate/engine/cached_size.go +++ b/go/vt/vtgate/engine/cached_size.go @@ -249,6 +249,51 @@ func (cached *Filter) CachedSize(alloc bool) int64 { } return size } +func (cached *FkCascade) CachedSize(alloc bool) int64 { + if cached == nil { + return int64(0) + } + size := int64(0) + if alloc { + size += int64(64) + } + // field Selection vitess.io/vitess/go/vt/vtgate/engine.Primitive + if cc, ok := cached.Selection.(cachedObject); ok { + size += cc.CachedSize(true) + } + // field Children []vitess.io/vitess/go/vt/vtgate/engine.FkChild + { + size += hack.RuntimeAllocSize(int64(cap(cached.Children)) * int64(56)) + for _, elem := range cached.Children { + size += elem.CachedSize(false) + } + } + // field Parent vitess.io/vitess/go/vt/vtgate/engine.Primitive + if cc, ok := cached.Parent.(cachedObject); ok { + size += cc.CachedSize(true) + } + return size +} +func (cached *FkChild) CachedSize(alloc bool) int64 { + if cached == nil { + return int64(0) + } + size := int64(0) + if alloc { + size += int64(64) + } + // field BVName string + size += hack.RuntimeAllocSize(int64(len(cached.BVName))) + // field Cols []int + { + size += hack.RuntimeAllocSize(int64(cap(cached.Cols)) * int64(8)) + } + // field Exec vitess.io/vitess/go/vt/vtgate/engine.Primitive + if cc, ok := cached.Exec.(cachedObject); ok { + size += cc.CachedSize(true) + } + return size +} func (cached *Generate) CachedSize(alloc bool) int64 { if cached == nil { return int64(0) diff --git a/go/vt/vtgate/engine/fk_cascade.go b/go/vt/vtgate/engine/fk_cascade.go new file mode 100644 index 00000000000..f32767bf2bc --- /dev/null +++ b/go/vt/vtgate/engine/fk_cascade.go @@ -0,0 +1,185 @@ +/* +Copyright 2023 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package engine + +import ( + "context" + + "vitess.io/vitess/go/sqltypes" + querypb "vitess.io/vitess/go/vt/proto/query" + vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" + "vitess.io/vitess/go/vt/vterrors" +) + +// FkChild contains the Child Primitive to be executed collecting the values from the Selection Primitive using the column indexes. +// BVName is used to pass the value as bind variable to the Child Primitive. +type FkChild struct { + BVName string + Cols []int // indexes + Exec Primitive +} + +// FkCascade is a primitive that implements foreign key cascading using Selection as values required to execute the FkChild Primitives. +// On success, it executes the Parent Primitive. +type FkCascade struct { + // Selection is the Primitive that is used to find the rows that are going to be modified in the child tables. + Selection Primitive + // Children is a list of child foreign key Primitives that are executed using rows from the Selection Primitive. + Children []FkChild + // Parent is the Primitive that is executed after the children are modified. + Parent Primitive + + txNeeded +} + +// RouteType implements the Primitive interface. +func (fkc *FkCascade) RouteType() string { + return "FkCascade" +} + +// GetKeyspaceName implements the Primitive interface. +func (fkc *FkCascade) GetKeyspaceName() string { + return fkc.Parent.GetKeyspaceName() +} + +// GetTableName implements the Primitive interface. +func (fkc *FkCascade) GetTableName() string { + return fkc.Parent.GetTableName() +} + +// GetFields implements the Primitive interface. +func (fkc *FkCascade) GetFields(_ context.Context, _ VCursor, _ map[string]*querypb.BindVariable) (*sqltypes.Result, error) { + return nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "[BUG] GetFields should not be called") +} + +// TryExecute implements the Primitive interface. +func (fkc *FkCascade) TryExecute(ctx context.Context, vcursor VCursor, bindVars map[string]*querypb.BindVariable, wantfields bool) (*sqltypes.Result, error) { + // Execute the Selection primitive to find the rows that are going to modified. + // This will be used to find the rows that need modification on the children. + selectionRes, err := vcursor.ExecutePrimitive(ctx, fkc.Selection, bindVars, wantfields) + if err != nil { + return nil, err + } + + // If no rows are to be modified, there is nothing to do. + if len(selectionRes.Rows) == 0 { + return &sqltypes.Result{}, nil + } + + for _, child := range fkc.Children { + // We create a bindVariable for each Child + // that stores the tuple of columns involved in the fk constraint. + bv := &querypb.BindVariable{ + Type: querypb.Type_TUPLE, + } + for _, row := range selectionRes.Rows { + // Create a tuple from each Row. + tuple := &querypb.Value{ + Type: querypb.Type_TUPLE, + } + for _, colIdx := range child.Cols { + tuple.Values = append(tuple.Values, + sqltypes.ValueToProto(row[colIdx])) + } + bv.Values = append(bv.Values, tuple) + } + // Execute the child primitive, and bail out incase of failure. + // Since this Primitive is always executed in a transaction, the changes should + // be rolled back incase of an error. + bindVars[child.BVName] = bv + _, err = vcursor.ExecutePrimitive(ctx, child.Exec, bindVars, wantfields) + if err != nil { + return nil, err + } + delete(bindVars, child.BVName) + } + + // All the children are modified successfully, we can now execute the Parent Primitive. + return vcursor.ExecutePrimitive(ctx, fkc.Parent, bindVars, wantfields) +} + +// TryStreamExecute implements the Primitive interface. +func (fkc *FkCascade) TryStreamExecute(ctx context.Context, vcursor VCursor, bindVars map[string]*querypb.BindVariable, wantfields bool, callback func(*sqltypes.Result) error) error { + // We create a bindVariable for each Child + // that stores the tuple of columns involved in the fk constraint. + var bindVariables []*querypb.BindVariable + for range fkc.Children { + bindVariables = append(bindVariables, &querypb.BindVariable{ + Type: querypb.Type_TUPLE, + }) + } + + // Execute the Selection primitive to find the rows that are going to modified. + // This will be used to find the rows that need modification on the children. + err := vcursor.StreamExecutePrimitive(ctx, fkc.Selection, bindVars, wantfields, func(result *sqltypes.Result) error { + if len(result.Rows) == 0 { + return nil + } + for idx, child := range fkc.Children { + for _, row := range result.Rows { + // Create a tuple from each Row. + tuple := &querypb.Value{ + Type: querypb.Type_TUPLE, + } + for _, colIdx := range child.Cols { + tuple.Values = append(tuple.Values, + sqltypes.ValueToProto(row[colIdx])) + } + bindVariables[idx].Values = append(bindVariables[idx].Values, tuple) + } + } + return nil + }) + if err != nil { + return err + } + + // Execute the child primitive, and bail out incase of failure. + // Since this Primitive is always executed in a transaction, the changes should + // be rolled back incase of an error. + for idx, child := range fkc.Children { + bindVars[child.BVName] = bindVariables[idx] + err = vcursor.StreamExecutePrimitive(ctx, child.Exec, bindVars, wantfields, func(result *sqltypes.Result) error { + return nil + }) + if err != nil { + return err + } + delete(bindVars, child.BVName) + } + + // All the children are modified successfully, we can now execute the Parent Primitive. + return vcursor.StreamExecutePrimitive(ctx, fkc.Parent, bindVars, wantfields, callback) +} + +// Inputs implements the Primitive interface. +func (fkc *FkCascade) Inputs() []Primitive { + inputs := []Primitive{fkc.Selection} + for _, child := range fkc.Children { + inputs = append(inputs, child.Exec) + } + inputs = append(inputs, fkc.Parent) + return inputs +} + +func (fkc *FkCascade) description() PrimitiveDescription { + return PrimitiveDescription{ + OperatorType: fkc.RouteType(), + } +} + +var _ Primitive = (*FkCascade)(nil) diff --git a/go/vt/vtgate/engine/fk_cascade_test.go b/go/vt/vtgate/engine/fk_cascade_test.go new file mode 100644 index 00000000000..a8aa1055c86 --- /dev/null +++ b/go/vt/vtgate/engine/fk_cascade_test.go @@ -0,0 +1,150 @@ +/* +Copyright 2023 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package engine + +import ( + "context" + "testing" + + "github.com/stretchr/testify/require" + + "vitess.io/vitess/go/sqltypes" + querypb "vitess.io/vitess/go/vt/proto/query" + "vitess.io/vitess/go/vt/vtgate/vindexes" +) + +// TestDeleteCascade tests that FkCascade executes the child and parent primitives for a delete cascade. +func TestDeleteCascade(t *testing.T) { + fakeRes := sqltypes.MakeTestResult(sqltypes.MakeTestFields("cola|colb", "int64|varchar"), "1|a", "2|b") + + inputP := &Route{ + Query: "select cola, colb from parent where foo = 48", + RoutingParameters: &RoutingParameters{ + Opcode: Unsharded, + Keyspace: &vindexes.Keyspace{Name: "ks"}, + }, + } + childP := &Delete{ + DML: &DML{ + Query: "delete from child where (ca, cb) in ::__vals", + RoutingParameters: &RoutingParameters{ + Opcode: Unsharded, + Keyspace: &vindexes.Keyspace{Name: "ks"}, + }, + }, + } + parentP := &Delete{ + DML: &DML{ + Query: "delete from parent where foo = 48", + RoutingParameters: &RoutingParameters{ + Opcode: Unsharded, + Keyspace: &vindexes.Keyspace{Name: "ks"}, + }, + }, + } + fkc := &FkCascade{ + Selection: inputP, + Children: []FkChild{{BVName: "__vals", Cols: []int{0, 1}, Exec: childP}}, + Parent: parentP, + } + + vc := newDMLTestVCursor("0") + vc.results = []*sqltypes.Result{fakeRes} + _, err := fkc.TryExecute(context.Background(), vc, map[string]*querypb.BindVariable{}, true) + require.NoError(t, err) + vc.ExpectLog(t, []string{ + `ResolveDestinations ks [] Destinations:DestinationAllShards()`, + `ExecuteMultiShard ks.0: select cola, colb from parent where foo = 48 {} false false`, + `ResolveDestinations ks [] Destinations:DestinationAllShards()`, + `ExecuteMultiShard ks.0: delete from child where (ca, cb) in ::__vals {__vals: type:TUPLE values:{type:TUPLE values:{type:INT64 value:"1"} values:{type:VARCHAR value:"a"}} values:{type:TUPLE values:{type:INT64 value:"2"} values:{type:VARCHAR value:"b"}}} true true`, + `ResolveDestinations ks [] Destinations:DestinationAllShards()`, + `ExecuteMultiShard ks.0: delete from parent where foo = 48 {} true true`, + }) + + vc.Rewind() + err = fkc.TryStreamExecute(context.Background(), vc, map[string]*querypb.BindVariable{}, true, func(result *sqltypes.Result) error { return nil }) + require.NoError(t, err) + vc.ExpectLog(t, []string{ + `ResolveDestinations ks [] Destinations:DestinationAllShards()`, + `StreamExecuteMulti select cola, colb from parent where foo = 48 ks.0: {} `, + `ResolveDestinations ks [] Destinations:DestinationAllShards()`, + `ExecuteMultiShard ks.0: delete from child where (ca, cb) in ::__vals {__vals: type:TUPLE values:{type:TUPLE values:{type:INT64 value:"1"} values:{type:VARCHAR value:"a"}} values:{type:TUPLE values:{type:INT64 value:"2"} values:{type:VARCHAR value:"b"}}} true true`, + `ResolveDestinations ks [] Destinations:DestinationAllShards()`, + `ExecuteMultiShard ks.0: delete from parent where foo = 48 {} true true`, + }) +} + +// TestUpdateCascade tests that FkCascade executes the child and parent primitives for an update cascade. +func TestUpdateCascade(t *testing.T) { + fakeRes := sqltypes.MakeTestResult(sqltypes.MakeTestFields("cola|colb", "int64|varchar"), "1|a", "2|b") + + inputP := &Route{ + Query: "select cola, colb from parent where foo = 48", + RoutingParameters: &RoutingParameters{ + Opcode: Unsharded, + Keyspace: &vindexes.Keyspace{Name: "ks"}, + }, + } + childP := &Update{ + DML: &DML{ + Query: "update child set ca = :vtg1 where (ca, cb) in ::__vals", + RoutingParameters: &RoutingParameters{ + Opcode: Unsharded, + Keyspace: &vindexes.Keyspace{Name: "ks"}, + }, + }, + } + parentP := &Update{ + DML: &DML{ + Query: "update parent set cola = 1 where foo = 48", + RoutingParameters: &RoutingParameters{ + Opcode: Unsharded, + Keyspace: &vindexes.Keyspace{Name: "ks"}, + }, + }, + } + fkc := &FkCascade{ + Selection: inputP, + Children: []FkChild{{BVName: "__vals", Cols: []int{0, 1}, Exec: childP}}, + Parent: parentP, + } + + vc := newDMLTestVCursor("0") + vc.results = []*sqltypes.Result{fakeRes} + _, err := fkc.TryExecute(context.Background(), vc, map[string]*querypb.BindVariable{}, true) + require.NoError(t, err) + vc.ExpectLog(t, []string{ + `ResolveDestinations ks [] Destinations:DestinationAllShards()`, + `ExecuteMultiShard ks.0: select cola, colb from parent where foo = 48 {} false false`, + `ResolveDestinations ks [] Destinations:DestinationAllShards()`, + `ExecuteMultiShard ks.0: update child set ca = :vtg1 where (ca, cb) in ::__vals {__vals: type:TUPLE values:{type:TUPLE values:{type:INT64 value:"1"} values:{type:VARCHAR value:"a"}} values:{type:TUPLE values:{type:INT64 value:"2"} values:{type:VARCHAR value:"b"}}} true true`, + `ResolveDestinations ks [] Destinations:DestinationAllShards()`, + `ExecuteMultiShard ks.0: update parent set cola = 1 where foo = 48 {} true true`, + }) + + vc.Rewind() + err = fkc.TryStreamExecute(context.Background(), vc, map[string]*querypb.BindVariable{}, true, func(result *sqltypes.Result) error { return nil }) + require.NoError(t, err) + vc.ExpectLog(t, []string{ + `ResolveDestinations ks [] Destinations:DestinationAllShards()`, + `StreamExecuteMulti select cola, colb from parent where foo = 48 ks.0: {} `, + `ResolveDestinations ks [] Destinations:DestinationAllShards()`, + `ExecuteMultiShard ks.0: update child set ca = :vtg1 where (ca, cb) in ::__vals {__vals: type:TUPLE values:{type:TUPLE values:{type:INT64 value:"1"} values:{type:VARCHAR value:"a"}} values:{type:TUPLE values:{type:INT64 value:"2"} values:{type:VARCHAR value:"b"}}} true true`, + `ResolveDestinations ks [] Destinations:DestinationAllShards()`, + `ExecuteMultiShard ks.0: update parent set cola = 1 where foo = 48 {} true true`, + }) +}