Skip to content

Commit

Permalink
server,admission: automatically infer disk names for stores
Browse files Browse the repository at this point in the history
Part of cockroachdb#86857.

This commit eliminate the need to provide the disk-name common
environments e.g. linux with store on EBS or GCP PD. To make use of AC's
disk bandwidth tokens, users still need to specify the provisioned
bandwidth, for now. So in a sense this machinery is still "disabled by
default". They can also do this through the
kvadmission.store.provisioned_bandwidth cluster setting.

Next steps:

- add roachtests that make use of these disk bandwidth tokens;
- automatically measure provisioned bandwidth, using something like
  github.com/irfansharif/probe, gate behind envvars or cluster settings;
- roll it out in managed environments;
- roll it out elsewhere.

Release note: None
  • Loading branch information
irfansharif committed Sep 2, 2023
1 parent d9c137d commit 885abad
Show file tree
Hide file tree
Showing 12 changed files with 219 additions and 51 deletions.
41 changes: 18 additions & 23 deletions pkg/base/store_spec.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,23 +162,17 @@ func (ss *SizeSpec) Set(value string) error {
}

// ProvisionedRateSpec is an optional part of the StoreSpec.
//
// TODO(sumeer): We should map the file path specified in the store spec to
// the disk name. df can be used to map paths to names like /dev/nvme1n1 and
// /dev/sdb (these examples are from AWS EBS and GCP PD respectively) and the
// corresponding names produced by disk_counters.go are nvme1n1 and sdb
// respectively. We need to find or write a platform independent library --
// see the discussion on
// https://github.com/cockroachdb/cockroach/pull/86063#pullrequestreview-1074487018.
// With that change, the ProvisionedRateSpec would only be needed to override
// the cluster setting when there are heterogenous bandwidth limits in a
// cluster (there would be no more DiskName field).
type ProvisionedRateSpec struct {
// DiskName is the name of the disk observed by the code in disk_counters.go
// when retrieving stats for this store.
// when retrieving stats for this store. They look like /dev/nvme1n1 or
// /dev/sdb on AWS EBS and GCP PD respectively. (The names in
// disk_counters.go omit the /dev suffix). It's optionally passed in using
// --store=provisioned-rate=disk-name=nvme1n1. When unspecified, the disk
// names are automatically inferred from the store path.
DiskName string
// ProvisionedBandwidth is the bandwidth provisioned for this store in
// bytes/s.
// bytes/s. It's optionally passed in using
// --store=provisioned-rate=bandwidth=250MiB/s.
ProvisionedBandwidth int64
}

Expand Down Expand Up @@ -234,10 +228,6 @@ func newStoreProvisionedRateSpec(
field, subField)
}
}
if len(spec.DiskName) == 0 {
return ProvisionedRateSpec{},
errors.Errorf("%s field did not specify disk-name", field)
}
return spec, nil
}

