Skip to content

Commit

Permalink
Handle new restrict_fk_on_non_standard_key option in MySQL 8.4
Browse files Browse the repository at this point in the history
MySQL 8.4 adds restrict_fk_on_non_standard_key for which the details can
be found here:

https://dev.mysql.com/doc/refman/8.4/en/server-system-variables.html#sysvar_restrict_fk_on_non_standard_key

It prevents the use of non-unique keys or partial keys as foreign keys.

For regular MySQL replication though, it does the following:

Implication for MySQL Replication.  When a foreign key is created on a
nonstandard key on the primary because restrict_fk_on_non_standard_key
is OFF, the statement succeeds on the replica regardless of any setting
on the replica for this variable.

Given how this works fir regular replication, we should mirror this
behavior for vreplication and always allow this. So we set it up so we
can do this with similar logic as to how we disable foreign key checks
and we reset it back afterwards.

Signed-off-by: Dirkjan Bussink <d.bussink@gmail.com>
  • Loading branch information
dbussink committed Jul 23, 2024
1 parent 2460c52 commit 0110468
Show file tree
Hide file tree
Showing 8 changed files with 96 additions and 1 deletion.
5 changes: 5 additions & 0 deletions go/vt/binlog/binlogplayer/dbclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ type DBClient interface {
Close()
ExecuteFetch(query string, maxrows int) (qr *sqltypes.Result, err error)
ExecuteFetchMulti(query string, maxrows int) (qrs []*sqltypes.Result, err error)
ServerVersion() string
}

// dbClientImpl is a real DBClient backed by a mysql connection.
Expand Down Expand Up @@ -123,6 +124,10 @@ func (dc *dbClientImpl) Close() {
dc.dbConn.Close()
}

func (dc *dbClientImpl) ServerVersion() string {
return dc.dbConn.ServerVersion
}

// LogError logs a message after truncating it to avoid spamming logs
func LogError(msg string, err error) {
log.Errorf("%s: %s", msg, MessageTruncate(err.Error()))
Expand Down
5 changes: 5 additions & 0 deletions go/vt/binlog/binlogplayer/fake_dbclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"fmt"
"strings"

"vitess.io/vitess/go/mysql/config"
"vitess.io/vitess/go/sqltypes"
)

Expand Down Expand Up @@ -84,3 +85,7 @@ func (dc *fakeDBClient) ExecuteFetch(query string, maxrows int) (qr *sqltypes.Re
func (dc *fakeDBClient) ExecuteFetchMulti(query string, maxrows int) ([]*sqltypes.Result, error) {
return make([]*sqltypes.Result, 0), nil
}

func (dc *fakeDBClient) ServerVersion() string {
return config.DefaultMySQLVersion
}
5 changes: 5 additions & 0 deletions go/vt/binlog/binlogplayer/mock_dbclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"testing"
"time"

"vitess.io/vitess/go/mysql/config"
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/vt/sqlparser"
)
Expand Down Expand Up @@ -260,3 +261,7 @@ func (dc *MockDBClient) RemoveInvariant(query string) {
defer dc.expectMu.Unlock()
delete(dc.invariants, query)
}

func (dc *MockDBClient) ServerVersion() string {
return config.DefaultMySQLVersion
}
4 changes: 4 additions & 0 deletions go/vt/vttablet/tabletmanager/vdiff/framework_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -422,6 +422,10 @@ func (dbc *realDBClient) ExecuteFetchMulti(query string, maxrows int) ([]*sqltyp
return results, nil
}

func (dbc *realDBClient) ServerVersion() string {
return dbc.conn.ServerVersion
}

//----------------------------------------------
// fakeTMClient

Expand Down
4 changes: 4 additions & 0 deletions go/vt/vttablet/tabletmanager/vreplication/framework_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -509,6 +509,10 @@ func (dbc *realDBClient) ExecuteFetchMulti(query string, maxrows int) ([]*sqltyp
return results, nil
}

func (dbc *realDBClient) ServerVersion() string {
return dbc.conn.ServerVersion
}

