Skip to content

Commit

Permalink
admission: add support for disk bandwidth as a bottleneck resource
Browse files Browse the repository at this point in the history
We assume that:
- There is a provisioned known limit on the sum of read and write
  bandwidth. This limit is allowed to change.
- Admission control can only shape the rate of admission of writes. Writes
  also cause reads, since compactions do reads and writes.

There are multiple challenges:
- We are unable to precisely track the causes of disk read bandwidth, since
  we do not have observability into what reads missed the OS page cache.
  That is, we don't know how much of the reads were due to incoming reads
  (that we don't shape) and how much due to compaction read bandwidth.
- We don't shape incoming reads.
- There can be a large time lag between the shaping of incoming writes, and when
  it affects actual writes in the system, since compaction backlog can
  build up in various levels of the LSM store.
- Signals of overload are coarse, since we cannot view all the internal
  queues that can build up due to resource overload. For instance,
  different examples of bandwidth saturation exhibit different
  latency effects, presumably because the queue buildup is different. So it
  is non-trivial to approach full utilization without risking high latency.

Due to these challenges, and previous design attempts that were quite
complicated (and incomplete), we adopt a goal of simplicity of design, and strong
abstraction boundaries.
- The disk load is abstracted using an enum. The diskLoadWatcher can be
  evolved independently.
- The approach uses easy to understand additive increase and multiplicative
  decrease, (unlike what we do for flush and compaction tokens, where we
  try to more precisely calculate the sustainable rates).

Since we are using a simple approach that is somewhat coarse in its behavior,
we start by limiting its application to two kinds of writes:
- Incoming writes that are deemed "elastic": This can be done by
  introducing a work-class (in addition to admissionpb.WorkPriority), or by
  implying a work-class from the priority (e.g. priorities < NormalPri are
  deemed elastic). This prototype does the latter.
- Optional compactions: We assume that the LSM store is configured with a
  ceiling on number of regular concurrent compactions, and if it needs more
  it can request resources for additional (optional) compactions. These
  latter compactions can be limited by this approach. See
  cockroachdb/pebble/issues/1329 for motivation. This control on compactions
  is not currently implemented and is future work (though the prototype
  in cockroachdb#82813 had code for
  it).

The reader should start with disk_bandwidth.go, consisting of
- diskLoadWatcher: which computes load levels.
- diskBandwidthLimiter: It used the load level computed by diskLoadWatcher
  to limit write tokens for elastic writes and in the future will also
  limit compactions.

There is significant refactoring and changes in granter.go and
work_queue.go. This is driven by the fact that:
- Previously the tokens were for L0 and now we need to support tokens for
  bytes into L0 and tokens for bytes into the LSM (the former being a subset
  of the latter).
- Elastic work is in a different WorkQueue than regular work, but they
  are competing for the same tokens.

The latter is handled by allowing kvSlotGranter to multiplex across
multiple requesters, via multiple child granters. A number of interfaces
are adjusted to make this viable. In general, the GrantCoordinator
is now slightly dumber and some of that logic is moved into the granters.

For the former (handling two kinds of tokens), I considered adding multiple
resource dimensions to the granter-requester interaction but found it
too complicated. Instead we rely on the observation that we request
tokens based on the total incoming bytes of the request (not just L0),
and when the request is completed, tell the granter how many bytes
went into L0. The latter allows us to return tokens to L0. So at the
time the request is completed, we can account separately for the L0
tokens and these new tokens for all incoming bytes (which we are calling
disk bandwidth tokens, since they are constrained based on disk bandwidth).

This is a cleaned up version of the prototype in
cockroachdb#82813 which contains the
experimental results. The plumbing from the KV layer to populate the
disk reads, writes and provisioned bandwidth is absent in this PR,
and will be added in a subsequent PR.

Disk bandwidth bottlenecks are considered only if both the following
are true:
- DiskStats.ProvisionedBandwidth is non-zero.
- The cluster setting admission.disk_bandwidth_tokens.elastic.enabled
  is true (defaults to true).

Informs cockroachdb#82898

Release note: None (the cluster setting mentioned earlier is useless
since the integration with CockroachDB will be in a future PR).
  • Loading branch information
sumeerbhola committed Aug 11, 2022
1 parent 6c24f35 commit 502cbe1
Show file tree
Hide file tree
Showing 18 changed files with 2,082 additions and 596 deletions.
1 change: 1 addition & 0 deletions docs/generated/settings/settings.html
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
<table>
<thead><tr><th>Setting</th><th>Type</th><th>Default</th><th>Description</th></tr></thead>
<tbody>
<tr><td><code>admission.disk_bandwidth_tokens.elastic.enabled</code></td><td>boolean</td><td><code>true</code></td><td>when true, and provisioned bandwidth for the disk corresponding to a store is configured, tokens for elastic work will be limited if disk bandwidth becomes a bottleneck</td></tr>
<tr><td><code>admission.epoch_lifo.enabled</code></td><td>boolean</td><td><code>false</code></td><td>when true, epoch-LIFO behavior is enabled when there is significant delay in admission</td></tr>
<tr><td><code>admission.epoch_lifo.epoch_closing_delta_duration</code></td><td>duration</td><td><code>5ms</code></td><td>the delta duration before closing an epoch, for epoch-LIFO admission control ordering</td></tr>
<tr><td><code>admission.epoch_lifo.epoch_duration</code></td><td>duration</td><td><code>100ms</code></td><td>the duration of an epoch, for epoch-LIFO admission control ordering</td></tr>
Expand Down
2 changes: 2 additions & 0 deletions pkg/util/admission/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
go_library(
name = "admission",
srcs = [
"disk_bandwidth.go",
"doc.go",
"granter.go",
"store_token_estimation.go",
Expand Down Expand Up @@ -32,6 +33,7 @@ go_library(
go_test(
name = "admission_test",
srcs = [
"disk_bandwidth_test.go",
"granter_test.go",
"store_token_estimation_test.go",
"work_queue_test.go",
Expand Down
340 changes: 340 additions & 0 deletions pkg/util/admission/disk_bandwidth.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,340 @@
// Copyright 2022 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package admission

import (
"context"
"math"

"github.com/cockroachdb/cockroach/pkg/util/humanizeutil"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/redact"
)

// The functionality in this file is geared towards preventing chronic overload
// of disk bandwidth which typically results in severely high latency for all work.
//
// For now, we assume that:
// - There is a provisioned limit on the sum of read and write bandwidth. This
// limit is allowed to change. This is true for block devices of major cloud
// providers.
// - Admission control can only shape the rate of admission of writes. Writes
// also cause reads, since compactions do reads and writes.
//
// There are multiple challenges:
// - We are unable to precisely track the causes of disk read bandwidth, since
// we do not have observability into what reads missed the OS page cache.
// That is we don't know how much of the reads were due to incoming reads
// (that we don't shape) and how much due to compaction read bandwidth.
//
// - We don't shape incoming reads.
//
// - There can be a large lag (1+min) between the shaping of incoming writes,
// and when it affects actual writes in the system, since compaction backlog
// can build up in various levels of the LSM store.
//
// - Signals of overload are coarse, since we cannot view all the internal
// queues that can build up due to resource overload. For instance,
// different examples of bandwidth saturation exhibit very different
// latency effects, presumably because the queue buildup is different. So it
// is non-trivial to approach full utilization without risking high latency.
//
// Due to these challenges, we adopt a goal of simplicity of design, and
// strong abstraction boundaries.
//
// - The disk load is abstracted using an enum, diskLoadLevel. The
// diskLoadWatcher, that maps load signals to this enum, can be evolved
// independently.
//
// - The approach uses easy to understand additive increase and multiplicative
// decrease, (unlike what we do for flush and compaction tokens, where we
// try to more precisely calculate the sustainable rates).
//
// Since we are using a simple approach that is somewhat coarse in its behavior,
// we start by limiting its application to two kinds of writes (the second one
// is future work, and not yet implemented):
//
// - Incoming writes that are deemed "elastic": This can be done by
// introducing a work-class (in addition to admissionpb.WorkPriority), or by
// implying a work-class from the priority (e.g. priorities < NormalPri are
// deemed elastic).
//
// - Optional compactions: We assume that the LSM store is configured with a
// ceiling on number of regular concurrent compactions, and if it needs more
// it can request resources for additional (optional) compactions. These
// latter compactions can be limited by this approach. See
// https://github.com/cockroachdb/pebble/issues/1329 for motivation.
// TODO(sumeer): this compaction control is not yet done, though how to do
// it is included in the prototype in
// https://github.com/cockroachdb/cockroach/pull/82813
//
// Extending this to all incoming writes is future work.

// The load level of a disk.
type diskLoadLevel int8

const (
// diskLoadLow implies no need to shape anything.
diskLoadLow diskLoadLevel = iota
// diskLoadModerate implies shaping and additive increase.
diskLoadModerate
// diskLoadHigh implies shaping and hold steady.
diskLoadHigh
// diskLoadOverload implies shaping and multiplicative decrease.
diskLoadOverload
)

func diskLoadLevelString(level diskLoadLevel) redact.SafeString {
switch level {
case diskLoadLow:
return "low"
case diskLoadModerate:
return "moderate"
case diskLoadHigh:
return "high"
case diskLoadOverload:
return "overload"
}
return ""
}

// diskLoadWatcher computes the diskLoadLevel based on provided stats.
type diskLoadWatcher struct {
lastInterval intervalDiskLoadInfo
lastUtil float64
loadLevel diskLoadLevel
}

// intervalDiskLoadInfo provides disk stats over an adjustmentInterval.
type intervalDiskLoadInfo struct {
// readBandwidth is the measure disk read bandwidth in bytes/s.
readBandwidth int64
// writeBandwidth is the measured disk write bandwidth in bytes/s.
writeBandwidth int64
// provisionedBandwidth is the aggregate (read+write) provisioned bandwidth
// in bytes/s.
provisionedBandwidth int64
}

// setIntervalInfo is called at the same time as ioLoadListener.pebbleMetricsTick.
func (d *diskLoadWatcher) setIntervalInfo(load intervalDiskLoadInfo) {
lastInterval := load
util := float64(load.readBandwidth+load.writeBandwidth) / float64(load.provisionedBandwidth)
// The constants and other heuristics in the following logic can seem
// extremely arbitrary: they were subject to some tuning and evolution based
// on the experiments in https://github.com/cockroachdb/cockroach/pull/82813
// that used (a) an artificial provisioned bandwidth limit lower than the
// actual, to see how well the system stayed within that limit, (b) an
// actual provisioned bandwidth limit. The difficulty in general is that
// small changes can have outsize influence if a higher number of
// compactions start happening.
var loadLevel diskLoadLevel
const lowUtilThreshold = 0.3
const moderateUtilThreshold = 0.7
const highUtilThreshold = 0.95
const highlyOverUtilizedThreshold = 2.0
const smallDelta = 0.05
if util < lowUtilThreshold {
// Were at moderate or lower and have not increased significantly and the
// lastUtil was also low, then we can afford to stop limiting tokens. We
// are trying to carefully narrow this case since not limiting tokens can
// blow up bandwidth.
if d.loadLevel <= diskLoadModerate && util < d.lastUtil+smallDelta &&
d.lastUtil < lowUtilThreshold {
loadLevel = diskLoadLow
} else {
// util is increasing, or we just dropped from something higher than
// moderate. Give it more time at moderate, where we will gradually
// increase tokens.
loadLevel = diskLoadModerate
}
} else if util < moderateUtilThreshold {
// Wide band from [0.3,0.7) where we gradually increase tokens. Also, 0.7
// is deliberately a lowish fraction since the effect on compactions can
// lag and kick in later. We are ok with accepting a lower utilization for
// elastic work to make progress.
loadLevel = diskLoadModerate
} else if util < highUtilThreshold ||
(util < highlyOverUtilizedThreshold && util < d.lastUtil-smallDelta) {
// Wide band from [0.7,0.95) where we will hold the number of tokens
// steady. We don't want to overreact and decrease too early since
// compaction bandwidth usage can be lumpy. For this same reason, if we
// are trending downward, we want to hold. Note that util < 2 will always
// be true in typical configurations where one cannot actually exceed
// provisioned bandwidth -- but we also run experiments where we
// artificially constrain the provisioned bandwidth, where this is useful.
// And it is possible that some production settings may set a slightly
// lower value of provisioned bandwidth, if they want to further reduce
// the probability of hitting the real provisioned bandwidth due to
// elastic work.
loadLevel = diskLoadHigh
} else {
// Overloaded. We will reduce tokens.
loadLevel = diskLoadOverload
}
*d = diskLoadWatcher{
lastInterval: lastInterval,
lastUtil: util,
loadLevel: loadLevel,
}
// TODO(sumeer): Use the history of fsync latency and the value in the
// current interval, and if high, increase the load level computed earlier.
// We shouldn't rely fully on syncLatencyMicros since (a) sync latency could
// arise due to an external unrelated outage, (b) some customers may set
// fsync to be a noop. As an alternative to sync latency, we could also
// consider looking at fluctuations of peak-rate that the WAL writer can
// sustain.
}

func (d *diskLoadWatcher) getLoadLevel() diskLoadLevel {
return d.loadLevel
}

func (d diskLoadWatcher) SafeFormat(p redact.SafePrinter, _ rune) {
p.Printf("disk bandwidth: read: %s/s, write: %s/s, provisioned: %s/s, util: %.2f",
humanizeutil.IBytes(d.lastInterval.readBandwidth),
humanizeutil.IBytes(d.lastInterval.writeBandwidth),
humanizeutil.IBytes(d.lastInterval.provisionedBandwidth), d.lastUtil)
}

// intervalLSMInfo provides stats about the LSM over an adjustmentInterval.
type intervalLSMInfo struct {
// Flushed bytes + Ingested bytes seen by the LSM. Ingested bytes incur the
// cost of writing a sstable, even though that is done outside Pebble, so
// ingestion is similar in cost to flushing. Ingested bytes don't cause WAL
// writes, but we ignore that difference for simplicity, and just work with
// the sum of flushed and ingested bytes.
incomingBytes int64
// regularTokensUsed and elasticTokensUsed are the byte tokens used for
// regular and elastic work respectively. Each of these includes both
// writes that will get flushed and ingested bytes. The
// regularTokensUsed+elasticTokensUsed do not need to sum up to
// incomingBytes, since these stats are produced by different sources.
regularTokensUsed int64
elasticTokensUsed int64
}

// diskBandwidthLimiter produces tokens for elastic work.
type diskBandwidthLimiter struct {
diskLoadWatcher diskLoadWatcher

smoothedIncomingBytes float64
smoothedElasticFraction float64
elasticTokens int64

prevElasticTokensUsed int64
}

func makeDiskBandwidthLimiter() diskBandwidthLimiter {
return diskBandwidthLimiter{
elasticTokens: math.MaxInt64,
}
}

// computeElasticTokens is called every adjustmentInterval.
func (d *diskBandwidthLimiter) computeElasticTokens(
ctx context.Context, id intervalDiskLoadInfo, il intervalLSMInfo,
) (elasticTokens int64) {
d.diskLoadWatcher.setIntervalInfo(id)
const alpha = 0.5
d.smoothedIncomingBytes = alpha*float64(il.incomingBytes) + (1-alpha)*d.smoothedIncomingBytes
var intElasticFraction float64
if il.regularTokensUsed+il.elasticTokensUsed > 0 {
intElasticFraction =
float64(il.elasticTokensUsed) / float64(il.regularTokensUsed+il.elasticTokensUsed)
d.smoothedElasticFraction = alpha*intElasticFraction + (1-alpha)*d.smoothedElasticFraction
}
intElasticBytes := int64(float64(il.incomingBytes) * intElasticFraction)
d.prevElasticTokensUsed = il.elasticTokensUsed
ll := d.diskLoadWatcher.getLoadLevel()

// The constants and other heuristics in the following logic can seem
// arbitrary: they were subject to some tuning and evolution based on the
// experiments in https://github.com/cockroachdb/cockroach/pull/82813 that
// used (a) an artificial provisioned bandwidth limit lower than the actual,
// to see how well the system stayed within that limit, (b) an actual
// provisioned bandwidth limit. The difficulty in general is that small
// changes can have outsize influence if a higher number of compactions
// start happening, or the compaction backlog is cleared.
//
// TODO(sumeer): experiment with a PID controller.
doLog := true
switch ll {
case diskLoadLow:
lastElasticTokens := d.elasticTokens
d.elasticTokens = math.MaxInt64
if d.elasticTokens == lastElasticTokens {
doLog = false
}
// else we stay in the common case of low bandwidth usage.
case diskLoadModerate:
tokensFullyUtilized :=
// elasticTokens == MaxInt64 is also considered fully utilized since we
// can never fully utilize unlimited tokens.
d.elasticTokens == math.MaxInt64 ||
(d.elasticTokens > 0 && float64(il.elasticTokensUsed)/float64(d.elasticTokens) >= 0.8)

if tokensFullyUtilized {
// Smoothed elastic bytes plus 10% of smoothedIncomingBytes is given to
// elastic work. That is, we are increasing the total incoming bytes by
// 10% (not just the elastic bytes by 10%).
elasticBytes := (d.smoothedElasticFraction + 0.1) * d.smoothedIncomingBytes
// Sometimes we see the tokens not increasing even though we are staying
// for multiple intervals at moderate. This is because the smoothed
// fraction and incoming bytes can be decreasing. We do want to increase
// tokens since we know there is spare capacity, so we try many ways
// (that don't look at smoothed numbers only). Also, we sometimes come
// here due to an overload=>moderate transition because compaction
// bandwidth usage can be lumpy (high when there is a backlog and then
// dropping severely) -- in that case we want to start increasing
// immediately, since we have likely decreased too much.
intBasedElasticTokens := (d.smoothedElasticFraction + 0.1) * float64(il.incomingBytes)
elasticBytes = math.Max(elasticBytes, intBasedElasticTokens)
elasticBytes = math.Max(elasticBytes, 1.1*float64(il.elasticTokensUsed))
d.elasticTokens = int64(elasticBytes)
if d.elasticTokens == 0 {
// Don't get stuck in a situation where smoothedIncomingBytes are 0.
d.elasticTokens = math.MaxInt64
}
}
// else no change.
case diskLoadHigh:
// No change.
case diskLoadOverload:
// Sometimes we come here after a low => overload transition. The
// intElasticBytes will be very high because tokens were unlimited. We
// don't want to use that as the starting point of the decrease if the
// smoothed value is lower. Hence, the min logic below, to try to dampen
// the increase quickly.
d.elasticTokens = int64(0.5 * math.Min(float64(intElasticBytes),
d.smoothedElasticFraction*d.smoothedIncomingBytes))
}
// We can end up with 0 elastic tokens here -- e.g. if intElasticBytes was 0
// but we were still overloaded because of compactions. The trouble with 0
// elastic tokens is that if we don't admit anything, we cannot correct an
// occasional poor estimate of the per-request bytes. So we decide to give
// out at least 1 token. A single elastic request should not be too big for
// this to matter.
d.elasticTokens = max(1, d.elasticTokens)
if doLog {
log.Infof(ctx, "%v", d)
}
return d.elasticTokens
}

func (d *diskBandwidthLimiter) SafeFormat(p redact.SafePrinter, _ rune) {
ib := humanizeutil.IBytes
level := d.diskLoadWatcher.getLoadLevel()
p.Printf("diskBandwidthLimiter %s (%v): elastic-fr: %.2f, incoming: %s, "+
"elastic-tokens (used %s): %s",
diskLoadLevelString(level), d.diskLoadWatcher, d.smoothedElasticFraction,
ib(int64(d.smoothedIncomingBytes)), ib(d.prevElasticTokensUsed), ib(d.elasticTokens))
}
Loading

0 comments on commit 502cbe1

Please sign in to comment.