Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tx throttler: healthcheck all cells if --tx-throttler-healthcheck-cells is undefined #12477

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
1fecb5d
WIP
timvaillancourt Feb 24, 2023
6beae70
vttablet tx throttler: healthcheck all cells if cells unspecified
timvaillancourt Feb 24, 2023
c8ad379
Fix test typo
timvaillancourt Feb 24, 2023
6ed1bac
Simplify test
timvaillancourt Feb 24, 2023
4c04f24
remove unnecessary 'empty healthCheckCells given' error
timvaillancourt Feb 25, 2023
08c1a1d
last tweak to simplify test
timvaillancourt Feb 25, 2023
0dd51b8
Update go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go
timvaillancourt Mar 15, 2023
25bf05b
Merge branch 'main' into tx-throtter-default-all-cells
timvaillancourt Mar 15, 2023
62a9a26
Merge branch 'main' into tx-throtter-default-all-cells
timvaillancourt May 17, 2023
71ea246
Merge branch 'main' into tx-throtter-default-all-cells
timvaillancourt Jun 18, 2023
ed439e4
Move .GetKnownCells() call to .Open()
timvaillancourt Jun 19, 2023
d5c87cc
Update test
timvaillancourt Jun 19, 2023
84e0a4a
Improve err message
timvaillancourt Jun 19, 2023
9887e9f
Add code comment to explain 2nd cell check
timvaillancourt Jun 20, 2023
90d15b0
Add func fetchKnownCells()
timvaillancourt Jun 24, 2023
abc694d
Add test for fetchKnownCells
timvaillancourt Jun 24, 2023
1cf084d
Restart health stream if topo cells change
timvaillancourt Jun 24, 2023
29c496a
Update cell list when changed
timvaillancourt Jun 25, 2023
a0698dd
log.Fatalf -> Errorf
timvaillancourt Jun 25, 2023
8b287b8
Improve test
timvaillancourt Jun 25, 2023
0e8ae6f
use defer to stop ticker
timvaillancourt Jun 25, 2023
ab35c8b
PR suggestion
timvaillancourt Jun 27, 2023
2f5c44d
Add cellsFromTopo bool to txThrottlerState
timvaillancourt Jun 27, 2023
e7fa178
pass context.Context to .fetchKnownCells(...)
timvaillancourt Jun 28, 2023
d88c453
merge main
timvaillancourt Jul 25, 2023
e25e5ef
PR suggestion
timvaillancourt Jul 25, 2023
42bba79
fix defer-cancel() in loop
timvaillancourt Jul 25, 2023
e616a92
Past context to updateHealthCheckCells
timvaillancourt Jul 25, 2023
2a879ad
PR feedback
timvaillancourt Jul 27, 2023
4d8bff0
Dont update cells in config
timvaillancourt Jul 27, 2023
e7ac2c6
Add "the" to cli flag help + force ci to run again
timvaillancourt Jul 27, 2023
b70a1fa
Missing ts.config -> ts.config.healthCheckCells
timvaillancourt Jul 29, 2023
616f4a6
Fix unit test failure
timvaillancourt Jul 30, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions go/flags/endtoend/vttablet.txt
Original file line number Diff line number Diff line change
Expand Up @@ -349,6 +349,7 @@ Usage of vttablet:
--tx-throttler-default-priority int Default priority assigned to queries that lack priority information (default 100)
--tx-throttler-healthcheck-cells strings Synonym to -tx_throttler_healthcheck_cells
--tx-throttler-tablet-types strings A comma-separated list of tablet types. Only tablets of this type are monitored for replication lag by the transaction throttler. Supported types are replica and/or rdonly. (default replica)
--tx-throttler-topo-refresh-interval duration The rate that the transaction throttler will refresh the topology to find cells. (default 5m0s)
--tx_throttler_config string The configuration of the transaction throttler as a text-formatted throttlerdata.Configuration protocol buffer message. (default "target_replication_lag_sec:2 max_replication_lag_sec:10 initial_rate:100 max_increase:1 emergency_decrease:0.5 min_duration_between_increases_sec:40 max_duration_between_increases_sec:62 min_duration_between_decreases_sec:20 spread_backlog_across_sec:20 age_bad_rate_after_sec:180 bad_rate_increase:0.1 max_rate_approach_threshold:0.9")
--tx_throttler_healthcheck_cells strings A comma-separated list of cells. Only tabletservers running in these cells will be monitored for replication lag by the transaction throttler.
--unhealthy_threshold duration replication lag after which a replica is considered unhealthy (default 2h0m0s)
Expand Down
26 changes: 13 additions & 13 deletions go/vt/vttablet/tabletserver/tabletenv/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,7 @@ func registerTabletEnvFlags(fs *pflag.FlagSet) {
flagutil.DualFormatStringListVar(fs, &currentConfig.TxThrottlerHealthCheckCells, "tx_throttler_healthcheck_cells", defaultConfig.TxThrottlerHealthCheckCells, "A comma-separated list of cells. Only tabletservers running in these cells will be monitored for replication lag by the transaction throttler.")
fs.IntVar(&currentConfig.TxThrottlerDefaultPriority, "tx-throttler-default-priority", defaultConfig.TxThrottlerDefaultPriority, "Default priority assigned to queries that lack priority information")
fs.Var(currentConfig.TxThrottlerTabletTypes, "tx-throttler-tablet-types", "A comma-separated list of tablet types. Only tablets of this type are monitored for replication lag by the transaction throttler. Supported types are replica and/or rdonly.")
fs.DurationVar(&currentConfig.TxThrottlerTopoRefreshInterval, "tx-throttler-topo-refresh-interval", time.Minute*5, "The rate that the transaction throttler will refresh the topology to find cells.")

fs.BoolVar(&enableHotRowProtection, "enable_hot_row_protection", false, "If true, incoming transactions for the same row (range) will be queued and cannot consume all txpool slots.")
fs.BoolVar(&enableHotRowProtectionDryRun, "enable_hot_row_protection_dry_run", false, "If true, hot row protection is not enforced but logs if transactions would have been queued.")
Expand Down Expand Up @@ -359,11 +360,12 @@ type TabletConfig struct {
TwoPCCoordinatorAddress string `json:"-"`
TwoPCAbandonAge Seconds `json:"-"`

EnableTxThrottler bool `json:"-"`
TxThrottlerConfig *TxThrottlerConfigFlag `json:"-"`
TxThrottlerHealthCheckCells []string `json:"-"`
TxThrottlerDefaultPriority int `json:"-"`
TxThrottlerTabletTypes *topoproto.TabletTypeListFlag `json:"-"`
EnableTxThrottler bool `json:"-"`
TxThrottlerConfig *TxThrottlerConfigFlag `json:"-"`
TxThrottlerHealthCheckCells []string `json:"-"`
TxThrottlerDefaultPriority int `json:"-"`
TxThrottlerTabletTypes *topoproto.TabletTypeListFlag `json:"-"`
TxThrottlerTopoRefreshInterval time.Duration `json:"-"`

EnableTableGC bool `json:"-"` // can be turned off programmatically by tests

Expand Down Expand Up @@ -721,9 +723,6 @@ func (c *TabletConfig) verifyTxThrottlerConfig() error {
return vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "failed to parse throttlerdatapb.Configuration config: %v", err)
}

if len(c.TxThrottlerHealthCheckCells) == 0 {
return vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "empty healthCheckCells given: %+v", c.TxThrottlerHealthCheckCells)
}
if v := c.TxThrottlerDefaultPriority; v > sqlparser.MaxPriorityValue || v < 0 {
return vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "--tx-throttler-default-priority must be > 0 and < 100 (specified value: %d)", v)
}
Expand Down Expand Up @@ -827,11 +826,12 @@ var defaultConfig = TabletConfig{
MessagePostponeParallelism: 4,
SignalWhenSchemaChange: true,

EnableTxThrottler: false,
TxThrottlerConfig: defaultTxThrottlerConfig(),
TxThrottlerHealthCheckCells: []string{},
TxThrottlerDefaultPriority: sqlparser.MaxPriorityValue, // This leads to all queries being candidates to throttle
TxThrottlerTabletTypes: &topoproto.TabletTypeListFlag{topodatapb.TabletType_REPLICA},
EnableTxThrottler: false,
TxThrottlerConfig: defaultTxThrottlerConfig(),
TxThrottlerHealthCheckCells: []string{},
TxThrottlerDefaultPriority: sqlparser.MaxPriorityValue, // This leads to all queries being candidates to throttle
TxThrottlerTabletTypes: &topoproto.TabletTypeListFlag{topodatapb.TabletType_REPLICA},
TxThrottlerTopoRefreshInterval: time.Minute * 5,

TransactionLimitConfig: defaultTransactionLimitConfig(),

Expand Down
7 changes: 0 additions & 7 deletions go/vt/vttablet/tabletserver/tabletenv/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -382,13 +382,6 @@ func TestVerifyTxThrottlerConfig(t *testing.T) {
EnableTxThrottler: true,
TxThrottlerConfig: &TxThrottlerConfigFlag{invalidMaxReplicationLagModuleConfig},
},
{
// enabled without cells defined
Name: "enabled without cells",
ExpectedErrorCode: vtrpcpb.Code_FAILED_PRECONDITION,
EnableTxThrottler: true,
TxThrottlerConfig: &TxThrottlerConfigFlag{defaultMaxReplicationLagModuleConfig},
},
{
// enabled with good config (default/replica tablet type)
Name: "enabled",
Expand Down
148 changes: 103 additions & 45 deletions go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package txthrottler
import (
"context"
"math/rand"
"reflect"
"strings"
"sync"
"time"
Expand Down Expand Up @@ -101,6 +102,17 @@ type TopologyWatcherInterface interface {
// go/vt/throttler.GlobalManager.
const TxThrottlerName = "TransactionThrottler"

// fetchKnownCells gathers a list of known cells from the topology. On error,
// the cell of the local tablet will be used and an error is logged.
func fetchKnownCells(ctx context.Context, topoServer *topo.Server, target *querypb.Target) []string {
cells, err := topoServer.GetKnownCells(ctx)
if err != nil {
log.Errorf("txThrottler: falling back to local cell due to error fetching cells from topology: %+v", err)
cells = []string{target.Cell}
}
return cells
}

// txThrottler implements TxThrottle for throttling transactions based on replication lag.
// It's a thin wrapper around the throttler found in vitess/go/vt/throttler.
// It uses a discovery.HealthCheck to send replication-lag updates to the wrapped throttler.
Expand Down Expand Up @@ -165,6 +177,9 @@ type txThrottlerConfig struct {

// tabletTypes stores the tablet types for throttling
tabletTypes map[topodatapb.TabletType]bool

// rate to refresh topo for cells
topoRefreshInterval time.Duration
}

// txThrottlerState holds the state of an open TxThrottler object.
Expand All @@ -174,12 +189,15 @@ type txThrottlerState struct {

// throttleMu serializes calls to throttler.Throttler.Throttle(threadId).
// That method is required to be called in serial for each threadId.
throttleMu sync.Mutex
throttler ThrottlerInterface
stopHealthCheck context.CancelFunc
throttleMu sync.Mutex
throttler ThrottlerInterface
stopHealthCheck context.CancelFunc
topologyWatchers map[string]TopologyWatcherInterface

healthCheck discovery.HealthCheck
topologyWatchers map[string]TopologyWatcherInterface
healthCheckChan chan *discovery.TabletHealth
healthCheckCells []string
cellsFromTopo bool
}

// NewTxThrottler tries to construct a txThrottler from the
Expand All @@ -201,10 +219,11 @@ func NewTxThrottler(env tabletenv.Env, topoServer *topo.Server) TxThrottler {
}

throttlerConfig = &txThrottlerConfig{
enabled: true,
tabletTypes: tabletTypes,
throttlerConfig: env.Config().TxThrottlerConfig.Get(),
healthCheckCells: healthCheckCells,
enabled: true,
healthCheckCells: healthCheckCells,
tabletTypes: tabletTypes,
throttlerConfig: env.Config().TxThrottlerConfig.Get(),
topoRefreshInterval: env.Config().TxThrottlerTopoRefreshInterval,
}

defer log.Infof("Initialized transaction throttler with config: %+v", throttlerConfig)
Expand Down Expand Up @@ -301,44 +320,91 @@ func newTxThrottlerState(txThrottler *txThrottler, config *txThrottlerConfig, ta
return nil, err
}
state := &txThrottlerState{
config: config,
throttler: t,
txThrottler: txThrottler,
config: config,
healthCheckCells: config.healthCheckCells,
throttler: t,
txThrottler: txThrottler,
}
createTxThrottlerHealthCheck(txThrottler.topoServer, config, state, target.Cell)

state.topologyWatchers = make(
map[string]TopologyWatcherInterface, len(config.healthCheckCells))
for _, cell := range config.healthCheckCells {
state.topologyWatchers[cell] = topologyWatcherFactory(
txThrottler.topoServer,
state.healthCheck,

// get cells from topo if none defined in tabletenv config
if len(state.healthCheckCells) == 0 {
ctx, cancel := context.WithTimeout(context.Background(), topo.RemoteOperationTimeout)
defer cancel()
state.healthCheckCells = fetchKnownCells(ctx, txThrottler.topoServer, target)
state.cellsFromTopo = true
}

ctx, cancel := context.WithCancel(context.Background())
state.stopHealthCheck = cancel
state.initHealthCheckStream(txThrottler.topoServer, target)
go state.healthChecksProcessor(ctx, txThrottler.topoServer, target)

return state, nil
}

func (ts *txThrottlerState) initHealthCheckStream(topoServer *topo.Server, target *querypb.Target) {
ts.healthCheck = healthCheckFactory(topoServer, target.Cell, ts.healthCheckCells)
ts.healthCheckChan = ts.healthCheck.Subscribe()

ts.topologyWatchers = make(
map[string]TopologyWatcherInterface, len(ts.healthCheckCells))
for _, cell := range ts.healthCheckCells {
ts.topologyWatchers[cell] = topologyWatcherFactory(
topoServer,
ts.healthCheck,
cell,
target.Keyspace,
target.Shard,
discovery.DefaultTopologyWatcherRefreshInterval,
discovery.DefaultTopoReadConcurrency,
)
txThrottler.topoWatchers.Add(cell, 1)
ts.txThrottler.topoWatchers.Add(cell, 1)
}
return state, nil
}

func createTxThrottlerHealthCheck(topoServer *topo.Server, config *txThrottlerConfig, result *txThrottlerState, cell string) {
ctx, cancel := context.WithCancel(context.Background())
result.stopHealthCheck = cancel
result.healthCheck = healthCheckFactory(topoServer, cell, config.healthCheckCells)
ch := result.healthCheck.Subscribe()
go func(ctx context.Context) {
for {
select {
case <-ctx.Done():
return
case th := <-ch:
result.StatsUpdate(th)
}
func (ts *txThrottlerState) closeHealthCheckStream() {
if ts.healthCheck == nil {
return
}
for cell, watcher := range ts.topologyWatchers {
watcher.Stop()
ts.txThrottler.topoWatchers.Reset(cell)
}
ts.topologyWatchers = nil
ts.stopHealthCheck()
ts.healthCheck.Close()
}

func (ts *txThrottlerState) updateHealthCheckCells(ctx context.Context, topoServer *topo.Server, target *querypb.Target) {
fetchCtx, cancel := context.WithTimeout(ctx, topo.RemoteOperationTimeout)
defer cancel()

knownCells := fetchKnownCells(fetchCtx, topoServer, target)
if !reflect.DeepEqual(knownCells, ts.healthCheckCells) {
log.Info("txThrottler: restarting healthcheck stream due to topology cells update")
ts.healthCheckCells = knownCells
ts.closeHealthCheckStream()
ts.initHealthCheckStream(topoServer, target)
}
}

func (ts *txThrottlerState) healthChecksProcessor(ctx context.Context, topoServer *topo.Server, target *querypb.Target) {
var cellsUpdateTicks <-chan time.Time
if ts.cellsFromTopo {
ticker := time.NewTicker(ts.config.topoRefreshInterval)
cellsUpdateTicks = ticker.C
defer ticker.Stop()
}
for {
select {
case <-ctx.Done():
return
case <-cellsUpdateTicks:
ts.updateHealthCheckCells(ctx, topoServer, target)
case th := <-ts.healthCheckChan:
ts.StatsUpdate(th)
}
}(ctx)
}
}

func (ts *txThrottlerState) throttle() bool {
Expand All @@ -353,16 +419,8 @@ func (ts *txThrottlerState) throttle() bool {
}

func (ts *txThrottlerState) deallocateResources() {
// We don't really need to nil out the fields here
// as deallocateResources is not expected to be called
// more than once, but it doesn't hurt to do so.
for cell, watcher := range ts.topologyWatchers {
watcher.Stop()
ts.txThrottler.topoWatchers.Reset(cell)
}
ts.topologyWatchers = nil

ts.healthCheck.Close()
// Close healthcheck and topo watchers
ts.closeHealthCheckStream()
ts.healthCheck = nil

// After ts.healthCheck is closed txThrottlerState.StatsUpdate() is guaranteed not
Expand Down
15 changes: 14 additions & 1 deletion go/vt/vttablet/tabletserver/txthrottler/tx_throttler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ package txthrottler
//go:generate mockgen -destination mock_topology_watcher_test.go -package txthrottler vitess.io/vitess/go/vt/vttablet/tabletserver/txthrottler TopologyWatcherInterface

import (
"context"
"testing"
"time"

Expand Down Expand Up @@ -112,7 +113,6 @@ func TestEnabledThrottler(t *testing.T) {

config := tabletenv.NewDefaultConfig()
config.EnableTxThrottler = true
config.TxThrottlerHealthCheckCells = []string{"cell1", "cell2"}
config.TxThrottlerTabletTypes = &topoproto.TabletTypeListFlag{topodatapb.TabletType_REPLICA}

env := tabletenv.NewEnv(config, t.Name())
Expand Down Expand Up @@ -161,6 +161,19 @@ func TestEnabledThrottler(t *testing.T) {
assert.Equal(t, map[string]int64{"cell1": 0, "cell2": 0}, throttlerImpl.topoWatchers.Counts())
}

func TestFetchKnownCells(t *testing.T) {
{
ts := memorytopo.NewServer("cell1", "cell2")
cells := fetchKnownCells(context.Background(), ts, &querypb.Target{Cell: "cell1"})
assert.Equal(t, []string{"cell1", "cell2"}, cells)
}
{
ts := memorytopo.NewServer()
cells := fetchKnownCells(context.Background(), ts, &querypb.Target{Cell: "cell1"})
assert.Equal(t, []string{"cell1"}, cells)
}
}

func TestNewTxThrottler(t *testing.T) {
config := tabletenv.NewDefaultConfig()
env := tabletenv.NewEnv(config, t.Name())
Expand Down