Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Throttler: exempt apps via UpdateThrottlerConfig --throttle-app-exempt #13666

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions go/cmd/vtctldclient/command/throttler.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ func init() {
UpdateThrottlerConfig.Flags().StringVar(&throttledAppRule.Name, "throttle-app", "", "an app name to throttle")
UpdateThrottlerConfig.Flags().Float64Var(&throttledAppRule.Ratio, "throttle-app-ratio", throttle.DefaultThrottleRatio, "ratio to throttle app (app specififed in --throttled-app)")
UpdateThrottlerConfig.Flags().DurationVar(&throttledAppDuration, "throttle-app-duration", throttle.DefaultAppThrottleDuration, "duration after which throttled app rule expires (app specififed in --throttled-app)")
UpdateThrottlerConfig.Flags().BoolVar(&throttledAppRule.Exempt, "throttle-app-exempt", throttledAppRule.Exempt, "exempt this app from being at all throttled. WARNING: use with extreme care, as this is likely to push metrics beyond the throttler's threshold, and starve other apps")

Root.AddCommand(UpdateThrottlerConfig)
}
2 changes: 1 addition & 1 deletion go/test/endtoend/onlineddl/vrepl/onlineddl_vrepl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ func TestSchemaChange(t *testing.T) {
}
})
t.Run("updating throttler config", func(t *testing.T) {
_, err := throttler.UpdateThrottlerTopoConfig(clusterInstance, true, false, customThreshold, noCustomQuery)
_, err := throttler.UpdateThrottlerTopoConfig(clusterInstance, true, false, customThreshold, noCustomQuery, nil)
require.NoError(t, err)
})

