Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding two new binlog replication statements: RESET REPLICA and CHANGE REPLICATION FILTERS #215

Merged
merged 6 commits into from
Jan 23, 2023
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 63 additions & 2 deletions go/vt/sqlparser/ast.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"errors"
"fmt"
"io"
"strconv"
"strings"
"sync"
"unicode"
Expand Down Expand Up @@ -214,6 +215,16 @@ func stringIsUnbrokenQuote(s string, quoteChar byte) bool {
return true
}

// mustAtoi converts the string into an integer, by using strconv.atoi, and returns the result. If any errors are
// encountered, it registers a parsing error with |yylex|.
func mustAtoi(yylex interface{}, s string) int {
fulghum marked this conversation as resolved.
Show resolved Hide resolved
i, err := strconv.Atoi(s)
if err != nil {
yylex.(*Tokenizer).Error(fmt.Sprintf("unable to parse integer from string '%s'", s))
}
return i
}

// ParseTokenizer is a raw interface to parse from the given tokenizer.
// This does not used pooled parsers, and should not be used in general.
func ParseTokenizer(tokenizer *Tokenizer) int {
Expand Down Expand Up @@ -3332,15 +3343,48 @@ func (s *ChangeReplicationSource) Format(buf *TrackedBuffer) {
}
buf.WriteString(strings.ToLower(option.Name))
buf.WriteString(" = ")
buf.WriteString(option.Value)
buf.WriteString(fmt.Sprintf("%v", option.Value))
}
}

// ChangeReplicationFilter represents a "CHANGE REPLICATION FILTER" statement.
// https://dev.mysql.com/doc/refman/8.0/en/change-replication-filter.html
type ChangeReplicationFilter struct {
Options []*ReplicationOption
}

var _ Statement = (*ChangeReplicationFilter)(nil)

func (*ChangeReplicationFilter) iStatement() {}

func (c *ChangeReplicationFilter) Format(buf *TrackedBuffer) {
buf.WriteString("change replication filter ")
for i, option := range c.Options {
if i > 0 {
buf.WriteString(", ")
}
buf.WriteString(strings.ToLower(option.Name))
buf.WriteString(" = (")
switch value := option.Value.(type) {
case TableNames:
for i, tableName := range value {
if i > 0 {
buf.WriteString(", ")
}
buf.WriteString(tableName.String())
}
default:
panic(fmt.Sprintf("unexpected option value type: %T", option.Value))
}
buf.WriteString(")")
}
}

// ReplicationOption represents a single replication option name and value.
// See https://dev.mysql.com/doc/refman/8.0/en/change-replication-source-to.html for available options.
type ReplicationOption struct {
Name string
Value string
Value interface{}
}

// StartReplica represents a "START REPLICA" statement.
Expand All @@ -3367,6 +3411,23 @@ func (r *StopReplica) Format(buf *TrackedBuffer) {
buf.WriteString("stop replica")
}

// ResetReplica represents a "RESET REPLICA" statement.
// https://dev.mysql.com/doc/refman/8.0/en/reset-replica.html
type ResetReplica struct {
All bool
}

var _ Statement = (*ResetReplica)(nil)

func (*ResetReplica) iStatement() {}

func (r *ResetReplica) Format(buf *TrackedBuffer) {
buf.WriteString("reset replica")
if r.All {
buf.WriteString(" all")
}
}

// OtherRead represents a DESCRIBE, or EXPLAIN statement.
// It should be used only as an indicator. It does not contain
// the full AST for the statement.
Expand Down
48 changes: 38 additions & 10 deletions go/vt/sqlparser/parse_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,30 @@ var (
output: "change replication source to source_host = Host",
},
{
input: "change replication source to SOURCE_HOST = 'host', SOURCE_PASSWORD='PaSSword', SOURCE_PORT=12345, source_user='root'",
output: "change replication source to source_host = host, source_password = PaSSword, source_port = 12345, source_user = root",
input: "change replication source to SOURCE_HOST = 'host', SOURCE_PASSWORD='PaSSword', SOURCE_PORT=12345, source_user='root', SOURCE_CONNECT_RETRY=60, SOURCE_RETRY_COUNT=3",
output: "change replication source to source_host = host, source_password = PaSSword, source_port = 12345, source_user = root, source_connect_retry = 60, source_retry_count = 3",
},
{
input: "change replication filter REPLICATE_DO_TABLE=(table1)",
output: "change replication filter replicate_do_table = (table1)",
},
{
input: "change replication filter REPLICATE_DO_TABLE=(table1, db.table2, db.table3)",
output: "change replication filter replicate_do_table = (table1, db.table2, db.table3)",
},
{
input: "change replication filter REPLICATE_DO_TABLE=(db1.t1, db2.t2), REPLICATE_IGNORE_TABLE=(t1)",
output: "change replication filter replicate_do_table = (db1.t1, db2.t2), replicate_ignore_table = (t1)",
},
{
input: "change replication filter REPLICATE_DO_TABLE=(db1.t1, db2.t2), REPLICATE_IGNORE_TABLE=(db1.t1, db2.t2)",
output: "change replication filter replicate_do_table = (db1.t1, db2.t2), replicate_ignore_table = (db1.t1, db2.t2)",
},
{
input: "reset replica",
},
{
input: "reset replica all",
},
{
input: "create database `db1` charset 'utf8mb4' collate 'utf8_bin';",
Expand Down Expand Up @@ -3701,6 +3723,12 @@ func TestInvalid(t *testing.T) {
input string
err string
}{{
input: "CHANGE REPLICATION FILTER",
err: "syntax error",
}, {
input: "change replication filter REPLICATE_DO_TABLE=()",
err: "syntax error",
}, {
input: "CHANGE REPLICATION SOURCE TO",
err: "syntax error",
}, {
Expand Down Expand Up @@ -5893,20 +5921,20 @@ var (
input: "drop table dual",
output: "syntax error at position 16 near 'dual'",
}, {
input: "CREATE PROCEDURE testproc() BEGIN begin1: BEGIN END begin2; END",
output: "End-label begin2 without match at position 59 near 'begin2'",
input: "CREATE PROCEDURE testproc() BEGIN begin1: BEGIN END begin2; END",
output: "End-label begin2 without match at position 59 near 'begin2'",
excludeMulti: true,
}, {
input: "CREATE PROCEDURE testproc() BEGIN loop1: LOOP BEGIN END; END LOOP loop2; END",
output: "End-label loop2 without match at position 72 near 'loop2'",
input: "CREATE PROCEDURE testproc() BEGIN loop1: LOOP BEGIN END; END LOOP loop2; END",
output: "End-label loop2 without match at position 72 near 'loop2'",
excludeMulti: true,
}, {
input: "CREATE PROCEDURE testproc() BEGIN repeat1: REPEAT BEGIN END; UNTIL a > 7 END REPEAT repeat2; END",
output: "End-label repeat2 without match at position 92 near 'repeat2'",
input: "CREATE PROCEDURE testproc() BEGIN repeat1: REPEAT BEGIN END; UNTIL a > 7 END REPEAT repeat2; END",
output: "End-label repeat2 without match at position 92 near 'repeat2'",
excludeMulti: true,
}, {
input: "CREATE PROCEDURE testproc() BEGIN while1: WHILE a > 7 DO BEGIN END; END WHILE while2; END",
output: "End-label while2 without match at position 85 near 'while2'",
input: "CREATE PROCEDURE testproc() BEGIN while1: WHILE a > 7 DO BEGIN END; END WHILE while2; END",
output: "End-label while2 without match at position 85 near 'while2'",
excludeMulti: true,
},
}
Expand Down
Loading