Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Distributor: added per-tenant request limit #1843

Merged
merged 13 commits into from
May 30, 2022
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,12 @@
- `-querier.query-ingesters-within`
- `-querier.query-store-after`
* [CHANGE] Config flag category overrides can be set dynamically at runtime. #1934
* [CHANGE] Distributor: Added limit to prevent tenants from sending excessive number of requests: #1843
ortuman marked this conversation as resolved.
Show resolved Hide resolved
* The following CLI flags (and their respective YAML config options) have been added:
* `-distributor.request-rate-limit`
* `-distributor.request-burst-limit`
* The following metric is exposed to tell how many requests have been rejected:
* `cortex_discarded_requests_total`
* [ENHANCEMENT] Store-gateway: Add the experimental ability to run requests in a dedicated OS thread pool. This feature can be configured using `-store-gateway.thread-pool-size` and is disabled by default. Replaces the ability to run index header operations in a dedicated thread pool. #1660 #1812
* [ENHANCEMENT] Improved error messages to make them easier to understand and referencing a unique global identifier that can be looked up in the runbooks. #1907 #1919 #1888
* [ENHANCEMENT] Memberlist KV: incoming messages are now processed on per-key goroutine. This may reduce loss of "maintanance" packets in busy memberlist installations, but use more CPU. New `memberlist_client_received_broadcasts_dropped_total` counter tracks number of dropped per-key messages. #1912
Expand Down
20 changes: 20 additions & 0 deletions cmd/mimir/config-descriptor.json
Original file line number Diff line number Diff line change
Expand Up @@ -2411,6 +2411,26 @@
"required": false,
"desc": "",
"blockEntries": [
{
"kind": "field",
"name": "request_rate",
"required": false,
"desc": "Per-tenant request rate limit in requests per second. 0 to disable",
"fieldValue": null,
"fieldDefaultValue": 0,
"fieldFlag": "distributor.request-rate-limit",
"fieldType": "float"
},
{
"kind": "field",
"name": "request_burst_size",
"required": false,
"desc": "Per-tenant allowed request burst size. 0 to disable",
"fieldValue": null,
"fieldDefaultValue": 0,
"fieldFlag": "distributor.request-burst-size",
"fieldType": "int"
},
{
"kind": "field",
"name": "ingestion_rate",
Expand Down
4 changes: 4 additions & 0 deletions cmd/mimir/help-all.txt.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -723,6 +723,10 @@ Usage of ./cmd/mimir/mimir:
remote_write API max receive message size (bytes). (default 104857600)
-distributor.remote-timeout duration
Timeout for downstream ingesters. (default 20s)
-distributor.request-burst-size int
Per-tenant allowed request burst size. 0 to disable
-distributor.request-rate-limit float
Per-tenant request rate limit in requests per second. 0 to disable
-distributor.ring.consul.acl-token string
ACL Token used to interact with Consul.
-distributor.ring.consul.client-timeout duration
Expand Down
4 changes: 4 additions & 0 deletions cmd/mimir/help.txt.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,10 @@ Usage of ./cmd/mimir/mimir:
Per-tenant ingestion rate limit in samples per second. (default 10000)
-distributor.ingestion-tenant-shard-size int
The tenant's shard size used by shuffle-sharding. Must be set both on ingesters and distributors. 0 disables shuffle sharding.
-distributor.request-burst-size int
Per-tenant allowed request burst size. 0 to disable
-distributor.request-rate-limit float
Per-tenant request rate limit in requests per second. 0 to disable
-distributor.ring.consul.hostname string
Hostname and port of Consul. (default "localhost:8500")
-distributor.ring.etcd.endpoints value
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,21 +33,29 @@ The distributor validation includes the following checks:

## Rate limiting

The distributor includes a built-in rate limiter that it applies to each tenant.
The rate limit is the maximum ingestion rate for each tenant across the Grafana Mimir cluster.
If the rate exceeds the maximum number of samples per second, the distributor drops the request and returns an HTTP 429 response code.
The distributor includes two different types of rate limiters that apply to each tenant.

Internally, the limit is implemented using a per-distributor local rate limiter.
The local rate limiter for each distributor is configured with a limit of `ingestion rate limit / N`, where `N` is the number of healthy distributor replicas.
The distributor automatically adjusts the ingestion rate limit if the number of distributor replicas change.
Because the rate limit is implemented using a per-distributor local rate limiter, the ingestion rate limit requires that write requests are [evenly distributed across the pool of distributors]({{< relref "#load-balancing-across-distributors" >}}).
- **Request rate**<br />
The maximum number of requests per second that can be served across Grafana Mimir cluster for each tenant.

Use the following flags to configure the rate limit:
- **Ingestion rate**<br />
The maximum samples per second that can be ingested across Grafana Mimir cluster for each tenant.

If any of these rates is exceeded, the distributor drops the request and returns an HTTP 429 response code.

Internally, these limits are implemented using a per-distributor local rate limiter.
The local rate limiter for each distributor is configured with a limit of `limit / N`, where `N` is the number of healthy distributor replicas.
The distributor automatically adjusts the request and ingestion rate limits if the number of distributor replicas change.
Because these rate limits are implemented using a per-distributor local rate limiter, they require that write requests are [evenly distributed across the pool of distributors]({{< relref "#load-balancing-across-distributors" >}}).

Use the following flags to configure the rate limits:

- `-distributor.request-rate-limit`: Request rate limit, which is per tenant, and which is in requests per second
- `-distributor.request-burst-size`: Request burst size (in number of requests) allowed, which is per tenant
- `-distributor.ingestion-rate-limit`: Ingestion rate limit, which is per tenant, and which is in samples per second
- `-distributor.ingestion-burst-size`: Ingestion burst size (in number of samples) allowed, which is per tenant

> **Note:** You can override rate limiting on a per-tenant basis by setting `ingestion_rate` and `ingestion_burst_size` in the overrides section of the runtime configuration.
> **Note:** You can override rate limiting on a per-tenant basis by setting `request_rate`, `ingestion_rate`, `request_burst_size` and `ingestion_burst_size` in the overrides section of the runtime configuration.

> **Note:** By default, Prometheus remote write doesn't retry requests on 429 HTTP response status code. To modify this behavior, use `retry_on_http_429: true` in the Prometheus [`remote_write` configuration](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#remote_write).

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2600,6 +2600,14 @@ The `memberlist` block configures the Gossip memberlist.
The `limits` block configures default and per-tenant limits imposed by components.

```yaml
# Per-tenant request rate limit in requests per second. 0 to disable
# CLI flag: -distributor.request-rate-limit
[request_rate: <float> | default = 0]

# Per-tenant allowed request burst size. 0 to disable
# CLI flag: -distributor.request-burst-size
[request_burst_size: <int> | default = 0]

# Per-tenant ingestion rate limit in samples per second.
# CLI flag: -distributor.ingestion-rate-limit
[ingestion_rate: <float> | default = 10000]
Expand Down
21 changes: 17 additions & 4 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,8 @@ type Distributor struct {
// For handling HA replicas.
HATracker *haTracker

// Per-user rate limiter.
// Per-user rate limiters.
requestRateLimiter *limiter.RateLimiter
ingestionRateLimiter *limiter.RateLimiter

// Manager for subservices (HA Tracker, distributor ring and client pool)
Expand Down Expand Up @@ -202,12 +203,13 @@ func New(cfg Config, clientConfig ingester_client.Config, limits *validation.Ove
// Create the configured ingestion rate limit strategy (local or global). In case
// it's an internal dependency and can't join the distributors ring, we skip rate
// limiting.
var ingestionRateStrategy limiter.RateLimiterStrategy
var ingestionRateStrategy, requestRateStrategy limiter.RateLimiterStrategy
var distributorsLifeCycler *ring.Lifecycler
var distributorsRing *ring.Ring

if !canJoinDistributorsRing {
ingestionRateStrategy = newInfiniteIngestionRateStrategy()
requestRateStrategy = newInfiniteRateStrategy()
ingestionRateStrategy = newInfiniteRateStrategy()
} else {
distributorsLifeCycler, err = ring.NewLifecycler(cfg.DistributorRing.ToLifecyclerConfig(), nil, "distributor", DistributorRingKey, true, log, prometheus.WrapRegistererWithPrefix("cortex_", reg))
if err != nil {
Expand All @@ -220,7 +222,8 @@ func New(cfg Config, clientConfig ingester_client.Config, limits *validation.Ove
}
subservices = append(subservices, distributorsLifeCycler, distributorsRing)

ingestionRateStrategy = newGlobalIngestionRateStrategy(limits, distributorsLifeCycler)
requestRateStrategy = newGlobalRateStrategy(newRequestRateStrategy(limits), distributorsLifeCycler)
ingestionRateStrategy = newGlobalRateStrategy(newIngestionRateStrategy(limits), distributorsLifeCycler)
}

d := &Distributor{
Expand All @@ -231,6 +234,7 @@ func New(cfg Config, clientConfig ingester_client.Config, limits *validation.Ove
distributorsLifeCycler: distributorsLifeCycler,
distributorsRing: distributorsRing,
limits: limits,
requestRateLimiter: limiter.NewRateLimiter(requestRateStrategy, 10*time.Second),
ingestionRateLimiter: limiter.NewRateLimiter(ingestionRateStrategy, 10*time.Second),
HATracker: haTracker,
ingestionRate: util_math.NewEWMARate(0.2, instanceIngestionRateTickInterval),
Expand Down Expand Up @@ -582,6 +586,15 @@ func (d *Distributor) PushWithCleanup(ctx context.Context, req *mimirpb.WriteReq
}

now := mtime.Now()
if !d.requestRateLimiter.AllowN(now, userID, 1) {
validation.DiscardedRequests.WithLabelValues(userID).Add(1)

// Return a 429 here to tell the client it is going too fast.
// Client may discard the data or slow down and re-send.
// Prometheus v2.26 added a remote-write option 'retry_on_http_429'.
return nil, httpgrpc.Errorf(http.StatusTooManyRequests, "request rate limit (%v) exceeded", d.requestRateLimiter.Limit(now, userID))
}

d.activeUsers.UpdateUserTimestamp(userID, now)

source := util.GetSourceIPsFromOutgoingCtx(ctx)
Expand Down
72 changes: 70 additions & 2 deletions pkg/distributor/distributor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -413,6 +413,74 @@ func TestDistributor_MetricsCleanup(t *testing.T) {
`), metrics...))
}

