diff --git a/go/test/endtoend/tabletmanager/throttler_topo/throttler_test.go b/go/test/endtoend/tabletmanager/throttler_topo/throttler_test.go index fe87262a21f..a3204e0be2f 100644 --- a/go/test/endtoend/tabletmanager/throttler_topo/throttler_test.go +++ b/go/test/endtoend/tabletmanager/throttler_topo/throttler_test.go @@ -172,12 +172,12 @@ func throttledApps(tablet *cluster.Vttablet) (resp *http.Response, respBody stri } func throttleCheck(tablet *cluster.Vttablet, skipRequestHeartbeats bool) (*http.Response, error) { - resp, err := httpClient.Get(fmt.Sprintf("http://localhost:%d/%s?s=%t", tablet.HTTPPort, checkAPIPath, skipRequestHeartbeats)) + resp, err := httpClient.Get(fmt.Sprintf("http://localhost:%d/%s?app=test&s=%t", tablet.HTTPPort, checkAPIPath, skipRequestHeartbeats)) return resp, err } func throttleCheckSelf(tablet *cluster.Vttablet) (*http.Response, error) { - return httpClient.Get(fmt.Sprintf("http://localhost:%d/%s", tablet.HTTPPort, checkSelfAPIPath)) + return httpClient.Get(fmt.Sprintf("http://localhost:%d/%s?app=test", tablet.HTTPPort, checkSelfAPIPath)) } func warmUpHeartbeat(t *testing.T) (respStatus int) { diff --git a/go/vt/vttablet/tabletmanager/vdiff/framework_test.go b/go/vt/vttablet/tabletmanager/vdiff/framework_test.go index bb173c41abc..bd1c3f13005 100644 --- a/go/vt/vttablet/tabletmanager/vdiff/framework_test.go +++ b/go/vt/vttablet/tabletmanager/vdiff/framework_test.go @@ -235,7 +235,7 @@ func (ftc *fakeTabletConn) VStream(ctx context.Context, request *binlogdatapb.VS if vstreamHook != nil { vstreamHook(ctx) } - return vdiffenv.vse.Stream(ctx, request.Position, request.TableLastPKs, request.Filter, send) + return vdiffenv.vse.Stream(ctx, request.Position, request.TableLastPKs, request.Filter, false, send) } // vstreamRowsHook allows you to do work just before calling VStreamRows. diff --git a/go/vt/vttablet/tabletmanager/vreplication/external_connector.go b/go/vt/vttablet/tabletmanager/vreplication/external_connector.go index ac411de7ce6..0bebf076fef 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/external_connector.go +++ b/go/vt/vttablet/tabletmanager/vreplication/external_connector.go @@ -126,7 +126,7 @@ func (c *mysqlConnector) Close(ctx context.Context) error { } func (c *mysqlConnector) VStream(ctx context.Context, startPos string, tablePKs []*binlogdatapb.TableLastPK, filter *binlogdatapb.Filter, send func([]*binlogdatapb.VEvent) error) error { - return c.vstreamer.Stream(ctx, startPos, tablePKs, filter, send) + return c.vstreamer.Stream(ctx, startPos, tablePKs, filter, true, send) } func (c *mysqlConnector) VStreamRows(ctx context.Context, query string, lastpk *querypb.QueryResult, send func(*binlogdatapb.VStreamRowsResponse) error) error { diff --git a/go/vt/vttablet/tabletmanager/vreplication/framework_test.go b/go/vt/vttablet/tabletmanager/vreplication/framework_test.go index 5ed944125af..9d75f7381e8 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/framework_test.go +++ b/go/vt/vttablet/tabletmanager/vreplication/framework_test.go @@ -307,7 +307,7 @@ func (ftc *fakeTabletConn) VStream(ctx context.Context, request *binlogdatapb.VS if vstreamHook != nil { vstreamHook(ctx) } - return streamerEngine.Stream(ctx, request.Position, request.TableLastPKs, request.Filter, send) + return streamerEngine.Stream(ctx, request.Position, request.TableLastPKs, request.Filter, false, send) } // vstreamRowsHook allows you to do work just before calling VStreamRows. diff --git a/go/vt/vttablet/tabletmanager/vreplication/replica_connector.go b/go/vt/vttablet/tabletmanager/vreplication/replica_connector.go index 3954f4d0546..2fa2b3f148c 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/replica_connector.go +++ b/go/vt/vttablet/tabletmanager/vreplication/replica_connector.go @@ -84,7 +84,7 @@ func (c *ReplicaConnector) Close(ctx context.Context) error { } func (c *ReplicaConnector) VStream(ctx context.Context, startPos string, filter *binlogdatapb.Filter, send func([]*binlogdatapb.VEvent) error) error { - return c.vstreamer.Stream(ctx, startPos, nil, filter, send) + return c.vstreamer.Stream(ctx, startPos, nil, filter, true, send) } // VStreamRows streams rows from query result diff --git a/go/vt/vttablet/tabletserver/binlog_watcher.go b/go/vt/vttablet/tabletserver/binlog_watcher.go index 6c713791e6f..85472fa5d8d 100644 --- a/go/vt/vttablet/tabletserver/binlog_watcher.go +++ b/go/vt/vttablet/tabletserver/binlog_watcher.go @@ -31,7 +31,7 @@ import ( // VStreamer defines the functions of VStreamer // that the BinlogWatcher needs. type VStreamer interface { - Stream(ctx context.Context, startPos string, tablePKs []*binlogdatapb.TableLastPK, filter *binlogdatapb.Filter, send func([]*binlogdatapb.VEvent) error) error + Stream(ctx context.Context, startPos string, tablePKs []*binlogdatapb.TableLastPK, filter *binlogdatapb.Filter, useThrottler bool, send func([]*binlogdatapb.VEvent) error) error } // BinlogWatcher is a tabletserver service that watches the @@ -91,7 +91,7 @@ func (blw *BinlogWatcher) process(ctx context.Context) { for { // VStreamer will reload the schema when it encounters a DDL. - err := blw.vs.Stream(ctx, "current", nil, filter, func(events []*binlogdatapb.VEvent) error { + err := blw.vs.Stream(ctx, "current", nil, filter, false, func(events []*binlogdatapb.VEvent) error { return nil }) log.Infof("ReplicationWatcher VStream ended: %v, retrying in 5 seconds", err) diff --git a/go/vt/vttablet/tabletserver/messager/engine.go b/go/vt/vttablet/tabletserver/messager/engine.go index 2d7fdf2bb82..caed01e3c9b 100644 --- a/go/vt/vttablet/tabletserver/messager/engine.go +++ b/go/vt/vttablet/tabletserver/messager/engine.go @@ -44,7 +44,7 @@ type TabletService interface { // VStreamer defines the functions of VStreamer // that the messager needs. type VStreamer interface { - Stream(ctx context.Context, startPos string, tablePKs []*binlogdatapb.TableLastPK, filter *binlogdatapb.Filter, send func([]*binlogdatapb.VEvent) error) error + Stream(ctx context.Context, startPos string, tablePKs []*binlogdatapb.TableLastPK, filter *binlogdatapb.Filter, useThrottler bool, send func([]*binlogdatapb.VEvent) error) error StreamResults(ctx context.Context, query string, send func(*binlogdatapb.VStreamResultsResponse) error) error } diff --git a/go/vt/vttablet/tabletserver/messager/message_manager.go b/go/vt/vttablet/tabletserver/messager/message_manager.go index 39598169baa..4f9db3e0355 100644 --- a/go/vt/vttablet/tabletserver/messager/message_manager.go +++ b/go/vt/vttablet/tabletserver/messager/message_manager.go @@ -684,7 +684,7 @@ func (mm *messageManager) runOneVStream(ctx context.Context) error { var curPos string var fields []*querypb.Field - err := mm.vs.Stream(ctx, "current", nil, mm.vsFilter, func(events []*binlogdatapb.VEvent) error { + err := mm.vs.Stream(ctx, "current", nil, mm.vsFilter, false, func(events []*binlogdatapb.VEvent) error { // We need to get the flow control lock mm.cacheManagementMu.Lock() defer mm.cacheManagementMu.Unlock() diff --git a/go/vt/vttablet/tabletserver/messager/message_manager_test.go b/go/vt/vttablet/tabletserver/messager/message_manager_test.go index d6da59db065..51cfc881bee 100644 --- a/go/vt/vttablet/tabletserver/messager/message_manager_test.go +++ b/go/vt/vttablet/tabletserver/messager/message_manager_test.go @@ -888,7 +888,7 @@ func (fv *fakeVStreamer) setPollerResponse(pr []*binlogdatapb.VStreamResultsResp fv.pollerResponse = pr } -func (fv *fakeVStreamer) Stream(ctx context.Context, startPos string, tablePKs []*binlogdatapb.TableLastPK, filter *binlogdatapb.Filter, send func([]*binlogdatapb.VEvent) error) error { +func (fv *fakeVStreamer) Stream(ctx context.Context, startPos string, tablePKs []*binlogdatapb.TableLastPK, filter *binlogdatapb.Filter, useThrottler bool, send func([]*binlogdatapb.VEvent) error) error { fv.streamInvocations.Add(1) for { fv.mu.Lock() diff --git a/go/vt/vttablet/tabletserver/schema/tracker.go b/go/vt/vttablet/tabletserver/schema/tracker.go index 490ad59eadf..e0d8b61e9ed 100644 --- a/go/vt/vttablet/tabletserver/schema/tracker.go +++ b/go/vt/vttablet/tabletserver/schema/tracker.go @@ -40,7 +40,7 @@ import ( // VStreamer defines the functions of VStreamer // that the replicationWatcher needs. type VStreamer interface { - Stream(ctx context.Context, startPos string, tablePKs []*binlogdatapb.TableLastPK, filter *binlogdatapb.Filter, send func([]*binlogdatapb.VEvent) error) error + Stream(ctx context.Context, startPos string, tablePKs []*binlogdatapb.TableLastPK, filter *binlogdatapb.Filter, useThrottler bool, send func([]*binlogdatapb.VEvent) error) error } // Tracker watches the replication and saves the latest schema into the schema_version table when a DDL is encountered. @@ -129,7 +129,7 @@ func (tr *Tracker) process(ctx context.Context) { var gtid string for { - err := tr.vs.Stream(ctx, "current", nil, filter, func(events []*binlogdatapb.VEvent) error { + err := tr.vs.Stream(ctx, "current", nil, filter, false, func(events []*binlogdatapb.VEvent) error { for _, event := range events { if event.Type == binlogdatapb.VEventType_GTID { gtid = event.Gtid diff --git a/go/vt/vttablet/tabletserver/schema/tracker_test.go b/go/vt/vttablet/tabletserver/schema/tracker_test.go index 9822b6bfe5a..50f262169a4 100644 --- a/go/vt/vttablet/tabletserver/schema/tracker_test.go +++ b/go/vt/vttablet/tabletserver/schema/tracker_test.go @@ -137,7 +137,7 @@ type fakeVstreamer struct { events [][]*binlogdatapb.VEvent } -func (f *fakeVstreamer) Stream(ctx context.Context, startPos string, tablePKs []*binlogdatapb.TableLastPK, filter *binlogdatapb.Filter, send func([]*binlogdatapb.VEvent) error) error { +func (f *fakeVstreamer) Stream(ctx context.Context, startPos string, tablePKs []*binlogdatapb.TableLastPK, filter *binlogdatapb.Filter, useThrottler bool, send func([]*binlogdatapb.VEvent) error) error { for _, events := range f.events { err := send(events) if err != nil { diff --git a/go/vt/vttablet/tabletserver/tabletserver.go b/go/vt/vttablet/tabletserver/tabletserver.go index ff641fe7198..d86b6b8ab1d 100644 --- a/go/vt/vttablet/tabletserver/tabletserver.go +++ b/go/vt/vttablet/tabletserver/tabletserver.go @@ -1144,7 +1144,7 @@ func (tsv *TabletServer) VStream(ctx context.Context, request *binlogdatapb.VStr if err := tsv.sm.VerifyTarget(ctx, request.Target); err != nil { return err } - return tsv.vstreamer.Stream(ctx, request.Position, request.TableLastPKs, request.Filter, send) + return tsv.vstreamer.Stream(ctx, request.Position, request.TableLastPKs, request.Filter, true, send) } // VStreamRows streams rows from the specified starting point. diff --git a/go/vt/vttablet/tabletserver/throttle/check_result.go b/go/vt/vttablet/tabletserver/throttle/check_result.go index 52d52b78468..3bc162b623a 100644 --- a/go/vt/vttablet/tabletserver/throttle/check_result.go +++ b/go/vt/vttablet/tabletserver/throttle/check_result.go @@ -14,11 +14,12 @@ import ( // CheckResult is the result for an app inquiring on a metric. It also exports as JSON via the API type CheckResult struct { - StatusCode int `json:"StatusCode"` - Value float64 `json:"Value"` - Threshold float64 `json:"Threshold"` - Error error `json:"-"` - Message string `json:"Message"` + StatusCode int `json:"StatusCode"` + Value float64 `json:"Value"` + Threshold float64 `json:"Threshold"` + Error error `json:"-"` + Message string `json:"Message"` + RecentlyChecked bool `json:"RecentlyChecked"` } // NewCheckResult returns a CheckResult diff --git a/go/vt/vttablet/tabletserver/throttle/client.go b/go/vt/vttablet/tabletserver/throttle/client.go index 10fe910d264..9a3f699b2da 100644 --- a/go/vt/vttablet/tabletserver/throttle/client.go +++ b/go/vt/vttablet/tabletserver/throttle/client.go @@ -140,7 +140,7 @@ func (c *Client) Throttle(ctx context.Context) { return } if c.ThrottleCheckOKOrWait(ctx) { - break + return } } } diff --git a/go/vt/vttablet/tabletserver/throttle/mysql/mysql_throttle_metric.go b/go/vt/vttablet/tabletserver/throttle/mysql/mysql_throttle_metric.go index 3c2fe005828..8c8a5cc4b32 100644 --- a/go/vt/vttablet/tabletserver/throttle/mysql/mysql_throttle_metric.go +++ b/go/vt/vttablet/tabletserver/throttle/mysql/mysql_throttle_metric.go @@ -95,7 +95,7 @@ func (metric *MySQLThrottleMetric) Get() (float64, error) { } // ReadThrottleMetric returns a metric for the given probe. Either by explicit query -// or via SHOW SLAVE STATUS +// or via SHOW REPLICA STATUS func ReadThrottleMetric(probe *Probe, clusterName string, overrideGetMetricFunc func() *MySQLThrottleMetric) (mySQLThrottleMetric *MySQLThrottleMetric) { if mySQLThrottleMetric := getCachedMySQLThrottleMetric(probe); mySQLThrottleMetric != nil { return mySQLThrottleMetric diff --git a/go/vt/vttablet/tabletserver/throttle/throttler.go b/go/vt/vttablet/tabletserver/throttle/throttler.go index 56e913bb119..b10100ec01c 100644 --- a/go/vt/vttablet/tabletserver/throttle/throttler.go +++ b/go/vt/vttablet/tabletserver/throttle/throttler.go @@ -126,6 +126,12 @@ type Throttler struct { srvTopoServer srvtopo.Server heartbeatWriter heartbeat.HeartbeatWriter + // recentCheckTickerValue is an ever increasing number, incrementing once per second. + recentCheckTickerValue int64 + // recentCheckValue is set to match or exceed recentCheckTickerValue whenever a "check" was made (other than by the throttler itself). + // when recentCheckValue < recentCheckTickerValue that means there hasn't been a recent check. + recentCheckValue int64 + throttleTabletTypesMap map[topodatapb.TabletType]bool mysqlThrottleMetricChan chan *mysql.MySQLThrottleMetric @@ -576,6 +582,7 @@ func (throttler *Throttler) Operate(ctx context.Context) { mysqlRefreshTicker := addTicker(mysqlRefreshInterval) mysqlAggregateTicker := addTicker(mysqlAggregateInterval) throttledAppsTicker := addTicker(throttledAppsSnapshotInterval) + recentCheckTicker := addTicker(time.Second) go func() { defer log.Infof("Throttler: Operate terminated, tickers stopped") @@ -670,6 +677,9 @@ func (throttler *Throttler) Operate(ctx context.Context) { } case throttlerConfig := <-throttler.throttlerConfigChan: throttler.applyThrottlerConfig(ctx, throttlerConfig) + case <-recentCheckTicker.C: + // Increment recentCheckTickerValue by one. + atomic.AddInt64(&throttler.recentCheckTickerValue, 1) } } }() @@ -682,7 +692,7 @@ func (throttler *Throttler) generateTabletHTTPProbeFunction(ctx context.Context, mySQLThrottleMetric.ClusterName = clusterName mySQLThrottleMetric.Key = probe.Key - tabletCheckSelfURL := fmt.Sprintf("http://%s:%d/throttler/check-self?app=vitess", probe.TabletHost, probe.TabletPort) + tabletCheckSelfURL := fmt.Sprintf("http://%s:%d/throttler/check-self?app=%s", probe.TabletHost, probe.TabletPort, vitessAppName) resp, err := throttler.httpClient.Get(tabletCheckSelfURL) if err != nil { mySQLThrottleMetric.Err = err @@ -704,6 +714,11 @@ func (throttler *Throttler) generateTabletHTTPProbeFunction(ctx context.Context, if checkResult.StatusCode == http.StatusInternalServerError { mySQLThrottleMetric.Err = fmt.Errorf("Status code: %d", checkResult.StatusCode) } + if checkResult.RecentlyChecked { + // We have just probed a tablet, and it reported back that someone just recently "check"ed it. + // We therefore renew the heartbeats lease. + go throttler.heartbeatWriter.RequestHeartbeats() + } return mySQLThrottleMetric } } @@ -1015,7 +1030,17 @@ func (throttler *Throttler) checkStore(ctx context.Context, appName string, stor if !throttler.IsEnabled() { return okMetricCheckResult } - return throttler.check.Check(ctx, appName, "mysql", storeName, remoteAddr, flags) + checkResult = throttler.check.Check(ctx, appName, "mysql", storeName, remoteAddr, flags) + + if atomic.LoadInt64(&throttler.recentCheckValue) >= atomic.LoadInt64(&throttler.recentCheckTickerValue) { + // This indicates someone, who is not "vitess" ie not internal to the throttling logic, did a _recent_ `check`. + // This could be online-ddl, or vreplication or whoever else. + // If this tablet is a REPLICA or RDONLY, we want to advertise to the PRIMARY that someone did a recent check, + // so that the PRIMARY knows it must renew the heartbeat lease. + checkResult.RecentlyChecked = true + } + + return checkResult } // checkShard checks the health of the shard, and runs on the primary tablet only @@ -1030,8 +1055,12 @@ func (throttler *Throttler) checkSelf(ctx context.Context, appName string, remot // CheckByType runs a check by requested check type func (throttler *Throttler) CheckByType(ctx context.Context, appName string, remoteAddr string, flags *CheckFlags, checkType ThrottleCheckType) (checkResult *CheckResult) { - if throttler.IsEnabled() && !flags.SkipRequestHeartbeats { + if throttler.IsEnabled() && !flags.SkipRequestHeartbeats && appName != vitessAppName { go throttler.heartbeatWriter.RequestHeartbeats() + // This check was made by someone other than the throttler itself, i.e. this came from online-ddl or vreplication or other. + // We mark the fact that someone just made a check. If this is a REPLICA or RDONLY tables, this will be reported back + // to the PRIMARY so that it knows it must renew the heartbeat lease. + atomic.StoreInt64(&throttler.recentCheckValue, 1+atomic.LoadInt64(&throttler.recentCheckTickerValue)) } switch checkType { case ThrottleCheckSelf: diff --git a/go/vt/vttablet/tabletserver/vstreamer/copy.go b/go/vt/vttablet/tabletserver/vstreamer/copy.go index 62b93cf5063..dcd8dacd121 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/copy.go +++ b/go/vt/vttablet/tabletserver/vstreamer/copy.go @@ -73,7 +73,7 @@ func (uvs *uvstreamer) catchup(ctx context.Context) error { errch := make(chan error, 1) go func() { startPos := mysql.EncodePosition(uvs.pos) - vs := newVStreamer(ctx, uvs.cp, uvs.se, startPos, "", uvs.filter, uvs.getVSchema(), uvs.send2, "catchup", uvs.vse) + vs := newVStreamer(ctx, uvs.cp, uvs.se, startPos, "", uvs.filter, uvs.getVSchema(), uvs.useThrottler, uvs.send2, "catchup", uvs.vse) uvs.setVs(vs) errch <- vs.Stream() uvs.setVs(nil) @@ -302,7 +302,7 @@ func (uvs *uvstreamer) fastForward(stopPos string) error { }() log.Infof("starting fastForward from %s upto pos %s", mysql.EncodePosition(uvs.pos), stopPos) uvs.stopPos, _ = mysql.DecodePosition(stopPos) - vs := newVStreamer(uvs.ctx, uvs.cp, uvs.se, mysql.EncodePosition(uvs.pos), "", uvs.filter, uvs.getVSchema(), uvs.send2, "fastforward", uvs.vse) + vs := newVStreamer(uvs.ctx, uvs.cp, uvs.se, mysql.EncodePosition(uvs.pos), "", uvs.filter, uvs.getVSchema(), uvs.useThrottler, uvs.send2, "fastforward", uvs.vse) uvs.setVs(vs) return vs.Stream() } diff --git a/go/vt/vttablet/tabletserver/vstreamer/engine.go b/go/vt/vttablet/tabletserver/vstreamer/engine.go index 829d0b88a5f..9e8074fb4b0 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/engine.go +++ b/go/vt/vttablet/tabletserver/vstreamer/engine.go @@ -226,7 +226,7 @@ func (vse *Engine) validateBinlogRowImage(ctx context.Context, db dbconfigs.Conn // Stream starts a new stream. // This streams events from the binary logs -func (vse *Engine) Stream(ctx context.Context, startPos string, tablePKs []*binlogdatapb.TableLastPK, filter *binlogdatapb.Filter, send func([]*binlogdatapb.VEvent) error) error { +func (vse *Engine) Stream(ctx context.Context, startPos string, tablePKs []*binlogdatapb.TableLastPK, filter *binlogdatapb.Filter, useThrottler bool, send func([]*binlogdatapb.VEvent) error) error { if err := vse.validateBinlogRowImage(ctx, vse.se.GetDBConnector()); err != nil { return err @@ -244,7 +244,7 @@ func (vse *Engine) Stream(ctx context.Context, startPos string, tablePKs []*binl } vse.mu.Lock() defer vse.mu.Unlock() - streamer := newUVStreamer(ctx, vse, vse.env.Config().DB.FilteredWithDB(), vse.se, startPos, tablePKs, filter, vse.lvschema, send) + streamer := newUVStreamer(ctx, vse, vse.env.Config().DB.FilteredWithDB(), vse.se, startPos, tablePKs, filter, vse.lvschema, useThrottler, send) idx := vse.streamIdx vse.streamers[idx] = streamer vse.streamIdx++ diff --git a/go/vt/vttablet/tabletserver/vstreamer/engine_test.go b/go/vt/vttablet/tabletserver/vstreamer/engine_test.go index 16729e8fd24..d271164558a 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/engine_test.go +++ b/go/vt/vttablet/tabletserver/vstreamer/engine_test.go @@ -93,7 +93,7 @@ func TestUpdateVSchema(t *testing.T) { }}, } // Stream should terminate immediately due to invalid pos. - _ = engine.Stream(ctx, "invalid", nil, filter, func(_ []*binlogdatapb.VEvent) error { + _ = engine.Stream(ctx, "invalid", nil, filter, false, func(_ []*binlogdatapb.VEvent) error { return nil }) diff --git a/go/vt/vttablet/tabletserver/vstreamer/uvstreamer.go b/go/vt/vttablet/tabletserver/vstreamer/uvstreamer.go index e26b6f2939f..e33ca77e478 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/uvstreamer.go +++ b/go/vt/vttablet/tabletserver/vstreamer/uvstreamer.go @@ -51,13 +51,14 @@ type uvstreamer struct { cancel func() // input parameters - vse *Engine - send func([]*binlogdatapb.VEvent) error - cp dbconfigs.Connector - se *schema.Engine - startPos string - filter *binlogdatapb.Filter - inTablePKs []*binlogdatapb.TableLastPK + vse *Engine + send func([]*binlogdatapb.VEvent) error + cp dbconfigs.Connector + se *schema.Engine + startPos string + filter *binlogdatapb.Filter + inTablePKs []*binlogdatapb.TableLastPK + useThrottler bool vschema *localVSchema @@ -90,7 +91,7 @@ type uvstreamerConfig struct { CatchupRetryTime time.Duration } -func newUVStreamer(ctx context.Context, vse *Engine, cp dbconfigs.Connector, se *schema.Engine, startPos string, tablePKs []*binlogdatapb.TableLastPK, filter *binlogdatapb.Filter, vschema *localVSchema, send func([]*binlogdatapb.VEvent) error) *uvstreamer { +func newUVStreamer(ctx context.Context, vse *Engine, cp dbconfigs.Connector, se *schema.Engine, startPos string, tablePKs []*binlogdatapb.TableLastPK, filter *binlogdatapb.Filter, vschema *localVSchema, useThrottler bool, send func([]*binlogdatapb.VEvent) error) *uvstreamer { ctx, cancel := context.WithCancel(ctx) config := &uvstreamerConfig{ MaxReplicationLag: 1 * time.Nanosecond, @@ -105,17 +106,18 @@ func newUVStreamer(ctx context.Context, vse *Engine, cp dbconfigs.Connector, se return send(evs) } uvs := &uvstreamer{ - ctx: ctx, - cancel: cancel, - vse: vse, - send: send2, - cp: cp, - se: se, - startPos: startPos, - filter: filter, - vschema: vschema, - config: config, - inTablePKs: tablePKs, + ctx: ctx, + cancel: cancel, + vse: vse, + send: send2, + cp: cp, + se: se, + startPos: startPos, + filter: filter, + vschema: vschema, + config: config, + inTablePKs: tablePKs, + useThrottler: useThrottler, } return uvs @@ -418,7 +420,7 @@ func (uvs *uvstreamer) Stream() error { } } vs := newVStreamer(uvs.ctx, uvs.cp, uvs.se, mysql.EncodePosition(uvs.pos), mysql.EncodePosition(uvs.stopPos), - uvs.filter, uvs.getVSchema(), uvs.send, "replicate", uvs.vse) + uvs.filter, uvs.getVSchema(), uvs.useThrottler, uvs.send, "replicate", uvs.vse) uvs.setVs(vs) return vs.Stream() diff --git a/go/vt/vttablet/tabletserver/vstreamer/uvstreamer_flaky_test.go b/go/vt/vttablet/tabletserver/vstreamer/uvstreamer_flaky_test.go index 7d22f9c0034..ec2a8d2334d 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/uvstreamer_flaky_test.go +++ b/go/vt/vttablet/tabletserver/vstreamer/uvstreamer_flaky_test.go @@ -440,7 +440,7 @@ func getEventCallback(event *binlogdatapb.VEvent) func() { func startVStreamCopy(ctx context.Context, t *testing.T, filter *binlogdatapb.Filter, tablePKs []*binlogdatapb.TableLastPK) { pos := "" go func() { - err := engine.Stream(ctx, pos, tablePKs, filter, func(evs []*binlogdatapb.VEvent) error { + err := engine.Stream(ctx, pos, tablePKs, filter, true, func(evs []*binlogdatapb.VEvent) error { //t.Logf("Received events: %v", evs) muAllEvents.Lock() defer muAllEvents.Unlock() diff --git a/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go b/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go index d9ac730e43e..cf5c5fb350c 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go +++ b/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go @@ -61,11 +61,12 @@ type vstreamer struct { ctx context.Context cancel func() - cp dbconfigs.Connector - se *schema.Engine - startPos string - filter *binlogdatapb.Filter - send func([]*binlogdatapb.VEvent) error + cp dbconfigs.Connector + se *schema.Engine + startPos string + filter *binlogdatapb.Filter + send func([]*binlogdatapb.VEvent) error + useThrottler bool vevents chan *localVSchema vschema *localVSchema @@ -112,22 +113,23 @@ type streamerPlan struct { // // vschema: the current vschema. This value can later be changed through the SetVSchema method. // send: callback function to send events. -func newVStreamer(ctx context.Context, cp dbconfigs.Connector, se *schema.Engine, startPos string, stopPos string, filter *binlogdatapb.Filter, vschema *localVSchema, send func([]*binlogdatapb.VEvent) error, phase string, vse *Engine) *vstreamer { +func newVStreamer(ctx context.Context, cp dbconfigs.Connector, se *schema.Engine, startPos string, stopPos string, filter *binlogdatapb.Filter, vschema *localVSchema, useThrottler bool, send func([]*binlogdatapb.VEvent) error, phase string, vse *Engine) *vstreamer { ctx, cancel := context.WithCancel(ctx) return &vstreamer{ - ctx: ctx, - cancel: cancel, - cp: cp, - se: se, - startPos: startPos, - stopPos: stopPos, - filter: filter, - send: send, - vevents: make(chan *localVSchema, 1), - vschema: vschema, - plans: make(map[uint64]*streamerPlan), - phase: phase, - vse: vse, + ctx: ctx, + cancel: cancel, + cp: cp, + se: se, + startPos: startPos, + stopPos: stopPos, + useThrottler: useThrottler, + filter: filter, + send: send, + vevents: make(chan *localVSchema, 1), + vschema: vschema, + plans: make(map[uint64]*streamerPlan), + phase: phase, + vse: vse, } } @@ -301,20 +303,22 @@ func (vs *vstreamer) parseEvents(ctx context.Context, events <-chan mysql.Binlog throttledHeartbeatsRateLimiter := timer.NewRateLimiter(HeartbeatTime) defer throttledHeartbeatsRateLimiter.Stop() for { - // check throttler. - if !vs.vse.throttlerClient.ThrottleCheckOKOrWait(ctx) { - // make sure to leave if context is cancelled - select { - case <-ctx.Done(): - return - default: - // do nothing special + if vs.useThrottler { + // check throttler. + if !vs.vse.throttlerClient.ThrottleCheckOKOrWait(ctx) { + // make sure to leave if context is cancelled + select { + case <-ctx.Done(): + return + default: + // do nothing special + } + throttledHeartbeatsRateLimiter.Do(func() error { + return injectHeartbeat(true) + }) + // we won't process events, until we're no longer throttling + continue } - throttledHeartbeatsRateLimiter.Do(func() error { - return injectHeartbeat(true) - }) - // we won't process events, until we're no longer throttling - continue } select { case ev, ok := <-events: diff --git a/go/vt/vttablet/tabletserver/vstreamer/vstreamer_flaky_test.go b/go/vt/vttablet/tabletserver/vstreamer/vstreamer_flaky_test.go index a1d68b5eb81..1a99e7e9169 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/vstreamer_flaky_test.go +++ b/go/vt/vttablet/tabletserver/vstreamer/vstreamer_flaky_test.go @@ -607,7 +607,7 @@ func TestVStreamCopyWithDifferentFilters(t *testing.T) { var errGoroutine error go func() { defer wg.Done() - engine.Stream(ctx2, "", nil, filter, func(evs []*binlogdatapb.VEvent) error { + engine.Stream(ctx2, "", nil, filter, true, func(evs []*binlogdatapb.VEvent) error { for _, ev := range evs { if ev.Type == binlogdatapb.VEventType_HEARTBEAT { continue @@ -1925,7 +1925,7 @@ func TestMinimalMode(t *testing.T) { newEngine(t, "minimal") defer newEngine(t, "full") - err := engine.Stream(context.Background(), "current", nil, nil, func(evs []*binlogdatapb.VEvent) error { return nil }) + err := engine.Stream(context.Background(), "current", nil, nil, false, func(evs []*binlogdatapb.VEvent) error { return nil }) require.Error(t, err, "minimal binlog_row_image is not supported by Vitess VReplication") } @@ -2309,7 +2309,7 @@ func vstream(ctx context.Context, t *testing.T, pos string, tablePKs []*binlogda }}, } } - return engine.Stream(ctx, pos, tablePKs, filter, func(evs []*binlogdatapb.VEvent) error { + return engine.Stream(ctx, pos, tablePKs, filter, false, func(evs []*binlogdatapb.VEvent) error { timer := time.NewTimer(2 * time.Second) defer timer.Stop()