Skip to content

Commit

Permalink
VReplication: Optimize replication on target tablets (vitessio#17166)
Browse files Browse the repository at this point in the history
Signed-off-by: Matt Lord <mattalord@gmail.com>
  • Loading branch information
mattlord authored Dec 3, 2024
1 parent 99d534b commit 551a5f7
Show file tree
Hide file tree
Showing 18 changed files with 184 additions and 80 deletions.
1 change: 1 addition & 0 deletions examples/common/scripts/vtctld-up.sh
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ vtctld \
--port $vtctld_web_port \
--grpc_port $grpc_port \
--pid_file $VTDATAROOT/tmp/vtctld.pid \
--pprof-http \
> $VTDATAROOT/tmp/vtctld.out 2>&1 &

for _ in {0..300}; do
Expand Down
1 change: 1 addition & 0 deletions examples/common/scripts/vtgate-up.sh
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ vtgate \
--pid_file $VTDATAROOT/tmp/vtgate.pid \
--enable_buffer \
--mysql_auth_server_impl none \
--pprof-http \
> $VTDATAROOT/tmp/vtgate.out 2>&1 &

# Block waiting for vtgate to be listening
Expand Down
1 change: 1 addition & 0 deletions examples/common/scripts/vttablet-up.sh
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ vttablet \
--service_map 'grpc-queryservice,grpc-tabletmanager,grpc-updatestream' \
--pid_file $VTDATAROOT/$tablet_dir/vttablet.pid \
--heartbeat_on_demand_duration=5s \
--pprof-http \
> $VTDATAROOT/$tablet_dir/vttablet.out 2>&1 &

# Block waiting for the tablet to be listening
Expand Down
2 changes: 1 addition & 1 deletion go/flags/endtoend/vtcombo.txt
Original file line number Diff line number Diff line change
Expand Up @@ -414,7 +414,7 @@ Flags:
--vreplication_copy_phase_duration duration Duration for each copy phase loop (before running the next catchup: default 1h) (default 1h0m0s)
--vreplication_copy_phase_max_innodb_history_list_length int The maximum InnoDB transaction history that can exist on a vstreamer (source) before starting another round of copying rows. This helps to limit the impact on the source tablet. (default 1000000)
--vreplication_copy_phase_max_mysql_replication_lag int The maximum MySQL replication lag (in seconds) that can exist on a vstreamer (source) before starting another round of copying rows. This helps to limit the impact on the source tablet. (default 43200)
--vreplication_experimental_flags int (Bitmask) of experimental features in vreplication to enable (default 3)
--vreplication_experimental_flags int (Bitmask) of experimental features in vreplication to enable (default 7)
--vreplication_heartbeat_update_interval int Frequency (in seconds, default 1, max 60) at which the time_updated column of a vreplication stream when idling (default 1)
--vreplication_max_time_to_retry_on_error duration stop automatically retrying when we've had consecutive failures with the same error for this long after the first occurrence
--vreplication_net_read_timeout int Session value of net_read_timeout for vreplication, in seconds (default 300)
Expand Down
2 changes: 1 addition & 1 deletion go/flags/endtoend/vttablet.txt
Original file line number Diff line number Diff line change
Expand Up @@ -414,7 +414,7 @@ Flags:
--vreplication_copy_phase_duration duration Duration for each copy phase loop (before running the next catchup: default 1h) (default 1h0m0s)
--vreplication_copy_phase_max_innodb_history_list_length int The maximum InnoDB transaction history that can exist on a vstreamer (source) before starting another round of copying rows. This helps to limit the impact on the source tablet. (default 1000000)
--vreplication_copy_phase_max_mysql_replication_lag int The maximum MySQL replication lag (in seconds) that can exist on a vstreamer (source) before starting another round of copying rows. This helps to limit the impact on the source tablet. (default 43200)
--vreplication_experimental_flags int (Bitmask) of experimental features in vreplication to enable (default 3)
--vreplication_experimental_flags int (Bitmask) of experimental features in vreplication to enable (default 7)
--vreplication_heartbeat_update_interval int Frequency (in seconds, default 1, max 60) at which the time_updated column of a vreplication stream when idling (default 1)
--vreplication_max_time_to_retry_on_error duration stop automatically retrying when we've had consecutive failures with the same error for this long after the first occurrence
--vreplication_net_read_timeout int Session value of net_read_timeout for vreplication, in seconds (default 300)
Expand Down
4 changes: 0 additions & 4 deletions go/test/endtoend/onlineddl/flow/onlineddl_flow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@ import (
"vitess.io/vitess/go/test/endtoend/throttler"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/schema"
vttablet "vitess.io/vitess/go/vt/vttablet/common"
throttlebase "vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/base"
"vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/throttlerapp"
)
Expand Down Expand Up @@ -145,9 +144,6 @@ func TestMain(m *testing.M) {
"--heartbeat_on_demand_duration", "5s",
"--migration_check_interval", "2s",
"--watch_replication_stream",
// Test VPlayer batching mode.
fmt.Sprintf("--vreplication_experimental_flags=%d",
vttablet.VReplicationExperimentalFlagAllowNoBlobBinlogRowImage|vttablet.VReplicationExperimentalFlagOptimizeInserts|vttablet.VReplicationExperimentalFlagVPlayerBatching),
}
clusterInstance.VtGateExtraArgs = []string{
"--ddl_strategy", "online",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ import (
"vitess.io/vitess/go/test/endtoend/throttler"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/schema"
vttablet "vitess.io/vitess/go/vt/vttablet/common"
)

type WriteMetrics struct {
Expand Down Expand Up @@ -184,9 +183,6 @@ func TestMain(m *testing.M) {
"--heartbeat_on_demand_duration", "5s",
"--migration_check_interval", "5s",
"--watch_replication_stream",
// Test VPlayer batching mode.
fmt.Sprintf("--vreplication_experimental_flags=%d",
vttablet.VReplicationExperimentalFlagAllowNoBlobBinlogRowImage|vttablet.VReplicationExperimentalFlagOptimizeInserts|vttablet.VReplicationExperimentalFlagVPlayerBatching),
}
clusterInstance.VtGateExtraArgs = []string{
"--ddl_strategy", "online",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ import (
"vitess.io/vitess/go/timer"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/schema"
vttablet "vitess.io/vitess/go/vt/vttablet/common"
)

type testcase struct {
Expand Down Expand Up @@ -436,9 +435,6 @@ func TestMain(m *testing.M) {
"--migration_check_interval", "5s",
"--vstream_packet_size", "4096", // Keep this value small and below 10k to ensure multilple vstream iterations
"--watch_replication_stream",
// Test VPlayer batching mode.
fmt.Sprintf("--vreplication_experimental_flags=%d",
vttablet.VReplicationExperimentalFlagAllowNoBlobBinlogRowImage|vttablet.VReplicationExperimentalFlagOptimizeInserts|vttablet.VReplicationExperimentalFlagVPlayerBatching),
}
clusterInstance.VtGateExtraArgs = []string{
"--ddl_strategy", "online",
Expand Down
13 changes: 0 additions & 13 deletions go/test/endtoend/vreplication/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ import (
"vitess.io/vitess/go/vt/mysqlctl"
"vitess.io/vitess/go/vt/sqlparser"
"vitess.io/vitess/go/vt/vtgate/planbuilder/plancontext"
vttablet "vitess.io/vitess/go/vt/vttablet/common"

vtctldatapb "vitess.io/vitess/go/vt/proto/vtctldata"
)
Expand Down Expand Up @@ -101,18 +100,6 @@ func (cc *ClusterConfig) enableGTIDCompression() func() {
}
}

// setAllVTTabletExperimentalFlags sets all the experimental flags for vttablet and returns a function
// that can be used to reset them in a defer.
func setAllVTTabletExperimentalFlags() func() {
experimentalArgs := fmt.Sprintf("--vreplication_experimental_flags=%d",
vttablet.VReplicationExperimentalFlagAllowNoBlobBinlogRowImage|vttablet.VReplicationExperimentalFlagOptimizeInserts|vttablet.VReplicationExperimentalFlagVPlayerBatching)
oldArgs := extraVTTabletArgs
extraVTTabletArgs = append(extraVTTabletArgs, experimentalArgs)
return func() {
extraVTTabletArgs = oldArgs
}
}

// VitessCluster represents all components within the test cluster
type VitessCluster struct {
t *testing.T
Expand Down
4 changes: 0 additions & 4 deletions go/test/endtoend/vreplication/fk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import (
"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/vt/log"
vttablet "vitess.io/vitess/go/vt/vttablet/common"

binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
)
Expand All @@ -43,9 +42,6 @@ func TestFKWorkflow(t *testing.T) {
extraVTTabletArgs = []string{
// Ensure that there are multiple copy phase cycles per table.
"--vstream_packet_size=256",
// Test VPlayer batching mode.
fmt.Sprintf("--vreplication_experimental_flags=%d",
vttablet.VReplicationExperimentalFlagAllowNoBlobBinlogRowImage|vttablet.VReplicationExperimentalFlagOptimizeInserts|vttablet.VReplicationExperimentalFlagVPlayerBatching),
}
defer func() { extraVTTabletArgs = nil }()

Expand Down
4 changes: 0 additions & 4 deletions go/test/endtoend/vreplication/vdiff2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ import (
"vitess.io/vitess/go/test/endtoend/cluster"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/sqlparser"
vttablet "vitess.io/vitess/go/vt/vttablet/common"

binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
tabletmanagerdatapb "vitess.io/vitess/go/vt/proto/tabletmanagerdata"
Expand Down Expand Up @@ -140,9 +139,6 @@ func TestVDiff2(t *testing.T) {
extraVTTabletArgs = []string{
// This forces us to use multiple vstream packets even with small test tables.
"--vstream_packet_size=1",
// Test VPlayer batching mode.
fmt.Sprintf("--vreplication_experimental_flags=%d",
vttablet.VReplicationExperimentalFlagAllowNoBlobBinlogRowImage|vttablet.VReplicationExperimentalFlagOptimizeInserts|vttablet.VReplicationExperimentalFlagVPlayerBatching),
}

vc = NewVitessCluster(t, &clusterOptions{cells: strings.Split(cellNames, ",")})
Expand Down
7 changes: 4 additions & 3 deletions go/test/endtoend/vreplication/vdiff_helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ import (
)

const (
vdiffTimeout = 120 * time.Second // We can leverage auto retry on error with this longer-than-usual timeout
vdiffTimeout = 180 * time.Second // We can leverage auto retry on error with this longer-than-usual timeout
vdiffRetryTimeout = 30 * time.Second
vdiffStatusCheckInterval = 5 * time.Second
vdiffRetryInterval = 5 * time.Second
Expand Down Expand Up @@ -71,7 +71,8 @@ func doVtctlclientVDiff(t *testing.T, keyspace, workflow, cells string, want *ex
ksWorkflow := fmt.Sprintf("%s.%s", keyspace, workflow)
t.Run(fmt.Sprintf("vtctlclient vdiff %s", ksWorkflow), func(t *testing.T) {
// update-table-stats is needed in order to test progress reports.
uuid, _ := performVDiff2Action(t, true, ksWorkflow, cells, "create", "", false, "--auto-retry", "--update-table-stats")
uuid, _ := performVDiff2Action(t, true, ksWorkflow, cells, "create", "", false, "--auto-retry",
"--update-table-stats", fmt.Sprintf("--filtered_replication_wait_time=%v", vdiffTimeout/2))
info := waitForVDiff2ToComplete(t, true, ksWorkflow, cells, uuid, time.Time{})
require.NotNil(t, info)
require.Equal(t, workflow, info.Workflow)
Expand Down Expand Up @@ -164,7 +165,7 @@ func doVtctldclientVDiff(t *testing.T, keyspace, workflow, cells string, want *e
ksWorkflow := fmt.Sprintf("%s.%s", keyspace, workflow)
t.Run(fmt.Sprintf("vtctldclient vdiff %s", ksWorkflow), func(t *testing.T) {
// update-table-stats is needed in order to test progress reports.
flags := []string{"--auto-retry", "--update-table-stats"}
flags := []string{"--auto-retry", "--update-table-stats", fmt.Sprintf("--filtered-replication-wait-time=%v", vdiffTimeout/2)}
if len(extraFlags) > 0 {
flags = append(flags, extraFlags...)
}
Expand Down
3 changes: 0 additions & 3 deletions go/test/endtoend/vreplication/vreplication_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,6 @@ func TestVreplicationCopyThrottling(t *testing.T) {
}

func TestBasicVreplicationWorkflow(t *testing.T) {
defer setAllVTTabletExperimentalFlags()
sourceKsOpts["DBTypeVersion"] = "mysql-8.0"
targetKsOpts["DBTypeVersion"] = "mysql-8.0"
testBasicVreplicationWorkflow(t, "noblob")
Expand Down Expand Up @@ -595,8 +594,6 @@ func TestCellAliasVreplicationWorkflow(t *testing.T) {
cells := []string{"zone1", "zone2"}
resetCompression := mainClusterConfig.enableGTIDCompression()
defer resetCompression()
resetExperimentalFlags := setAllVTTabletExperimentalFlags()
defer resetExperimentalFlags()
vc = NewVitessCluster(t, &clusterOptions{cells: cells})
defer vc.TearDown()

Expand Down
3 changes: 1 addition & 2 deletions go/vt/vttablet/common/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,7 @@ const (
)

var (
// Default flags: currently VReplicationExperimentalFlagVPlayerBatching is not enabled by default.
vreplicationExperimentalFlags = VReplicationExperimentalFlagOptimizeInserts | VReplicationExperimentalFlagAllowNoBlobBinlogRowImage
vreplicationExperimentalFlags = VReplicationExperimentalFlagOptimizeInserts | VReplicationExperimentalFlagAllowNoBlobBinlogRowImage | VReplicationExperimentalFlagVPlayerBatching
vreplicationNetReadTimeout = 300
vreplicationNetWriteTimeout = 600
vreplicationCopyPhaseDuration = 1 * time.Hour
Expand Down
63 changes: 29 additions & 34 deletions go/vt/vttablet/tabletmanager/vreplication/replicator_plan.go
Original file line number Diff line number Diff line change
Expand Up @@ -618,66 +618,57 @@ func valsEqual(v1, v2 sqltypes.Value) bool {
func (tp *TablePlan) appendFromRow(buf *bytes2.Buffer, row *querypb.Row) error {
bindLocations := tp.BulkInsertValues.BindLocations()
if len(tp.Fields) < len(bindLocations) {
return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "wrong number of fields: got %d fields for %d bind locations ",
return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "wrong number of fields: got %d fields for %d bind locations",
len(tp.Fields), len(bindLocations))
}

type colInfo struct {
typ querypb.Type
length int64
offset int64
field *querypb.Field
}
rowInfo := make([]*colInfo, 0)

offset := int64(0)
for i, field := range tp.Fields { // collect info required for fields to be bound
length := row.Lengths[i]
if !tp.FieldsToSkip[strings.ToLower(field.Name)] {
rowInfo = append(rowInfo, &colInfo{
typ: field.Type,
length: length,
offset: offset,
field: field,
})
}
if length > 0 {
offset += row.Lengths[i]
// Bind field values to locations.
var (
offset int64
offsetQuery int
fieldsIndex int
field *querypb.Field
)
for i, loc := range bindLocations {
field = tp.Fields[fieldsIndex]
length := row.Lengths[fieldsIndex]
for tp.FieldsToSkip[strings.ToLower(field.Name)] {
if length > 0 {
offset += length
}
fieldsIndex++
field = tp.Fields[fieldsIndex]
length = row.Lengths[fieldsIndex]
}
}

// bind field values to locations
var offsetQuery int
for i, loc := range bindLocations {
col := rowInfo[i]
buf.WriteString(tp.BulkInsertValues.Query[offsetQuery:loc.Offset])
typ := col.typ
typ := field.Type

switch typ {
case querypb.Type_TUPLE:
return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "unexpected Type_TUPLE for value %d", i)
case querypb.Type_JSON:
if col.length < 0 { // An SQL NULL and not an actual JSON value
if length < 0 { // An SQL NULL and not an actual JSON value
buf.WriteString(sqltypes.NullStr)
} else { // A JSON value (which may be a JSON null literal value)
buf2 := row.Values[col.offset : col.offset+col.length]
buf2 := row.Values[offset : offset+length]
vv, err := vjson.MarshalSQLValue(buf2)
if err != nil {
return err
}
buf.WriteString(vv.RawStr())
}
default:
if col.length < 0 {
if length < 0 {
// -1 means a null variable; serialize it directly
buf.WriteString(sqltypes.NullStr)
} else {
raw := row.Values[col.offset : col.offset+col.length]
raw := row.Values[offset : offset+length]
var vv sqltypes.Value

if conversion, ok := tp.ConvertCharset[col.field.Name]; ok && col.length > 0 {
if conversion, ok := tp.ConvertCharset[field.Name]; ok && length > 0 {
// Non-null string value, for which we have a charset conversion instruction
out, err := tp.convertStringCharset(raw, conversion, col.field.Name)
out, err := tp.convertStringCharset(raw, conversion, field.Name)
if err != nil {
return err
}
Expand All @@ -690,6 +681,10 @@ func (tp *TablePlan) appendFromRow(buf *bytes2.Buffer, row *querypb.Row) error {
}
}
offsetQuery = loc.Offset + loc.Length
if length > 0 {
offset += length
}
fieldsIndex++
}
buf.WriteString(tp.BulkInsertValues.Query[offsetQuery:])
return nil
Expand Down
Loading

0 comments on commit 551a5f7

Please sign in to comment.