func expectDeleteQueries(t *testing.T) {
t.Helper()
if doNotLogDBQueries {
Expand Down
56 changes: 56 additions & 0 deletions go/vt/vttablet/tabletmanager/vreplication/vreplicator.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"strings"
"time"

"vitess.io/vitess/go/mysql/capabilities"
"vitess.io/vitess/go/mysql/replication"
"vitess.io/vitess/go/mysql/sqlerror"
"vitess.io/vitess/go/sqltypes"
Expand Down Expand Up @@ -105,6 +106,7 @@ type vreplicator struct {

originalFKCheckSetting int64
originalSQLMode string
originalFKRestrict int64

WorkflowType int32
WorkflowSubType int32
Expand Down Expand Up @@ -239,6 +241,10 @@ func (vr *vreplicator) replicate(ctx context.Context) error {
}
//defensive guard, should be a no-op since it should happen after copy is done
defer vr.resetFKCheckAfterCopy(vr.dbClient)
if err := vr.getSettingFKRestrict(); err != nil {
return err
}
defer vr.resetFKRestrictAfterCopy(vr.dbClient)

vr.throttleUpdatesRateLimiter = timer.NewRateLimiter(time.Second)
defer vr.throttleUpdatesRateLimiter.Stop()
Expand Down Expand Up @@ -272,6 +278,10 @@ func (vr *vreplicator) replicate(ctx context.Context) error {
log.Warningf("Unable to clear FK check %v", err)
return err
}
if err := vr.clearFKRestrict(vr.dbClient); err != nil {
log.Warningf("Unable to clear FK restrict %v", err)
return err
}
if vr.WorkflowSubType == int32(binlogdatapb.VReplicationWorkflowSubType_AtomicCopy) {
if err := newVCopier(vr).copyAll(ctx, settings); err != nil {
log.Infof("Error atomically copying all tables: %v", err)
Expand Down Expand Up @@ -301,6 +311,10 @@ func (vr *vreplicator) replicate(ctx context.Context) error {
log.Warningf("Unable to reset FK check %v", err)
return err
}
if err := vr.resetFKRestrictAfterCopy(vr.dbClient); err != nil {
log.Warningf("Unable to reset FK restrict %v", err)
return err
}
if vr.source.StopAfterCopy {
return vr.setState(binlogdatapb.VReplicationWorkflowState_Stopped, "Stopped after copy.")
}
Expand Down Expand Up @@ -512,11 +526,42 @@ func (vr *vreplicator) getSettingFKCheck() error {
return nil
}

func (vr *vreplicator) needFKRestrict() bool {
ok, _ := capabilities.ServerVersionAtLeast(vr.dbClient.ServerVersion(), 8, 4, 0)
return ok
}

func (vr *vreplicator) getSettingFKRestrict() error {
if !vr.needFKRestrict() {
return nil
}
qr, err := vr.dbClient.Execute("select @@session.restrict_fk_on_non_standard_key")
if err != nil {
return err
}
if len(qr.Rows) != 1 || len(qr.Fields) != 1 {
return fmt.Errorf("unable to select @@session.restrict_fk_on_non_standard_key")
}
vr.originalFKRestrict, err = qr.Rows[0][0].ToCastInt64()
if err != nil {
return err
}
return nil
}

func (vr *vreplicator) resetFKCheckAfterCopy(dbClient *vdbClient) error {
_, err := dbClient.Execute(fmt.Sprintf("set @@session.foreign_key_checks=%d", vr.originalFKCheckSetting))
return err
}

func (vr *vreplicator) resetFKRestrictAfterCopy(dbClient *vdbClient) error {
if !vr.needFKRestrict() {
return nil
}
_, err := dbClient.Execute(fmt.Sprintf("set @@session.restrict_fk_on_non_standard_key=%d", vr.originalFKRestrict))
return err
}

func (vr *vreplicator) setSQLMode(ctx context.Context, dbClient *vdbClient) (func(), error) {
resetFunc := func() {}
// First save the original SQL mode if we have not already done so
Expand Down Expand Up @@ -616,6 +661,14 @@ func (vr *vreplicator) clearFKCheck(dbClient *vdbClient) error {
return err
}

func (vr *vreplicator) clearFKRestrict(dbClient *vdbClient) error {
if !vr.needFKRestrict() {
return nil
}
_, err := dbClient.Execute("set @@session.restrict_fk_on_non_standard_key=0")
return err
}

func recalculatePKColsInfoByColumnNames(uniqueKeyColumnNames []string, colInfos []*ColumnInfo) (pkColInfos []*ColumnInfo) {
pkColInfos = colInfos[:]
columnOrderMap := map[string]int64{}
Expand Down Expand Up @@ -1056,6 +1109,9 @@ func (vr *vreplicator) newClientConnection(ctx context.Context) (*vdbClient, err
if err := vr.clearFKCheck(dbClient); err != nil {
return nil, vterrors.Wrap(err, "failed to clear foreign key check")
}
if err := vr.clearFKRestrict(dbClient); err != nil {
return nil, vterrors.Wrap(err, "failed to clear foreign key restriction")
}
return dbClient, nil
}

Expand Down
13 changes: 12 additions & 1 deletion go/vt/vttablet/tabletmanager/vreplication/vreplicator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"vitess.io/vitess/go/mysql/capabilities"
"vitess.io/vitess/go/vt/binlog/binlogplayer"
"vitess.io/vitess/go/vt/dbconfigs"
"vitess.io/vitess/go/vt/mysqlctl"
Expand Down Expand Up @@ -481,7 +482,17 @@ func TestDeferSecondaryKeys(t *testing.T) {
}

// Create the table.
_, err := dbClient.ExecuteFetch(tcase.initialDDL, 1)
capability, err := capabilities.ServerVersionAtLeast(dbClient.ServerVersion(), 8, 4, 0)
require.NoError(t, err)
if capability {
_, err := dbClient.ExecuteFetch("set @@session.restrict_fk_on_non_standard_key=0", 1)
require.NoError(t, err)
defer func() {
_, err = dbClient.ExecuteFetch("set @@session.restrict_fk_on_non_standard_key=1", 1)
require.NoError(t, err)
}()
}
_, err = dbClient.ExecuteFetch(tcase.initialDDL, 1)
require.NoError(t, err)
defer func() {
_, err = dbClient.ExecuteFetch(fmt.Sprintf("drop table %s.%s", dbName, tcase.tableName), 1)
Expand Down
5 changes: 5 additions & 0 deletions go/vt/wrangler/fake_dbclient_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (

"github.com/stretchr/testify/assert"

"vitess.io/vitess/go/mysql/config"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/sqlparser"

Expand Down Expand Up @@ -177,6 +178,10 @@ func (dc *fakeDBClient) ExecuteFetchMulti(query string, maxrows int) ([]*sqltype
return results, nil
}

func (dc *fakeDBClient) ServerVersion() string {
return config.DefaultMySQLVersion
}

// ExecuteFetch is part of the DBClient interface
func (dc *fakeDBClient) executeFetch(query string, maxrows int) (*sqltypes.Result, error) {
if dbrs := dc.queries[query]; dbrs != nil {
Expand Down

0 comments on commit 0110468

Please sign in to comment.