Skip to content

Commit

Permalink
admission,storage: introduce flush tokens to constrain write admission
Browse files Browse the repository at this point in the history
In addition to byte tokens for writes computed based on compaction rate
out of L0, we now compute byte tokens based on how fast the system can
flush memtables into L0. The motivation is that writing to the memtable,
or creating memtables faster than the system can flush results in write
stalls due to memtables, that create a latency hiccup for all write
traffic. We have observed write stalls that lasted > 100ms.

The approach taken here for flush tokens is straightforward (there is
justification based on experiments, mentioned in code comments):
- Measure and smooth the peak rate that the flush loop can operate on.
  This relies on the recently added pebble.InternalIntervalMetrics.
- The peak rate causes 100% utilization of the single flush thread,
  and that is potentially too high to prevent write stalls (depending
  on how long it takes to do a single flush). So we multiply the
  smoothed peak rate by a utilization-target-fraction which is
  dynamically adjusted and by default is constrained to the interval
  [0.5, 1.5]. There is additive increase and decrease of this
  fraction:
  - High usage of tokens and no write stalls cause an additive increase.
  - Write stalls cause an additive decrease. A small multiplier is used
    if there are multiple write stalls, so that the probing falls
    more in the region where there are no write stalls.

Note that this probing scheme cannot eliminate all write stalls. For
now we are ok with a reduction in write stalls.

For convenience, and some additional justification mentioned in a code
comment, the scheme uses the minimum of the flush and compaction tokens
for writes to L0. This means that sstable ingestion into L0 is also
subject to such tokens. The periodic token computation continues to be
done at 15s intervals. However, instead of giving out these tokens at
1s intervals, we now give them out at 250ms intervals. This is to
reduce the burstiness, since that can cause write stalls.

There is a new metric, storage.write-stall-nanos, that measures the
cumulative duration of write stalls, since it gives a more intuitive
feel for how the system is behaving, compared to a write stall count.

The scheme can be disabled by increasing the cluster setting
admission.min_flush_util_fraction, which defaults to 0.5 (corresponding
to the 0.5 lower bound mentioned earluer), to a high value, say
10.

The scheme was evaluated using a single node cluster with the node
having a high CPU count, such that CPU was not a bottleneck, even
with max compaction concurrency set to 8. A kv0 workload with high
concurrency and 4KB writes was used to overload the store. Due
to the high compaction concurrency, L0 stayed below the unhealthy
thresholds, and the resource bottleneck became the total bandwidth
provisioned for the disk. This setup was evaluated under both:
- early-life: when the store had 10-20GB of data, when the compaction
  backlog was not very heavy, so there was less queueing for the
  limited disk bandwidth (it was still usually saturated).
- later-life: when the store had around 150GB of data.

In both cases, turning off flush tokens increased the duration of
write stalls by > 5x. For the early-life case, ~750ms per second was
spent in a write stall with flush-tokens off. The later-life case had
~200ms per second of write stalls with flush-tokens off. The lower
value of the latter is paradoxically due to the worse bandwidth
saturation: fsync latency rose from 2-4ms with flush-tokens on, to
11-20ms with flush-tokens off. This increase imposed a natural
backpressure on writes due to the need to sync the WAL. In contrast
the fsync latency was low in the early-life case, though it did
increase from 0.125ms to 0.25ms when flush-tokens were turned off.

In both cases, the admission throughput did not increase when turning
off flush-tokens. That is, the system cannot sustain more throughput,
but by turning on flush tokens, we shift queueing from the disk layer
the admission control layer (where we have the capability to reorder
work).

Fixes #77357

Release note (ops change): I/O admission control now reduces the
likelihood of storage layer write stalls, which can be caused by
memtable flushes becoming a bottleneck. This is done by limiting
write tokens based on flush throughput, so as to reduce storage
layer write stalls. Consequently, write tokens are now limited
both by flush throughput, and by compaction throughput out of L0.
This behavior is enabled by default. The new cluster setting
admission.min_flush_util_fraction, defaulting to 0.5, can be used to
disable or tune flush throughput based admission tokens. Setting
it to a value much much greater than 1, say 10, will disable flush
based tokens. Tuning the behavior, without disabling it, should
be done only on the recommendation of a domain expert.
  • Loading branch information
