Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Distributor: add config flag to control error code when rate limit reached #5752

Merged
merged 9 commits into from
Aug 17, 2023
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
* [CHANGE] The `-shutdown-delay` flag is no longer experimental. #5701
* [CHANGE] The `-validation.create-grace-period` is now enforced in the ingester too, other than distributor and query-frontend. If you've configured `-validation.create-grace-period` then make sure the configuration is applied to ingesters too. #5712
* [CHANGE] The `-validation.create-grace-period` is now enforced for examplars too in the distributor. If an examplar has timestamp greater than "now + grace_period", then the exemplar will be dropped and the metric `cortex_discarded_exemplars_total{reason="exemplar_too_far_in_future",user="..."}` increased. #5761
* [FEATURE] Introduced `distributor.service_overload_status_code_on_rate_limit_enabled` flag for configuring status code to 529 instead of 429 upon rate limit exhaustion. #5752
* [FEATURE] Cardinality API: Add a new `count_method` parameter which enables counting active series #5136
* [FEATURE] Query-frontend: added experimental support to cache cardinality, label names and label values query responses. The cache will be used when `-query-frontend.cache-results` is enabled, and `-query-frontend.results-cache-ttl-for-cardinality-query` or `-query-frontend.results-cache-ttl-for-labels-query` set to a value greater than 0. The following metrics have been added to track the query results cache hit ratio per `request_type`: #5212 #5235 #5426 #5524
* `cortex_frontend_query_result_cache_requests_total{request_type="query_range|cardinality|label_names_and_values"}`
Expand Down
11 changes: 11 additions & 0 deletions cmd/mimir/config-descriptor.json
Original file line number Diff line number Diff line change
Expand Up @@ -3020,6 +3020,17 @@
"fieldType": "relabel_config...",
"fieldCategory": "experimental"
},
{
"kind": "field",
"name": "service_overload_status_code_on_rate_limit_enabled",
"required": false,
"desc": "If enabled, rate limit errors will be reported to the client with HTTP status code 529 (Service is overloaded). If disabled, status code 429 (Too Many Requests) is used.",
"fieldValue": null,
"fieldDefaultValue": false,
"fieldFlag": "distributor.service-overload-status-code-on-rate-limit-enabled",
"fieldType": "boolean",
"fieldCategory": "experimental"
},
{
"kind": "field",
"name": "max_global_series_per_user",
Expand Down
2 changes: 2 additions & 0 deletions cmd/mimir/help-all.txt.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -1187,6 +1187,8 @@ Usage of ./cmd/mimir/mimir:
The prefix for the keys in the store. Should end with a /. (default "collectors/")
-distributor.ring.store string
Backend storage to use for the ring. Supported values are: consul, etcd, inmemory, memberlist, multi. (default "memberlist")
-distributor.service-overload-status-code-on-rate-limit-enabled
[experimental] If enabled, rate limit errors will be reported to the client with HTTP status code 529 (Service is overloaded). If disabled, status code 429 (Too Many Requests) is used.
-enable-go-runtime-metrics
Set to true to enable all Go runtime metrics, such as go_sched_* and go_memstats_*.
-flusher.exit-after-flush
Expand Down
2 changes: 2 additions & 0 deletions docs/sources/mimir/configure/about-versioning.md
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ The following features are currently experimental:
- Distributor
- Metrics relabeling
- OTLP ingestion path
- Using status code 529 instead of 429 upon rate limit exhaustion.
- `distributor.service-overload-status-code-on-rate-limit-enabled`
- Hash ring
- Disabling ring heartbeat timeouts
- `-distributor.ring.heartbeat-timeout=0`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2754,6 +2754,12 @@ The `limits` block configures default and per-tenant limits imposed by component
# during the relabeling phase and cleaned afterwards: __meta_tenant_id
[metric_relabel_configs: <relabel_config...> | default = ]

# (experimental) If enabled, rate limit errors will be reported to the client
# with HTTP status code 529 (Service is overloaded). If disabled, status code
# 429 (Too Many Requests) is used.
# CLI flag: -distributor.service-overload-status-code-on-rate-limit-enabled
[service_overload_status_code_on_rate_limit_enabled: <boolean> | default = false]

# The maximum number of in-memory series per tenant, across the cluster before
# replication. 0 to disable.
# CLI flag: -ingester.max-global-series-per-user
Expand Down
8 changes: 7 additions & 1 deletion pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,9 @@ const (
// Size of "slab" when using pooled buffers for marshaling write requests. When handling single Push request
// buffers for multiple write requests sent to ingesters will be allocated from single "slab", if there is enough space.
writeRequestSlabPoolSize = 512 * 1024

// 529 is non-standard status code used by some services to signal that "The service is overloaded".
statusServiceOverload = 529
ying-jeanne marked this conversation as resolved.
Show resolved Hide resolved
)

// Distributor forwards appends and queries to individual ingesters.
Expand Down Expand Up @@ -1041,9 +1044,12 @@ func (d *Distributor) limitsMiddleware(next push.Func) push.Func {
if !d.requestRateLimiter.AllowN(now, userID, 1) {
d.discardedRequestsRateLimited.WithLabelValues(userID).Add(1)

// Return a 429 here to tell the client it is going too fast.
// Return a 429 or a 529 here depending on configuration to tell the client it is going too fast.
// Client may discard the data or slow down and re-send.
// Prometheus v2.26 added a remote-write option 'retry_on_http_429'.
if d.limits.ServiceOverloadStatusCodeOnRateLimitEnabled(userID) {
return nil, httpgrpc.Errorf(statusServiceOverload, validation.NewRequestRateLimitedError(d.limits.RequestRate(userID), d.limits.RequestBurstSize(userID)).Error())
}
return nil, httpgrpc.Errorf(http.StatusTooManyRequests, validation.NewRequestRateLimitedError(d.limits.RequestRate(userID), d.limits.RequestBurstSize(userID)).Error())
}

Expand Down
21 changes: 17 additions & 4 deletions pkg/distributor/distributor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -472,10 +472,11 @@ func TestDistributor_PushRequestRateLimiter(t *testing.T) {
}
ctx := user.InjectOrgID(context.Background(), "user")
tests := map[string]struct {
distributors int
requestRate float64
requestBurstSize int
pushes []testPush
distributors int
requestRate float64
requestBurstSize int
pushes []testPush
enableServiceOverloadError bool
}{
"request limit should be evenly shared across distributors": {
distributors: 2,
Expand Down Expand Up @@ -508,6 +509,17 @@ func TestDistributor_PushRequestRateLimiter(t *testing.T) {
{expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, validation.NewRequestRateLimitedError(2, 3).Error())},
},
},
"request limit is reached return 529 when enable service overload error set to true": {
distributors: 2,
requestRate: 4,
requestBurstSize: 2,
enableServiceOverloadError: true,
pushes: []testPush{
{expectedError: nil},
{expectedError: nil},
{expectedError: httpgrpc.Errorf(statusServiceOverload, validation.NewRequestRateLimitedError(4, 2).Error())},
},
},
}

for testName, testData := range tests {
Expand All @@ -518,6 +530,7 @@ func TestDistributor_PushRequestRateLimiter(t *testing.T) {
flagext.DefaultValues(limits)
limits.RequestRate = testData.requestRate
limits.RequestBurstSize = testData.requestBurstSize
limits.ServiceOverloadStatusCodeOnRateLimitEnabled = testData.enableServiceOverloadError

// Start all expected distributors
distributors, _, _ := prepare(t, prepConfig{
Expand Down
44 changes: 25 additions & 19 deletions pkg/util/validation/limits.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,25 +66,25 @@ func (e LimitError) Error() string {
// limits via flags, or per-user limits via yaml config.
type Limits struct {
// Distributor enforced limits.
RequestRate float64 `yaml:"request_rate" json:"request_rate"`
RequestBurstSize int `yaml:"request_burst_size" json:"request_burst_size"`
IngestionRate float64 `yaml:"ingestion_rate" json:"ingestion_rate"`
IngestionBurstSize int `yaml:"ingestion_burst_size" json:"ingestion_burst_size"`
AcceptHASamples bool `yaml:"accept_ha_samples" json:"accept_ha_samples"`
HAClusterLabel string `yaml:"ha_cluster_label" json:"ha_cluster_label"`
HAReplicaLabel string `yaml:"ha_replica_label" json:"ha_replica_label"`
HAMaxClusters int `yaml:"ha_max_clusters" json:"ha_max_clusters"`
DropLabels flagext.StringSlice `yaml:"drop_labels" json:"drop_labels" category:"advanced"`
MaxLabelNameLength int `yaml:"max_label_name_length" json:"max_label_name_length"`
MaxLabelValueLength int `yaml:"max_label_value_length" json:"max_label_value_length"`
MaxLabelNamesPerSeries int `yaml:"max_label_names_per_series" json:"max_label_names_per_series"`
MaxMetadataLength int `yaml:"max_metadata_length" json:"max_metadata_length"`
MaxNativeHistogramBuckets int `yaml:"max_native_histogram_buckets" json:"max_native_histogram_buckets"`
CreationGracePeriod model.Duration `yaml:"creation_grace_period" json:"creation_grace_period" category:"advanced"`
EnforceMetadataMetricName bool `yaml:"enforce_metadata_metric_name" json:"enforce_metadata_metric_name" category:"advanced"`
IngestionTenantShardSize int `yaml:"ingestion_tenant_shard_size" json:"ingestion_tenant_shard_size"`
MetricRelabelConfigs []*relabel.Config `yaml:"metric_relabel_configs,omitempty" json:"metric_relabel_configs,omitempty" doc:"nocli|description=List of metric relabel configurations. Note that in most situations, it is more effective to use metrics relabeling directly in the Prometheus server, e.g. remote_write.write_relabel_configs. Labels available during the relabeling phase and cleaned afterwards: __meta_tenant_id" category:"experimental"`

RequestRate float64 `yaml:"request_rate" json:"request_rate"`
RequestBurstSize int `yaml:"request_burst_size" json:"request_burst_size"`
IngestionRate float64 `yaml:"ingestion_rate" json:"ingestion_rate"`
IngestionBurstSize int `yaml:"ingestion_burst_size" json:"ingestion_burst_size"`
AcceptHASamples bool `yaml:"accept_ha_samples" json:"accept_ha_samples"`
HAClusterLabel string `yaml:"ha_cluster_label" json:"ha_cluster_label"`
HAReplicaLabel string `yaml:"ha_replica_label" json:"ha_replica_label"`
HAMaxClusters int `yaml:"ha_max_clusters" json:"ha_max_clusters"`
DropLabels flagext.StringSlice `yaml:"drop_labels" json:"drop_labels" category:"advanced"`
MaxLabelNameLength int `yaml:"max_label_name_length" json:"max_label_name_length"`
MaxLabelValueLength int `yaml:"max_label_value_length" json:"max_label_value_length"`
MaxLabelNamesPerSeries int `yaml:"max_label_names_per_series" json:"max_label_names_per_series"`
MaxMetadataLength int `yaml:"max_metadata_length" json:"max_metadata_length"`
MaxNativeHistogramBuckets int `yaml:"max_native_histogram_buckets" json:"max_native_histogram_buckets"`
CreationGracePeriod model.Duration `yaml:"creation_grace_period" json:"creation_grace_period" category:"advanced"`
EnforceMetadataMetricName bool `yaml:"enforce_metadata_metric_name" json:"enforce_metadata_metric_name" category:"advanced"`
IngestionTenantShardSize int `yaml:"ingestion_tenant_shard_size" json:"ingestion_tenant_shard_size"`
MetricRelabelConfigs []*relabel.Config `yaml:"metric_relabel_configs,omitempty" json:"metric_relabel_configs,omitempty" doc:"nocli|description=List of metric relabel configurations. Note that in most situations, it is more effective to use metrics relabeling directly in the Prometheus server, e.g. remote_write.write_relabel_configs. Labels available during the relabeling phase and cleaned afterwards: __meta_tenant_id" category:"experimental"`
ServiceOverloadStatusCodeOnRateLimitEnabled bool `yaml:"service_overload_status_code_on_rate_limit_enabled" json:"service_overload_status_code_on_rate_limit_enabled" category:"experimental"`
// Ingester enforced limits.
// Series
MaxGlobalSeriesPerUser int `yaml:"max_global_series_per_user" json:"max_global_series_per_user"`
Expand Down Expand Up @@ -185,6 +185,7 @@ type Limits struct {
func (l *Limits) RegisterFlags(f *flag.FlagSet) {
f.IntVar(&l.IngestionTenantShardSize, "distributor.ingestion-tenant-shard-size", 0, "The tenant's shard size used by shuffle-sharding. This value is the total size of the shard (ie. it is not the number of ingesters in the shard per zone, but the number of ingesters in the shard across all zones, if zone-awareness is enabled). Must be set both on ingesters and distributors. 0 disables shuffle sharding.")
f.Float64Var(&l.RequestRate, requestRateFlag, 0, "Per-tenant push request rate limit in requests per second. 0 to disable.")
f.BoolVar(&l.ServiceOverloadStatusCodeOnRateLimitEnabled, "distributor.service-overload-status-code-on-rate-limit-enabled", false, "If enabled, rate limit errors will be reported to the client with HTTP status code 529 (Service is overloaded). If disabled, status code 429 (Too Many Requests) is used.")
f.IntVar(&l.RequestBurstSize, requestBurstSizeFlag, 0, "Per-tenant allowed push request burst size. 0 to disable.")
f.Float64Var(&l.IngestionRate, ingestionRateFlag, 10000, "Per-tenant ingestion rate limit in samples per second.")
f.IntVar(&l.IngestionBurstSize, ingestionBurstSizeFlag, 200000, "Per-tenant allowed ingestion burst size (in number of samples).")
Expand Down Expand Up @@ -435,6 +436,11 @@ func (o *Overrides) AcceptHASamples(userID string) bool {
return o.getOverridesForUser(userID).AcceptHASamples
}

// ServiceOverloadStatusCodeOnRateLimitEnabled return whether the distributor uses status code 529 instead of 429 when the rate limit is exceeded.
func (o *Overrides) ServiceOverloadStatusCodeOnRateLimitEnabled(userID string) bool {
return o.getOverridesForUser(userID).ServiceOverloadStatusCodeOnRateLimitEnabled
}

// HAClusterLabel returns the cluster label to look for when deciding whether to accept a sample from a Prometheus HA replica.
func (o *Overrides) HAClusterLabel(userID string) string {
return o.getOverridesForUser(userID).HAClusterLabel
Expand Down
Loading