From 78f98fc70ae18b92d8b0536f80ebeb70138ee96f Mon Sep 17 00:00:00 2001 From: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> Date: Sun, 28 May 2023 17:01:00 +0300 Subject: [PATCH 1/4] Tablet throttler: non-PRIMARY tablets report back to PRIMARY throttler when they've been 'check'ed Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> --- .../tabletserver/throttle/check_result.go | 11 +++--- .../tabletserver/throttle/throttler.go | 37 +++++++++++++++++-- 2 files changed, 40 insertions(+), 8 deletions(-) diff --git a/go/vt/vttablet/tabletserver/throttle/check_result.go b/go/vt/vttablet/tabletserver/throttle/check_result.go index 52d52b78468..3bc162b623a 100644 --- a/go/vt/vttablet/tabletserver/throttle/check_result.go +++ b/go/vt/vttablet/tabletserver/throttle/check_result.go @@ -14,11 +14,12 @@ import ( // CheckResult is the result for an app inquiring on a metric. It also exports as JSON via the API type CheckResult struct { - StatusCode int `json:"StatusCode"` - Value float64 `json:"Value"` - Threshold float64 `json:"Threshold"` - Error error `json:"-"` - Message string `json:"Message"` + StatusCode int `json:"StatusCode"` + Value float64 `json:"Value"` + Threshold float64 `json:"Threshold"` + Error error `json:"-"` + Message string `json:"Message"` + RecentlyChecked bool `json:"RecentlyChecked"` } // NewCheckResult returns a CheckResult diff --git a/go/vt/vttablet/tabletserver/throttle/throttler.go b/go/vt/vttablet/tabletserver/throttle/throttler.go index 56e913bb119..939885601a2 100644 --- a/go/vt/vttablet/tabletserver/throttle/throttler.go +++ b/go/vt/vttablet/tabletserver/throttle/throttler.go @@ -126,6 +126,12 @@ type Throttler struct { srvTopoServer srvtopo.Server heartbeatWriter heartbeat.HeartbeatWriter + // recentCheckTickerValue is an ever increasing number, incrementing once per second. + recentCheckTickerValue int64 + // recentCheckValue is set to match or exceed recentCheckTickerValue whenever a "check" was made (other than by the throttler itself). + // when recentCheckValue < recentCheckTickerValue that means there hasn't been a recent check. + recentCheckValue int64 + throttleTabletTypesMap map[topodatapb.TabletType]bool mysqlThrottleMetricChan chan *mysql.MySQLThrottleMetric @@ -576,6 +582,7 @@ func (throttler *Throttler) Operate(ctx context.Context) { mysqlRefreshTicker := addTicker(mysqlRefreshInterval) mysqlAggregateTicker := addTicker(mysqlAggregateInterval) throttledAppsTicker := addTicker(throttledAppsSnapshotInterval) + recentCheckTicker := addTicker(time.Second) go func() { defer log.Infof("Throttler: Operate terminated, tickers stopped") @@ -670,6 +677,9 @@ func (throttler *Throttler) Operate(ctx context.Context) { } case throttlerConfig := <-throttler.throttlerConfigChan: throttler.applyThrottlerConfig(ctx, throttlerConfig) + case <-recentCheckTicker.C: + // Increment recentCheckTickerValue by one. + atomic.AddInt64(&throttler.recentCheckTickerValue, 1) } } }() @@ -682,7 +692,7 @@ func (throttler *Throttler) generateTabletHTTPProbeFunction(ctx context.Context, mySQLThrottleMetric.ClusterName = clusterName mySQLThrottleMetric.Key = probe.Key - tabletCheckSelfURL := fmt.Sprintf("http://%s:%d/throttler/check-self?app=vitess", probe.TabletHost, probe.TabletPort) + tabletCheckSelfURL := fmt.Sprintf("http://%s:%d/throttler/check-self?app=%s", probe.TabletHost, probe.TabletPort, vitessAppName) resp, err := throttler.httpClient.Get(tabletCheckSelfURL) if err != nil { mySQLThrottleMetric.Err = err @@ -704,6 +714,11 @@ func (throttler *Throttler) generateTabletHTTPProbeFunction(ctx context.Context, if checkResult.StatusCode == http.StatusInternalServerError { mySQLThrottleMetric.Err = fmt.Errorf("Status code: %d", checkResult.StatusCode) } + if checkResult.RecentlyChecked { + // We have just probed a tablet, and it reported back that someone just recently "check"ed it. + // We therefore renew the heartbeats lease. + go throttler.heartbeatWriter.RequestHeartbeats() + } return mySQLThrottleMetric } } @@ -1015,7 +1030,17 @@ func (throttler *Throttler) checkStore(ctx context.Context, appName string, stor if !throttler.IsEnabled() { return okMetricCheckResult } - return throttler.check.Check(ctx, appName, "mysql", storeName, remoteAddr, flags) + checkResult = throttler.check.Check(ctx, appName, "mysql", storeName, remoteAddr, flags) + + if atomic.LoadInt64(&throttler.recentCheckValue) >= atomic.LoadInt64(&throttler.recentCheckTickerValue) { + // This indicates someone, who is not "vitess" ie not internal to the throttling logic, did a _recent_ `check`. + // This could be online-ddl, or vreplication or whoever else. + // If this tablet is a REPLICA or RDONLY, we want to advertise to the PRIMARY that someone did a recent check, + // so that the PRIMARY knows it must renew the heartbeat lease. + checkResult.RecentlyChecked = true + } + + return checkResult } // checkShard checks the health of the shard, and runs on the primary tablet only @@ -1031,7 +1056,13 @@ func (throttler *Throttler) checkSelf(ctx context.Context, appName string, remot // CheckByType runs a check by requested check type func (throttler *Throttler) CheckByType(ctx context.Context, appName string, remoteAddr string, flags *CheckFlags, checkType ThrottleCheckType) (checkResult *CheckResult) { if throttler.IsEnabled() && !flags.SkipRequestHeartbeats { - go throttler.heartbeatWriter.RequestHeartbeats() + if appName != vitessAppName { + go throttler.heartbeatWriter.RequestHeartbeats() + // This check was made by someone other than the throttler itself, i.e. this came from online-ddl or vreplication or other. + // We mark the fact that someone just made a check. If this is a REPLICA or RDONLY tables, this will be reported back + // to the PRIMARY so that it knows it must renew the heartbeat lease. + atomic.StoreInt64(&throttler.recentCheckValue, 1+atomic.LoadInt64(&throttler.recentCheckTickerValue)) + } } switch checkType { case ThrottleCheckSelf: From 379ec2b541098523b166a3733422a45e4c488e67 Mon Sep 17 00:00:00 2001 From: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> Date: Mon, 29 May 2023 06:57:01 +0300 Subject: [PATCH 2/4] inclusive language Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> --- .../tabletserver/throttle/mysql/mysql_throttle_metric.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/go/vt/vttablet/tabletserver/throttle/mysql/mysql_throttle_metric.go b/go/vt/vttablet/tabletserver/throttle/mysql/mysql_throttle_metric.go index 3c2fe005828..8c8a5cc4b32 100644 --- a/go/vt/vttablet/tabletserver/throttle/mysql/mysql_throttle_metric.go +++ b/go/vt/vttablet/tabletserver/throttle/mysql/mysql_throttle_metric.go @@ -95,7 +95,7 @@ func (metric *MySQLThrottleMetric) Get() (float64, error) { } // ReadThrottleMetric returns a metric for the given probe. Either by explicit query -// or via SHOW SLAVE STATUS +// or via SHOW REPLICA STATUS func ReadThrottleMetric(probe *Probe, clusterName string, overrideGetMetricFunc func() *MySQLThrottleMetric) (mySQLThrottleMetric *MySQLThrottleMetric) { if mySQLThrottleMetric := getCachedMySQLThrottleMetric(probe); mySQLThrottleMetric != nil { return mySQLThrottleMetric From 1ef74c3c78ea06f1085fc3dd60ec459f1d366f00 Mon Sep 17 00:00:00 2001 From: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> Date: Mon, 29 May 2023 14:49:32 +0300 Subject: [PATCH 3/4] vstreamer: support 'useThrottler' so that clients can choose whther they at all want to involve the throttler. Some lightweight clients, such as the schema tracker or the binlog watcher, or messager, do not need the throttler, and since some of these clients are _always on_, we also do not _want_ them to continuously approach the throttler. One side effect of always engaging with the throttler is the infinite renewal of on-demand heartbeats Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> --- .../tabletmanager/vdiff/framework_test.go | 2 +- .../vreplication/external_connector.go | 2 +- .../vreplication/framework_test.go | 2 +- .../vreplication/replica_connector.go | 2 +- go/vt/vttablet/tabletserver/binlog_watcher.go | 4 +- .../vttablet/tabletserver/messager/engine.go | 2 +- .../tabletserver/messager/message_manager.go | 2 +- .../messager/message_manager_test.go | 2 +- go/vt/vttablet/tabletserver/schema/tracker.go | 4 +- .../tabletserver/schema/tracker_test.go | 2 +- go/vt/vttablet/tabletserver/tabletserver.go | 2 +- .../vttablet/tabletserver/throttle/client.go | 2 +- .../tabletserver/throttle/throttler.go | 14 ++-- go/vt/vttablet/tabletserver/vstreamer/copy.go | 4 +- .../vttablet/tabletserver/vstreamer/engine.go | 4 +- .../tabletserver/vstreamer/engine_test.go | 2 +- .../tabletserver/vstreamer/uvstreamer.go | 42 ++++++------ .../vstreamer/uvstreamer_flaky_test.go | 2 +- .../tabletserver/vstreamer/vstreamer.go | 68 ++++++++++--------- .../vstreamer/vstreamer_flaky_test.go | 6 +- 20 files changed, 87 insertions(+), 83 deletions(-) diff --git a/go/vt/vttablet/tabletmanager/vdiff/framework_test.go b/go/vt/vttablet/tabletmanager/vdiff/framework_test.go index bb173c41abc..bd1c3f13005 100644 --- a/go/vt/vttablet/tabletmanager/vdiff/framework_test.go +++ b/go/vt/vttablet/tabletmanager/vdiff/framework_test.go @@ -235,7 +235,7 @@ func (ftc *fakeTabletConn) VStream(ctx context.Context, request *binlogdatapb.VS if vstreamHook != nil { vstreamHook(ctx) } - return vdiffenv.vse.Stream(ctx, request.Position, request.TableLastPKs, request.Filter, send) + return vdiffenv.vse.Stream(ctx, request.Position, request.TableLastPKs, request.Filter, false, send) } // vstreamRowsHook allows you to do work just before calling VStreamRows. diff --git a/go/vt/vttablet/tabletmanager/vreplication/external_connector.go b/go/vt/vttablet/tabletmanager/vreplication/external_connector.go index ac411de7ce6..0bebf076fef 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/external_connector.go +++ b/go/vt/vttablet/tabletmanager/vreplication/external_connector.go @@ -126,7 +126,7 @@ func (c *mysqlConnector) Close(ctx context.Context) error { } func (c *mysqlConnector) VStream(ctx context.Context, startPos string, tablePKs []*binlogdatapb.TableLastPK, filter *binlogdatapb.Filter, send func([]*binlogdatapb.VEvent) error) error { - return c.vstreamer.Stream(ctx, startPos, tablePKs, filter, send) + return c.vstreamer.Stream(ctx, startPos, tablePKs, filter, true, send) } func (c *mysqlConnector) VStreamRows(ctx context.Context, query string, lastpk *querypb.QueryResult, send func(*binlogdatapb.VStreamRowsResponse) error) error { diff --git a/go/vt/vttablet/tabletmanager/vreplication/framework_test.go b/go/vt/vttablet/tabletmanager/vreplication/framework_test.go index 5ed944125af..9d75f7381e8 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/framework_test.go +++ b/go/vt/vttablet/tabletmanager/vreplication/framework_test.go @@ -307,7 +307,7 @@ func (ftc *fakeTabletConn) VStream(ctx context.Context, request *binlogdatapb.VS if vstreamHook != nil { vstreamHook(ctx) } - return streamerEngine.Stream(ctx, request.Position, request.TableLastPKs, request.Filter, send) + return streamerEngine.Stream(ctx, request.Position, request.TableLastPKs, request.Filter, false, send) } // vstreamRowsHook allows you to do work just before calling VStreamRows. diff --git a/go/vt/vttablet/tabletmanager/vreplication/replica_connector.go b/go/vt/vttablet/tabletmanager/vreplication/replica_connector.go index 3954f4d0546..2fa2b3f148c 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/replica_connector.go +++ b/go/vt/vttablet/tabletmanager/vreplication/replica_connector.go @@ -84,7 +84,7 @@ func (c *ReplicaConnector) Close(ctx context.Context) error { } func (c *ReplicaConnector) VStream(ctx context.Context, startPos string, filter *binlogdatapb.Filter, send func([]*binlogdatapb.VEvent) error) error { - return c.vstreamer.Stream(ctx, startPos, nil, filter, send) + return c.vstreamer.Stream(ctx, startPos, nil, filter, true, send) } // VStreamRows streams rows from query result diff --git a/go/vt/vttablet/tabletserver/binlog_watcher.go b/go/vt/vttablet/tabletserver/binlog_watcher.go index 6c713791e6f..85472fa5d8d 100644 --- a/go/vt/vttablet/tabletserver/binlog_watcher.go +++ b/go/vt/vttablet/tabletserver/binlog_watcher.go @@ -31,7 +31,7 @@ import ( // VStreamer defines the functions of VStreamer // that the BinlogWatcher needs. type VStreamer interface { - Stream(ctx context.Context, startPos string, tablePKs []*binlogdatapb.TableLastPK, filter *binlogdatapb.Filter, send func([]*binlogdatapb.VEvent) error) error + Stream(ctx context.Context, startPos string, tablePKs []*binlogdatapb.TableLastPK, filter *binlogdatapb.Filter, useThrottler bool, send func([]*binlogdatapb.VEvent) error) error } // BinlogWatcher is a tabletserver service that watches the @@ -91,7 +91,7 @@ func (blw *BinlogWatcher) process(ctx context.Context) { for { // VStreamer will reload the schema when it encounters a DDL. - err := blw.vs.Stream(ctx, "current", nil, filter, func(events []*binlogdatapb.VEvent) error { + err := blw.vs.Stream(ctx, "current", nil, filter, false, func(events []*binlogdatapb.VEvent) error { return nil }) log.Infof("ReplicationWatcher VStream ended: %v, retrying in 5 seconds", err) diff --git a/go/vt/vttablet/tabletserver/messager/engine.go b/go/vt/vttablet/tabletserver/messager/engine.go index 2d7fdf2bb82..caed01e3c9b 100644 --- a/go/vt/vttablet/tabletserver/messager/engine.go +++ b/go/vt/vttablet/tabletserver/messager/engine.go @@ -44,7 +44,7 @@ type TabletService interface { // VStreamer defines the functions of VStreamer // that the messager needs. type VStreamer interface { - Stream(ctx context.Context, startPos string, tablePKs []*binlogdatapb.TableLastPK, filter *binlogdatapb.Filter, send func([]*binlogdatapb.VEvent) error) error + Stream(ctx context.Context, startPos string, tablePKs []*binlogdatapb.TableLastPK, filter *binlogdatapb.Filter, useThrottler bool, send func([]*binlogdatapb.VEvent) error) error StreamResults(ctx context.Context, query string, send func(*binlogdatapb.VStreamResultsResponse) error) error } diff --git a/go/vt/vttablet/tabletserver/messager/message_manager.go b/go/vt/vttablet/tabletserver/messager/message_manager.go index 39598169baa..4f9db3e0355 100644 --- a/go/vt/vttablet/tabletserver/messager/message_manager.go +++ b/go/vt/vttablet/tabletserver/messager/message_manager.go @@ -684,7 +684,7 @@ func (mm *messageManager) runOneVStream(ctx context.Context) error { var curPos string var fields []*querypb.Field - err := mm.vs.Stream(ctx, "current", nil, mm.vsFilter, func(events []*binlogdatapb.VEvent) error { + err := mm.vs.Stream(ctx, "current", nil, mm.vsFilter, false, func(events []*binlogdatapb.VEvent) error { // We need to get the flow control lock mm.cacheManagementMu.Lock() defer mm.cacheManagementMu.Unlock() diff --git a/go/vt/vttablet/tabletserver/messager/message_manager_test.go b/go/vt/vttablet/tabletserver/messager/message_manager_test.go index d6da59db065..51cfc881bee 100644 --- a/go/vt/vttablet/tabletserver/messager/message_manager_test.go +++ b/go/vt/vttablet/tabletserver/messager/message_manager_test.go @@ -888,7 +888,7 @@ func (fv *fakeVStreamer) setPollerResponse(pr []*binlogdatapb.VStreamResultsResp fv.pollerResponse = pr } -func (fv *fakeVStreamer) Stream(ctx context.Context, startPos string, tablePKs []*binlogdatapb.TableLastPK, filter *binlogdatapb.Filter, send func([]*binlogdatapb.VEvent) error) error { +func (fv *fakeVStreamer) Stream(ctx context.Context, startPos string, tablePKs []*binlogdatapb.TableLastPK, filter *binlogdatapb.Filter, useThrottler bool, send func([]*binlogdatapb.VEvent) error) error { fv.streamInvocations.Add(1) for { fv.mu.Lock() diff --git a/go/vt/vttablet/tabletserver/schema/tracker.go b/go/vt/vttablet/tabletserver/schema/tracker.go index 490ad59eadf..e0d8b61e9ed 100644 --- a/go/vt/vttablet/tabletserver/schema/tracker.go +++ b/go/vt/vttablet/tabletserver/schema/tracker.go @@ -40,7 +40,7 @@ import ( // VStreamer defines the functions of VStreamer // that the replicationWatcher needs. type VStreamer interface { - Stream(ctx context.Context, startPos string, tablePKs []*binlogdatapb.TableLastPK, filter *binlogdatapb.Filter, send func([]*binlogdatapb.VEvent) error) error + Stream(ctx context.Context, startPos string, tablePKs []*binlogdatapb.TableLastPK, filter *binlogdatapb.Filter, useThrottler bool, send func([]*binlogdatapb.VEvent) error) error } // Tracker watches the replication and saves the latest schema into the schema_version table when a DDL is encountered. @@ -129,7 +129,7 @@ func (tr *Tracker) process(ctx context.Context) { var gtid string for { - err := tr.vs.Stream(ctx, "current", nil, filter, func(events []*binlogdatapb.VEvent) error { + err := tr.vs.Stream(ctx, "current", nil, filter, false, func(events []*binlogdatapb.VEvent) error { for _, event := range events { if event.Type == binlogdatapb.VEventType_GTID { gtid = event.Gtid diff --git a/go/vt/vttablet/tabletserver/schema/tracker_test.go b/go/vt/vttablet/tabletserver/schema/tracker_test.go index 9822b6bfe5a..50f262169a4 100644 --- a/go/vt/vttablet/tabletserver/schema/tracker_test.go +++ b/go/vt/vttablet/tabletserver/schema/tracker_test.go @@ -137,7 +137,7 @@ type fakeVstreamer struct { events [][]*binlogdatapb.VEvent } -func (f *fakeVstreamer) Stream(ctx context.Context, startPos string, tablePKs []*binlogdatapb.TableLastPK, filter *binlogdatapb.Filter, send func([]*binlogdatapb.VEvent) error) error { +func (f *fakeVstreamer) Stream(ctx context.Context, startPos string, tablePKs []*binlogdatapb.TableLastPK, filter *binlogdatapb.Filter, useThrottler bool, send func([]*binlogdatapb.VEvent) error) error { for _, events := range f.events { err := send(events) if err != nil { diff --git a/go/vt/vttablet/tabletserver/tabletserver.go b/go/vt/vttablet/tabletserver/tabletserver.go index ff641fe7198..d86b6b8ab1d 100644 --- a/go/vt/vttablet/tabletserver/tabletserver.go +++ b/go/vt/vttablet/tabletserver/tabletserver.go @@ -1144,7 +1144,7 @@ func (tsv *TabletServer) VStream(ctx context.Context, request *binlogdatapb.VStr if err := tsv.sm.VerifyTarget(ctx, request.Target); err != nil { return err } - return tsv.vstreamer.Stream(ctx, request.Position, request.TableLastPKs, request.Filter, send) + return tsv.vstreamer.Stream(ctx, request.Position, request.TableLastPKs, request.Filter, true, send) } // VStreamRows streams rows from the specified starting point. diff --git a/go/vt/vttablet/tabletserver/throttle/client.go b/go/vt/vttablet/tabletserver/throttle/client.go index 10fe910d264..9a3f699b2da 100644 --- a/go/vt/vttablet/tabletserver/throttle/client.go +++ b/go/vt/vttablet/tabletserver/throttle/client.go @@ -140,7 +140,7 @@ func (c *Client) Throttle(ctx context.Context) { return } if c.ThrottleCheckOKOrWait(ctx) { - break + return } } } diff --git a/go/vt/vttablet/tabletserver/throttle/throttler.go b/go/vt/vttablet/tabletserver/throttle/throttler.go index 939885601a2..b10100ec01c 100644 --- a/go/vt/vttablet/tabletserver/throttle/throttler.go +++ b/go/vt/vttablet/tabletserver/throttle/throttler.go @@ -1055,14 +1055,12 @@ func (throttler *Throttler) checkSelf(ctx context.Context, appName string, remot // CheckByType runs a check by requested check type func (throttler *Throttler) CheckByType(ctx context.Context, appName string, remoteAddr string, flags *CheckFlags, checkType ThrottleCheckType) (checkResult *CheckResult) { - if throttler.IsEnabled() && !flags.SkipRequestHeartbeats { - if appName != vitessAppName { - go throttler.heartbeatWriter.RequestHeartbeats() - // This check was made by someone other than the throttler itself, i.e. this came from online-ddl or vreplication or other. - // We mark the fact that someone just made a check. If this is a REPLICA or RDONLY tables, this will be reported back - // to the PRIMARY so that it knows it must renew the heartbeat lease. - atomic.StoreInt64(&throttler.recentCheckValue, 1+atomic.LoadInt64(&throttler.recentCheckTickerValue)) - } + if throttler.IsEnabled() && !flags.SkipRequestHeartbeats && appName != vitessAppName { + go throttler.heartbeatWriter.RequestHeartbeats() + // This check was made by someone other than the throttler itself, i.e. this came from online-ddl or vreplication or other. + // We mark the fact that someone just made a check. If this is a REPLICA or RDONLY tables, this will be reported back + // to the PRIMARY so that it knows it must renew the heartbeat lease. + atomic.StoreInt64(&throttler.recentCheckValue, 1+atomic.LoadInt64(&throttler.recentCheckTickerValue)) } switch checkType { case ThrottleCheckSelf: diff --git a/go/vt/vttablet/tabletserver/vstreamer/copy.go b/go/vt/vttablet/tabletserver/vstreamer/copy.go index 62b93cf5063..dcd8dacd121 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/copy.go +++ b/go/vt/vttablet/tabletserver/vstreamer/copy.go @@ -73,7 +73,7 @@ func (uvs *uvstreamer) catchup(ctx context.Context) error { errch := make(chan error, 1) go func() { startPos := mysql.EncodePosition(uvs.pos) - vs := newVStreamer(ctx, uvs.cp, uvs.se, startPos, "", uvs.filter, uvs.getVSchema(), uvs.send2, "catchup", uvs.vse) + vs := newVStreamer(ctx, uvs.cp, uvs.se, startPos, "", uvs.filter, uvs.getVSchema(), uvs.useThrottler, uvs.send2, "catchup", uvs.vse) uvs.setVs(vs) errch <- vs.Stream() uvs.setVs(nil) @@ -302,7 +302,7 @@ func (uvs *uvstreamer) fastForward(stopPos string) error { }() log.Infof("starting fastForward from %s upto pos %s", mysql.EncodePosition(uvs.pos), stopPos) uvs.stopPos, _ = mysql.DecodePosition(stopPos) - vs := newVStreamer(uvs.ctx, uvs.cp, uvs.se, mysql.EncodePosition(uvs.pos), "", uvs.filter, uvs.getVSchema(), uvs.send2, "fastforward", uvs.vse) + vs := newVStreamer(uvs.ctx, uvs.cp, uvs.se, mysql.EncodePosition(uvs.pos), "", uvs.filter, uvs.getVSchema(), uvs.useThrottler, uvs.send2, "fastforward", uvs.vse) uvs.setVs(vs) return vs.Stream() } diff --git a/go/vt/vttablet/tabletserver/vstreamer/engine.go b/go/vt/vttablet/tabletserver/vstreamer/engine.go index 829d0b88a5f..9e8074fb4b0 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/engine.go +++ b/go/vt/vttablet/tabletserver/vstreamer/engine.go @@ -226,7 +226,7 @@ func (vse *Engine) validateBinlogRowImage(ctx context.Context, db dbconfigs.Conn // Stream starts a new stream. // This streams events from the binary logs -func (vse *Engine) Stream(ctx context.Context, startPos string, tablePKs []*binlogdatapb.TableLastPK, filter *binlogdatapb.Filter, send func([]*binlogdatapb.VEvent) error) error { +func (vse *Engine) Stream(ctx context.Context, startPos string, tablePKs []*binlogdatapb.TableLastPK, filter *binlogdatapb.Filter, useThrottler bool, send func([]*binlogdatapb.VEvent) error) error { if err := vse.validateBinlogRowImage(ctx, vse.se.GetDBConnector()); err != nil { return err @@ -244,7 +244,7 @@ func (vse *Engine) Stream(ctx context.Context, startPos string, tablePKs []*binl } vse.mu.Lock() defer vse.mu.Unlock() - streamer := newUVStreamer(ctx, vse, vse.env.Config().DB.FilteredWithDB(), vse.se, startPos, tablePKs, filter, vse.lvschema, send) + streamer := newUVStreamer(ctx, vse, vse.env.Config().DB.FilteredWithDB(), vse.se, startPos, tablePKs, filter, vse.lvschema, useThrottler, send) idx := vse.streamIdx vse.streamers[idx] = streamer vse.streamIdx++ diff --git a/go/vt/vttablet/tabletserver/vstreamer/engine_test.go b/go/vt/vttablet/tabletserver/vstreamer/engine_test.go index 16729e8fd24..d271164558a 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/engine_test.go +++ b/go/vt/vttablet/tabletserver/vstreamer/engine_test.go @@ -93,7 +93,7 @@ func TestUpdateVSchema(t *testing.T) { }}, } // Stream should terminate immediately due to invalid pos. - _ = engine.Stream(ctx, "invalid", nil, filter, func(_ []*binlogdatapb.VEvent) error { + _ = engine.Stream(ctx, "invalid", nil, filter, false, func(_ []*binlogdatapb.VEvent) error { return nil }) diff --git a/go/vt/vttablet/tabletserver/vstreamer/uvstreamer.go b/go/vt/vttablet/tabletserver/vstreamer/uvstreamer.go index e26b6f2939f..e33ca77e478 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/uvstreamer.go +++ b/go/vt/vttablet/tabletserver/vstreamer/uvstreamer.go @@ -51,13 +51,14 @@ type uvstreamer struct { cancel func() // input parameters - vse *Engine - send func([]*binlogdatapb.VEvent) error - cp dbconfigs.Connector - se *schema.Engine - startPos string - filter *binlogdatapb.Filter - inTablePKs []*binlogdatapb.TableLastPK + vse *Engine + send func([]*binlogdatapb.VEvent) error + cp dbconfigs.Connector + se *schema.Engine + startPos string + filter *binlogdatapb.Filter + inTablePKs []*binlogdatapb.TableLastPK + useThrottler bool vschema *localVSchema @@ -90,7 +91,7 @@ type uvstreamerConfig struct { CatchupRetryTime time.Duration } -func newUVStreamer(ctx context.Context, vse *Engine, cp dbconfigs.Connector, se *schema.Engine, startPos string, tablePKs []*binlogdatapb.TableLastPK, filter *binlogdatapb.Filter, vschema *localVSchema, send func([]*binlogdatapb.VEvent) error) *uvstreamer { +func newUVStreamer(ctx context.Context, vse *Engine, cp dbconfigs.Connector, se *schema.Engine, startPos string, tablePKs []*binlogdatapb.TableLastPK, filter *binlogdatapb.Filter, vschema *localVSchema, useThrottler bool, send func([]*binlogdatapb.VEvent) error) *uvstreamer { ctx, cancel := context.WithCancel(ctx) config := &uvstreamerConfig{ MaxReplicationLag: 1 * time.Nanosecond, @@ -105,17 +106,18 @@ func newUVStreamer(ctx context.Context, vse *Engine, cp dbconfigs.Connector, se return send(evs) } uvs := &uvstreamer{ - ctx: ctx, - cancel: cancel, - vse: vse, - send: send2, - cp: cp, - se: se, - startPos: startPos, - filter: filter, - vschema: vschema, - config: config, - inTablePKs: tablePKs, + ctx: ctx, + cancel: cancel, + vse: vse, + send: send2, + cp: cp, + se: se, + startPos: startPos, + filter: filter, + vschema: vschema, + config: config, + inTablePKs: tablePKs, + useThrottler: useThrottler, } return uvs @@ -418,7 +420,7 @@ func (uvs *uvstreamer) Stream() error { } } vs := newVStreamer(uvs.ctx, uvs.cp, uvs.se, mysql.EncodePosition(uvs.pos), mysql.EncodePosition(uvs.stopPos), - uvs.filter, uvs.getVSchema(), uvs.send, "replicate", uvs.vse) + uvs.filter, uvs.getVSchema(), uvs.useThrottler, uvs.send, "replicate", uvs.vse) uvs.setVs(vs) return vs.Stream() diff --git a/go/vt/vttablet/tabletserver/vstreamer/uvstreamer_flaky_test.go b/go/vt/vttablet/tabletserver/vstreamer/uvstreamer_flaky_test.go index 7d22f9c0034..ec2a8d2334d 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/uvstreamer_flaky_test.go +++ b/go/vt/vttablet/tabletserver/vstreamer/uvstreamer_flaky_test.go @@ -440,7 +440,7 @@ func getEventCallback(event *binlogdatapb.VEvent) func() { func startVStreamCopy(ctx context.Context, t *testing.T, filter *binlogdatapb.Filter, tablePKs []*binlogdatapb.TableLastPK) { pos := "" go func() { - err := engine.Stream(ctx, pos, tablePKs, filter, func(evs []*binlogdatapb.VEvent) error { + err := engine.Stream(ctx, pos, tablePKs, filter, true, func(evs []*binlogdatapb.VEvent) error { //t.Logf("Received events: %v", evs) muAllEvents.Lock() defer muAllEvents.Unlock() diff --git a/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go b/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go index d9ac730e43e..cf5c5fb350c 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go +++ b/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go @@ -61,11 +61,12 @@ type vstreamer struct { ctx context.Context cancel func() - cp dbconfigs.Connector - se *schema.Engine - startPos string - filter *binlogdatapb.Filter - send func([]*binlogdatapb.VEvent) error + cp dbconfigs.Connector + se *schema.Engine + startPos string + filter *binlogdatapb.Filter + send func([]*binlogdatapb.VEvent) error + useThrottler bool vevents chan *localVSchema vschema *localVSchema @@ -112,22 +113,23 @@ type streamerPlan struct { // // vschema: the current vschema. This value can later be changed through the SetVSchema method. // send: callback function to send events. -func newVStreamer(ctx context.Context, cp dbconfigs.Connector, se *schema.Engine, startPos string, stopPos string, filter *binlogdatapb.Filter, vschema *localVSchema, send func([]*binlogdatapb.VEvent) error, phase string, vse *Engine) *vstreamer { +func newVStreamer(ctx context.Context, cp dbconfigs.Connector, se *schema.Engine, startPos string, stopPos string, filter *binlogdatapb.Filter, vschema *localVSchema, useThrottler bool, send func([]*binlogdatapb.VEvent) error, phase string, vse *Engine) *vstreamer { ctx, cancel := context.WithCancel(ctx) return &vstreamer{ - ctx: ctx, - cancel: cancel, - cp: cp, - se: se, - startPos: startPos, - stopPos: stopPos, - filter: filter, - send: send, - vevents: make(chan *localVSchema, 1), - vschema: vschema, - plans: make(map[uint64]*streamerPlan), - phase: phase, - vse: vse, + ctx: ctx, + cancel: cancel, + cp: cp, + se: se, + startPos: startPos, + stopPos: stopPos, + useThrottler: useThrottler, + filter: filter, + send: send, + vevents: make(chan *localVSchema, 1), + vschema: vschema, + plans: make(map[uint64]*streamerPlan), + phase: phase, + vse: vse, } } @@ -301,20 +303,22 @@ func (vs *vstreamer) parseEvents(ctx context.Context, events <-chan mysql.Binlog throttledHeartbeatsRateLimiter := timer.NewRateLimiter(HeartbeatTime) defer throttledHeartbeatsRateLimiter.Stop() for { - // check throttler. - if !vs.vse.throttlerClient.ThrottleCheckOKOrWait(ctx) { - // make sure to leave if context is cancelled - select { - case <-ctx.Done(): - return - default: - // do nothing special + if vs.useThrottler { + // check throttler. + if !vs.vse.throttlerClient.ThrottleCheckOKOrWait(ctx) { + // make sure to leave if context is cancelled + select { + case <-ctx.Done(): + return + default: + // do nothing special + } + throttledHeartbeatsRateLimiter.Do(func() error { + return injectHeartbeat(true) + }) + // we won't process events, until we're no longer throttling + continue } - throttledHeartbeatsRateLimiter.Do(func() error { - return injectHeartbeat(true) - }) - // we won't process events, until we're no longer throttling - continue } select { case ev, ok := <-events: diff --git a/go/vt/vttablet/tabletserver/vstreamer/vstreamer_flaky_test.go b/go/vt/vttablet/tabletserver/vstreamer/vstreamer_flaky_test.go index a1d68b5eb81..1a99e7e9169 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/vstreamer_flaky_test.go +++ b/go/vt/vttablet/tabletserver/vstreamer/vstreamer_flaky_test.go @@ -607,7 +607,7 @@ func TestVStreamCopyWithDifferentFilters(t *testing.T) { var errGoroutine error go func() { defer wg.Done() - engine.Stream(ctx2, "", nil, filter, func(evs []*binlogdatapb.VEvent) error { + engine.Stream(ctx2, "", nil, filter, true, func(evs []*binlogdatapb.VEvent) error { for _, ev := range evs { if ev.Type == binlogdatapb.VEventType_HEARTBEAT { continue @@ -1925,7 +1925,7 @@ func TestMinimalMode(t *testing.T) { newEngine(t, "minimal") defer newEngine(t, "full") - err := engine.Stream(context.Background(), "current", nil, nil, func(evs []*binlogdatapb.VEvent) error { return nil }) + err := engine.Stream(context.Background(), "current", nil, nil, false, func(evs []*binlogdatapb.VEvent) error { return nil }) require.Error(t, err, "minimal binlog_row_image is not supported by Vitess VReplication") } @@ -2309,7 +2309,7 @@ func vstream(ctx context.Context, t *testing.T, pos string, tablePKs []*binlogda }}, } } - return engine.Stream(ctx, pos, tablePKs, filter, func(evs []*binlogdatapb.VEvent) error { + return engine.Stream(ctx, pos, tablePKs, filter, false, func(evs []*binlogdatapb.VEvent) error { timer := time.NewTimer(2 * time.Second) defer timer.Stop() From 94b6f036dae405945aa2579921ce2f4c7391fac6 Mon Sep 17 00:00:00 2001 From: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> Date: Mon, 29 May 2023 14:49:54 +0300 Subject: [PATCH 4/4] add app name in test Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> --- .../endtoend/tabletmanager/throttler_topo/throttler_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/go/test/endtoend/tabletmanager/throttler_topo/throttler_test.go b/go/test/endtoend/tabletmanager/throttler_topo/throttler_test.go index fe87262a21f..a3204e0be2f 100644 --- a/go/test/endtoend/tabletmanager/throttler_topo/throttler_test.go +++ b/go/test/endtoend/tabletmanager/throttler_topo/throttler_test.go @@ -172,12 +172,12 @@ func throttledApps(tablet *cluster.Vttablet) (resp *http.Response, respBody stri } func throttleCheck(tablet *cluster.Vttablet, skipRequestHeartbeats bool) (*http.Response, error) { - resp, err := httpClient.Get(fmt.Sprintf("http://localhost:%d/%s?s=%t", tablet.HTTPPort, checkAPIPath, skipRequestHeartbeats)) + resp, err := httpClient.Get(fmt.Sprintf("http://localhost:%d/%s?app=test&s=%t", tablet.HTTPPort, checkAPIPath, skipRequestHeartbeats)) return resp, err } func throttleCheckSelf(tablet *cluster.Vttablet) (*http.Response, error) { - return httpClient.Get(fmt.Sprintf("http://localhost:%d/%s", tablet.HTTPPort, checkSelfAPIPath)) + return httpClient.Get(fmt.Sprintf("http://localhost:%d/%s?app=test", tablet.HTTPPort, checkSelfAPIPath)) } func warmUpHeartbeat(t *testing.T) (respStatus int) {