Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

txthrottler: remove txThrottlerConfig struct, rely on tabletenv #13624

120 changes: 46 additions & 74 deletions go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,10 +129,7 @@ const TxThrottlerName = "TransactionThrottler"
// be executing a method. The only exception is the 'Throttle' method where multiple goroutines are
// allowed to execute it concurrently.
type txThrottler struct {
// config stores the transaction throttler's configuration.
// It is populated in NewTxThrottler and is not modified
// since.
config *txThrottlerConfig
config *tabletenv.TabletConfig

// state holds an open transaction throttler state. It is nil
// if the TransactionThrottler is closed.
Expand All @@ -150,26 +147,8 @@ type txThrottler struct {
requestsThrottled *stats.Counter
}

// txThrottlerConfig holds the parameters that need to be
// passed when constructing a TxThrottler object.
type txThrottlerConfig struct {
// enabled is true if the transaction throttler is enabled. All methods
// of a disabled transaction throttler do nothing and Throttle() always
// returns false.
enabled bool

throttlerConfig *throttlerdatapb.Configuration
// healthCheckCells stores the cell names in which running vttablets will be monitored for
// replication lag.
healthCheckCells []string

// tabletTypes stores the tablet types for throttling
tabletTypes map[topodatapb.TabletType]bool
}

// txThrottlerState holds the state of an open TxThrottler object.
type txThrottlerState struct {
config *txThrottlerConfig
txThrottler *txThrottler

// throttleMu serializes calls to throttler.Throttler.Throttle(threadId).
Expand All @@ -180,47 +159,34 @@ type txThrottlerState struct {

healthCheck discovery.HealthCheck
topologyWatchers map[string]TopologyWatcherInterface

// tabletTypes stores the tablet types for throttling
tabletTypes map[topodatapb.TabletType]bool
}

// NewTxThrottler tries to construct a txThrottler from the
// relevant fields in the tabletenv.Config object. It returns a disabled TxThrottler if
// any error occurs.
// This function calls tryCreateTxThrottler that does the actual creation work
// and returns an error if one occurred.
// NewTxThrottler tries to construct a txThrottler from the relevant
// fields in the tabletenv.Env and topo.Server objects.
func NewTxThrottler(env tabletenv.Env, topoServer *topo.Server) TxThrottler {
throttlerConfig := &txThrottlerConfig{enabled: false}

if env.Config().EnableTxThrottler {
// Clone tsv.TxThrottlerHealthCheckCells so that we don't assume tsv.TxThrottlerHealthCheckCells
// is immutable.
healthCheckCells := env.Config().TxThrottlerHealthCheckCells

tabletTypes := make(map[topodatapb.TabletType]bool, len(*env.Config().TxThrottlerTabletTypes))
for _, tabletType := range *env.Config().TxThrottlerTabletTypes {
tabletTypes[tabletType] = true
}

throttlerConfig = &txThrottlerConfig{
enabled: true,
tabletTypes: tabletTypes,
throttlerConfig: env.Config().TxThrottlerConfig.Get(),
healthCheckCells: healthCheckCells,
}

defer log.Infof("Initialized transaction throttler with config: %+v", throttlerConfig)
config := env.Config()
if config.EnableTxThrottler {
defer log.Infof("Initialized transaction throttler using tabletTypes: %+v, healthCheckCells: %+v, throttlerConfig: %q",
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change also fixes the throttler config being printed as a pointer

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

config.TxThrottlerTabletTypes,
config.TxThrottlerHealthCheckCells,
config.TxThrottlerConfig.Get(),
)
}

return &txThrottler{
config: throttlerConfig,
config: config,
topoServer: topoServer,
throttlerRunning: env.Exporter().NewGauge("TransactionThrottlerRunning", "transaction throttler running state"),
topoWatchers: env.Exporter().NewGaugesWithSingleLabel("TransactionThrottlerTopoWatchers", "transaction throttler topology watchers", "cell"),
healthChecksReadTotal: env.Exporter().NewCountersWithMultiLabels("TransactionThrottlerHealthchecksRead", "transaction throttler healthchecks read",
throttlerRunning: env.Exporter().NewGauge(TxThrottlerName+"Running", "transaction throttler running state"),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Noting that this is not a breaking change because TxThrottlerName == "TransactionThrottler"

topoWatchers: env.Exporter().NewGaugesWithSingleLabel(TxThrottlerName+"TopoWatchers", "transaction throttler topology watchers", "cell"),
healthChecksReadTotal: env.Exporter().NewCountersWithMultiLabels(TxThrottlerName+"HealthchecksRead", "transaction throttler healthchecks read",
[]string{"cell", "DbType"}),
healthChecksRecordedTotal: env.Exporter().NewCountersWithMultiLabels("TransactionThrottlerHealthchecksRecorded", "transaction throttler healthchecks recorded",
healthChecksRecordedTotal: env.Exporter().NewCountersWithMultiLabels(TxThrottlerName+"HealthchecksRecorded", "transaction throttler healthchecks recorded",
[]string{"cell", "DbType"}),
requestsTotal: env.Exporter().NewCounter("TransactionThrottlerRequests", "transaction throttler requests"),
requestsThrottled: env.Exporter().NewCounter("TransactionThrottlerThrottled", "transaction throttler requests throttled"),
requestsTotal: env.Exporter().NewCounter(TxThrottlerName+"Requests", "transaction throttler requests"),
requestsThrottled: env.Exporter().NewCounter(TxThrottlerName+"Throttled", "transaction throttler requests throttled"),
}
}

Expand All @@ -231,23 +197,23 @@ func (t *txThrottler) InitDBConfig(target *querypb.Target) {

// Open opens the transaction throttler. It must be called prior to 'Throttle'.
func (t *txThrottler) Open() (err error) {
if !t.config.enabled {
if !t.config.EnableTxThrottler {
return nil
}
if t.state != nil {
return nil
}
log.Info("txThrottler: opening")
t.throttlerRunning.Set(1)
t.state, err = newTxThrottlerState(t, t.config, t.target)
t.state, err = newTxThrottlerState(t.config, t, t.target)
return err
}

// Close closes the txThrottler object and releases resources.
// It should be called after the throttler is no longer needed.
// It's ok to call this method on a closed throttler--in which case the method does nothing.
func (t *txThrottler) Close() {
if !t.config.enabled {
if !t.config.EnableTxThrottler {
return
}
if t.state == nil {
Expand All @@ -264,7 +230,7 @@ func (t *txThrottler) Close() {
// should back off). Throttle requires that Open() was previously called
// successfully.
func (t *txThrottler) Throttle(priority int) (result bool) {
if !t.config.enabled {
if !t.config.EnableTxThrottler {
return false
}
if t.state == nil {
Expand All @@ -283,8 +249,8 @@ func (t *txThrottler) Throttle(priority int) (result bool) {
return result
}

func newTxThrottlerState(txThrottler *txThrottler, config *txThrottlerConfig, target *querypb.Target) (*txThrottlerState, error) {
maxReplicationLagModuleConfig := throttler.MaxReplicationLagModuleConfig{Configuration: config.throttlerConfig}
func newTxThrottlerState(config *tabletenv.TabletConfig, txThrottler *txThrottler, target *querypb.Target) (*txThrottlerState, error) {
maxReplicationLagModuleConfig := throttler.MaxReplicationLagModuleConfig{Configuration: config.TxThrottlerConfig.Get()}

t, err := throttlerFactory(
TxThrottlerName,
Expand All @@ -296,20 +262,26 @@ func newTxThrottlerState(txThrottler *txThrottler, config *txThrottlerConfig, ta
if err != nil {
return nil, err
}
if err := t.UpdateConfiguration(config.throttlerConfig, true /* copyZeroValues */); err != nil {
if err := t.UpdateConfiguration(config.TxThrottlerConfig.Get(), true /* copyZeroValues */); err != nil {
t.Close()
return nil, err
}

tabletTypes := make(map[topodatapb.TabletType]bool, len(*config.TxThrottlerTabletTypes))
for _, tabletType := range *config.TxThrottlerTabletTypes {
tabletTypes[tabletType] = true
}

state := &txThrottlerState{
config: config,
tabletTypes: tabletTypes,
throttler: t,
txThrottler: txThrottler,
}
createTxThrottlerHealthCheck(txThrottler.topoServer, config, state, target.Cell)
createTxThrottlerHealthCheck(config, txThrottler.topoServer, state, target.Cell)

state.topologyWatchers = make(
map[string]TopologyWatcherInterface, len(config.healthCheckCells))
for _, cell := range config.healthCheckCells {
map[string]TopologyWatcherInterface, len(config.TxThrottlerHealthCheckCells))
for _, cell := range config.TxThrottlerHealthCheckCells {
state.topologyWatchers[cell] = topologyWatcherFactory(
txThrottler.topoServer,
state.healthCheck,
Expand All @@ -324,26 +296,26 @@ func newTxThrottlerState(txThrottler *txThrottler, config *txThrottlerConfig, ta
return state, nil
}

func createTxThrottlerHealthCheck(topoServer *topo.Server, config *txThrottlerConfig, result *txThrottlerState, cell string) {
func createTxThrottlerHealthCheck(config *tabletenv.TabletConfig, topoServer *topo.Server, state *txThrottlerState, cell string) {
ctx, cancel := context.WithCancel(context.Background())
result.stopHealthCheck = cancel
result.healthCheck = healthCheckFactory(topoServer, cell, config.healthCheckCells)
ch := result.healthCheck.Subscribe()
state.stopHealthCheck = cancel
state.healthCheck = healthCheckFactory(topoServer, cell, config.TxThrottlerHealthCheckCells)
ch := state.healthCheck.Subscribe()
go func(ctx context.Context) {
for {
select {
case <-ctx.Done():
return
case th := <-ch:
result.StatsUpdate(th)
state.StatsUpdate(th)
}
}
}(ctx)
}

func (ts *txThrottlerState) throttle() bool {
if ts.throttler == nil {
log.Error("throttle called after deallocateResources was called")
log.Error("txThrottler: throttle called after deallocateResources was called")
return false
}
// Serialize calls to ts.throttle.Throttle()
Expand Down Expand Up @@ -373,7 +345,7 @@ func (ts *txThrottlerState) deallocateResources() {

// StatsUpdate updates the health of a tablet with the given healthcheck.
func (ts *txThrottlerState) StatsUpdate(tabletStats *discovery.TabletHealth) {
if ts.config.tabletTypes == nil {
if len(ts.tabletTypes) == 0 {
return
}

Expand All @@ -382,8 +354,8 @@ func (ts *txThrottlerState) StatsUpdate(tabletStats *discovery.TabletHealth) {
ts.txThrottler.healthChecksReadTotal.Add(metricLabels, 1)

// Monitor tablets for replication lag if they have a tablet
// type specified by the --tx_throttler_tablet_types flag.
if ts.config.tabletTypes[tabletType] {
// type specified by the --tx-throttler-tablet-types flag.
if ts.tabletTypes[tabletType] {
ts.throttler.RecordReplicationLag(time.Now(), tabletStats)
ts.txThrottler.healthChecksRecordedTotal.Add(metricLabels, 1)
}
Expand Down
28 changes: 1 addition & 27 deletions go/vt/vttablet/tabletserver/txthrottler/tx_throttler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ func TestEnabledThrottler(t *testing.T) {
})

assert.Nil(t, throttlerImpl.Open())
assert.Equal(t, map[topodatapb.TabletType]bool{topodatapb.TabletType_REPLICA: true}, throttlerImpl.state.tabletTypes)
assert.Equal(t, int64(1), throttlerImpl.throttlerRunning.Get())
assert.Equal(t, map[string]int64{"cell1": 1, "cell2": 1}, throttlerImpl.topoWatchers.Counts())

Expand Down Expand Up @@ -160,30 +161,3 @@ func TestEnabledThrottler(t *testing.T) {
assert.Zero(t, throttlerImpl.throttlerRunning.Get())
assert.Equal(t, map[string]int64{"cell1": 0, "cell2": 0}, throttlerImpl.topoWatchers.Counts())
}

func TestNewTxThrottler(t *testing.T) {
config := tabletenv.NewDefaultConfig()
env := tabletenv.NewEnv(config, t.Name())

{
// disabled
config.EnableTxThrottler = false
throttler := NewTxThrottler(env, nil)
throttlerImpl, _ := throttler.(*txThrottler)
assert.NotNil(t, throttlerImpl)
assert.NotNil(t, throttlerImpl.config)
assert.False(t, throttlerImpl.config.enabled)
}
{
// enabled
config.EnableTxThrottler = true
config.TxThrottlerHealthCheckCells = []string{"cell1", "cell2"}
config.TxThrottlerTabletTypes = &topoproto.TabletTypeListFlag{topodatapb.TabletType_REPLICA}
throttler := NewTxThrottler(env, nil)
throttlerImpl, _ := throttler.(*txThrottler)
assert.NotNil(t, throttlerImpl)
assert.NotNil(t, throttlerImpl.config)
assert.True(t, throttlerImpl.config.enabled)
assert.Equal(t, []string{"cell1", "cell2"}, throttlerImpl.config.healthCheckCells)
}
}