Skip to content

Commit

Permalink
txthrottler: remove txThrottlerConfig struct, rely on tabletenv (
Browse files Browse the repository at this point in the history
…vitessio#13624)

Signed-off-by: Tim Vaillancourt <tim@timvaillancourt.com>
  • Loading branch information
timvaillancourt committed Apr 17, 2024
1 parent 02d5c47 commit 3e57711
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 105 deletions.
119 changes: 44 additions & 75 deletions go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,10 +141,7 @@ func fetchKnownCells(ctx context.Context, topoServer *topo.Server, target *query
// be executing a method. The only exception is the 'Throttle' method where multiple goroutines are
// allowed to execute it concurrently.
type txThrottler struct {
// config stores the transaction throttler's configuration.
// It is populated in NewTxThrottler and is not modified
// since.
config *txThrottlerConfig
config *tabletenv.TabletConfig

// state holds an open transaction throttler state. It is nil
// if the TransactionThrottler is closed.
Expand All @@ -162,30 +159,6 @@ type txThrottler struct {
requestsThrottled *stats.CountersWithSingleLabel
}

// txThrottlerConfig holds the parameters that need to be
// passed when constructing a TxThrottler object.
type txThrottlerConfig struct {
// enabled is true if the transaction throttler is enabled. All methods
// of a disabled transaction throttler do nothing and Throttle() always
// returns false.
enabled bool

// if dryRun is true, the txThrottler will run only on monitoring mode, meaning that it will increase counters for
// total and actually throttled requests, but it will not actually return that a transaction should be throttled.
dryRun bool

throttlerConfig *throttlerdatapb.Configuration
// healthCheckCells stores the cell names in which running vttablets will be monitored for
// replication lag.
healthCheckCells []string

// tabletTypes stores the tablet types for throttling
tabletTypes map[topodatapb.TabletType]bool

// rate to refresh topo for cells
topoRefreshInterval time.Duration
}

type txThrottlerState interface {
deallocateResources()
StatsUpdate(tabletStats *discovery.TabletHealth)
Expand All @@ -194,7 +167,7 @@ type txThrottlerState interface {

// txThrottlerStateImpl holds the state of an open TxThrottler object.
type txThrottlerStateImpl struct {
config *txThrottlerConfig
config *tabletenv.TabletConfig
txThrottler *txThrottler

// throttleMu serializes calls to throttler.Throttler.Throttle(threadId).
Expand All @@ -208,49 +181,38 @@ type txThrottlerStateImpl struct {
healthCheckChan chan *discovery.TabletHealth
healthCheckCells []string
cellsFromTopo bool

// tabletTypes stores the tablet types for throttling
tabletTypes map[topodatapb.TabletType]bool
}

// NewTxThrottler tries to construct a txThrottler from the
// relevant fields in the tabletenv.Config object. It returns a disabled TxThrottler if
// any error occurs.
// This function calls tryCreateTxThrottler that does the actual creation work
// and returns an error if one occurred.
// NewTxThrottler tries to construct a txThrottler from the relevant
// fields in the tabletenv.Env and topo.Server objects.
func NewTxThrottler(env tabletenv.Env, topoServer *topo.Server) TxThrottler {
throttlerConfig := &txThrottlerConfig{enabled: false}

if env.Config().EnableTxThrottler {
// Clone tsv.TxThrottlerHealthCheckCells so that we don't assume tsv.TxThrottlerHealthCheckCells
// is immutable.
healthCheckCells := env.Config().TxThrottlerHealthCheckCells

tabletTypes := make(map[topodatapb.TabletType]bool, len(*env.Config().TxThrottlerTabletTypes))
for _, tabletType := range *env.Config().TxThrottlerTabletTypes {
tabletTypes[tabletType] = true
config := env.Config()
if config.EnableTxThrottler {
if len(config.TxThrottlerHealthCheckCells) == 0 {
defer log.Infof("Initialized transaction throttler using tabletTypes: %+v, cellsFromTopo: true, topoRefreshInterval: %s, throttlerConfig: %q",
config.TxThrottlerTabletTypes, config.TxThrottlerTopoRefreshInterval, config.TxThrottlerConfig.Get(),
)
} else {
defer log.Infof("Initialized transaction throttler using tabletTypes: %+v, healthCheckCells: %+v, throttlerConfig: %q",
config.TxThrottlerTabletTypes, config.TxThrottlerHealthCheckCells, config.TxThrottlerConfig.Get(),
)
}

throttlerConfig = &txThrottlerConfig{
enabled: true,
healthCheckCells: healthCheckCells,
dryRun: env.Config().TxThrottlerDryRun,
tabletTypes: tabletTypes,
throttlerConfig: env.Config().TxThrottlerConfig.Get(),
topoRefreshInterval: env.Config().TxThrottlerTopoRefreshInterval,
}

defer log.Infof("Initialized transaction throttler with config: %+v", throttlerConfig)
}

return &txThrottler{
config: throttlerConfig,
config: config,
topoServer: topoServer,
throttlerRunning: env.Exporter().NewGauge("TransactionThrottlerRunning", "transaction throttler running state"),
topoWatchers: env.Exporter().NewGaugesWithSingleLabel("TransactionThrottlerTopoWatchers", "transaction throttler topology watchers", "cell"),
healthChecksReadTotal: env.Exporter().NewCountersWithMultiLabels("TransactionThrottlerHealthchecksRead", "transaction throttler healthchecks read",
throttlerRunning: env.Exporter().NewGauge(TxThrottlerName+"Running", "transaction throttler running state"),
topoWatchers: env.Exporter().NewGaugesWithSingleLabel(TxThrottlerName+"TopoWatchers", "transaction throttler topology watchers", "cell"),
healthChecksReadTotal: env.Exporter().NewCountersWithMultiLabels(TxThrottlerName+"HealthchecksRead", "transaction throttler healthchecks read",
[]string{"cell", "DbType"}),
healthChecksRecordedTotal: env.Exporter().NewCountersWithMultiLabels("TransactionThrottlerHealthchecksRecorded", "transaction throttler healthchecks recorded",
healthChecksRecordedTotal: env.Exporter().NewCountersWithMultiLabels(TxThrottlerName+"HealthchecksRecorded", "transaction throttler healthchecks recorded",
[]string{"cell", "DbType"}),
requestsTotal: env.Exporter().NewCountersWithSingleLabel("TransactionThrottlerRequests", "transaction throttler requests", "workload"),
requestsThrottled: env.Exporter().NewCountersWithSingleLabel("TransactionThrottlerThrottled", "transaction throttler requests throttled", "workload"),
requestsTotal: env.Exporter().NewCountersWithSingleLabel(TxThrottlerName+"Requests", "transaction throttler requests", "workload"),
requestsThrottled: env.Exporter().NewCountersWithSingleLabel(TxThrottlerName+"Throttled", "transaction throttler requests throttled", "workload"),
}
}

Expand All @@ -261,7 +223,7 @@ func (t *txThrottler) InitDBConfig(target *querypb.Target) {

// Open opens the transaction throttler. It must be called prior to 'Throttle'.
func (t *txThrottler) Open() (err error) {
if !t.config.enabled {
if !t.config.EnableTxThrottler {
return nil
}
if t.state != nil {
Expand All @@ -277,7 +239,7 @@ func (t *txThrottler) Open() (err error) {
// It should be called after the throttler is no longer needed.
// It's ok to call this method on a closed throttler--in which case the method does nothing.
func (t *txThrottler) Close() {
if !t.config.enabled {
if !t.config.EnableTxThrottler {
return
}
if t.state == nil {
Expand All @@ -294,7 +256,7 @@ func (t *txThrottler) Close() {
// should back off). Throttle requires that Open() was previously called
// successfully.
func (t *txThrottler) Throttle(priority int, workload string) (result bool) {
if !t.config.enabled {
if !t.config.EnableTxThrottler {
return false
}
if t.state == nil {
Expand All @@ -310,11 +272,11 @@ func (t *txThrottler) Throttle(priority int, workload string) (result bool) {
t.requestsThrottled.Add(workload, 1)
}

return result && !t.config.dryRun
return result && !t.config.TxThrottlerDryRun
}

func newTxThrottlerState(txThrottler *txThrottler, config *txThrottlerConfig, target *querypb.Target) (txThrottlerState, error) {
maxReplicationLagModuleConfig := throttler.MaxReplicationLagModuleConfig{Configuration: config.throttlerConfig}
func newTxThrottlerState(txThrottler *txThrottler, config *tabletenv.TabletConfig, target *querypb.Target) (txThrottlerState, error) {
maxReplicationLagModuleConfig := throttler.MaxReplicationLagModuleConfig{Configuration: config.TxThrottlerConfig.Get()}

t, err := throttlerFactory(
TxThrottlerName,
Expand All @@ -326,13 +288,20 @@ func newTxThrottlerState(txThrottler *txThrottler, config *txThrottlerConfig, ta
if err != nil {
return nil, err
}
if err := t.UpdateConfiguration(config.throttlerConfig, true /* copyZeroValues */); err != nil {
if err := t.UpdateConfiguration(config.TxThrottlerConfig.Get(), true /* copyZeroValues */); err != nil {
t.Close()
return nil, err
}

tabletTypes := make(map[topodatapb.TabletType]bool, len(*config.TxThrottlerTabletTypes))
for _, tabletType := range *config.TxThrottlerTabletTypes {
tabletTypes[tabletType] = true
}

state := &txThrottlerStateImpl{
config: config,
healthCheckCells: config.healthCheckCells,
healthCheckCells: config.TxThrottlerHealthCheckCells,
tabletTypes: tabletTypes,
throttler: t,
txThrottler: txThrottler,
}
Expand Down Expand Up @@ -402,7 +371,7 @@ func (ts *txThrottlerStateImpl) updateHealthCheckCells(ctx context.Context, topo
func (ts *txThrottlerStateImpl) healthChecksProcessor(ctx context.Context, topoServer *topo.Server, target *querypb.Target) {
var cellsUpdateTicks <-chan time.Time
if ts.cellsFromTopo {
ticker := time.NewTicker(ts.config.topoRefreshInterval)
ticker := time.NewTicker(ts.config.TxThrottlerTopoRefreshInterval)
cellsUpdateTicks = ticker.C
defer ticker.Stop()
}
Expand All @@ -420,7 +389,7 @@ func (ts *txThrottlerStateImpl) healthChecksProcessor(ctx context.Context, topoS

func (ts *txThrottlerStateImpl) throttle() bool {
if ts.throttler == nil {
log.Error("throttle called after deallocateResources was called")
log.Error("txThrottler: throttle called after deallocateResources was called")
return false
}
// Serialize calls to ts.throttle.Throttle()
Expand All @@ -442,7 +411,7 @@ func (ts *txThrottlerStateImpl) deallocateResources() {

// StatsUpdate updates the health of a tablet with the given healthcheck.
func (ts *txThrottlerStateImpl) StatsUpdate(tabletStats *discovery.TabletHealth) {
if ts.config.tabletTypes == nil {
if len(ts.tabletTypes) == 0 {
return
}

Expand All @@ -451,8 +420,8 @@ func (ts *txThrottlerStateImpl) StatsUpdate(tabletStats *discovery.TabletHealth)
ts.txThrottler.healthChecksReadTotal.Add(metricLabels, 1)

// Monitor tablets for replication lag if they have a tablet
// type specified by the --tx_throttler_tablet_types flag.
if ts.config.tabletTypes[tabletType] {
// type specified by the --tx-throttler-tablet-types flag.
if ts.tabletTypes[tabletType] {
ts.throttler.RecordReplicationLag(time.Now(), tabletStats)
ts.txThrottler.healthChecksRecordedTotal.Add(metricLabels, 1)
}
Expand Down
35 changes: 5 additions & 30 deletions go/vt/vttablet/tabletserver/txthrottler/tx_throttler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,8 @@ func TestEnabledThrottler(t *testing.T) {
})

assert.Nil(t, throttlerImpl.Open())
throttlerStateImpl := throttlerImpl.state.(*txThrottlerStateImpl)
assert.Equal(t, map[topodatapb.TabletType]bool{topodatapb.TabletType_REPLICA: true}, throttlerStateImpl.tabletTypes)
assert.Equal(t, int64(1), throttlerImpl.throttlerRunning.Get())
assert.Equal(t, map[string]int64{"cell1": 1, "cell2": 1}, throttlerImpl.topoWatchers.Counts())

Expand Down Expand Up @@ -174,33 +176,6 @@ func TestFetchKnownCells(t *testing.T) {
}
}

func TestNewTxThrottler(t *testing.T) {
config := tabletenv.NewDefaultConfig()
env := tabletenv.NewEnv(config, t.Name())

{
// disabled
config.EnableTxThrottler = false
throttler := NewTxThrottler(env, nil)
throttlerImpl, _ := throttler.(*txThrottler)
assert.NotNil(t, throttlerImpl)
assert.NotNil(t, throttlerImpl.config)
assert.False(t, throttlerImpl.config.enabled)
}
{
// enabled
config.EnableTxThrottler = true
config.TxThrottlerHealthCheckCells = []string{"cell1", "cell2"}
config.TxThrottlerTabletTypes = &topoproto.TabletTypeListFlag{topodatapb.TabletType_REPLICA}
throttler := NewTxThrottler(env, nil)
throttlerImpl, _ := throttler.(*txThrottler)
assert.NotNil(t, throttlerImpl)
assert.NotNil(t, throttlerImpl.config)
assert.True(t, throttlerImpl.config.enabled)
assert.Equal(t, []string{"cell1", "cell2"}, throttlerImpl.config.healthCheckCells)
}
}

func TestDryRunThrottler(t *testing.T) {
config := tabletenv.NewDefaultConfig()
env := tabletenv.NewEnv(config, t.Name())
Expand All @@ -222,9 +197,9 @@ func TestDryRunThrottler(t *testing.T) {

t.Run(theTestCase.Name, func(t *testing.T) {
aTxThrottler := &txThrottler{
config: &txThrottlerConfig{
enabled: true,
dryRun: theTestCase.throttlerDryRun,
config: &tabletenv.TabletConfig{
EnableTxThrottler: true,
TxThrottlerDryRun: theTestCase.throttlerDryRun,
},
state: &mockTxThrottlerState{shouldThrottle: theTestCase.txThrottlerStateShouldThrottle},
throttlerRunning: env.Exporter().NewGauge("TransactionThrottlerRunning", "transaction throttler running state"),
Expand Down

0 comments on commit 3e57711

Please sign in to comment.