Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

binlog replication interface between GMS and Dolt #1514

Merged
merged 29 commits into from
Jan 24, 2023
Merged
Show file tree
Hide file tree
Changes from 27 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
18f5bc7
Adding a gms.Engine reference to the server.Server struct so we can a…
fulghum Dec 14, 2022
aa9ccbc
Updating vitess dependency tip of fulghum/binlog-replication dev bran…
fulghum Dec 19, 2022
6a5cf4f
Updating vitess dependency tip of fulghum/binlog-replication dev bran…
fulghum Dec 19, 2022
3ce1391
First pass on replication plan nodes
fulghum Dec 19, 2022
8afdf58
First pass on the interface to connect Dolt and GMS for binlog replic…
fulghum Dec 20, 2022
20ff8fc
Merge branch 'fulghum/binlog-replication' of https://github.com/dolth…
fulghum Dec 20, 2022
4625c64
Adding support for `show replica status` and tidying up
fulghum Dec 20, 2022
af7669e
Adding the rest of the `show replica status` fields to schema and output
fulghum Dec 22, 2022
eb5df34
Refactoring package organization
fulghum Dec 28, 2022
bee2b5f
Wiring in connection retry attempt limit and connection retry delay c…
fulghum Dec 29, 2022
5a0fa1a
Updates to ReplicaStatus fields and formatting
fulghum Dec 29, 2022
a8f0910
Merge branch 'main' into fulghum/binlog-replication
fulghum Dec 30, 2022
026f839
Minor tweaks to tidy up
fulghum Jan 4, 2023
7e906e5
go mod tidy
fulghum Jan 4, 2023
f45f282
[ga-format-pr] Run ./format_repo.sh to fix formatting
fulghum Jan 4, 2023
3d1e722
minor note about privilege checks
fulghum Jan 4, 2023
0a7f423
Merge branch 'fulghum/binlog-replication' of https://github.com/dolth…
fulghum Jan 4, 2023
e9f32c8
Merge branch 'main' into fulghum/binlog-replication
fulghum Jan 11, 2023
cfb26ed
Embedding the new binlogReplicaControllerCommand type to simplify imp…
fulghum Jan 11, 2023
3645701
First pass on adding handling for CHANGE REPLICATION FILTER and RESET…
fulghum Jan 12, 2023
68ca776
Adding ConnectRetryInterval and ConnectRetryCount to ReplicaSourceInfo
fulghum Jan 12, 2023
0be5df9
Tidying up
fulghum Jan 13, 2023
35b177d
Cleaning up replication option support and including initial replica …
fulghum Jan 19, 2023
e181a83
Bumping to latest vitess version from fulghum/binlog-replication branch
fulghum Jan 19, 2023
cd5d01f
Merge branch 'main' into fulghum/binlog-replication
fulghum Jan 19, 2023
0c6830a
Updating from sql->types refactoring
fulghum Jan 19, 2023
6b94baf
Adding `server_id` sys var
fulghum Jan 19, 2023
45e839b
Changing to apply the binlog replica controller through an analyzer r…
fulghum Jan 24, 2023
e3cef36
Merge branch 'main' into fulghum/binlog-replication
fulghum Jan 24, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 17 additions & 9 deletions engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/dolthub/go-mysql-server/memory"
"github.com/dolthub/go-mysql-server/sql"
"github.com/dolthub/go-mysql-server/sql/analyzer"
"github.com/dolthub/go-mysql-server/sql/binlogreplication"
"github.com/dolthub/go-mysql-server/sql/expression"
"github.com/dolthub/go-mysql-server/sql/expression/function"
"github.com/dolthub/go-mysql-server/sql/parse"
Expand Down Expand Up @@ -116,15 +117,16 @@ func (p *PreparedDataCache) UncacheStmt(sessId uint32, query string) {

// Engine is a SQL engine.
type Engine struct {
Analyzer *analyzer.Analyzer
LS *sql.LockSubsystem
ProcessList sql.ProcessList
MemoryManager *sql.MemoryManager
BackgroundThreads *sql.BackgroundThreads
IsReadOnly bool
IsServerLocked bool
PreparedDataCache *PreparedDataCache
mu *sync.Mutex
Analyzer *analyzer.Analyzer
LS *sql.LockSubsystem
ProcessList sql.ProcessList
MemoryManager *sql.MemoryManager
BackgroundThreads *sql.BackgroundThreads
IsReadOnly bool
IsServerLocked bool
PreparedDataCache *PreparedDataCache
BinlogReplicaController binlogreplication.BinlogReplicaController
fulghum marked this conversation as resolved.
Show resolved Hide resolved
mu *sync.Mutex
}

type ColumnWithRawDefault struct {
Expand Down Expand Up @@ -235,6 +237,12 @@ func (e *Engine) QueryNodeWithBindings(
}
}

// The replica controller reference is held by |Engine|, so for any BinlogReplicaControllerCommands,
// we supply their replica controller here, before the command gets to the analyzer.
if nn, ok := parsed.(plan.BinlogReplicaControllerCommand); ok {
fulghum marked this conversation as resolved.
Show resolved Hide resolved
nn.SetBinlogReplicaController(e.BinlogReplicaController)
}

// Before we begin a transaction, we need to know if the database being operated on is not the one
// currently selected
transactionDatabase := analyzer.GetTransactionDatabase(ctx, parsed)
Expand Down
17 changes: 17 additions & 0 deletions enginetest/queries/variable_queries.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,23 @@ var VariableQueries = []ScriptTest{
{uint64(2)},
},
},
{
Name: "@@server_id",
Assertions: []ScriptTestAssertion{
{
Query: "select @@server_id;",
Expected: []sql.Row{{0}},
},
{
Query: "set @@server_id=123;",
Expected: []sql.Row{{}},
},
{
Query: "set @@GLOBAL.server_id=123;",
Expected: []sql.Row{{}},
},
},
},
{
Name: "set system variables and user variables",
SetUpScript: []string{
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module github.com/dolthub/go-mysql-server
require (
github.com/cespare/xxhash v1.1.0
github.com/dolthub/sqllogictest/go v0.0.0-20201107003712-816f3ae12d81
github.com/dolthub/vitess v0.0.0-20230111093229-dbe40c6c22d1
github.com/dolthub/vitess v0.0.0-20230119004230-012c165af84f
github.com/go-kit/kit v0.10.0
github.com/go-sql-driver/mysql v1.6.0
github.com/gocraft/dbr/v2 v2.7.2
Expand Down
4 changes: 2 additions & 2 deletions go.sum
100755 → 100644
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,8 @@ github.com/dolthub/jsonpath v0.0.0-20210609232853-d49537a30474 h1:xTrR+l5l+1Lfq0
github.com/dolthub/jsonpath v0.0.0-20210609232853-d49537a30474/go.mod h1:kMz7uXOXq4qRriCEyZ/LUeTqraLJCjf0WVZcUi6TxUY=
github.com/dolthub/sqllogictest/go v0.0.0-20201107003712-816f3ae12d81 h1:7/v8q9XGFa6q5Ap4Z/OhNkAMBaK5YeuEzwJt+NZdhiE=
github.com/dolthub/sqllogictest/go v0.0.0-20201107003712-816f3ae12d81/go.mod h1:siLfyv2c92W1eN/R4QqG/+RjjX5W2+gCTRjZxBjI3TY=
github.com/dolthub/vitess v0.0.0-20230111093229-dbe40c6c22d1 h1:PNOp1NXSMmvwNibFfMkDpwkck7XA51YH7uKgac2ezGo=
github.com/dolthub/vitess v0.0.0-20230111093229-dbe40c6c22d1/go.mod h1:oVFIBdqMFEkt4Xz2fzFJBNtzKhDEjwdCF0dzde39iKs=
github.com/dolthub/vitess v0.0.0-20230119004230-012c165af84f h1:I7gdWTiJ6vGk+DCOzDtnDUFx5ihCJgD5WHgiYcGuj44=
github.com/dolthub/vitess v0.0.0-20230119004230-012c165af84f/go.mod h1:oVFIBdqMFEkt4Xz2fzFJBNtzKhDEjwdCF0dzde39iKs=
github.com/dustin/go-humanize v0.0.0-20171111073723-bb3d318650d4/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk=
github.com/eapache/go-resiliency v1.1.0/go.mod h1:kFI+JgMyC7bLPUVY133qvEBtVayf5mFgVsvEsIPBvNs=
github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21/go.mod h1:+020luEh2TKB4/GOp8oxxtq0Daoen/Cii55CzbTV6DU=
Expand Down
1 change: 1 addition & 0 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ func newServerFromHandler(cfg Config, e *sqle.Engine, sm *SessionManager, handle
Listener: vtListnr,
handler: handler,
sessionMgr: sm,
Engine: e,
}, unixSocketInUse
}

Expand Down
2 changes: 2 additions & 0 deletions server/server_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/dolthub/vitess/go/mysql"
"go.opentelemetry.io/otel/trace"

gms "github.com/dolthub/go-mysql-server"
"github.com/dolthub/go-mysql-server/sql"
)

Expand All @@ -29,6 +30,7 @@ type Server struct {
Listener *mysql.Listener
handler mysql.Handler
sessionMgr *SessionManager
Engine *gms.Engine
}

// Config for the mysql server.
Expand Down
195 changes: 195 additions & 0 deletions sql/binlogreplication/binlog_replication.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,195 @@
// Copyright 2022 Dolthub, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package binlogreplication

import (
"fmt"
"strconv"
"strings"
"time"

"github.com/dolthub/go-mysql-server/sql"
)

// BinlogReplicaController allows callers to control a binlog replica. Providers built on go-mysql-server may optionally
// implement this interface and use it when constructing a SQL engine in order to receive callbacks when replication
// statements (e.g. START REPLICA, SHOW REPLICA STATUS) are being handled.
type BinlogReplicaController interface {
// StartReplica tells the binlog replica controller to start up replication processes for the current replication
// configuration. An error is returned if replication was unable to be started. Note the error response only signals
// whether there was a problem with the initial replication start up. Replication could fail after being started up
// successfully with no error response returned.
StartReplica(ctx *sql.Context) error

// StopReplica tells the binlog replica controller to stop all replication processes. An error is returned if there
// were any problems stopping replication. If no replication processes were running, no error is returned.
StopReplica(ctx *sql.Context) error

// SetReplicationSourceOptions configures the binlog replica controller with the specified source options. The
// replica controller must store this configuration. If any errors are encountered processing and storing the
// configuration options, an error is returned.
SetReplicationSourceOptions(ctx *sql.Context, options []ReplicationOption) error

// SetReplicationFilterOptions configures the binlog replica controller with the specified filter options. Although
// the official MySQL implementation does *NOT* persist these options, the replica controller should persist them.
// (MySQL requires these options to be manually set after every server restart, or to be specified as command line
// arguments when starting the MySQL process.) If any errors are encountered processing and storing the filter
// options, an error is returned.
SetReplicationFilterOptions(ctx *sql.Context, options []ReplicationOption) error

// GetReplicaStatus returns the current status of the replica, or nil if no replication processes are running. If
// any problems are encountered assembling the replica's status, an error is returned.
GetReplicaStatus(ctx *sql.Context) (*ReplicaStatus, error)

// ResetReplica resets the state for the replica. When the |resetAll| parameter is false, a "soft" or minimal reset
// is performed – replication errors are reset, but connection information and filters are NOT reset. If |resetAll|
// is true, a "hard" reset is performed – replication filters are removed, replication source options are removed,
// and `SHOW REPLICA STATUS` shows no results. If replication is currently running, this function should return an
// error indicating that replication needs to be stopped before it can be reset. If any errors were encountered
// resetting the replica state, an error is returned, otherwise nil is returned if the reset was successful.
ResetReplica(ctx *sql.Context, resetAll bool) error
}

// ReplicaStatus stores the status of a single binlog replica and is returned by `SHOW REPLICA STATUS`.
// https://dev.mysql.com/doc/refman/8.0/en/show-replica-status.html
type ReplicaStatus struct {
SourceHost string
SourceUser string
SourcePort uint
ConnectRetry uint32
SourceRetryCount uint64
ReplicaIoRunning string
ReplicaSqlRunning string
LastSqlErrNumber uint // Alias for LastErrNumber
LastSqlError string // Alias for LastError
LastIoErrNumber uint
LastIoError string
SourceServerId string
SourceServerUuid string
LastSqlErrorTimestamp *time.Time
LastIoErrorTimestamp *time.Time
RetrievedGtidSet string
ExecutedGtidSet string
AutoPosition bool
ReplicateDoTables []string
ReplicateIgnoreTables []string
}

const (
ReplicaIoNotRunning = "No"
ReplicaIoConnecting = "Connecting"
ReplicaIoRunning = "Yes"
ReplicaSqlNotRunning = "No"
ReplicaSqlRunning = "Yes"
)

// ReplicationOption represents a single option for replication configuration, as specified through the
// CHANGE REPLICATION SOURCE TO command: https://dev.mysql.com/doc/refman/8.0/en/change-replication-source-to.html
type ReplicationOption struct {
Name string
Value ReplicationOptionValue
}

// ReplicationOptionValue defines an interface for configuration option values for binlog replication. It holds the
// values of options for configuring the replication source (i.e. "CHANGE REPLICATION SOURCE TO" options) and for
// replication filtering (i.g. "SET REPLICATION FILTER" options).
type ReplicationOptionValue interface {
fmt.Stringer

// GetValue returns the raw, untyped option value. This method should generally not be used; callers should instead
// find the specific type implementing the ReplicationOptionValue interface and use its functions in order to get
// typed values.
GetValue() interface{}
}

// StringReplicationOptionValue is a ReplicationOptionValue implementation that holds a string value.
type StringReplicationOptionValue struct {
Value string
}

var _ ReplicationOptionValue = (*StringReplicationOptionValue)(nil)

func (ov StringReplicationOptionValue) GetValue() interface{} {
return ov.GetValueAsString()
}

func (ov StringReplicationOptionValue) GetValueAsString() string {
return ov.Value
}

// String implements the Stringer interface and returns a string representation of this option value.
func (ov StringReplicationOptionValue) String() string {
return ov.Value
}

// TableNamesReplicationOptionValue is a ReplicationOptionValue implementation that holds a list of table names for
// its value.
type TableNamesReplicationOptionValue struct {
Value []sql.UnresolvedTable
}

var _ ReplicationOptionValue = (*TableNamesReplicationOptionValue)(nil)

func (ov TableNamesReplicationOptionValue) GetValue() interface{} {
return ov.GetValueAsTableList()
}

func (ov TableNamesReplicationOptionValue) GetValueAsTableList() []sql.UnresolvedTable {
return ov.Value
}

// String implements the Stringer interface and returns a string representation of this option value.
func (ov TableNamesReplicationOptionValue) String() string {
sb := strings.Builder{}
for i, urt := range ov.Value {
if i > 0 {
sb.WriteString(", ")
}
if urt.Database() != "" {
sb.WriteString(urt.Database())
sb.WriteString(".")
}
sb.WriteString(urt.Name())
}
return sb.String()
}

// IntegerReplicationOptionValue is a ReplicationOptionValue implementation that holds an integer value.
type IntegerReplicationOptionValue struct {
Value int
}

var _ ReplicationOptionValue = (*IntegerReplicationOptionValue)(nil)

func (ov IntegerReplicationOptionValue) GetValue() interface{} {
return ov.GetValueAsInt()
}

func (ov IntegerReplicationOptionValue) GetValueAsInt() int {
return ov.Value
}

// String implements the Stringer interface and returns a string representation of this option value.
func (ov IntegerReplicationOptionValue) String() string {
return strconv.Itoa(ov.Value)
}

// NewReplicationOption creates a new ReplicationOption instance, with the specified |name| and |value|.
func NewReplicationOption(name string, value ReplicationOptionValue) *ReplicationOption {
return &ReplicationOption{
Name: name,
Value: value,
}
}
2 changes: 2 additions & 0 deletions sql/mysql_db/fbs/mysql_db.fbs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ table ReplicaSourceInfo {
password:string;
port:uint16;
uuid:string;
connect_retry_interval:uint32;
connect_retry_count:uint64;
}

// The MySQL Db containing all the tables
Expand Down
12 changes: 7 additions & 5 deletions sql/mysql_db/mysql_db_load.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,10 +137,12 @@ func LoadRoleEdge(serialRoleEdge *serial.RoleEdge) *RoleEdge {

func LoadReplicaSourceInfo(serialReplicaSourceInfo *serial.ReplicaSourceInfo) *ReplicaSourceInfo {
return &ReplicaSourceInfo{
Host: string(serialReplicaSourceInfo.Host()),
User: string(serialReplicaSourceInfo.User()),
Password: string(serialReplicaSourceInfo.Password()),
Port: serialReplicaSourceInfo.Port(),
Uuid: string(serialReplicaSourceInfo.Uuid()),
Host: string(serialReplicaSourceInfo.Host()),
User: string(serialReplicaSourceInfo.User()),
Password: string(serialReplicaSourceInfo.Password()),
Port: serialReplicaSourceInfo.Port(),
Uuid: string(serialReplicaSourceInfo.Uuid()),
ConnectRetryInterval: serialReplicaSourceInfo.ConnectRetryInterval(),
ConnectRetryCount: serialReplicaSourceInfo.ConnectRetryCount(),
}
}
4 changes: 3 additions & 1 deletion sql/mysql_db/mysql_db_serialize.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,8 +212,10 @@ func serializeReplicaSourceInfo(b *flatbuffers.Builder, replicaSourceInfos []*Re
serial.ReplicaSourceInfoAddPassword(b, password)
serial.ReplicaSourceInfoAddUuid(b, uuid)

// Write Port (uint value doesn't need offset)
// Write non-string fields (uint value doesn't need offset)
serial.ReplicaSourceInfoAddPort(b, replicaSourceInfo.Port)
serial.ReplicaSourceInfoAddConnectRetryInterval(b, replicaSourceInfo.ConnectRetryInterval)
serial.ReplicaSourceInfoAddConnectRetryCount(b, replicaSourceInfo.ConnectRetryCount)

// End ReplicaSourceInfo
offsets[len(replicaSourceInfos)-i-1] = serial.ReplicaSourceInfoEnd(b)
Expand Down
Loading