Skip to content

Commit

Permalink
UtilizationBasedLimiter: Get wall time after CPU time (#5526)
Browse files Browse the repository at this point in the history
Signed-off-by: Arve Knudsen <arve.knudsen@gmail.com>
(cherry picked from commit 07f689c)
  • Loading branch information
aknuds1 authored and grafanabot committed Jul 17, 2023
1 parent 698c650 commit cf4b37e
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 16 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* `cortex_frontend_query_result_cache_requests_total{request_type="query_range|cardinality|label_names_and_values"}`
* `cortex_frontend_query_result_cache_hits_total{request_type="query_range|cardinality|label_names_and_values"}`
* [FEATURE] Added `-<prefix>.s3.list-objects-version` flag to configure the S3 list objects version.
* [FEATURE] Ingester: Add optional CPU/memory utilization based read request limiting, considered experimental. Disabled by default, enable by configuring limits via both of the following flags: #5012 #5392 #5394
* [FEATURE] Ingester: Add optional CPU/memory utilization based read request limiting, considered experimental. Disabled by default, enable by configuring limits via both of the following flags: #5012 #5392 #5394 #5526
* `-ingester.read-path-cpu-utilization-limit`
* `-ingester.read-path-memory-utilization-limit`
* [FEATURE] Ruler: Support filtering results from rule status endpoint by `file`, `rule_group` and `rule_name`. #5291
Expand Down
9 changes: 6 additions & 3 deletions pkg/util/limiter/utilization.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,13 +121,13 @@ func (l *UtilizationBasedLimiter) starting(_ context.Context) error {
}

func (l *UtilizationBasedLimiter) update(_ context.Context) error {
l.compute(time.Now())
l.compute(time.Now)
return nil
}

// compute and return the current CPU and memory utilization.
// This function must be called at a regular interval (resourceUtilizationUpdateInterval) to get a predictable behaviour.
func (l *UtilizationBasedLimiter) compute(now time.Time) (currCPUUtil float64, currMemoryUtil uint64) {
func (l *UtilizationBasedLimiter) compute(nowFn func() time.Time) (currCPUUtil float64, currMemoryUtil uint64) {
cpuTime, currMemoryUtil, err := l.utilizationScanner.Scan()
if err != nil {
level.Warn(l.logger).Log("msg", "failed to get CPU and memory stats", "err", err.Error())
Expand All @@ -136,6 +136,9 @@ func (l *UtilizationBasedLimiter) compute(now time.Time) (currCPUUtil float64, c
return
}

// Get wall time after CPU time, in case there's a delay before CPU time is returned,
// which would cause us to compute too high of a CPU load
now := nowFn()
l.currMemoryUtil.Store(currMemoryUtil)

// Add the instant CPU utilization to the moving average. The instant CPU
Expand All @@ -155,7 +158,7 @@ func (l *UtilizationBasedLimiter) compute(now time.Time) (currCPUUtil float64, c
if l.firstUpdate.IsZero() {
l.firstUpdate = now
} else if now.Sub(l.firstUpdate) >= resourceUtilizationSlidingWindow {
currCPUUtil = float64(l.cpuMovingAvg.Rate()) / 100
currCPUUtil = l.cpuMovingAvg.Rate() / 100
l.currCPUUtil.Store(currCPUUtil)
}

Expand Down
29 changes: 17 additions & 12 deletions pkg/util/limiter/utilization_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,30 +30,33 @@ func TestUtilizationBasedLimiter(t *testing.T) {
}

tim := time.Now()
nowFn := func() time.Time {
return tim
}

t.Run("CPU based limiting should be enabled if set to a value greater than 0", func(t *testing.T) {
lim, _, reg := setup(t, 0.11, gigabyte)

// Warmup the CPU utilization.
for i := 0; i < int(resourceUtilizationSlidingWindow.Seconds()); i++ {
lim.compute(tim)
lim.compute(nowFn)
tim = tim.Add(time.Second)
}

// The fake utilization scanner linearly increases CPU usage for a minute
for i := 0; i < 59; i++ {
lim.compute(tim)
lim.compute(nowFn)
tim = tim.Add(time.Second)
require.Empty(t, lim.LimitingReason(), "Limiting should be disabled")
}
lim.compute(tim)
lim.compute(nowFn)
tim = tim.Add(time.Second)
require.Equal(t, "cpu", lim.LimitingReason(), "Limiting should be enabled due to CPU")

// The fake utilization scanner drops CPU usage again after a minute, so we expect
// limiting to be disabled shortly.
for i := 0; i < 5; i++ {
lim.compute(tim)
lim.compute(nowFn)
tim = tim.Add(time.Second)
}
require.Empty(t, lim.LimitingReason(), "Limiting should be disabled again")
Expand All @@ -73,12 +76,12 @@ func TestUtilizationBasedLimiter(t *testing.T) {

// Warmup the CPU utilization.
for i := 0; i < int(resourceUtilizationSlidingWindow.Seconds()); i++ {
lim.compute(tim)
lim.compute(nowFn)
tim = tim.Add(time.Second)
}

for i := 0; i < 60; i++ {
lim.compute(tim)
lim.compute(nowFn)
tim = tim.Add(time.Second)
require.Empty(t, lim.LimitingReason(), "Limiting should be disabled")
}
Expand All @@ -97,15 +100,15 @@ func TestUtilizationBasedLimiter(t *testing.T) {
lim, fakeScanner, reg := setup(t, 0.11, gigabyte)

// Compute the utilization a first time to warm up the limiter.
lim.compute(tim)
lim.compute(nowFn)

fakeScanner.memoryUtilization = gigabyte
lim.compute(tim)
lim.compute(nowFn)
tim = tim.Add(time.Second)
require.Equal(t, "memory", lim.LimitingReason(), "Limiting should be enabled due to memory")

fakeScanner.memoryUtilization = gigabyte - 1
lim.compute(tim)
lim.compute(nowFn)
require.Empty(t, lim.LimitingReason(), "Limiting should be disabled again")

assert.NoError(t, testutil.GatherAndCompare(reg, bytes.NewBufferString(`
Expand All @@ -122,10 +125,10 @@ func TestUtilizationBasedLimiter(t *testing.T) {
lim, fakeScanner, reg := setup(t, 0.11, 0)

// Compute the utilization a first time to warm up the limiter.
lim.compute(tim)
lim.compute(nowFn)

fakeScanner.memoryUtilization = gigabyte
lim.compute(tim)
lim.compute(nowFn)
tim = tim.Add(time.Second)
require.Empty(t, lim.LimitingReason(), "Limiting should be disabled")

Expand Down Expand Up @@ -244,7 +247,9 @@ func TestUtilizationBasedLimiter_CPUUtilizationSensitivity(t *testing.T) {
maxCPUUtilization := float64(math.MinInt64)

for i, ts := 0, time.Now(); i < len(testData.instantCPUValues); i++ {
currCPUUtilization, _ := lim.compute(ts)
currCPUUtilization, _ := lim.compute(func() time.Time {
return ts
})
ts = ts.Add(time.Second)

// Keep track of the max CPU utilization as computed by the limiter.
Expand Down

0 comments on commit cf4b37e

Please sign in to comment.