sumeerbhola committed Jul 14, 2022
1 parent 5ded4fc commit 6cad2d5
Show file tree
Hide file tree
Showing 11 changed files with 836 additions and 223 deletions.
9 changes: 9 additions & 0 deletions pkg/kv/kvserver/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -489,6 +489,12 @@ var (
Measurement: "Events",
Unit: metric.Unit_COUNT,
}
metaRdbWriteStallNanos = metric.Metadata{
Name: "storage.write-stall-nanos",
Help: "Total write stall duration in nanos",
Measurement: "Nanoseconds",
Unit: metric.Unit_NANOSECONDS,
}

// Disk health metrics.
metaDiskSlow = metric.Metadata{
Expand Down Expand Up @@ -1554,6 +1560,7 @@ type StoreMetrics struct {
RdbL0NumFiles *metric.Gauge
RdbBytesIngested [7]*metric.Gauge // idx = level
RdbWriteStalls *metric.Gauge
RdbWriteStallNanos *metric.Gauge

// Disk health metrics.
DiskSlow *metric.Gauge
Expand Down Expand Up @@ -2022,6 +2029,7 @@ func newStoreMetrics(histogramWindow time.Duration) *StoreMetrics {
RdbL0NumFiles: metric.NewGauge(metaRdbL0NumFiles),
RdbBytesIngested: rdbBytesIngested,
RdbWriteStalls: metric.NewGauge(metaRdbWriteStalls),
RdbWriteStallNanos: metric.NewGauge(metaRdbWriteStallNanos),

// Disk health metrics.
DiskSlow: metric.NewGauge(metaDiskSlow),
Expand Down Expand Up @@ -2279,6 +2287,7 @@ func (sm *StoreMetrics) updateEngineMetrics(m storage.Metrics) {
sm.RdbMarkedForCompactionFiles.Update(int64(m.Compact.MarkedFiles))
sm.RdbNumSSTables.Update(m.NumSSTables())
sm.RdbWriteStalls.Update(m.WriteStallCount)
sm.RdbWriteStallNanos.Update(m.WriteStallDuration.Nanoseconds())
sm.DiskSlow.Update(m.DiskSlowCount)
sm.DiskStalled.Update(m.DiskStallCount)

Expand Down
8 changes: 6 additions & 2 deletions pkg/server/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -785,8 +785,12 @@ func (n *Node) GetPebbleMetrics() []admission.StoreMetrics {
var metrics []admission.StoreMetrics
_ = n.stores.VisitStores(func(store *kvserver.Store) error {
m := store.Engine().GetMetrics()
metrics = append(
metrics, admission.StoreMetrics{StoreID: int32(store.StoreID()), Metrics: m.Metrics})
im := store.Engine().GetInternalIntervalMetrics()
metrics = append(metrics, admission.StoreMetrics{
StoreID: int32(store.StoreID()),
Metrics: m.Metrics,
WriteStallCount: m.WriteStallCount,
InternalIntervalMetrics: im})
return nil
})
return metrics
Expand Down
9 changes: 8 additions & 1 deletion pkg/storage/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -940,6 +940,12 @@ type Engine interface {
// MinVersionIsAtLeastTargetVersion returns whether the engine's recorded
// storage min version is at least the target version.
MinVersionIsAtLeastTargetVersion(target roachpb.Version) (bool, error)

// GetInternalIntervalMetrics returns low-level metrics from Pebble, that
// are reset at every interval, where an interval is defined over successive
// calls to this method. Hence, this should be used with care, with only one
// caller, which is currently the admission control subsystem.
GetInternalIntervalMetrics() *pebble.InternalIntervalMetrics
}

// Batch is the interface for batch specific operations.
Expand Down Expand Up @@ -978,7 +984,8 @@ type Metrics struct {
//
// We do not split this metric across these two reasons, but they can be
// distinguished in the pebble logs.
WriteStallCount int64
WriteStallCount int64
WriteStallDuration time.Duration
// DiskSlowCount counts the number of times Pebble records disk slowness.
DiskSlowCount int64
// DiskStallCount counts the number of times Pebble observes slow writes
Expand Down
39 changes: 32 additions & 7 deletions pkg/storage/pebble.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/errors/oserror"
Expand Down Expand Up @@ -632,9 +633,11 @@ type Pebble struct {

// Stats updated by pebble.EventListener invocations, and returned in
// GetMetrics. Updated and retrieved atomically.
writeStallCount int64
diskSlowCount int64
diskStallCount int64
writeStallCount int64
writeStallDuration time.Duration
writeStallStartNanos int64
diskSlowCount int64
diskStallCount int64

// Relevant options copied over from pebble.Options.
fs vfs.FS
Expand Down Expand Up @@ -905,6 +908,22 @@ func (p *Pebble) makeMetricEtcEventListener(ctx context.Context) pebble.EventLis
return pebble.EventListener{
WriteStallBegin: func(info pebble.WriteStallBeginInfo) {
atomic.AddInt64(&p.writeStallCount, 1)
startNanos := timeutil.Now().UnixNano()
atomic.StoreInt64(&p.writeStallStartNanos, startNanos)
},
WriteStallEnd: func() {
startNanos := atomic.SwapInt64(&p.writeStallStartNanos, 0)
if startNanos == 0 {
// Should not happen since these callbacks are registered when Pebble
// is opened, but just in case we miss the WriteStallBegin, lets not
// corrupt the metric.
return
}
stallDuration := timeutil.Now().UnixNano() - startNanos
if stallDuration < 0 {
return
}
atomic.AddInt64((*int64)(&p.writeStallDuration), stallDuration)
},
DiskSlow: func(info pebble.DiskSlowInfo) {
maxSyncDuration := maxSyncDurationDefault
Expand Down Expand Up @@ -1486,13 +1505,19 @@ func (p *Pebble) Flush() error {
func (p *Pebble) GetMetrics() Metrics {
m := p.db.Metrics()
return Metrics{
Metrics: m,
WriteStallCount: atomic.LoadInt64(&p.writeStallCount),
DiskSlowCount: atomic.LoadInt64(&p.diskSlowCount),
DiskStallCount: atomic.LoadInt64(&p.diskStallCount),
Metrics: m,
WriteStallCount: atomic.LoadInt64(&p.writeStallCount),
WriteStallDuration: time.Duration(atomic.LoadInt64((*int64)(&p.writeStallDuration))),
DiskSlowCount: atomic.LoadInt64(&p.diskSlowCount),
DiskStallCount: atomic.LoadInt64(&p.diskStallCount),
}
}

// GetInternalIntervalMetrics implements the Engine interface.
func (p *Pebble) GetInternalIntervalMetrics() *pebble.InternalIntervalMetrics {
return p.db.InternalIntervalMetrics()
}

// GetEncryptionRegistries implements the Engine interface.
func (p *Pebble) GetEncryptionRegistries() (*EncryptionRegistries, error) {
rv := &EncryptionRegistries{}
Expand Down
5 changes: 5 additions & 0 deletions pkg/ts/catalog/chart_catalog.go
Original file line number Diff line number Diff line change
Expand Up @@ -2823,6 +2823,11 @@ var charts = []sectionDescription{
Title: "Stalls",
Metrics: []string{"storage.write-stalls"},
},
{
Title: "Stall Duration",
Metrics: []string{"storage.write-stall-nanos"},
AxisLabel: "Duration (nanos)",
},
},
},
{
Expand Down
14 changes: 11 additions & 3 deletions pkg/util/admission/admissionpb/io_threshold.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,18 @@ import (
)

// Score returns, as the second return value, whether IO admission control is
// considering the Store overloaded. The first return value is a 1-normalized
// float (i.e. 1.0 is the threshold at which the second value flips to true).
// considering the Store overloaded wrt compaction of L0. The first return
// value is a 1-normalized float (i.e. 1.0 is the threshold at which the
// second value flips to true).
//
// The zero value returns (0, false). Use of the nil pointer is not allowed.
//
// TODO(sumeer): consider whether we need to enhance this to incorporate
// overloading via flush bandwidth. I suspect we can get away without
// incorporating flush bandwidth since typically chronic overload will be due
// to compactions falling behind (though that may change if we increase the
// max number of compactions). And we will need to incorporate overload due to
// disk bandwidth bottleneck.
func (iot IOThreshold) Score() (float64, bool) {
if iot == (IOThreshold{}) {
return 0, false
Expand All @@ -42,7 +50,7 @@ func (iot IOThreshold) SafeFormat(s interfaces.SafePrinter, _ rune) {
sc, overload := iot.Score()
s.Printf("%.3f", redact.SafeFloat(sc))
if overload {
s.Printf("[overload]")
s.Printf("[L0-overload]")
}
}

Expand Down
Loading

0 comments on commit 6cad2d5

Please sign in to comment.