diff --git a/CHANGELOG.md b/CHANGELOG.md index b0a9d7a8897..804ce521353 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,7 +14,7 @@ * `cortex_frontend_query_result_cache_requests_total{request_type="query_range|cardinality|label_names_and_values"}` * `cortex_frontend_query_result_cache_hits_total{request_type="query_range|cardinality|label_names_and_values"}` * [FEATURE] Added `-.s3.list-objects-version` flag to configure the S3 list objects version. -* [FEATURE] Ingester: Add optional CPU/memory utilization based read request limiting, considered experimental. Disabled by default, enable by configuring limits via both of the following flags: #5012 #5392 #5394 +* [FEATURE] Ingester: Add optional CPU/memory utilization based read request limiting, considered experimental. Disabled by default, enable by configuring limits via both of the following flags: #5012 #5392 #5394 #5526 * `-ingester.read-path-cpu-utilization-limit` * `-ingester.read-path-memory-utilization-limit` * [FEATURE] Ruler: Support filtering results from rule status endpoint by `file`, `rule_group` and `rule_name`. #5291 diff --git a/pkg/util/limiter/utilization.go b/pkg/util/limiter/utilization.go index a28a913e996..06cd67acdd2 100644 --- a/pkg/util/limiter/utilization.go +++ b/pkg/util/limiter/utilization.go @@ -121,13 +121,13 @@ func (l *UtilizationBasedLimiter) starting(_ context.Context) error { } func (l *UtilizationBasedLimiter) update(_ context.Context) error { - l.compute(time.Now()) + l.compute(time.Now) return nil } // compute and return the current CPU and memory utilization. // This function must be called at a regular interval (resourceUtilizationUpdateInterval) to get a predictable behaviour. -func (l *UtilizationBasedLimiter) compute(now time.Time) (currCPUUtil float64, currMemoryUtil uint64) { +func (l *UtilizationBasedLimiter) compute(nowFn func() time.Time) (currCPUUtil float64, currMemoryUtil uint64) { cpuTime, currMemoryUtil, err := l.utilizationScanner.Scan() if err != nil { level.Warn(l.logger).Log("msg", "failed to get CPU and memory stats", "err", err.Error()) @@ -136,6 +136,9 @@ func (l *UtilizationBasedLimiter) compute(now time.Time) (currCPUUtil float64, c return } + // Get wall time after CPU time, in case there's a delay before CPU time is returned, + // which would cause us to compute too high of a CPU load + now := nowFn() l.currMemoryUtil.Store(currMemoryUtil) // Add the instant CPU utilization to the moving average. The instant CPU @@ -155,7 +158,7 @@ func (l *UtilizationBasedLimiter) compute(now time.Time) (currCPUUtil float64, c if l.firstUpdate.IsZero() { l.firstUpdate = now } else if now.Sub(l.firstUpdate) >= resourceUtilizationSlidingWindow { - currCPUUtil = float64(l.cpuMovingAvg.Rate()) / 100 + currCPUUtil = l.cpuMovingAvg.Rate() / 100 l.currCPUUtil.Store(currCPUUtil) } diff --git a/pkg/util/limiter/utilization_test.go b/pkg/util/limiter/utilization_test.go index e560b42a390..e606a0f1e8b 100644 --- a/pkg/util/limiter/utilization_test.go +++ b/pkg/util/limiter/utilization_test.go @@ -30,30 +30,33 @@ func TestUtilizationBasedLimiter(t *testing.T) { } tim := time.Now() + nowFn := func() time.Time { + return tim + } t.Run("CPU based limiting should be enabled if set to a value greater than 0", func(t *testing.T) { lim, _, reg := setup(t, 0.11, gigabyte) // Warmup the CPU utilization. for i := 0; i < int(resourceUtilizationSlidingWindow.Seconds()); i++ { - lim.compute(tim) + lim.compute(nowFn) tim = tim.Add(time.Second) } // The fake utilization scanner linearly increases CPU usage for a minute for i := 0; i < 59; i++ { - lim.compute(tim) + lim.compute(nowFn) tim = tim.Add(time.Second) require.Empty(t, lim.LimitingReason(), "Limiting should be disabled") } - lim.compute(tim) + lim.compute(nowFn) tim = tim.Add(time.Second) require.Equal(t, "cpu", lim.LimitingReason(), "Limiting should be enabled due to CPU") // The fake utilization scanner drops CPU usage again after a minute, so we expect // limiting to be disabled shortly. for i := 0; i < 5; i++ { - lim.compute(tim) + lim.compute(nowFn) tim = tim.Add(time.Second) } require.Empty(t, lim.LimitingReason(), "Limiting should be disabled again") @@ -73,12 +76,12 @@ func TestUtilizationBasedLimiter(t *testing.T) { // Warmup the CPU utilization. for i := 0; i < int(resourceUtilizationSlidingWindow.Seconds()); i++ { - lim.compute(tim) + lim.compute(nowFn) tim = tim.Add(time.Second) } for i := 0; i < 60; i++ { - lim.compute(tim) + lim.compute(nowFn) tim = tim.Add(time.Second) require.Empty(t, lim.LimitingReason(), "Limiting should be disabled") } @@ -97,15 +100,15 @@ func TestUtilizationBasedLimiter(t *testing.T) { lim, fakeScanner, reg := setup(t, 0.11, gigabyte) // Compute the utilization a first time to warm up the limiter. - lim.compute(tim) + lim.compute(nowFn) fakeScanner.memoryUtilization = gigabyte - lim.compute(tim) + lim.compute(nowFn) tim = tim.Add(time.Second) require.Equal(t, "memory", lim.LimitingReason(), "Limiting should be enabled due to memory") fakeScanner.memoryUtilization = gigabyte - 1 - lim.compute(tim) + lim.compute(nowFn) require.Empty(t, lim.LimitingReason(), "Limiting should be disabled again") assert.NoError(t, testutil.GatherAndCompare(reg, bytes.NewBufferString(` @@ -122,10 +125,10 @@ func TestUtilizationBasedLimiter(t *testing.T) { lim, fakeScanner, reg := setup(t, 0.11, 0) // Compute the utilization a first time to warm up the limiter. - lim.compute(tim) + lim.compute(nowFn) fakeScanner.memoryUtilization = gigabyte - lim.compute(tim) + lim.compute(nowFn) tim = tim.Add(time.Second) require.Empty(t, lim.LimitingReason(), "Limiting should be disabled") @@ -244,7 +247,9 @@ func TestUtilizationBasedLimiter_CPUUtilizationSensitivity(t *testing.T) { maxCPUUtilization := float64(math.MinInt64) for i, ts := 0, time.Now(); i < len(testData.instantCPUValues); i++ { - currCPUUtilization, _ := lim.compute(ts) + currCPUUtilization, _ := lim.compute(func() time.Time { + return ts + }) ts = ts.Add(time.Second) // Keep track of the max CPU utilization as computed by the limiter.