Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

vstreamer: support 'useThrottler' so that clients can choose whther they at all want to involve the throttler #13187

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -172,12 +172,12 @@ func throttledApps(tablet *cluster.Vttablet) (resp *http.Response, respBody stri
}

func throttleCheck(tablet *cluster.Vttablet, skipRequestHeartbeats bool) (*http.Response, error) {
resp, err := httpClient.Get(fmt.Sprintf("http://localhost:%d/%s?s=%t", tablet.HTTPPort, checkAPIPath, skipRequestHeartbeats))
resp, err := httpClient.Get(fmt.Sprintf("http://localhost:%d/%s?app=test&s=%t", tablet.HTTPPort, checkAPIPath, skipRequestHeartbeats))
return resp, err
}

func throttleCheckSelf(tablet *cluster.Vttablet) (*http.Response, error) {
return httpClient.Get(fmt.Sprintf("http://localhost:%d/%s", tablet.HTTPPort, checkSelfAPIPath))
return httpClient.Get(fmt.Sprintf("http://localhost:%d/%s?app=test", tablet.HTTPPort, checkSelfAPIPath))
}

func warmUpHeartbeat(t *testing.T) (respStatus int) {
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vttablet/tabletmanager/vdiff/framework_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ func (ftc *fakeTabletConn) VStream(ctx context.Context, request *binlogdatapb.VS
if vstreamHook != nil {
vstreamHook(ctx)
}
return vdiffenv.vse.Stream(ctx, request.Position, request.TableLastPKs, request.Filter, send)
return vdiffenv.vse.Stream(ctx, request.Position, request.TableLastPKs, request.Filter, false, send)
}

// vstreamRowsHook allows you to do work just before calling VStreamRows.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ func (c *mysqlConnector) Close(ctx context.Context) error {
}

func (c *mysqlConnector) VStream(ctx context.Context, startPos string, tablePKs []*binlogdatapb.TableLastPK, filter *binlogdatapb.Filter, send func([]*binlogdatapb.VEvent) error) error {
return c.vstreamer.Stream(ctx, startPos, tablePKs, filter, send)
return c.vstreamer.Stream(ctx, startPos, tablePKs, filter, true, send)
}

func (c *mysqlConnector) VStreamRows(ctx context.Context, query string, lastpk *querypb.QueryResult, send func(*binlogdatapb.VStreamRowsResponse) error) error {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,7 @@ func (ftc *fakeTabletConn) VStream(ctx context.Context, request *binlogdatapb.VS
if vstreamHook != nil {
vstreamHook(ctx)
}
return streamerEngine.Stream(ctx, request.Position, request.TableLastPKs, request.Filter, send)
return streamerEngine.Stream(ctx, request.Position, request.TableLastPKs, request.Filter, false, send)
}

// vstreamRowsHook allows you to do work just before calling VStreamRows.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func (c *ReplicaConnector) Close(ctx context.Context) error {
}

func (c *ReplicaConnector) VStream(ctx context.Context, startPos string, filter *binlogdatapb.Filter, send func([]*binlogdatapb.VEvent) error) error {
return c.vstreamer.Stream(ctx, startPos, nil, filter, send)
return c.vstreamer.Stream(ctx, startPos, nil, filter, true, send)
}