Expand Down Expand Up @@ -314,9 +304,12 @@ func (ss StoreSpec) String() string {
fmt.Fprint(&buffer, optsStr)
fmt.Fprint(&buffer, ",")
}
if len(ss.ProvisionedRateSpec.DiskName) > 0 {
fmt.Fprintf(&buffer, "provisioned-rate=disk-name=%s",
ss.ProvisionedRateSpec.DiskName)
if ss.ProvisionedRateSpec.ProvisionedBandwidth > 0 {
diskName := "<omitted>"
if len(ss.ProvisionedRateSpec.DiskName) > 0 {
diskName = ss.ProvisionedRateSpec.DiskName
}
fmt.Fprintf(&buffer, "provisioned-rate=disk-name=%s", diskName)
if ss.ProvisionedRateSpec.ProvisionedBandwidth > 0 {
fmt.Fprintf(&buffer, ":bandwidth=%s/s,",
humanizeutil.IBytes(ss.ProvisionedRateSpec.ProvisionedBandwidth))
Expand Down Expand Up @@ -366,10 +359,12 @@ var fractionRegex = regexp.MustCompile(`^([-]?([0-9]+\.[0-9]*|[0-9]*\.[0-9]+|[0-
// - 20% -> 20% of the available space
// - 0.2 -> 20% of the available space
// - attrs=xxx:yyy:zzz A colon separated list of optional attributes.
// - provisioned-rate=disk-name=<disk-name>[:bandwidth=<bandwidth-bytes/s>] The
// - provisioned-rate=[disk-name=<disk-name>][:][bandwidth=<bandwidth-bytes/s>] The
// provisioned-rate can be used for admission control for operations on the
// store. The bandwidth is optional, and if unspecified, a cluster setting
// (kvadmission.store.provisioned_bandwidth) will be used.
// store. The disk-name and bandwidth are optional. If disk-name is
// unspecified, it's automatically inferred. If the bandwidth is
// unspecified, a cluster setting (kvadmission.store.provisioned_bandwidth),
// if non-zero, is used.
//
// Note that commas are forbidden within any field name or value.
func NewStoreSpec(value string) (StoreSpec, error) {
Expand Down
16 changes: 10 additions & 6 deletions pkg/cli/cliflags/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -998,18 +998,22 @@ memory that the store may consume, for example:
</PRE>
Optionally, to configure admission control enforcement to prevent disk
bandwidth saturation, the "provisioned-rate" field can be specified with
the "disk-name" and an optional "bandwidth". The bandwidth is used to override
the value of the cluster setting, kvadmission.store.provisioned_bandwidth.
For example:
the optional "disk-name" and "bandwidth" parameters. If the "bandwidth"
parameter is specified for a given store, admission control enforcement is
enabled. Using the kvadmission.store.provisioned_bandwidth cluster setting also
enables admission control enforcement. The "bandwidth" parameter takes
precedence over the cluster setting. Example usage:
<PRE>
--store=provisioned-rate=disk-name=nvme1n1
--store=provisioned-rate=disk-name=sdb:bandwidth=250MiB/s
--store=provisioned-rate=disk-name=nvme1n1
--store=provisioned-rate=bandwidth=250MiB/s
</PRE>
Commas are forbidden in all values, since they are used to separate fields.
Also, if you use equal signs in the file path to a store, you must use the
"path" field label.
If you use equal signs in the file path to a store, you must use the "path"
field label. If the "disk-name" field is omitted, CockroachDB will attempt to
automatically infer the disk name from the given store path.
(default is 'cockroach-data' in current directory except for mt commands
which use 'cockroach-data-tenant-X' for tenant 'X')
Expand Down
6 changes: 6 additions & 0 deletions pkg/kv/kvserver/kvadmission/kvadmission.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,12 @@ var rangefeedCatchupScanElasticControlEnabled = settings.RegisterBoolSetting(

// ProvisionedBandwidth set a value of the provisioned
// bandwidth for each store in the cluster.
//
// TODO(irfansharif): Now that we automatically infer disk names, if we're
// unable to for whatever reason and this bandwidth setting is non-zero, we
// don't have an easy way to communicate to the user that admission control
// enforcement for disk bandwidth is not happening, despite expectations. Maybe
// we surface this through SQL hints somehow?
var ProvisionedBandwidth = settings.RegisterByteSizeSetting(
settings.SystemOnly, "kvadmission.store.provisioned_bandwidth",
"if set to a non-zero value, this is used as the provisioned bandwidth (in bytes/s), "+
Expand Down
1 change: 1 addition & 0 deletions pkg/server/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,7 @@ go_library(
"//pkg/util/stop",
"//pkg/util/strutil",
"//pkg/util/syncutil",
"//pkg/util/sysutil",
"//pkg/util/timeutil",
"//pkg/util/timeutil/ptp",
"//pkg/util/tracing",
Expand Down
75 changes: 64 additions & 11 deletions pkg/server/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/startup"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/sysutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/cockroach/pkg/util/tracing/grpcinterceptor"
Expand Down Expand Up @@ -1009,8 +1010,16 @@ func (n *Node) UpdateIOThreshold(id roachpb.StoreID, threshold *admissionpb.IOTh
// diskStatsMap encapsulates all the logic for populating DiskStats for
// admission.StoreMetrics.
type diskStatsMap struct {
provisionedRate map[roachpb.StoreID]base.ProvisionedRateSpec
// diskNameToStoreID maps between attached disk names (/dev/sdb and
// /dev/nvme1n1 for example, from GCP PD and AWS EBS respectively) to
// corresponding store IDs.
diskNameToStoreID map[string]roachpb.StoreID
// provisionedRate is passed through using
// --store=provisioned-rate=disk-name=sdb:bandwidth=250MiB/s.
//
// TODO(irfansharif): Use something like github.com/irfansharif/probe to
// measure this empirically on process start, if not explicitly passed in.
provisionedRate map[roachpb.StoreID]base.ProvisionedRateSpec
}

func (dsm *diskStatsMap) tryPopulateAdmissionDiskStats(
Expand All @@ -1026,9 +1035,9 @@ func (dsm *diskStatsMap) tryPopulateAdmissionDiskStats(
return stats, err
}
stats = make(map[roachpb.StoreID]admission.DiskStats)
for id, spec := range dsm.provisionedRate {
for _, id := range dsm.diskNameToStoreID {
s := admission.DiskStats{ProvisionedBandwidth: clusterProvisionedBandwidth}
if spec.ProvisionedBandwidth > 0 {
if spec, ok := dsm.provisionedRate[id]; ok && spec.ProvisionedBandwidth > 0 {
s.ProvisionedBandwidth = spec.ProvisionedBandwidth
}
stats[id] = s
Expand All @@ -1045,31 +1054,72 @@ func (dsm *diskStatsMap) tryPopulateAdmissionDiskStats(
}

func (dsm *diskStatsMap) empty() bool {
return len(dsm.provisionedRate) == 0
return len(dsm.diskNameToStoreID) == 0
}

func (dsm *diskStatsMap) initDiskStatsMap(specs []base.StoreSpec, engines []storage.Engine) error {
func (dsm *diskStatsMap) maybeInitDiskStatsMap(
ctx context.Context, specs []base.StoreSpec, engines []storage.Engine,
) error {
*dsm = diskStatsMap{
provisionedRate: make(map[roachpb.StoreID]base.ProvisionedRateSpec),
diskNameToStoreID: make(map[string]roachpb.StoreID),
}

deviceIDToName := make(map[uint64]string)
if mapping, err := sysutil.GetDeviceMap(); err != nil {
log.Errorf(ctx, "unable to retrieve mapping between device ID to name: %v", err)
return nil // nolint:returnerrcheck
} else {
deviceIDToName = mapping
}

for i := range engines {
id, err := kvstorage.ReadStoreIdent(context.Background(), engines[i])
storeIdent, err := kvstorage.ReadStoreIdent(ctx, engines[i])
if err != nil {
return err
}

// STORAGE_MIN_VERSION is guaranteed to exist for initialized stores.
// We'll look at its file metadata to see what device contains the file
// (st_dev specifically, see https://linux.die.net/man/2/stat),
// and match it against the list of device IDs retrieved earlier.
fi, err := engines[i].Stat(engines[i].PathJoin(specs[i].Path, storage.MinVersionFilename))
if err != nil {
return err
}

if deviceID, ok := sysutil.GetDeviceID(fi); ok {
if name, ok := deviceIDToName[deviceID]; ok {
dsm.diskNameToStoreID[name] = storeIdent.StoreID
}
}

if len(specs[i].ProvisionedRateSpec.DiskName) > 0 {
dsm.provisionedRate[id.StoreID] = specs[i].ProvisionedRateSpec
dsm.diskNameToStoreID[specs[i].ProvisionedRateSpec.DiskName] = id.StoreID
// If users specify disk names manually, compare the automatically
// observed data against what they spelled out.
dsm.provisionedRate[storeIdent.StoreID] = specs[i].ProvisionedRateSpec

diskName := specs[i].ProvisionedRateSpec.DiskName
if storeID, found := dsm.diskNameToStoreID[diskName]; found {
if storeID != storeIdent.StoreID {
return errors.Newf("mismatched store ID for disk %s, expected %d got %d",
diskName, storeID, storeIdent.StoreID)
}
} else {
dsm.diskNameToStoreID[diskName] = storeIdent.StoreID
}
}
}
for diskName, storeID := range dsm.diskNameToStoreID {
log.Infof(ctx, "s%d mapped to disk %s", storeID, diskName)
}
return nil
}

func (n *Node) registerEnginesForDiskStatsMap(
specs []base.StoreSpec, engines []storage.Engine,
ctx context.Context, specs []base.StoreSpec, engines []storage.Engine,
) error {
return n.diskStatsMap.initDiskStatsMap(specs, engines)
return n.diskStatsMap.maybeInitDiskStatsMap(ctx, specs, engines)
}

// GetPebbleMetrics implements admission.PebbleMetricsProvider.
Expand All @@ -1081,6 +1131,7 @@ func (n *Node) GetPebbleMetrics() []admission.StoreMetrics {
if err != nil {
log.Warningf(context.Background(), "%v",
errors.Wrapf(err, "unable to populate disk stats"))
return nil
}
var metrics []admission.StoreMetrics
_ = n.stores.VisitStores(func(store *kvserver.Store) error {
Expand All @@ -1093,9 +1144,11 @@ func (n *Node) GetPebbleMetrics() []admission.StoreMetrics {
StoreID: store.StoreID(),
Metrics: m.Metrics,
WriteStallCount: m.WriteStallCount,
DiskStats: diskStats})
DiskStats: diskStats,
})
return nil
})

return metrics
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/server/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -953,7 +953,7 @@ func TestDiskStatsMap(t *testing.T) {
require.Equal(t, 0, len(stats))

// diskStatsMap initialized with these two stores.
require.NoError(t, dsm.initDiskStatsMap(specs, engines))
require.NoError(t, dsm.maybeInitDiskStatsMap(ctx, specs, engines))

// diskStatsFunc returns stats for these two stores, and an unknown store.
diskStatsFunc := func(context.Context) ([]status.DiskStats, error) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1946,7 +1946,7 @@ func (s *topLevelServer) PreStart(ctx context.Context) error {
// wholly initialized stores (it reads the StoreIdentKeys). It also needs
// to come before the call into SetPebbleMetricsProvider, which internally
// uses the disk stats map we're initializing.
if err := s.node.registerEnginesForDiskStatsMap(s.cfg.Stores.Specs, s.engines); err != nil {
if err := s.node.registerEnginesForDiskStatsMap(ctx, s.cfg.Stores.Specs, s.engines); err != nil {
return errors.Wrapf(err, "failed to register engines for the disk stats map")
}

Expand Down
10 changes: 5 additions & 5 deletions pkg/util/admission/disk_bandwidth.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ func (d *diskLoadWatcher) getLoadLevel() diskLoadLevel {
}

func (d diskLoadWatcher) SafeFormat(p redact.SafePrinter, _ rune) {
p.Printf("disk bandwidth: read: %s/s, write: %s/s, provisioned: %s/s, util: %.2f",
p.Printf("read-bw %s/s write-bw %s/s provisioned-bw %s/s, util: %.2f",
humanizeutil.IBytes(d.lastInterval.readBandwidth),
humanizeutil.IBytes(d.lastInterval.writeBandwidth),
humanizeutil.IBytes(d.lastInterval.provisionedBandwidth), d.lastUtil)
Expand Down Expand Up @@ -359,9 +359,9 @@ func (d *diskBandwidthLimiter) computeElasticTokens(
func (d *diskBandwidthLimiter) SafeFormat(p redact.SafePrinter, _ rune) {
ib := humanizeutil.IBytes
level := d.diskLoadWatcher.getLoadLevel()
p.Printf("diskBandwidthLimiter %s (%v): elastic-frac: %.2f, incoming: %s, "+
"elastic-tokens (used %s): %s",
p.Printf("disk bandwidth load level: %s (%v); elastic-frac%.2f, incoming%s, "+
"elastic-disk-bw-tokens %s (used %s)",
diskLoadLevelString(level), d.diskLoadWatcher, d.state.smoothedElasticFraction,
ib(int64(d.state.smoothedIncomingBytes)), ib(d.state.prevElasticTokensUsed),
ib(d.state.elasticTokens))
ib(int64(d.state.smoothedIncomingBytes)), ib(d.state.elasticTokens),
ib(d.state.prevElasticTokensUsed))
}
9 changes: 5 additions & 4 deletions pkg/util/admission/io_load_listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -1059,13 +1059,14 @@ func (res adjustTokensResult) SafeFormat(p redact.SafePrinter, _ rune) {
p.SafeString("all")
}
if res.elasticDiskBWTokens != unlimitedTokens {
p.Printf("; elastic-disk-bw tokens %s (used %s, regular used %s): "+
"write model %.2fx+%s ingest model %.2fx+%s, ",
ib(res.elasticDiskBWTokens), ib(res.aux.diskBW.intervalLSMInfo.elasticTokensUsed),
p.Printf("; elastic-disk-bw-tokens %s (used %s regular-used %s): "+
"write-model %.2fx+%s ingest-model %.2fx+%s, ",
ib(res.elasticDiskBWTokens),
ib(res.aux.diskBW.intervalLSMInfo.elasticTokensUsed),
ib(res.aux.diskBW.intervalLSMInfo.regularTokensUsed),
res.l0WriteLM.multiplier, ib(res.l0WriteLM.constant),
res.ingestLM.multiplier, ib(res.ingestLM.constant))
p.Printf("disk bw read %s write %s provisioned %s",
p.Printf("read-bw %s/s write-bw %s/s provisioned-bw %s/s",
ib(res.aux.diskBW.intervalDiskLoadInfo.readBandwidth),
ib(res.aux.diskBW.intervalDiskLoadInfo.writeBandwidth),
ib(res.aux.diskBW.intervalDiskLoadInfo.provisionedBandwidth))
Expand Down
2 changes: 2 additions & 0 deletions pkg/util/sysutil/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ go_library(
"acl_unix.go",
"acl_windows.go",
"aclinfo.go",
"device_unix.go",
"device_windows.go",
"large_file.go",
"large_file_linux.go",
"large_file_nonlinux.go",
Expand Down
Loading

0 comments on commit 885abad

Please sign in to comment.