From fe67c368bd6bf169ca55ca9dca31798547fb3d6a Mon Sep 17 00:00:00 2001 From: Tim Vaillancourt Date: Wed, 26 Jul 2023 02:30:33 +0200 Subject: [PATCH 1/8] `txthrottler`: remove `txThrottlerConfig` struct, rely on `tabletenv.Env` Signed-off-by: Tim Vaillancourt --- .../tabletserver/txthrottler/tx_throttler.go | 90 +++++++------------ .../txthrottler/tx_throttler_test.go | 7 +- 2 files changed, 35 insertions(+), 62 deletions(-) diff --git a/go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go b/go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go index e25e4c0da89..fee6524621a 100644 --- a/go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go +++ b/go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go @@ -129,10 +129,7 @@ const TxThrottlerName = "TransactionThrottler" // be executing a method. The only exception is the 'Throttle' method where multiple goroutines are // allowed to execute it concurrently. type txThrottler struct { - // config stores the transaction throttler's configuration. - // It is populated in NewTxThrottler and is not modified - // since. - config *txThrottlerConfig + env tabletenv.Env // state holds an open transaction throttler state. It is nil // if the TransactionThrottler is closed. @@ -141,6 +138,10 @@ type txThrottler struct { target *querypb.Target topoServer *topo.Server + // healthCheckCells stores the cell names in which running vttablets will be monitored for + // replication lag. + healthCheckCells []string + // stats throttlerRunning *stats.Gauge topoWatchers *stats.GaugesWithSingleLabel @@ -150,26 +151,9 @@ type txThrottler struct { requestsThrottled *stats.Counter } -// txThrottlerConfig holds the parameters that need to be -// passed when constructing a TxThrottler object. -type txThrottlerConfig struct { - // enabled is true if the transaction throttler is enabled. All methods - // of a disabled transaction throttler do nothing and Throttle() always - // returns false. - enabled bool - - throttlerConfig *throttlerdatapb.Configuration - // healthCheckCells stores the cell names in which running vttablets will be monitored for - // replication lag. - healthCheckCells []string - - // tabletTypes stores the tablet types for throttling - tabletTypes map[topodatapb.TabletType]bool -} - // txThrottlerState holds the state of an open TxThrottler object. type txThrottlerState struct { - config *txThrottlerConfig + env tabletenv.Env txThrottler *txThrottler // throttleMu serializes calls to throttler.Throttler.Throttle(threadId). @@ -180,6 +164,9 @@ type txThrottlerState struct { healthCheck discovery.HealthCheck topologyWatchers map[string]TopologyWatcherInterface + + // tabletTypes stores the tablet types for throttling + tabletTypes map[topodatapb.TabletType]bool } // NewTxThrottler tries to construct a txThrottler from the @@ -188,31 +175,14 @@ type txThrottlerState struct { // This function calls tryCreateTxThrottler that does the actual creation work // and returns an error if one occurred. func NewTxThrottler(env tabletenv.Env, topoServer *topo.Server) TxThrottler { - throttlerConfig := &txThrottlerConfig{enabled: false} - if env.Config().EnableTxThrottler { - // Clone tsv.TxThrottlerHealthCheckCells so that we don't assume tsv.TxThrottlerHealthCheckCells - // is immutable. - healthCheckCells := env.Config().TxThrottlerHealthCheckCells - - tabletTypes := make(map[topodatapb.TabletType]bool, len(*env.Config().TxThrottlerTabletTypes)) - for _, tabletType := range *env.Config().TxThrottlerTabletTypes { - tabletTypes[tabletType] = true - } - - throttlerConfig = &txThrottlerConfig{ - enabled: true, - tabletTypes: tabletTypes, - throttlerConfig: env.Config().TxThrottlerConfig.Get(), - healthCheckCells: healthCheckCells, - } - - defer log.Infof("Initialized transaction throttler with config: %+v", throttlerConfig) + defer log.Infof("Initialized transaction throttler with config: %+v", env.Config().TxThrottlerConfig.Get()) } return &txThrottler{ - config: throttlerConfig, + env: env, topoServer: topoServer, + healthCheckCells: env.Config().TxThrottlerHealthCheckCells, throttlerRunning: env.Exporter().NewGauge("TransactionThrottlerRunning", "transaction throttler running state"), topoWatchers: env.Exporter().NewGaugesWithSingleLabel("TransactionThrottlerTopoWatchers", "transaction throttler topology watchers", "cell"), healthChecksReadTotal: env.Exporter().NewCountersWithMultiLabels("TransactionThrottlerHealthchecksRead", "transaction throttler healthchecks read", @@ -231,7 +201,7 @@ func (t *txThrottler) InitDBConfig(target *querypb.Target) { // Open opens the transaction throttler. It must be called prior to 'Throttle'. func (t *txThrottler) Open() (err error) { - if !t.config.enabled { + if !t.env.Config().EnableTxThrottler { return nil } if t.state != nil { @@ -239,7 +209,7 @@ func (t *txThrottler) Open() (err error) { } log.Info("txThrottler: opening") t.throttlerRunning.Set(1) - t.state, err = newTxThrottlerState(t, t.config, t.target) + t.state, err = newTxThrottlerState(t.env, t, t.target) return err } @@ -247,7 +217,7 @@ func (t *txThrottler) Open() (err error) { // It should be called after the throttler is no longer needed. // It's ok to call this method on a closed throttler--in which case the method does nothing. func (t *txThrottler) Close() { - if !t.config.enabled { + if !t.env.Config().EnableTxThrottler { return } if t.state == nil { @@ -264,7 +234,7 @@ func (t *txThrottler) Close() { // should back off). Throttle requires that Open() was previously called // successfully. func (t *txThrottler) Throttle(priority int) (result bool) { - if !t.config.enabled { + if !t.env.Config().EnableTxThrottler { return false } if t.state == nil { @@ -283,8 +253,8 @@ func (t *txThrottler) Throttle(priority int) (result bool) { return result } -func newTxThrottlerState(txThrottler *txThrottler, config *txThrottlerConfig, target *querypb.Target) (*txThrottlerState, error) { - maxReplicationLagModuleConfig := throttler.MaxReplicationLagModuleConfig{Configuration: config.throttlerConfig} +func newTxThrottlerState(env tabletenv.Env, txThrottler *txThrottler, target *querypb.Target) (*txThrottlerState, error) { + maxReplicationLagModuleConfig := throttler.MaxReplicationLagModuleConfig{Configuration: env.Config().TxThrottlerConfig.Get()} t, err := throttlerFactory( TxThrottlerName, @@ -296,20 +266,26 @@ func newTxThrottlerState(txThrottler *txThrottler, config *txThrottlerConfig, ta if err != nil { return nil, err } - if err := t.UpdateConfiguration(config.throttlerConfig, true /* copyZeroValues */); err != nil { + if err := t.UpdateConfiguration(env.Config().TxThrottlerConfig.Get(), true /* copyZeroValues */); err != nil { t.Close() return nil, err } + + tabletTypes := make(map[topodatapb.TabletType]bool, len(*env.Config().TxThrottlerTabletTypes)) + for _, tabletType := range *env.Config().TxThrottlerTabletTypes { + tabletTypes[tabletType] = true + } + state := &txThrottlerState{ - config: config, + tabletTypes: tabletTypes, throttler: t, txThrottler: txThrottler, } - createTxThrottlerHealthCheck(txThrottler.topoServer, config, state, target.Cell) + createTxThrottlerHealthCheck(txThrottler, state, target.Cell) state.topologyWatchers = make( - map[string]TopologyWatcherInterface, len(config.healthCheckCells)) - for _, cell := range config.healthCheckCells { + map[string]TopologyWatcherInterface, len(txThrottler.healthCheckCells)) + for _, cell := range txThrottler.healthCheckCells { state.topologyWatchers[cell] = topologyWatcherFactory( txThrottler.topoServer, state.healthCheck, @@ -324,10 +300,10 @@ func newTxThrottlerState(txThrottler *txThrottler, config *txThrottlerConfig, ta return state, nil } -func createTxThrottlerHealthCheck(topoServer *topo.Server, config *txThrottlerConfig, result *txThrottlerState, cell string) { +func createTxThrottlerHealthCheck(txThrottler *txThrottler, result *txThrottlerState, cell string) { ctx, cancel := context.WithCancel(context.Background()) result.stopHealthCheck = cancel - result.healthCheck = healthCheckFactory(topoServer, cell, config.healthCheckCells) + result.healthCheck = healthCheckFactory(txThrottler.topoServer, cell, txThrottler.healthCheckCells) ch := result.healthCheck.Subscribe() go func(ctx context.Context) { for { @@ -373,7 +349,7 @@ func (ts *txThrottlerState) deallocateResources() { // StatsUpdate updates the health of a tablet with the given healthcheck. func (ts *txThrottlerState) StatsUpdate(tabletStats *discovery.TabletHealth) { - if ts.config.tabletTypes == nil { + if len(ts.tabletTypes) == 0 { return } @@ -383,7 +359,7 @@ func (ts *txThrottlerState) StatsUpdate(tabletStats *discovery.TabletHealth) { // Monitor tablets for replication lag if they have a tablet // type specified by the --tx_throttler_tablet_types flag. - if ts.config.tabletTypes[tabletType] { + if ts.tabletTypes[tabletType] { ts.throttler.RecordReplicationLag(time.Now(), tabletStats) ts.txThrottler.healthChecksRecordedTotal.Add(metricLabels, 1) } diff --git a/go/vt/vttablet/tabletserver/txthrottler/tx_throttler_test.go b/go/vt/vttablet/tabletserver/txthrottler/tx_throttler_test.go index 77f311719b4..6a7c9256e6a 100644 --- a/go/vt/vttablet/tabletserver/txthrottler/tx_throttler_test.go +++ b/go/vt/vttablet/tabletserver/txthrottler/tx_throttler_test.go @@ -171,8 +171,6 @@ func TestNewTxThrottler(t *testing.T) { throttler := NewTxThrottler(env, nil) throttlerImpl, _ := throttler.(*txThrottler) assert.NotNil(t, throttlerImpl) - assert.NotNil(t, throttlerImpl.config) - assert.False(t, throttlerImpl.config.enabled) } { // enabled @@ -182,8 +180,7 @@ func TestNewTxThrottler(t *testing.T) { throttler := NewTxThrottler(env, nil) throttlerImpl, _ := throttler.(*txThrottler) assert.NotNil(t, throttlerImpl) - assert.NotNil(t, throttlerImpl.config) - assert.True(t, throttlerImpl.config.enabled) - assert.Equal(t, []string{"cell1", "cell2"}, throttlerImpl.config.healthCheckCells) + assert.Equal(t, []string{"cell1", "cell2"}, throttlerImpl.healthCheckCells) + assert.Nil(t, throttlerImpl.state) } } From 0d97d95430c18eec206858d03abf5bdbcf799651 Mon Sep 17 00:00:00 2001 From: Tim Vaillancourt Date: Wed, 26 Jul 2023 02:35:04 +0200 Subject: [PATCH 2/8] remove healthCheckCells, use env Signed-off-by: Tim Vaillancourt --- .../tabletserver/txthrottler/tx_throttler.go | 15 ++++-------- .../txthrottler/tx_throttler_test.go | 24 ------------------- 2 files changed, 5 insertions(+), 34 deletions(-) diff --git a/go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go b/go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go index fee6524621a..dc2b676cb24 100644 --- a/go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go +++ b/go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go @@ -138,10 +138,6 @@ type txThrottler struct { target *querypb.Target topoServer *topo.Server - // healthCheckCells stores the cell names in which running vttablets will be monitored for - // replication lag. - healthCheckCells []string - // stats throttlerRunning *stats.Gauge topoWatchers *stats.GaugesWithSingleLabel @@ -182,7 +178,6 @@ func NewTxThrottler(env tabletenv.Env, topoServer *topo.Server) TxThrottler { return &txThrottler{ env: env, topoServer: topoServer, - healthCheckCells: env.Config().TxThrottlerHealthCheckCells, throttlerRunning: env.Exporter().NewGauge("TransactionThrottlerRunning", "transaction throttler running state"), topoWatchers: env.Exporter().NewGaugesWithSingleLabel("TransactionThrottlerTopoWatchers", "transaction throttler topology watchers", "cell"), healthChecksReadTotal: env.Exporter().NewCountersWithMultiLabels("TransactionThrottlerHealthchecksRead", "transaction throttler healthchecks read", @@ -281,11 +276,11 @@ func newTxThrottlerState(env tabletenv.Env, txThrottler *txThrottler, target *qu throttler: t, txThrottler: txThrottler, } - createTxThrottlerHealthCheck(txThrottler, state, target.Cell) + createTxThrottlerHealthCheck(env, txThrottler.topoServer, state, target.Cell) state.topologyWatchers = make( - map[string]TopologyWatcherInterface, len(txThrottler.healthCheckCells)) - for _, cell := range txThrottler.healthCheckCells { + map[string]TopologyWatcherInterface, len(env.Config().TxThrottlerHealthCheckCells)) + for _, cell := range env.Config().TxThrottlerHealthCheckCells { state.topologyWatchers[cell] = topologyWatcherFactory( txThrottler.topoServer, state.healthCheck, @@ -300,10 +295,10 @@ func newTxThrottlerState(env tabletenv.Env, txThrottler *txThrottler, target *qu return state, nil } -func createTxThrottlerHealthCheck(txThrottler *txThrottler, result *txThrottlerState, cell string) { +func createTxThrottlerHealthCheck(env tabletenv.Env, topoServer *topo.Server, result *txThrottlerState, cell string) { ctx, cancel := context.WithCancel(context.Background()) result.stopHealthCheck = cancel - result.healthCheck = healthCheckFactory(txThrottler.topoServer, cell, txThrottler.healthCheckCells) + result.healthCheck = healthCheckFactory(topoServer, cell, env.Config().TxThrottlerHealthCheckCells) ch := result.healthCheck.Subscribe() go func(ctx context.Context) { for { diff --git a/go/vt/vttablet/tabletserver/txthrottler/tx_throttler_test.go b/go/vt/vttablet/tabletserver/txthrottler/tx_throttler_test.go index 6a7c9256e6a..038d66945dd 100644 --- a/go/vt/vttablet/tabletserver/txthrottler/tx_throttler_test.go +++ b/go/vt/vttablet/tabletserver/txthrottler/tx_throttler_test.go @@ -160,27 +160,3 @@ func TestEnabledThrottler(t *testing.T) { assert.Zero(t, throttlerImpl.throttlerRunning.Get()) assert.Equal(t, map[string]int64{"cell1": 0, "cell2": 0}, throttlerImpl.topoWatchers.Counts()) } - -func TestNewTxThrottler(t *testing.T) { - config := tabletenv.NewDefaultConfig() - env := tabletenv.NewEnv(config, t.Name()) - - { - // disabled - config.EnableTxThrottler = false - throttler := NewTxThrottler(env, nil) - throttlerImpl, _ := throttler.(*txThrottler) - assert.NotNil(t, throttlerImpl) - } - { - // enabled - config.EnableTxThrottler = true - config.TxThrottlerHealthCheckCells = []string{"cell1", "cell2"} - config.TxThrottlerTabletTypes = &topoproto.TabletTypeListFlag{topodatapb.TabletType_REPLICA} - throttler := NewTxThrottler(env, nil) - throttlerImpl, _ := throttler.(*txThrottler) - assert.NotNil(t, throttlerImpl) - assert.Equal(t, []string{"cell1", "cell2"}, throttlerImpl.healthCheckCells) - assert.Nil(t, throttlerImpl.state) - } -} From d7e10a64cb4c2ed8c31caf16b625d9a68beb17d8 Mon Sep 17 00:00:00 2001 From: Tim Vaillancourt Date: Wed, 26 Jul 2023 11:58:41 +0200 Subject: [PATCH 3/8] Use tabletenv.TabletConfig Signed-off-by: Tim Vaillancourt --- .../tabletserver/txthrottler/tx_throttler.go | 71 ++++++++++--------- .../txthrottler/tx_throttler_test.go | 1 + 2 files changed, 37 insertions(+), 35 deletions(-) diff --git a/go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go b/go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go index dc2b676cb24..a025be973e4 100644 --- a/go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go +++ b/go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go @@ -129,7 +129,7 @@ const TxThrottlerName = "TransactionThrottler" // be executing a method. The only exception is the 'Throttle' method where multiple goroutines are // allowed to execute it concurrently. type txThrottler struct { - env tabletenv.Env + config *tabletenv.TabletConfig // state holds an open transaction throttler state. It is nil // if the TransactionThrottler is closed. @@ -149,7 +149,6 @@ type txThrottler struct { // txThrottlerState holds the state of an open TxThrottler object. type txThrottlerState struct { - env tabletenv.Env txThrottler *txThrottler // throttleMu serializes calls to throttler.Throttler.Throttle(threadId). @@ -165,27 +164,29 @@ type txThrottlerState struct { tabletTypes map[topodatapb.TabletType]bool } -// NewTxThrottler tries to construct a txThrottler from the -// relevant fields in the tabletenv.Config object. It returns a disabled TxThrottler if -// any error occurs. -// This function calls tryCreateTxThrottler that does the actual creation work -// and returns an error if one occurred. +// NewTxThrottler tries to construct a txThrottler from the relevant +// fields in the tabletenv.Env and topo.Server objects. func NewTxThrottler(env tabletenv.Env, topoServer *topo.Server) TxThrottler { - if env.Config().EnableTxThrottler { - defer log.Infof("Initialized transaction throttler with config: %+v", env.Config().TxThrottlerConfig.Get()) + config := env.Config() + if config.EnableTxThrottler { + defer log.Infof("Initialized transaction throttler using tabletTypes: %+v, healthCheckCells: %+v, throttlerConfig: %q", + config.TxThrottlerTabletTypes, + config.TxThrottlerHealthCheckCells, + config.TxThrottlerConfig.Get(), + ) } return &txThrottler{ - env: env, + config: config, topoServer: topoServer, - throttlerRunning: env.Exporter().NewGauge("TransactionThrottlerRunning", "transaction throttler running state"), - topoWatchers: env.Exporter().NewGaugesWithSingleLabel("TransactionThrottlerTopoWatchers", "transaction throttler topology watchers", "cell"), - healthChecksReadTotal: env.Exporter().NewCountersWithMultiLabels("TransactionThrottlerHealthchecksRead", "transaction throttler healthchecks read", + throttlerRunning: env.Exporter().NewGauge(TxThrottlerName+"Running", "transaction throttler running state"), + topoWatchers: env.Exporter().NewGaugesWithSingleLabel(TxThrottlerName+"TopoWatchers", "transaction throttler topology watchers", "cell"), + healthChecksReadTotal: env.Exporter().NewCountersWithMultiLabels(TxThrottlerName+"HealthchecksRead", "transaction throttler healthchecks read", []string{"cell", "DbType"}), - healthChecksRecordedTotal: env.Exporter().NewCountersWithMultiLabels("TransactionThrottlerHealthchecksRecorded", "transaction throttler healthchecks recorded", + healthChecksRecordedTotal: env.Exporter().NewCountersWithMultiLabels(TxThrottlerName+"HealthchecksRecorded", "transaction throttler healthchecks recorded", []string{"cell", "DbType"}), - requestsTotal: env.Exporter().NewCounter("TransactionThrottlerRequests", "transaction throttler requests"), - requestsThrottled: env.Exporter().NewCounter("TransactionThrottlerThrottled", "transaction throttler requests throttled"), + requestsTotal: env.Exporter().NewCounter(TxThrottlerName+"Requests", "transaction throttler requests"), + requestsThrottled: env.Exporter().NewCounter(TxThrottlerName+"Throttled", "transaction throttler requests throttled"), } } @@ -196,7 +197,7 @@ func (t *txThrottler) InitDBConfig(target *querypb.Target) { // Open opens the transaction throttler. It must be called prior to 'Throttle'. func (t *txThrottler) Open() (err error) { - if !t.env.Config().EnableTxThrottler { + if !t.config.EnableTxThrottler { return nil } if t.state != nil { @@ -204,7 +205,7 @@ func (t *txThrottler) Open() (err error) { } log.Info("txThrottler: opening") t.throttlerRunning.Set(1) - t.state, err = newTxThrottlerState(t.env, t, t.target) + t.state, err = newTxThrottlerState(t.config, t, t.target) return err } @@ -212,7 +213,7 @@ func (t *txThrottler) Open() (err error) { // It should be called after the throttler is no longer needed. // It's ok to call this method on a closed throttler--in which case the method does nothing. func (t *txThrottler) Close() { - if !t.env.Config().EnableTxThrottler { + if !t.config.EnableTxThrottler { return } if t.state == nil { @@ -229,7 +230,7 @@ func (t *txThrottler) Close() { // should back off). Throttle requires that Open() was previously called // successfully. func (t *txThrottler) Throttle(priority int) (result bool) { - if !t.env.Config().EnableTxThrottler { + if !t.config.EnableTxThrottler { return false } if t.state == nil { @@ -248,8 +249,8 @@ func (t *txThrottler) Throttle(priority int) (result bool) { return result } -func newTxThrottlerState(env tabletenv.Env, txThrottler *txThrottler, target *querypb.Target) (*txThrottlerState, error) { - maxReplicationLagModuleConfig := throttler.MaxReplicationLagModuleConfig{Configuration: env.Config().TxThrottlerConfig.Get()} +func newTxThrottlerState(config *tabletenv.TabletConfig, txThrottler *txThrottler, target *querypb.Target) (*txThrottlerState, error) { + maxReplicationLagModuleConfig := throttler.MaxReplicationLagModuleConfig{Configuration: config.TxThrottlerConfig.Get()} t, err := throttlerFactory( TxThrottlerName, @@ -261,13 +262,13 @@ func newTxThrottlerState(env tabletenv.Env, txThrottler *txThrottler, target *qu if err != nil { return nil, err } - if err := t.UpdateConfiguration(env.Config().TxThrottlerConfig.Get(), true /* copyZeroValues */); err != nil { + if err := t.UpdateConfiguration(config.TxThrottlerConfig.Get(), true /* copyZeroValues */); err != nil { t.Close() return nil, err } - tabletTypes := make(map[topodatapb.TabletType]bool, len(*env.Config().TxThrottlerTabletTypes)) - for _, tabletType := range *env.Config().TxThrottlerTabletTypes { + tabletTypes := make(map[topodatapb.TabletType]bool, len(*config.TxThrottlerTabletTypes)) + for _, tabletType := range *config.TxThrottlerTabletTypes { tabletTypes[tabletType] = true } @@ -276,11 +277,11 @@ func newTxThrottlerState(env tabletenv.Env, txThrottler *txThrottler, target *qu throttler: t, txThrottler: txThrottler, } - createTxThrottlerHealthCheck(env, txThrottler.topoServer, state, target.Cell) + createTxThrottlerHealthCheck(config, txThrottler.topoServer, state, target.Cell) state.topologyWatchers = make( - map[string]TopologyWatcherInterface, len(env.Config().TxThrottlerHealthCheckCells)) - for _, cell := range env.Config().TxThrottlerHealthCheckCells { + map[string]TopologyWatcherInterface, len(config.TxThrottlerHealthCheckCells)) + for _, cell := range config.TxThrottlerHealthCheckCells { state.topologyWatchers[cell] = topologyWatcherFactory( txThrottler.topoServer, state.healthCheck, @@ -295,18 +296,18 @@ func newTxThrottlerState(env tabletenv.Env, txThrottler *txThrottler, target *qu return state, nil } -func createTxThrottlerHealthCheck(env tabletenv.Env, topoServer *topo.Server, result *txThrottlerState, cell string) { +func createTxThrottlerHealthCheck(config *tabletenv.TabletConfig, topoServer *topo.Server, state *txThrottlerState, cell string) { ctx, cancel := context.WithCancel(context.Background()) - result.stopHealthCheck = cancel - result.healthCheck = healthCheckFactory(topoServer, cell, env.Config().TxThrottlerHealthCheckCells) - ch := result.healthCheck.Subscribe() + state.stopHealthCheck = cancel + state.healthCheck = healthCheckFactory(topoServer, cell, config.TxThrottlerHealthCheckCells) + ch := state.healthCheck.Subscribe() go func(ctx context.Context) { for { select { case <-ctx.Done(): return case th := <-ch: - result.StatsUpdate(th) + state.StatsUpdate(th) } } }(ctx) @@ -314,7 +315,7 @@ func createTxThrottlerHealthCheck(env tabletenv.Env, topoServer *topo.Server, re func (ts *txThrottlerState) throttle() bool { if ts.throttler == nil { - log.Error("throttle called after deallocateResources was called") + log.Error("txThrottler: throttle called after deallocateResources was called") return false } // Serialize calls to ts.throttle.Throttle() @@ -353,7 +354,7 @@ func (ts *txThrottlerState) StatsUpdate(tabletStats *discovery.TabletHealth) { ts.txThrottler.healthChecksReadTotal.Add(metricLabels, 1) // Monitor tablets for replication lag if they have a tablet - // type specified by the --tx_throttler_tablet_types flag. + // type specified by the --tx-throttler-tablet-types flag. if ts.tabletTypes[tabletType] { ts.throttler.RecordReplicationLag(time.Now(), tabletStats) ts.txThrottler.healthChecksRecordedTotal.Add(metricLabels, 1) diff --git a/go/vt/vttablet/tabletserver/txthrottler/tx_throttler_test.go b/go/vt/vttablet/tabletserver/txthrottler/tx_throttler_test.go index 038d66945dd..8eb76be2dc2 100644 --- a/go/vt/vttablet/tabletserver/txthrottler/tx_throttler_test.go +++ b/go/vt/vttablet/tabletserver/txthrottler/tx_throttler_test.go @@ -126,6 +126,7 @@ func TestEnabledThrottler(t *testing.T) { }) assert.Nil(t, throttlerImpl.Open()) + assert.Equal(t, map[topodatapb.TabletType]bool{topodatapb.TabletType_REPLICA: true}, throttlerImpl.state.tabletTypes) assert.Equal(t, int64(1), throttlerImpl.throttlerRunning.Get()) assert.Equal(t, map[string]int64{"cell1": 1, "cell2": 1}, throttlerImpl.topoWatchers.Counts()) From 52e2ed14af15fbfe3fee2b3b3667bf88dc2adecc Mon Sep 17 00:00:00 2001 From: Tim Vaillancourt Date: Tue, 1 Aug 2023 01:10:13 +0200 Subject: [PATCH 4/8] Fix merge conflict Signed-off-by: Tim Vaillancourt --- .../vttablet/tabletserver/txthrottler/tx_throttler.go | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go b/go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go index 7de611d5cbe..8380e934762 100644 --- a/go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go +++ b/go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go @@ -161,6 +161,7 @@ type txThrottler struct { // txThrottlerState holds the state of an open TxThrottler object. type txThrottlerState struct { + config *tabletenv.TabletConfig txThrottler *txThrottler // throttleMu serializes calls to throttler.Throttler.Throttle(threadId). @@ -174,8 +175,8 @@ type txThrottlerState struct { healthCheckChan chan *discovery.TabletHealth healthCheckCells []string cellsFromTopo bool - - // tabletTypes stores the tablet types for throttling + + // tabletTypes stores the tablet types for throttling tabletTypes map[topodatapb.TabletType]bool } @@ -290,10 +291,10 @@ func newTxThrottlerState(config *tabletenv.TabletConfig, txThrottler *txThrottle state := &txThrottlerState{ config: config, - healthCheckCells: config.healthCheckCells, + healthCheckCells: config.TxThrottlerHealthCheckCells, throttler: t, txThrottler: txThrottler, - tabletTypes: tabletTypes, + tabletTypes: tabletTypes, } // get cells from topo if none defined in tabletenv config @@ -361,7 +362,7 @@ func (ts *txThrottlerState) updateHealthCheckCells(ctx context.Context, topoServ func (ts *txThrottlerState) healthChecksProcessor(ctx context.Context, topoServer *topo.Server, target *querypb.Target) { var cellsUpdateTicks <-chan time.Time if ts.cellsFromTopo { - ticker := time.NewTicker(ts.config.topoRefreshInterval) + ticker := time.NewTicker(ts.config.TxThrottlerTopoRefreshInterval) cellsUpdateTicks = ticker.C defer ticker.Stop() } From adb5daf98de17c9578e519f4823b06c0e7734288 Mon Sep 17 00:00:00 2001 From: Tim Vaillancourt Date: Tue, 1 Aug 2023 01:16:56 +0200 Subject: [PATCH 5/8] Cleanup Signed-off-by: Tim Vaillancourt --- go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go | 6 +++--- .../vttablet/tabletserver/txthrottler/tx_throttler_test.go | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go b/go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go index 8380e934762..ddbed4ee89c 100644 --- a/go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go +++ b/go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go @@ -222,7 +222,7 @@ func (t *txThrottler) Open() (err error) { } log.Info("txThrottler: opening") t.throttlerRunning.Set(1) - t.state, err = newTxThrottlerState(t.config, t, t.target) + t.state, err = newTxThrottlerState(t, t.config, t.target) return err } @@ -266,7 +266,7 @@ func (t *txThrottler) Throttle(priority int, workload string) (result bool) { return result } -func newTxThrottlerState(config *tabletenv.TabletConfig, txThrottler *txThrottler, target *querypb.Target) (*txThrottlerState, error) { +func newTxThrottlerState(txThrottler *txThrottler, config *tabletenv.TabletConfig, target *querypb.Target) (*txThrottlerState, error) { maxReplicationLagModuleConfig := throttler.MaxReplicationLagModuleConfig{Configuration: config.TxThrottlerConfig.Get()} t, err := throttlerFactory( @@ -292,9 +292,9 @@ func newTxThrottlerState(config *tabletenv.TabletConfig, txThrottler *txThrottle state := &txThrottlerState{ config: config, healthCheckCells: config.TxThrottlerHealthCheckCells, + tabletTypes: tabletTypes, throttler: t, txThrottler: txThrottler, - tabletTypes: tabletTypes, } // get cells from topo if none defined in tabletenv config diff --git a/go/vt/vttablet/tabletserver/txthrottler/tx_throttler_test.go b/go/vt/vttablet/tabletserver/txthrottler/tx_throttler_test.go index 3a2dff9d604..e203dca101b 100644 --- a/go/vt/vttablet/tabletserver/txthrottler/tx_throttler_test.go +++ b/go/vt/vttablet/tabletserver/txthrottler/tx_throttler_test.go @@ -173,4 +173,4 @@ func TestFetchKnownCells(t *testing.T) { cells := fetchKnownCells(context.Background(), ts, &querypb.Target{Cell: "cell1"}) assert.Equal(t, []string{"cell1"}, cells) } -} \ No newline at end of file +} From 5347417b99b2f6dbfe2af7c86101f3f4c067c643 Mon Sep 17 00:00:00 2001 From: Tim Vaillancourt Date: Tue, 1 Aug 2023 01:21:54 +0200 Subject: [PATCH 6/8] Update init log msg Signed-off-by: Tim Vaillancourt --- .../tabletserver/txthrottler/tx_throttler.go | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go b/go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go index ddbed4ee89c..4b4600a9a89 100644 --- a/go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go +++ b/go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go @@ -185,12 +185,15 @@ type txThrottlerState struct { func NewTxThrottler(env tabletenv.Env, topoServer *topo.Server) TxThrottler { config := env.Config() if config.EnableTxThrottler { - defer log.Infof("Initialized transaction throttler using tabletTypes: %+v, healthCheckCells: %+v, throttlerConfig: %q", - config.TxThrottlerTabletTypes, - config.TxThrottlerHealthCheckCells, - config.TxThrottlerConfig.Get(), - ) - + if len(config.TxThrottlerHealthCheckCells) == 0 { + defer log.Infof("Initialized transaction throttler using tabletTypes: %+v, cellsFromTopo: true, topoRefreshInterval: %s, throttlerConfig: %q", + config.TxThrottlerTabletTypes, config.TxThrottlerTopoRefreshInterval, config.TxThrottlerConfig.Get(), + ) + } else { + defer log.Infof("Initialized transaction throttler using tabletTypes: %+v, healthCheckCells: %+v, throttlerConfig: %q", + config.TxThrottlerTabletTypes, config.TxThrottlerHealthCheckCells, config.TxThrottlerConfig.Get(), + ) + } } return &txThrottler{ From c9dc684d5cb77741304eb7506120b98c24d60872 Mon Sep 17 00:00:00 2001 From: Tim Vaillancourt Date: Wed, 2 Aug 2023 13:55:19 +0200 Subject: [PATCH 7/8] fix test Signed-off-by: Tim Vaillancourt --- go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go | 2 +- go/vt/vttablet/tabletserver/txthrottler/tx_throttler_test.go | 5 +++-- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go b/go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go index f22ec66e673..923c860d184 100644 --- a/go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go +++ b/go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go @@ -425,4 +425,4 @@ func (ts *txThrottlerStateImpl) StatsUpdate(tabletStats *discovery.TabletHealth) ts.throttler.RecordReplicationLag(time.Now(), tabletStats) ts.txThrottler.healthChecksRecordedTotal.Add(metricLabels, 1) } -} \ No newline at end of file +} diff --git a/go/vt/vttablet/tabletserver/txthrottler/tx_throttler_test.go b/go/vt/vttablet/tabletserver/txthrottler/tx_throttler_test.go index d08b6eb80f9..b0d15a5b059 100644 --- a/go/vt/vttablet/tabletserver/txthrottler/tx_throttler_test.go +++ b/go/vt/vttablet/tabletserver/txthrottler/tx_throttler_test.go @@ -126,7 +126,8 @@ func TestEnabledThrottler(t *testing.T) { }) assert.Nil(t, throttlerImpl.Open()) - assert.Equal(t, map[topodatapb.TabletType]bool{topodatapb.TabletType_REPLICA: true}, throttlerImpl.state.tabletTypes) + throttlerStateImpl := throttlerImpl.state.(*txThrottlerStateImpl) + assert.Equal(t, map[topodatapb.TabletType]bool{topodatapb.TabletType_REPLICA: true}, throttlerStateImpl.tabletTypes) assert.Equal(t, int64(1), throttlerImpl.throttlerRunning.Get()) assert.Equal(t, map[string]int64{"cell1": 1, "cell2": 1}, throttlerImpl.topoWatchers.Counts()) @@ -224,4 +225,4 @@ func (t *mockTxThrottlerState) StatsUpdate(tabletStats *discovery.TabletHealth) func (t *mockTxThrottlerState) throttle() bool { return t.shouldThrottle -} \ No newline at end of file +} From c1afe5121c90cb878b3cc9a6680f354589aff6f2 Mon Sep 17 00:00:00 2001 From: Tim Vaillancourt Date: Wed, 2 Aug 2023 14:17:31 +0200 Subject: [PATCH 8/8] empty commit Signed-off-by: Tim Vaillancourt