// VStreamRows streams rows from query result
Expand Down
4 changes: 2 additions & 2 deletions go/vt/vttablet/tabletserver/binlog_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import (
// VStreamer defines the functions of VStreamer
// that the BinlogWatcher needs.
type VStreamer interface {
Stream(ctx context.Context, startPos string, tablePKs []*binlogdatapb.TableLastPK, filter *binlogdatapb.Filter, send func([]*binlogdatapb.VEvent) error) error
Stream(ctx context.Context, startPos string, tablePKs []*binlogdatapb.TableLastPK, filter *binlogdatapb.Filter, useThrottler bool, send func([]*binlogdatapb.VEvent) error) error
}

// BinlogWatcher is a tabletserver service that watches the
Expand Down Expand Up @@ -91,7 +91,7 @@ func (blw *BinlogWatcher) process(ctx context.Context) {

for {
// VStreamer will reload the schema when it encounters a DDL.
err := blw.vs.Stream(ctx, "current", nil, filter, func(events []*binlogdatapb.VEvent) error {
err := blw.vs.Stream(ctx, "current", nil, filter, false, func(events []*binlogdatapb.VEvent) error {
return nil
})
log.Infof("ReplicationWatcher VStream ended: %v, retrying in 5 seconds", err)
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vttablet/tabletserver/messager/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ type TabletService interface {
// VStreamer defines the functions of VStreamer
// that the messager needs.
type VStreamer interface {
Stream(ctx context.Context, startPos string, tablePKs []*binlogdatapb.TableLastPK, filter *binlogdatapb.Filter, send func([]*binlogdatapb.VEvent) error) error
Stream(ctx context.Context, startPos string, tablePKs []*binlogdatapb.TableLastPK, filter *binlogdatapb.Filter, useThrottler bool, send func([]*binlogdatapb.VEvent) error) error
StreamResults(ctx context.Context, query string, send func(*binlogdatapb.VStreamResultsResponse) error) error
}

Expand Down
2 changes: 1 addition & 1 deletion go/vt/vttablet/tabletserver/messager/message_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -684,7 +684,7 @@ func (mm *messageManager) runOneVStream(ctx context.Context) error {
var curPos string
var fields []*querypb.Field

err := mm.vs.Stream(ctx, "current", nil, mm.vsFilter, func(events []*binlogdatapb.VEvent) error {
err := mm.vs.Stream(ctx, "current", nil, mm.vsFilter, false, func(events []*binlogdatapb.VEvent) error {
// We need to get the flow control lock
mm.cacheManagementMu.Lock()
defer mm.cacheManagementMu.Unlock()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -888,7 +888,7 @@ func (fv *fakeVStreamer) setPollerResponse(pr []*binlogdatapb.VStreamResultsResp
fv.pollerResponse = pr
}

func (fv *fakeVStreamer) Stream(ctx context.Context, startPos string, tablePKs []*binlogdatapb.TableLastPK, filter *binlogdatapb.Filter, send func([]*binlogdatapb.VEvent) error) error {
func (fv *fakeVStreamer) Stream(ctx context.Context, startPos string, tablePKs []*binlogdatapb.TableLastPK, filter *binlogdatapb.Filter, useThrottler bool, send func([]*binlogdatapb.VEvent) error) error {
fv.streamInvocations.Add(1)
for {
fv.mu.Lock()
Expand Down
4 changes: 2 additions & 2 deletions go/vt/vttablet/tabletserver/schema/tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ import (
// VStreamer defines the functions of VStreamer
// that the replicationWatcher needs.
type VStreamer interface {
Stream(ctx context.Context, startPos string, tablePKs []*binlogdatapb.TableLastPK, filter *binlogdatapb.Filter, send func([]*binlogdatapb.VEvent) error) error
Stream(ctx context.Context, startPos string, tablePKs []*binlogdatapb.TableLastPK, filter *binlogdatapb.Filter, useThrottler bool, send func([]*binlogdatapb.VEvent) error) error
}

// Tracker watches the replication and saves the latest schema into the schema_version table when a DDL is encountered.
Expand Down Expand Up @@ -129,7 +129,7 @@ func (tr *Tracker) process(ctx context.Context) {

var gtid string
for {
err := tr.vs.Stream(ctx, "current", nil, filter, func(events []*binlogdatapb.VEvent) error {
err := tr.vs.Stream(ctx, "current", nil, filter, false, func(events []*binlogdatapb.VEvent) error {
for _, event := range events {
if event.Type == binlogdatapb.VEventType_GTID {
gtid = event.Gtid
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vttablet/tabletserver/schema/tracker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ type fakeVstreamer struct {
events [][]*binlogdatapb.VEvent
}

func (f *fakeVstreamer) Stream(ctx context.Context, startPos string, tablePKs []*binlogdatapb.TableLastPK, filter *binlogdatapb.Filter, send func([]*binlogdatapb.VEvent) error) error {
func (f *fakeVstreamer) Stream(ctx context.Context, startPos string, tablePKs []*binlogdatapb.TableLastPK, filter *binlogdatapb.Filter, useThrottler bool, send func([]*binlogdatapb.VEvent) error) error {
for _, events := range f.events {
err := send(events)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vttablet/tabletserver/tabletserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -1144,7 +1144,7 @@ func (tsv *TabletServer) VStream(ctx context.Context, request *binlogdatapb.VStr
if err := tsv.sm.VerifyTarget(ctx, request.Target); err != nil {
return err
}
return tsv.vstreamer.Stream(ctx, request.Position, request.TableLastPKs, request.Filter, send)
return tsv.vstreamer.Stream(ctx, request.Position, request.TableLastPKs, request.Filter, true, send)
}

// VStreamRows streams rows from the specified starting point.
Expand Down
11 changes: 6 additions & 5 deletions go/vt/vttablet/tabletserver/throttle/check_result.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,12 @@ import (

// CheckResult is the result for an app inquiring on a metric. It also exports as JSON via the API
type CheckResult struct {
StatusCode int `json:"StatusCode"`
Value float64 `json:"Value"`
Threshold float64 `json:"Threshold"`
Error error `json:"-"`
Message string `json:"Message"`
StatusCode int `json:"StatusCode"`
Value float64 `json:"Value"`
Threshold float64 `json:"Threshold"`
Error error `json:"-"`
Message string `json:"Message"`
RecentlyChecked bool `json:"RecentlyChecked"`
}

// NewCheckResult returns a CheckResult
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vttablet/tabletserver/throttle/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ func (c *Client) Throttle(ctx context.Context) {
return
}
if c.ThrottleCheckOKOrWait(ctx) {
break
return
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func (metric *MySQLThrottleMetric) Get() (float64, error) {
}

// ReadThrottleMetric returns a metric for the given probe. Either by explicit query
// or via SHOW SLAVE STATUS
// or via SHOW REPLICA STATUS
func ReadThrottleMetric(probe *Probe, clusterName string, overrideGetMetricFunc func() *MySQLThrottleMetric) (mySQLThrottleMetric *MySQLThrottleMetric) {
if mySQLThrottleMetric := getCachedMySQLThrottleMetric(probe); mySQLThrottleMetric != nil {
return mySQLThrottleMetric
Expand Down
35 changes: 32 additions & 3 deletions go/vt/vttablet/tabletserver/throttle/throttler.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,12 @@ type Throttler struct {
srvTopoServer srvtopo.Server
heartbeatWriter heartbeat.HeartbeatWriter

// recentCheckTickerValue is an ever increasing number, incrementing once per second.
recentCheckTickerValue int64
// recentCheckValue is set to match or exceed recentCheckTickerValue whenever a "check" was made (other than by the throttler itself).
// when recentCheckValue < recentCheckTickerValue that means there hasn't been a recent check.
recentCheckValue int64

throttleTabletTypesMap map[topodatapb.TabletType]bool

mysqlThrottleMetricChan chan *mysql.MySQLThrottleMetric
Expand Down Expand Up @@ -576,6 +582,7 @@ func (throttler *Throttler) Operate(ctx context.Context) {
mysqlRefreshTicker := addTicker(mysqlRefreshInterval)
mysqlAggregateTicker := addTicker(mysqlAggregateInterval)
throttledAppsTicker := addTicker(throttledAppsSnapshotInterval)
recentCheckTicker := addTicker(time.Second)

go func() {
defer log.Infof("Throttler: Operate terminated, tickers stopped")
Expand Down Expand Up @@ -670,6 +677,9 @@ func (throttler *Throttler) Operate(ctx context.Context) {
}
case throttlerConfig := <-throttler.throttlerConfigChan:
throttler.applyThrottlerConfig(ctx, throttlerConfig)
case <-recentCheckTicker.C:
// Increment recentCheckTickerValue by one.
atomic.AddInt64(&throttler.recentCheckTickerValue, 1)
}
}
}()
Expand All @@ -682,7 +692,7 @@ func (throttler *Throttler) generateTabletHTTPProbeFunction(ctx context.Context,
mySQLThrottleMetric.ClusterName = clusterName
mySQLThrottleMetric.Key = probe.Key

tabletCheckSelfURL := fmt.Sprintf("http://%s:%d/throttler/check-self?app=vitess", probe.TabletHost, probe.TabletPort)
tabletCheckSelfURL := fmt.Sprintf("http://%s:%d/throttler/check-self?app=%s", probe.TabletHost, probe.TabletPort, vitessAppName)
resp, err := throttler.httpClient.Get(tabletCheckSelfURL)
if err != nil {
mySQLThrottleMetric.Err = err
Expand All @@ -704,6 +714,11 @@ func (throttler *Throttler) generateTabletHTTPProbeFunction(ctx context.Context,
if checkResult.StatusCode == http.StatusInternalServerError {
mySQLThrottleMetric.Err = fmt.Errorf("Status code: %d", checkResult.StatusCode)
}
if checkResult.RecentlyChecked {
// We have just probed a tablet, and it reported back that someone just recently "check"ed it.
// We therefore renew the heartbeats lease.
go throttler.heartbeatWriter.RequestHeartbeats()
}
return mySQLThrottleMetric
}
}
Expand Down Expand Up @@ -1015,7 +1030,17 @@ func (throttler *Throttler) checkStore(ctx context.Context, appName string, stor
if !throttler.IsEnabled() {
return okMetricCheckResult
}
return throttler.check.Check(ctx, appName, "mysql", storeName, remoteAddr, flags)
checkResult = throttler.check.Check(ctx, appName, "mysql", storeName, remoteAddr, flags)

if atomic.LoadInt64(&throttler.recentCheckValue) >= atomic.LoadInt64(&throttler.recentCheckTickerValue) {
// This indicates someone, who is not "vitess" ie not internal to the throttling logic, did a _recent_ `check`.
// This could be online-ddl, or vreplication or whoever else.
// If this tablet is a REPLICA or RDONLY, we want to advertise to the PRIMARY that someone did a recent check,
// so that the PRIMARY knows it must renew the heartbeat lease.
checkResult.RecentlyChecked = true
}

return checkResult
}

// checkShard checks the health of the shard, and runs on the primary tablet only
Expand All @@ -1030,8 +1055,12 @@ func (throttler *Throttler) checkSelf(ctx context.Context, appName string, remot

// CheckByType runs a check by requested check type
func (throttler *Throttler) CheckByType(ctx context.Context, appName string, remoteAddr string, flags *CheckFlags, checkType ThrottleCheckType) (checkResult *CheckResult) {
if throttler.IsEnabled() && !flags.SkipRequestHeartbeats {
if throttler.IsEnabled() && !flags.SkipRequestHeartbeats && appName != vitessAppName {
go throttler.heartbeatWriter.RequestHeartbeats()
// This check was made by someone other than the throttler itself, i.e. this came from online-ddl or vreplication or other.
// We mark the fact that someone just made a check. If this is a REPLICA or RDONLY tables, this will be reported back
// to the PRIMARY so that it knows it must renew the heartbeat lease.
atomic.StoreInt64(&throttler.recentCheckValue, 1+atomic.LoadInt64(&throttler.recentCheckTickerValue))
}
switch checkType {
case ThrottleCheckSelf:
Expand Down
4 changes: 2 additions & 2 deletions go/vt/vttablet/tabletserver/vstreamer/copy.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func (uvs *uvstreamer) catchup(ctx context.Context) error {
errch := make(chan error, 1)
go func() {
startPos := mysql.EncodePosition(uvs.pos)
vs := newVStreamer(ctx, uvs.cp, uvs.se, startPos, "", uvs.filter, uvs.getVSchema(), uvs.send2, "catchup", uvs.vse)
vs := newVStreamer(ctx, uvs.cp, uvs.se, startPos, "", uvs.filter, uvs.getVSchema(), uvs.useThrottler, uvs.send2, "catchup", uvs.vse)
uvs.setVs(vs)
errch <- vs.Stream()
uvs.setVs(nil)
Expand Down Expand Up @@ -302,7 +302,7 @@ func (uvs *uvstreamer) fastForward(stopPos string) error {
}()
log.Infof("starting fastForward from %s upto pos %s", mysql.EncodePosition(uvs.pos), stopPos)
uvs.stopPos, _ = mysql.DecodePosition(stopPos)
vs := newVStreamer(uvs.ctx, uvs.cp, uvs.se, mysql.EncodePosition(uvs.pos), "", uvs.filter, uvs.getVSchema(), uvs.send2, "fastforward", uvs.vse)
vs := newVStreamer(uvs.ctx, uvs.cp, uvs.se, mysql.EncodePosition(uvs.pos), "", uvs.filter, uvs.getVSchema(), uvs.useThrottler, uvs.send2, "fastforward", uvs.vse)
uvs.setVs(vs)
return vs.Stream()
}
4 changes: 2 additions & 2 deletions go/vt/vttablet/tabletserver/vstreamer/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ func (vse *Engine) validateBinlogRowImage(ctx context.Context, db dbconfigs.Conn

// Stream starts a new stream.
// This streams events from the binary logs
func (vse *Engine) Stream(ctx context.Context, startPos string, tablePKs []*binlogdatapb.TableLastPK, filter *binlogdatapb.Filter, send func([]*binlogdatapb.VEvent) error) error {
func (vse *Engine) Stream(ctx context.Context, startPos string, tablePKs []*binlogdatapb.TableLastPK, filter *binlogdatapb.Filter, useThrottler bool, send func([]*binlogdatapb.VEvent) error) error {

if err := vse.validateBinlogRowImage(ctx, vse.se.GetDBConnector()); err != nil {
return err
Expand All @@ -244,7 +244,7 @@ func (vse *Engine) Stream(ctx context.Context, startPos string, tablePKs []*binl
}
vse.mu.Lock()
defer vse.mu.Unlock()
streamer := newUVStreamer(ctx, vse, vse.env.Config().DB.FilteredWithDB(), vse.se, startPos, tablePKs, filter, vse.lvschema, send)
streamer := newUVStreamer(ctx, vse, vse.env.Config().DB.FilteredWithDB(), vse.se, startPos, tablePKs, filter, vse.lvschema, useThrottler, send)
idx := vse.streamIdx
vse.streamers[idx] = streamer
vse.streamIdx++
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vttablet/tabletserver/vstreamer/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func TestUpdateVSchema(t *testing.T) {
}},
}
// Stream should terminate immediately due to invalid pos.
_ = engine.Stream(ctx, "invalid", nil, filter, func(_ []*binlogdatapb.VEvent) error {
_ = engine.Stream(ctx, "invalid", nil, filter, false, func(_ []*binlogdatapb.VEvent) error {
return nil
})

Expand Down
42 changes: 22 additions & 20 deletions go/vt/vttablet/tabletserver/vstreamer/uvstreamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,13 +51,14 @@ type uvstreamer struct {
cancel func()

// input parameters
vse *Engine
send func([]*binlogdatapb.VEvent) error
cp dbconfigs.Connector
se *schema.Engine
startPos string
filter *binlogdatapb.Filter
inTablePKs []*binlogdatapb.TableLastPK
vse *Engine
send func([]*binlogdatapb.VEvent) error
cp dbconfigs.Connector
se *schema.Engine
startPos string
filter *binlogdatapb.Filter
inTablePKs []*binlogdatapb.TableLastPK
useThrottler bool

vschema *localVSchema

Expand Down Expand Up @@ -90,7 +91,7 @@ type uvstreamerConfig struct {
CatchupRetryTime time.Duration
}

func newUVStreamer(ctx context.Context, vse *Engine, cp dbconfigs.Connector, se *schema.Engine, startPos string, tablePKs []*binlogdatapb.TableLastPK, filter *binlogdatapb.Filter, vschema *localVSchema, send func([]*binlogdatapb.VEvent) error) *uvstreamer {
func newUVStreamer(ctx context.Context, vse *Engine, cp dbconfigs.Connector, se *schema.Engine, startPos string, tablePKs []*binlogdatapb.TableLastPK, filter *binlogdatapb.Filter, vschema *localVSchema, useThrottler bool, send func([]*binlogdatapb.VEvent) error) *uvstreamer {
ctx, cancel := context.WithCancel(ctx)
config := &uvstreamerConfig{
MaxReplicationLag: 1 * time.Nanosecond,
Expand All @@ -105,17 +106,18 @@ func newUVStreamer(ctx context.Context, vse *Engine, cp dbconfigs.Connector, se
return send(evs)
}
uvs := &uvstreamer{
ctx: ctx,
cancel: cancel,
vse: vse,
send: send2,
cp: cp,
se: se,
startPos: startPos,
filter: filter,
vschema: vschema,
config: config,
inTablePKs: tablePKs,
ctx: ctx,
cancel: cancel,
vse: vse,
send: send2,
cp: cp,
se: se,
startPos: startPos,
filter: filter,
vschema: vschema,
config: config,
inTablePKs: tablePKs,
useThrottler: useThrottler,
}

return uvs
Expand Down Expand Up @@ -418,7 +420,7 @@ func (uvs *uvstreamer) Stream() error {
}
}
vs := newVStreamer(uvs.ctx, uvs.cp, uvs.se, mysql.EncodePosition(uvs.pos), mysql.EncodePosition(uvs.stopPos),
uvs.filter, uvs.getVSchema(), uvs.send, "replicate", uvs.vse)
uvs.filter, uvs.getVSchema(), uvs.useThrottler, uvs.send, "replicate", uvs.vse)

uvs.setVs(vs)
return vs.Stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -440,7 +440,7 @@ func getEventCallback(event *binlogdatapb.VEvent) func() {
func startVStreamCopy(ctx context.Context, t *testing.T, filter *binlogdatapb.Filter, tablePKs []*binlogdatapb.TableLastPK) {
pos := ""
go func() {
err := engine.Stream(ctx, pos, tablePKs, filter, func(evs []*binlogdatapb.VEvent) error {
err := engine.Stream(ctx, pos, tablePKs, filter, true, func(evs []*binlogdatapb.VEvent) error {
//t.Logf("Received events: %v", evs)
muAllEvents.Lock()
defer muAllEvents.Unlock()
Expand Down
Loading