func TestDistributor_PushRequestRateLimiter(t *testing.T) {
ortuman marked this conversation as resolved.
Show resolved Hide resolved
type testPush struct {
expectedError error
}
ctx := user.InjectOrgID(context.Background(), "user")
tests := map[string]struct {
distributors int
requestRate float64
requestBurstSize int
pushes []testPush
}{
"request limit should be evenly shared across distributors": {
distributors: 2,
requestRate: 4,
requestBurstSize: 2,
pushes: []testPush{
{expectedError: nil},
{expectedError: nil},
{expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, "request rate limit (2) exceeded")},
},
},
"request burst should set to each distributor": {
distributors: 2,
requestRate: 2,
requestBurstSize: 3,
pushes: []testPush{
{expectedError: nil},
{expectedError: nil},
{expectedError: nil},
{expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, "request rate limit (1) exceeded")},
},
},
}

for testName, testData := range tests {
testData := testData

t.Run(testName, func(t *testing.T) {
limits := &validation.Limits{}
flagext.DefaultValues(limits)
limits.RequestRate = testData.requestRate
limits.RequestBurstSize = testData.requestBurstSize

// Start all expected distributors
distributors, _, _ := prepare(t, prepConfig{
numIngesters: 3,
happyIngesters: 3,
numDistributors: testData.distributors,
limits: limits,
})

// Send multiple requests to the first distributor
for _, push := range testData.pushes {
request := makeWriteRequest(0, 1, 1, false)
response, err := distributors[0].Push(ctx, request)

if push.expectedError == nil {
assert.Equal(t, emptyResponse, response)
assert.Nil(t, err)
} else {
assert.Nil(t, response)
assert.Equal(t, push.expectedError, err)
}
}
})
}
}

func TestDistributor_PushIngestionRateLimiter(t *testing.T) {
type testPush struct {
samples int
Expand All @@ -427,7 +495,7 @@ func TestDistributor_PushIngestionRateLimiter(t *testing.T) {
ingestionBurstSize int
pushes []testPush
}{
"limit should be evenly shared across distributors": {
"evenly share the ingestion limit across distributors": {
distributors: 2,
ingestionRate: 10,
ingestionBurstSize: 5,
Expand All @@ -440,7 +508,7 @@ func TestDistributor_PushIngestionRateLimiter(t *testing.T) {
{metadata: 1, expectedError: httpgrpc.Errorf(http.StatusTooManyRequests, "ingestion rate limit (5) exceeded while adding 0 samples and 1 metadata")},
},
},
"burst should set to each distributor": {
"for each distributor, set an ingestion burst limit.": {
distributors: 2,
ingestionRate: 10,
ingestionBurstSize: 20,
Expand Down
61 changes: 0 additions & 61 deletions pkg/distributor/ingestion_rate_strategy.go

This file was deleted.

Loading