Expand Down
44 changes: 33 additions & 11 deletions go/test/endtoend/tabletmanager/throttler_topo/throttler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,11 @@ import (

"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/base"

"vitess.io/vitess/go/test/endtoend/cluster"
"vitess.io/vitess/go/test/endtoend/throttler"
"vitess.io/vitess/go/vt/logutil"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
"vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/base"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand All @@ -45,6 +46,7 @@ const (
onDemandHeartbeatDuration = 5 * time.Second
throttlerEnabledTimeout = 60 * time.Second
useDefaultQuery = ""
testAppName = "test"
)

var (
Expand Down Expand Up @@ -170,12 +172,12 @@ func throttledApps(tablet *cluster.Vttablet) (resp *http.Response, respBody stri
}

func throttleCheck(tablet *cluster.Vttablet, skipRequestHeartbeats bool) (*http.Response, error) {
resp, err := httpClient.Get(fmt.Sprintf("http://localhost:%d/%s?app=test&s=%t", tablet.HTTPPort, checkAPIPath, skipRequestHeartbeats))
resp, err := httpClient.Get(fmt.Sprintf("http://localhost:%d/%s?app=%s&s=%t", tablet.HTTPPort, checkAPIPath, testAppName, skipRequestHeartbeats))
return resp, err
}

func throttleCheckSelf(tablet *cluster.Vttablet) (*http.Response, error) {
return httpClient.Get(fmt.Sprintf("http://localhost:%d/%s?app=test", tablet.HTTPPort, checkSelfAPIPath))
return httpClient.Get(fmt.Sprintf("http://localhost:%d/%s?app=%s", tablet.HTTPPort, checkSelfAPIPath, testAppName))
}

func warmUpHeartbeat(t *testing.T) (respStatus int) {
Expand Down Expand Up @@ -245,7 +247,7 @@ func TestInitialThrottler(t *testing.T) {
waitForThrottleCheckStatus(t, primaryTablet, http.StatusOK)
})
t.Run("enabling throttler with very low threshold", func(t *testing.T) {
_, err := throttler.UpdateThrottlerTopoConfig(clusterInstance, true, false, unreasonablyLowThreshold.Seconds(), useDefaultQuery)
_, err := throttler.UpdateThrottlerTopoConfig(clusterInstance, true, false, unreasonablyLowThreshold.Seconds(), useDefaultQuery, nil)
assert.NoError(t, err)

// Wait for the throttler to be enabled everywhere with the new config.
Expand All @@ -257,7 +259,7 @@ func TestInitialThrottler(t *testing.T) {
waitForThrottleCheckStatus(t, primaryTablet, http.StatusTooManyRequests)
})
t.Run("disabling throttler", func(t *testing.T) {
_, err := throttler.UpdateThrottlerTopoConfig(clusterInstance, false, true, unreasonablyLowThreshold.Seconds(), useDefaultQuery)
_, err := throttler.UpdateThrottlerTopoConfig(clusterInstance, false, true, unreasonablyLowThreshold.Seconds(), useDefaultQuery, nil)
assert.NoError(t, err)

// Wait for the throttler to be disabled everywhere.
Expand All @@ -271,7 +273,7 @@ func TestInitialThrottler(t *testing.T) {
t.Run("enabling throttler, again", func(t *testing.T) {
// Enable the throttler again with the default query which also moves us back
// to the default threshold.
_, err := throttler.UpdateThrottlerTopoConfig(clusterInstance, true, false, 0, useDefaultQuery)
_, err := throttler.UpdateThrottlerTopoConfig(clusterInstance, true, false, 0, useDefaultQuery, nil)
assert.NoError(t, err)

// Wait for the throttler to be enabled everywhere again with the default config.
Expand All @@ -283,7 +285,7 @@ func TestInitialThrottler(t *testing.T) {
waitForThrottleCheckStatus(t, primaryTablet, http.StatusTooManyRequests)
})
t.Run("setting high threshold", func(t *testing.T) {
_, err := throttler.UpdateThrottlerTopoConfig(clusterInstance, false, false, extremelyHighThreshold.Seconds(), useDefaultQuery)
_, err := throttler.UpdateThrottlerTopoConfig(clusterInstance, false, false, extremelyHighThreshold.Seconds(), useDefaultQuery, nil)
assert.NoError(t, err)

// Wait for the throttler to be enabled everywhere with new config.
Expand All @@ -295,7 +297,7 @@ func TestInitialThrottler(t *testing.T) {
waitForThrottleCheckStatus(t, primaryTablet, http.StatusOK)
})
t.Run("setting low threshold", func(t *testing.T) {
_, err := throttler.UpdateThrottlerTopoConfig(clusterInstance, false, false, throttler.DefaultThreshold.Seconds(), useDefaultQuery)
_, err := throttler.UpdateThrottlerTopoConfig(clusterInstance, false, false, throttler.DefaultThreshold.Seconds(), useDefaultQuery, nil)
assert.NoError(t, err)

// Wait for the throttler to be enabled everywhere with new config.
Expand Down Expand Up @@ -392,6 +394,26 @@ func TestLag(t *testing.T) {
defer resp.Body.Close()
assert.Equalf(t, http.StatusTooManyRequests, resp.StatusCode, "Unexpected response from throttler: %s", getResponseBody(resp))
})
t.Run("exempting test app", func(t *testing.T) {
appRule := &topodatapb.ThrottledAppRule{
Name: testAppName,
ExpiresAt: logutil.TimeToProto(time.Now().Add(time.Hour)),
Exempt: true,
}
_, err := throttler.UpdateThrottlerTopoConfig(clusterInstance, false, false, throttler.DefaultThreshold.Seconds(), useDefaultQuery, appRule)
assert.NoError(t, err)
waitForThrottleCheckStatus(t, primaryTablet, http.StatusOK)
})
t.Run("unexempting test app", func(t *testing.T) {
appRule := &topodatapb.ThrottledAppRule{
Name: testAppName,
ExpiresAt: logutil.TimeToProto(time.Now()),
}
_, err := throttler.UpdateThrottlerTopoConfig(clusterInstance, false, false, throttler.DefaultThreshold.Seconds(), useDefaultQuery, appRule)
assert.NoError(t, err)
waitForThrottleCheckStatus(t, primaryTablet, http.StatusTooManyRequests)
})

t.Run("starting replication", func(t *testing.T) {
err := clusterInstance.VtctlclientProcess.ExecuteCommand("StartReplication", replicaTablet.Alias)
assert.NoError(t, err)
Expand Down Expand Up @@ -436,7 +458,7 @@ func TestCustomQuery(t *testing.T) {
defer cluster.PanicHandler(t)

t.Run("enabling throttler with custom query and threshold", func(t *testing.T) {
_, err := throttler.UpdateThrottlerTopoConfig(clusterInstance, true, false, customThreshold, customQuery)
_, err := throttler.UpdateThrottlerTopoConfig(clusterInstance, true, false, customThreshold, customQuery, nil)
assert.NoError(t, err)

// Wait for the throttler to be enabled everywhere with new custom config.
Expand Down Expand Up @@ -504,7 +526,7 @@ func TestRestoreDefaultQuery(t *testing.T) {

// Validate going back from custom-query to default-query (replication lag) still works.
t.Run("enabling throttler with default query and threshold", func(t *testing.T) {
_, err := throttler.UpdateThrottlerTopoConfig(clusterInstance, true, false, throttler.DefaultThreshold.Seconds(), useDefaultQuery)
_, err := throttler.UpdateThrottlerTopoConfig(clusterInstance, true, false, throttler.DefaultThreshold.Seconds(), useDefaultQuery, nil)
assert.NoError(t, err)

// Wait for the throttler to be up and running everywhere again with the default config.
Expand Down
18 changes: 14 additions & 4 deletions go/test/endtoend/throttler/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ import (
"vitess.io/vitess/go/test/endtoend/cluster"
"vitess.io/vitess/go/vt/concurrency"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/logutil"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
"vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/base"
"vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/throttlerapp"
)
Expand All @@ -57,7 +59,7 @@ var DefaultConfig = &Config{
// This retries the command until it succeeds or times out as the
// SrvKeyspace record may not yet exist for a newly created
// Keyspace that is still initializing before it becomes serving.
func UpdateThrottlerTopoConfigRaw(vtctldProcess *cluster.VtctldClientProcess, keyspaceName string, enable bool, disable bool, threshold float64, metricsQuery string) (result string, err error) {
func UpdateThrottlerTopoConfigRaw(vtctldProcess *cluster.VtctldClientProcess, keyspaceName string, enable bool, disable bool, threshold float64, metricsQuery string, appRule *topodatapb.ThrottledAppRule) (result string, err error) {
args := []string{}
args = append(args, "UpdateThrottlerConfig")
if enable {
Expand All @@ -75,6 +77,14 @@ func UpdateThrottlerTopoConfigRaw(vtctldProcess *cluster.VtctldClientProcess, ke
} else {
args = append(args, "--check-as-check-shard")
}
if appRule != nil {
args = append(args, "--throttle-app", appRule.Name)
args = append(args, "--throttle-app-duration", logutil.ProtoToTime(appRule.ExpiresAt).Sub(time.Now()).String())
args = append(args, "--throttle-app-ratio", fmt.Sprintf("%f", appRule.Ratio))
if appRule.Exempt {
args = append(args, "--throttle-app-exempt")
}
}
args = append(args, keyspaceName)

ctx, cancel := context.WithTimeout(context.Background(), ConfigTimeout)
Expand All @@ -100,14 +110,14 @@ func UpdateThrottlerTopoConfigRaw(vtctldProcess *cluster.VtctldClientProcess, ke
// This retries the command until it succeeds or times out as the
// SrvKeyspace record may not yet exist for a newly created
// Keyspace that is still initializing before it becomes serving.
func UpdateThrottlerTopoConfig(clusterInstance *cluster.LocalProcessCluster, enable bool, disable bool, threshold float64, metricsQuery string) (string, error) {
func UpdateThrottlerTopoConfig(clusterInstance *cluster.LocalProcessCluster, enable bool, disable bool, threshold float64, metricsQuery string, appRule *topodatapb.ThrottledAppRule) (string, error) {
rec := concurrency.AllErrorRecorder{}
var (
err error
res strings.Builder
)
for _, ks := range clusterInstance.Keyspaces {
ires, err := UpdateThrottlerTopoConfigRaw(&clusterInstance.VtctldClientProcess, ks.Name, enable, disable, threshold, metricsQuery)
ires, err := UpdateThrottlerTopoConfigRaw(&clusterInstance.VtctldClientProcess, ks.Name, enable, disable, threshold, metricsQuery, appRule)
if err != nil {
rec.RecordError(err)
}
Expand Down Expand Up @@ -337,7 +347,7 @@ func WaitForThrottledApp(t *testing.T, tablet *cluster.Vttablet, throttlerApp th
// The throttler is configued to use the standard replication lag metric. The function waits until the throttler is confirmed
// to be running on all tablets.
func EnableLagThrottlerAndWaitForStatus(t *testing.T, clusterInstance *cluster.LocalProcessCluster, lag time.Duration) {
_, err := UpdateThrottlerTopoConfig(clusterInstance, true, false, lag.Seconds(), "")
_, err := UpdateThrottlerTopoConfig(clusterInstance, true, false, lag.Seconds(), "", nil)
require.NoError(t, err)

for _, ks := range clusterInstance.Keyspaces {
Expand Down
2 changes: 1 addition & 1 deletion go/test/endtoend/vreplication/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -369,7 +369,7 @@ func (vc *VitessCluster) AddKeyspace(t *testing.T, cells []*Cell, ksName string,
}

log.Infof("Applying throttler config for keyspace %s", keyspace.Name)
res, err := throttler.UpdateThrottlerTopoConfigRaw(vc.VtctldClient, keyspace.Name, true, false, throttlerConfig.Threshold, throttlerConfig.Query)
res, err := throttler.UpdateThrottlerTopoConfigRaw(vc.VtctldClient, keyspace.Name, true, false, throttlerConfig.Threshold, throttlerConfig.Query, nil)
require.NoError(t, err, res)

cellsToWatch := ""
Expand Down
Loading