diff --git a/docs/generated/metrics/metrics.html b/docs/generated/metrics/metrics.html
index 322cf53ac390..c420e3e73441 100644
--- a/docs/generated/metrics/metrics.html
+++ b/docs/generated/metrics/metrics.html
@@ -630,6 +630,15 @@
STORAGE | storage.compactions.keys.pinned.count | Cumulative count of storage engine KVs written to sstables during flushes and compactions due to open LSM snapshots.
Various subsystems of CockroachDB take LSM snapshots to maintain a consistent view of the database over an extended duration. In order to maintain the consistent view, flushes and compactions within the storage engine must preserve keys that otherwise would have been dropped. This increases write amplification, and introduces keys that must be skipped during iteration. This metric records the cumulative count of KVs preserved during flushes and compactions over the lifetime of the process.
| Keys | GAUGE | COUNT | AVG | NONE |
STORAGE | storage.disk-slow | Number of instances of disk operations taking longer than 10s | Events | GAUGE | COUNT | AVG | NONE |
STORAGE | storage.disk-stalled | Number of instances of disk operations taking longer than 20s | Events | GAUGE | COUNT | AVG | NONE |
+STORAGE | storage.disk.io.time | Time spent reading from or writing to the store's disk since this process started (as reported by the OS) | Time | GAUGE | NANOSECONDS | AVG | NONE |
+STORAGE | storage.disk.iopsinprogress | IO operations currently in progress on the store's disk (as reported by the OS) | Operations | GAUGE | COUNT | AVG | NONE |
+STORAGE | storage.disk.read.bytes | Bytes read from the store's disk since this process started (as reported by the OS) | Bytes | GAUGE | BYTES | AVG | NONE |
+STORAGE | storage.disk.read.count | Disk read operations on the store's disk since this process started (as reported by the OS) | Operations | GAUGE | COUNT | AVG | NONE |
+STORAGE | storage.disk.read.time | Time spent reading from the store's disk since this process started (as reported by the OS) | Time | GAUGE | NANOSECONDS | AVG | NONE |
+STORAGE | storage.disk.weightedio.time | Weighted time spent reading from or writing to the store's disk since this process started (as reported by the OS) | Time | GAUGE | NANOSECONDS | AVG | NONE |
+STORAGE | storage.disk.write.bytes | Bytes written to the store's disk since this process started (as reported by the OS) | Bytes | GAUGE | BYTES | AVG | NONE |
+STORAGE | storage.disk.write.count | Disk write operations on the store's disk since this process started (as reported by the OS) | Operations | GAUGE | COUNT | AVG | NONE |
+STORAGE | storage.disk.write.time | Time spent writing to the store's disks since this process started (as reported by the OS) | Time | GAUGE | NANOSECONDS | AVG | NONE |
STORAGE | storage.flush.ingest.count | Flushes performing an ingest (flushable ingestions) | Flushes | GAUGE | COUNT | AVG | NONE |
STORAGE | storage.flush.ingest.table.bytes | Bytes ingested via flushes (flushable ingestions) | Bytes | GAUGE | BYTES | AVG | NONE |
STORAGE | storage.flush.ingest.table.count | Tables ingested via flushes (flushable ingestions) | Tables | GAUGE | COUNT | AVG | NONE |
@@ -1595,7 +1604,7 @@
SERVER | sys.host.disk.read.bytes | Bytes read from all disks since this process started (as reported by the OS) | Bytes | GAUGE | BYTES | AVG | NONE |
SERVER | sys.host.disk.read.count | Disk read operations across all disks since this process started (as reported by the OS) | Operations | GAUGE | COUNT | AVG | NONE |
SERVER | sys.host.disk.read.time | Time spent reading from all disks since this process started (as reported by the OS) | Time | GAUGE | NANOSECONDS | AVG | NONE |
-SERVER | sys.host.disk.weightedio.time | Weighted time spent reading from or writing to to all disks since this process started (as reported by the OS) | Time | GAUGE | NANOSECONDS | AVG | NONE |
+SERVER | sys.host.disk.weightedio.time | Weighted time spent reading from or writing to all disks since this process started (as reported by the OS) | Time | GAUGE | NANOSECONDS | AVG | NONE |
SERVER | sys.host.disk.write.bytes | Bytes written to all disks since this process started (as reported by the OS) | Bytes | GAUGE | BYTES | AVG | NONE |
SERVER | sys.host.disk.write.count | Disk write operations across all disks since this process started (as reported by the OS) | Operations | GAUGE | COUNT | AVG | NONE |
SERVER | sys.host.disk.write.time | Time spent writing to all disks since this process started (as reported by the OS) | Time | GAUGE | NANOSECONDS | AVG | NONE |
diff --git a/pkg/BUILD.bazel b/pkg/BUILD.bazel
index 8ab1da50f192..71e8bfd04ac8 100644
--- a/pkg/BUILD.bazel
+++ b/pkg/BUILD.bazel
@@ -613,6 +613,7 @@ ALL_TESTS = [
"//pkg/sql/types:types_test",
"//pkg/sql:sql_disallowed_imports_test",
"//pkg/sql:sql_test",
+ "//pkg/storage/disk:disk_test",
"//pkg/storage/enginepb:enginepb_test",
"//pkg/storage/fs:fs_test",
"//pkg/storage/metamorphic:metamorphic_test",
@@ -2225,6 +2226,8 @@ GO_TARGETS = [
"//pkg/sql/vtable:vtable",
"//pkg/sql:sql",
"//pkg/sql:sql_test",
+ "//pkg/storage/disk:disk",
+ "//pkg/storage/disk:disk_test",
"//pkg/storage/enginepb:enginepb",
"//pkg/storage/enginepb:enginepb_test",
"//pkg/storage/fs:fs",
diff --git a/pkg/ccl/changefeedccl/BUILD.bazel b/pkg/ccl/changefeedccl/BUILD.bazel
index 0fa3d9dafb0f..1c5ec6c8f074 100644
--- a/pkg/ccl/changefeedccl/BUILD.bazel
+++ b/pkg/ccl/changefeedccl/BUILD.bazel
@@ -299,6 +299,7 @@ go_test(
"//pkg/util/ctxgroup",
"//pkg/util/encoding",
"//pkg/util/hlc",
+ "//pkg/util/intsets",
"//pkg/util/json",
"//pkg/util/leaktest",
"//pkg/util/log",
diff --git a/pkg/ccl/changefeedccl/changefeed_dist.go b/pkg/ccl/changefeedccl/changefeed_dist.go
index c25b5bcacbd4..db8ca1edba7f 100644
--- a/pkg/ccl/changefeedccl/changefeed_dist.go
+++ b/pkg/ccl/changefeedccl/changefeed_dist.go
@@ -10,12 +10,14 @@ package changefeedccl
import (
"context"
+ "math"
"sort"
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdceval"
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/jobs/jobsprofiler"
+ "github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord"
"github.com/cockroachdb/cockroach/pkg/roachpb"
@@ -35,7 +37,9 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/util"
+ "github.com/cockroachdb/cockroach/pkg/util/buildutil"
"github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
+ "github.com/cockroachdb/cockroach/pkg/util/envutil"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/errors"
@@ -390,7 +394,7 @@ func makePlan(
return nil, nil, err
}
if log.ExpensiveLogEnabled(ctx, 2) {
- log.Infof(ctx, "spans returned by DistSQL: %s", spanPartitions)
+ log.Infof(ctx, "spans returned by DistSQL: %v", spanPartitions)
}
switch {
case distMode == sql.LocalDistribution || rangeDistribution == int64(defaultDistribution):
@@ -400,14 +404,14 @@ func makePlan(
}
sender := execCtx.ExecCfg().DB.NonTransactionalSender()
distSender := sender.(*kv.CrossRangeTxnWrapperSender).Wrapped().(*kvcoord.DistSender)
-
+ ri := kvcoord.MakeRangeIterator(distSender)
spanPartitions, err = rebalanceSpanPartitions(
- ctx, &distResolver{distSender}, rebalanceThreshold.Get(sv), spanPartitions)
+ ctx, &ri, rebalanceThreshold.Get(sv), spanPartitions)
if err != nil {
return nil, nil, err
}
if log.ExpensiveLogEnabled(ctx, 2) {
- log.Infof(ctx, "spans after balanced simple distribution rebalancing: %s", spanPartitions)
+ log.Infof(ctx, "spans after balanced simple distribution rebalancing: %v", spanPartitions)
}
default:
return nil, nil, errors.AssertionFailedf("unsupported dist strategy %d and dist mode %d",
@@ -440,7 +444,7 @@ func makePlan(
aggregatorSpecs := make([]*execinfrapb.ChangeAggregatorSpec, len(spanPartitions))
for i, sp := range spanPartitions {
if log.ExpensiveLogEnabled(ctx, 2) {
- log.Infof(ctx, "watched spans for node %d: %s", sp.SQLInstanceID, sp)
+ log.Infof(ctx, "watched spans for node %d: %v", sp.SQLInstanceID, sp)
}
watches := make([]execinfrapb.ChangeAggregatorSpec_Watch, len(sp.Spans))
for watchIdx, nodeSpan := range sp.Spans {
@@ -549,6 +553,7 @@ func (w *changefeedResultWriter) Err() error {
return w.err
}
+// TODO(#120427): improve this to be more useful.
var rebalanceThreshold = settings.RegisterFloatSetting(
settings.ApplicationLevel,
"changefeed.balance_range_distribution.sensitivity",
@@ -557,80 +562,178 @@ var rebalanceThreshold = settings.RegisterFloatSetting(
settings.PositiveFloat,
)
-type rangeResolver interface {
- getRangesForSpans(ctx context.Context, spans []roachpb.Span) ([]roachpb.Span, error)
+type rangeIterator interface {
+ Desc() *roachpb.RangeDescriptor
+ NeedAnother(rs roachpb.RSpan) bool
+ Valid() bool
+ Error() error
+ Next(ctx context.Context)
+ Seek(ctx context.Context, key roachpb.RKey, scanDir kvcoord.ScanDirection)
}
-type distResolver struct {
- *kvcoord.DistSender
+// rebalancingPartition is a container used to store a partition undergoing
+// rebalancing.
+type rebalancingPartition struct {
+ // These fields store the current number of ranges and spans in this partition.
+ // They are initialized corresponding to the sql.SpanPartition partition below
+ // and mutated during rebalancing.
+ numRanges int
+ group roachpb.SpanGroup
+
+ // The original span partition corresponding to this bucket and its
+ // index in the original []sql.SpanPartition.
+ part sql.SpanPartition
+ pIdx int
}
-func (r *distResolver) getRangesForSpans(
- ctx context.Context, spans []roachpb.Span,
-) ([]roachpb.Span, error) {
- spans, _, err := r.DistSender.AllRangeSpans(ctx, spans)
- return spans, err
-}
+// Setting expensiveReblanceChecksEnabled = true will cause re-balancing to
+// panic if the output list of partitions does not cover the same keys as the
+// input list of partitions.
+var expensiveReblanceChecksEnabled = buildutil.CrdbTestBuild || envutil.EnvOrDefaultBool(
+ "COCKROACH_CHANGEFEED_TESTING_REBALANCING_CHECKS", false)
func rebalanceSpanPartitions(
- ctx context.Context, r rangeResolver, sensitivity float64, p []sql.SpanPartition,
+ ctx context.Context, ri rangeIterator, sensitivity float64, partitions []sql.SpanPartition,
) ([]sql.SpanPartition, error) {
- if len(p) <= 1 {
- return p, nil
+ if len(partitions) <= 1 {
+ return partitions, nil
}
- // Explode set of spans into set of ranges.
- // TODO(yevgeniy): This might not be great if the tables are huge.
- numRanges := 0
- for i := range p {
- spans, err := r.getRangesForSpans(ctx, p[i].Spans)
- if err != nil {
- return nil, err
+ // Create partition builder structs for the partitions array above.
+ var builders = make([]rebalancingPartition, len(partitions))
+ var totalRanges int
+ for i, p := range partitions {
+ builders[i].part = p
+ builders[i].pIdx = i
+ nRanges, ok := p.NumRanges()
+ // We cannot rebalance if we're missing range information.
+ if !ok {
+ log.Warning(ctx, "skipping rebalance due to missing range info")
+ return partitions, nil
}
- p[i].Spans = spans
- numRanges += len(spans)
+ builders[i].numRanges = nRanges
+ totalRanges += nRanges
+ builders[i].group.Add(p.Spans...)
}
// Sort descending based on the number of ranges.
- sort.Slice(p, func(i, j int) bool {
- return len(p[i].Spans) > len(p[j].Spans)
+ sort.Slice(builders, func(i, j int) bool {
+ return builders[i].numRanges > builders[j].numRanges
})
- targetRanges := int((1 + sensitivity) * float64(numRanges) / float64(len(p)))
-
- for i, j := 0, len(p)-1; i < j && len(p[i].Spans) > targetRanges && len(p[j].Spans) < targetRanges; {
- from, to := i, j
-
- // Figure out how many ranges we can move.
- numToMove := len(p[from].Spans) - targetRanges
- canMove := targetRanges - len(p[to].Spans)
- if numToMove <= canMove {
- i++
- }
- if canMove <= numToMove {
- numToMove = canMove
- j--
+ targetRanges := int(math.Ceil((1 + sensitivity) * float64(totalRanges) / float64(len(partitions))))
+ to := len(builders) - 1
+ from := 0
+
+ // In each iteration of the outer loop, check if `from` has too many ranges.
+ // If so, move them to other partitions which need more ranges
+ // starting from `to` and moving down. Otherwise, increment `from` and check
+ // again.
+ for ; from < to && builders[from].numRanges > targetRanges; from++ {
+ // numToMove is the number of ranges which need to be moved out of `from`
+ // to other partitions.
+ numToMove := builders[from].numRanges - targetRanges
+ count := 0
+ needMore := func() bool {
+ return count < numToMove
}
- if numToMove == 0 {
- break
+ // Iterate over all the spans in `from`.
+ for spanIdx := 0; from < to && needMore() && spanIdx < len(builders[from].part.Spans); spanIdx++ {
+ sp := builders[from].part.Spans[spanIdx]
+ rSpan, err := keys.SpanAddr(sp)
+ if err != nil {
+ return nil, err
+ }
+ // Iterate over the ranges in the current span.
+ for ri.Seek(ctx, rSpan.Key, kvcoord.Ascending); from < to && needMore(); ri.Next(ctx) {
+ // Error check.
+ if !ri.Valid() {
+ return nil, ri.Error()
+ }
+
+ // Move one range from `from` to `to`.
+ count += 1
+ builders[from].numRanges -= 1
+ builders[to].numRanges += 1
+ // If the range boundaries are outside the original span, trim
+ // the range.
+ startKey := ri.Desc().StartKey
+ if startKey.Compare(rSpan.Key) == -1 {
+ startKey = rSpan.Key
+ }
+ endKey := ri.Desc().EndKey
+ if endKey.Compare(rSpan.EndKey) == 1 {
+ endKey = rSpan.EndKey
+ }
+ diff := roachpb.Span{
+ Key: startKey.AsRawKey(), EndKey: endKey.AsRawKey(),
+ }
+ builders[from].group.Sub(diff)
+ builders[to].group.Add(diff)
+
+ // Since we moved a range, `to` may have enough ranges.
+ // Decrement `to` until we find a new partition which needs more
+ // ranges.
+ for from < to && builders[to].numRanges >= targetRanges {
+ to--
+ }
+ // No more ranges in this span.
+ if !ri.NeedAnother(rSpan) {
+ break
+ }
+ }
}
+ }
- // Move numToMove spans from 'from' to 'to'.
- idx := len(p[from].Spans) - numToMove
- p[to].Spans = append(p[to].Spans, p[from].Spans[idx:]...)
- p[from].Spans = p[from].Spans[:idx]
+ // Overwrite the original partitions slice with the balanced partitions.
+ for _, b := range builders {
+ partitions[b.pIdx] = sql.MakeSpanPartitionWithRangeCount(
+ b.part.SQLInstanceID, b.group.Slice(), b.numRanges)
}
- // Collapse ranges into nice set of contiguous spans.
- for i := range p {
- var g roachpb.SpanGroup
- g.Add(p[i].Spans...)
- p[i].Spans = g.Slice()
+ if err := verifyPartitionsIfExpensiveChecksEnabled(builders, partitions, targetRanges); err != nil {
+ return nil, err
}
- // Finally, re-sort based on the node id.
- sort.Slice(p, func(i, j int) bool {
- return p[i].SQLInstanceID < p[j].SQLInstanceID
- })
- return p, nil
+ return partitions, nil
+}
+
+// verifyPartitionsIfExpensiveChecksEnabled panics if the output partitions
+// cover a different set of keys than the input partitions.
+func verifyPartitionsIfExpensiveChecksEnabled(
+ builderWithInputSpans []rebalancingPartition,
+ outputPartitions []sql.SpanPartition,
+ targetRanges int,
+) error {
+ if !expensiveReblanceChecksEnabled {
+ return nil
+ }
+ var originalSpansG roachpb.SpanGroup
+ var originalSpansArr []roachpb.Span
+ var newSpansG roachpb.SpanGroup
+ var newSpansArr []roachpb.Span
+ for _, b := range builderWithInputSpans {
+ originalSpansG.Add(b.part.Spans...)
+ originalSpansArr = append(originalSpansArr, b.part.Spans...)
+ }
+ for _, p := range outputPartitions {
+ if numRanges, ok := p.NumRanges(); !ok {
+ return changefeedbase.WithTerminalError(
+ errors.Newf("partition missing number of ranges info, partition: %v, partitions: %v", p, outputPartitions))
+ } else if numRanges > targetRanges {
+ return changefeedbase.WithTerminalError(
+ errors.Newf("found partition with too many ranges, target: %d, partition: %v, partitions: %v",
+ targetRanges, p, outputPartitions))
+ }
+
+ newSpansG.Add(p.Spans...)
+ newSpansArr = append(newSpansArr, p.Spans...)
+ }
+ // If the original spans enclose the new spans and the new spans enclose the original spans,
+ // then the two groups must cover exactly the same keys.
+ if !originalSpansG.Encloses(newSpansArr...) || !newSpansG.Encloses(originalSpansArr...) {
+ return changefeedbase.WithTerminalError(errors.Newf("incorrect rebalance. input spans: %v, output spans: %v",
+ originalSpansArr, newSpansArr))
+ }
+ return nil
}
diff --git a/pkg/ccl/changefeedccl/changefeed_dist_test.go b/pkg/ccl/changefeedccl/changefeed_dist_test.go
index 656802bfa14c..35f47c1904f2 100644
--- a/pkg/ccl/changefeedccl/changefeed_dist_test.go
+++ b/pkg/ccl/changefeedccl/changefeed_dist_test.go
@@ -13,6 +13,7 @@ import (
"fmt"
"math"
"reflect"
+ "sort"
"strings"
"testing"
@@ -30,18 +31,70 @@ import (
"github.com/cockroachdb/cockroach/pkg/testutils/skip"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
+ "github.com/cockroachdb/cockroach/pkg/util/intsets"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
+ "github.com/cockroachdb/cockroach/pkg/util/randutil"
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/require"
)
+// mockRangeIterator iterates over ranges in a span assuming that each range
+// contains one character.
+type mockRangeIterator struct {
+ rangeDesc *roachpb.RangeDescriptor
+}
+
+var _ rangeIterator = (*mockRangeIterator)(nil)
+
+func nextKey(startKey []byte) []byte {
+ return []byte{startKey[0] + 1}
+}
+
+// Desc implements the rangeIterator interface.
+func (ri *mockRangeIterator) Desc() *roachpb.RangeDescriptor {
+ return ri.rangeDesc
+}
+
+// NeedAnother implements the rangeIterator interface.
+func (ri *mockRangeIterator) NeedAnother(rs roachpb.RSpan) bool {
+ return ri.rangeDesc.EndKey.Less(rs.EndKey)
+}
+
+// Valid implements the rangeIterator interface.
+func (ri *mockRangeIterator) Valid() bool {
+ return true
+}
+
+// Error implements the rangeIterator interface.
+func (ri *mockRangeIterator) Error() error {
+ panic("unexpected call to Error()")
+}
+
+// Next implements the rangeIterator interface.
+func (ri *mockRangeIterator) Next(ctx context.Context) {
+ ri.rangeDesc.StartKey = nextKey(ri.rangeDesc.StartKey)
+ ri.rangeDesc.EndKey = nextKey(ri.rangeDesc.EndKey)
+}
+
+// Seek implements the rangeIterator interface.
+func (ri *mockRangeIterator) Seek(_ context.Context, key roachpb.RKey, _ kvcoord.ScanDirection) {
+ ri.rangeDesc = &roachpb.RangeDescriptor{
+ StartKey: key,
+ EndKey: nextKey(key),
+ }
+}
+
var partitions = func(p ...sql.SpanPartition) []sql.SpanPartition {
return p
}
var mkPart = func(n base.SQLInstanceID, spans ...roachpb.Span) sql.SpanPartition {
- return sql.SpanPartition{SQLInstanceID: n, Spans: spans}
+ var count int
+ for _, sp := range spans {
+ count += int(rune(sp.EndKey[0]) - rune(sp.Key[0]))
+ }
+ return sql.MakeSpanPartitionWithRangeCount(n, spans, count)
}
// mkRange makes a range containing a single rune.
@@ -67,30 +120,19 @@ var mkSingleLetterRanges = func(start, end rune) (result []roachpb.Span) {
return result
}
-// letterRangeResolver resolves spans such that each letter is a range.
-type letterRangeResolver struct{}
-
-func (r *letterRangeResolver) getRangesForSpans(
- _ context.Context, inSpans []roachpb.Span,
-) (spans []roachpb.Span, _ error) {
- for _, sp := range inSpans {
- spans = append(spans, mkSingleLetterRanges(rune(sp.Key[0]), rune(sp.EndKey[0]))...)
- }
- return spans, nil
-}
-
// TestPartitionSpans unit tests the rebalanceSpanPartitions function.
func TestPartitionSpans(t *testing.T) {
defer leaktest.AfterTest(t)()
- const sensitivity = 0.01
+ defer log.Scope(t).Close(t)
// 26 nodes, 1 range per node.
make26NodesBalanced := func() (p []sql.SpanPartition) {
for i := rune(0); i < 26; i += 1 {
- p = append(p, sql.SpanPartition{
- SQLInstanceID: base.SQLInstanceID(i + 1),
- Spans: []roachpb.Span{mkRange('a' + i)},
- })
+ p = append(p, sql.MakeSpanPartitionWithRangeCount(
+ base.SQLInstanceID(i+1),
+ []roachpb.Span{mkRange('z' - i)},
+ 1,
+ ))
}
return p
}
@@ -98,13 +140,13 @@ func TestPartitionSpans(t *testing.T) {
// 26 nodes. All empty except for the first, which has 26 ranges.
make26NodesImBalanced := func() (p []sql.SpanPartition) {
for i := rune(0); i < 26; i += 1 {
- sp := sql.SpanPartition{
- SQLInstanceID: base.SQLInstanceID(i + 1),
- }
if i == 0 {
- sp.Spans = append(sp.Spans, mkSpan('a', 'z'+1))
+ p = append(p, sql.MakeSpanPartitionWithRangeCount(
+ base.SQLInstanceID(i+1), []roachpb.Span{mkSpan('a', 'z'+1)}, 26))
+ } else {
+ p = append(p, sql.MakeSpanPartitionWithRangeCount(base.SQLInstanceID(i+1), []roachpb.Span{}, 0))
}
- p = append(p, sp)
+
}
return p
}
@@ -122,9 +164,9 @@ func TestPartitionSpans(t *testing.T) {
mkPart(3, mkSpan('q', 'z'+1)), // 10
),
expect: partitions(
- mkPart(1, mkSpan('a', 'j')), // 9
- mkPart(2, mkSpan('j', 'q'), mkRange('z')), // 8
- mkPart(3, mkSpan('q', 'z')), // 9
+ mkPart(1, mkSpan('a', 'j')), // 9
+ mkPart(2, mkSpan('j', 'r')), // 8
+ mkPart(3, mkSpan('r', 'z'+1)), // 9
),
},
{
@@ -135,9 +177,9 @@ func TestPartitionSpans(t *testing.T) {
mkPart(3, mkSpan('c', 'e'), mkSpan('p', 'r')), // 4
),
expect: partitions(
- mkPart(1, mkSpan('a', 'c'), mkSpan('e', 'l')), // 9
- mkPart(2, mkSpan('r', 'z')), // 8
- mkPart(3, mkSpan('c', 'e'), mkSpan('l', 'r')), // 8
+ mkPart(1, mkSpan('o', 'p'), mkSpan('r', 'z')), // 9
+ mkPart(2, mkSpan('a', 'c'), mkSpan('e', 'l')), // 9
+ mkPart(3, mkSpan('c', 'e'), mkSpan('l', 'o'), mkSpan('p', 'r')), // 7
),
},
{
@@ -148,9 +190,9 @@ func TestPartitionSpans(t *testing.T) {
mkPart(3, mkRange('z')), // 1
),
expect: partitions(
- mkPart(1, mkSpan('a', 'k')), // 10
- mkPart(2, mkSpan('k', 'r'), mkSpan('y', 'z')), // 8
- mkPart(3, mkSpan('r', 'y'), mkRange('z')), // 7
+ mkPart(1, mkSpan('p', 'y')), // 9
+ mkPart(2, mkSpan('i', 'p'), mkSpan('y', 'z')), // 8
+ mkPart(3, mkSpan('a', 'i'), mkRange('z')), // 9
),
},
{
@@ -190,7 +232,7 @@ func TestPartitionSpans(t *testing.T) {
} {
t.Run(tc.name, func(t *testing.T) {
sp, err := rebalanceSpanPartitions(context.Background(),
- &letterRangeResolver{}, sensitivity, tc.input)
+ &mockRangeIterator{}, 0.00, tc.input)
t.Log("expected partitions")
for _, p := range tc.expect {
t.Log(p)
@@ -203,6 +245,80 @@ func TestPartitionSpans(t *testing.T) {
require.Equal(t, tc.expect, sp)
})
}
+
+ dedupe := func(in []int) []int {
+ ret := intsets.Fast{}
+ for _, id := range in {
+ ret.Add(id)
+ }
+ return ret.Ordered()
+ }
+ copySpans := func(partitions []sql.SpanPartition) (g roachpb.SpanGroup) {
+ for _, p := range partitions {
+ for _, sp := range p.Spans {
+ g.Add(sp)
+ }
+ }
+ return
+ }
+ // Create a random input and assert that the output has the same
+ // spans as the input.
+ t.Run("random", func(t *testing.T) {
+ rng, _ := randutil.NewTestRand()
+ numPartitions := rng.Intn(8) + 1
+ numSpans := rng.Intn(25) + 1
+
+ // Randomly create spans and assign them to nodes. For example,
+ // {1 {h-i}, {m-n}, {t-u}}
+ // {2 {a-c}, {d-f}, {l-m}, {s-t}, {x-z}}
+ // {3 {c-d}, {i-j}, {u-w}}
+ // {4 {w-x}}
+ // {5 {f-h}, {p-s}}
+ // {6 {j-k}, {k-l}, {n-o}, {o-p}}
+
+ // First, select some indexes in ['a' ... 'z'] to partition at.
+ spanIdxs := make([]int, numSpans)
+ for i := range spanIdxs {
+ spanIdxs[i] = rng.Intn((int('z')-int('a'))-1) + int('a') + 1
+ }
+ sort.Slice(spanIdxs, func(i int, j int) bool {
+ return spanIdxs[i] < spanIdxs[j]
+ })
+ // Make sure indexes are unique.
+ spanIdxs = dedupe(spanIdxs)
+
+ // Generate spans and assign them randomly to partitions.
+ input := make([]sql.SpanPartition, numPartitions)
+ for i, key := range spanIdxs {
+ assignTo := rng.Intn(numPartitions)
+ if i == 0 {
+ input[assignTo].Spans = append(input[assignTo].Spans, mkSpan('a', (rune(key))))
+ } else {
+ input[assignTo].Spans = append(input[assignTo].Spans, mkSpan((rune(spanIdxs[i-1])), rune(key)))
+ }
+ }
+ last := rng.Intn(numPartitions)
+ input[last].Spans = append(input[last].Spans, mkSpan(rune(spanIdxs[len(spanIdxs)-1]), 'z'))
+
+ // Populate the remaining fields in the partitions.
+ for i := range input {
+ input[i] = mkPart(base.SQLInstanceID(i+1), input[i].Spans...)
+ }
+
+ t.Log(input)
+
+ // Ensure the set of input spans matches the set of output spans.
+ g1 := copySpans(input)
+ output, err := rebalanceSpanPartitions(context.Background(),
+ &mockRangeIterator{}, 0.00, input)
+ require.NoError(t, err)
+
+ t.Log(output)
+
+ g2 := copySpans(output)
+ require.True(t, g1.Encloses(g2.Slice()...))
+ require.True(t, g2.Encloses(g1.Slice()...))
+ })
}
type rangeDistributionTester struct {
@@ -296,7 +412,11 @@ func newRangeDistributionTester(
// Distribute the leases exponentially across the first 5 nodes.
for i := 0; i < 64; i += 1 {
- nodeID := int(math.Floor(math.Log2(float64(i)))) + 1
+ nodeID := 1
+ // Avoid log(0).
+ if i != 0 {
+ nodeID = int(math.Floor(math.Log2(float64(i)))) + 1
+ }
cmd := fmt.Sprintf(`ALTER TABLE x EXPERIMENTAL_RELOCATE VALUES (ARRAY[%d], %d)`,
nodeID, i,
)
diff --git a/pkg/cmd/roachtest/tests/cdc.go b/pkg/cmd/roachtest/tests/cdc.go
index 9e5bee5c1dcd..15a8371474a9 100644
--- a/pkg/cmd/roachtest/tests/cdc.go
+++ b/pkg/cmd/roachtest/tests/cdc.go
@@ -87,6 +87,9 @@ var envVars = []string{
"COCKROACH_CHANGEFEED_TESTING_FAST_RETRY=true",
"COCKROACH_CHANGEFEED_TESTING_INCLUDE_PARQUET_TEST_METADATA=true",
"COCKROACH_CHANGEFEED_TESTING_INCLUDE_PARQUET_READER_METADATA=true",
+ // Enable strict re-balancing checks to ensure that rebalancing doesn't create an
+ // incorrect set of spans for the changefeed.
+ "COCKROACH_CHANGEFEED_TESTING_REBALANCING_CHECKS=true",
}
type cdcTester struct {
diff --git a/pkg/kv/kvserver/BUILD.bazel b/pkg/kv/kvserver/BUILD.bazel
index eab82f951c07..a08607cd1679 100644
--- a/pkg/kv/kvserver/BUILD.bazel
+++ b/pkg/kv/kvserver/BUILD.bazel
@@ -186,6 +186,7 @@ go_library(
"//pkg/spanconfig",
"//pkg/spanconfig/spanconfigstore",
"//pkg/storage",
+ "//pkg/storage/disk",
"//pkg/storage/enginepb",
"//pkg/storage/fs",
"//pkg/util",
@@ -462,6 +463,7 @@ go_test(
"//pkg/sql/sem/tree",
"//pkg/sql/sqlstats",
"//pkg/storage",
+ "//pkg/storage/disk",
"//pkg/storage/enginepb",
"//pkg/storage/fs",
"//pkg/testutils",
@@ -515,6 +517,7 @@ go_test(
"@com_github_cockroachdb_errors//oserror",
"@com_github_cockroachdb_logtags//:logtags",
"@com_github_cockroachdb_pebble//:pebble",
+ "@com_github_cockroachdb_pebble//vfs",
"@com_github_cockroachdb_redact//:redact",
"@com_github_dustin_go_humanize//:go-humanize",
"@com_github_gogo_protobuf//proto",
diff --git a/pkg/kv/kvserver/metrics.go b/pkg/kv/kvserver/metrics.go
index 4ef2b1418a7e..ddc89fa89233 100644
--- a/pkg/kv/kvserver/metrics.go
+++ b/pkg/kv/kvserver/metrics.go
@@ -28,6 +28,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/raft/raftpb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage"
+ "github.com/cockroachdb/cockroach/pkg/storage/disk"
"github.com/cockroachdb/cockroach/pkg/storage/enginepb"
"github.com/cockroachdb/cockroach/pkg/storage/fs"
"github.com/cockroachdb/cockroach/pkg/util/log"
@@ -2355,6 +2356,60 @@ Note that the measurement does not include the duration for replicating the eval
Measurement: "Batches",
Unit: metric.Unit_COUNT,
}
+ metaDiskReadCount = metric.Metadata{
+ Name: "storage.disk.read.count",
+ Unit: metric.Unit_COUNT,
+ Measurement: "Operations",
+ Help: "Disk read operations on the store's disk since this process started (as reported by the OS)",
+ }
+ metaDiskReadBytes = metric.Metadata{
+ Name: "storage.disk.read.bytes",
+ Unit: metric.Unit_BYTES,
+ Measurement: "Bytes",
+ Help: "Bytes read from the store's disk since this process started (as reported by the OS)",
+ }
+ metaDiskReadTime = metric.Metadata{
+ Name: "storage.disk.read.time",
+ Unit: metric.Unit_NANOSECONDS,
+ Measurement: "Time",
+ Help: "Time spent reading from the store's disk since this process started (as reported by the OS)",
+ }
+ metaDiskWriteCount = metric.Metadata{
+ Name: "storage.disk.write.count",
+ Unit: metric.Unit_COUNT,
+ Measurement: "Operations",
+ Help: "Disk write operations on the store's disk since this process started (as reported by the OS)",
+ }
+ metaDiskWriteBytes = metric.Metadata{
+ Name: "storage.disk.write.bytes",
+ Unit: metric.Unit_BYTES,
+ Measurement: "Bytes",
+ Help: "Bytes written to the store's disk since this process started (as reported by the OS)",
+ }
+ metaDiskWriteTime = metric.Metadata{
+ Name: "storage.disk.write.time",
+ Unit: metric.Unit_NANOSECONDS,
+ Measurement: "Time",
+ Help: "Time spent writing to the store's disks since this process started (as reported by the OS)",
+ }
+ metaDiskIOTime = metric.Metadata{
+ Name: "storage.disk.io.time",
+ Unit: metric.Unit_NANOSECONDS,
+ Measurement: "Time",
+ Help: "Time spent reading from or writing to the store's disk since this process started (as reported by the OS)",
+ }
+ metaDiskWeightedIOTime = metric.Metadata{
+ Name: "storage.disk.weightedio.time",
+ Unit: metric.Unit_NANOSECONDS,
+ Measurement: "Time",
+ Help: "Weighted time spent reading from or writing to the store's disk since this process started (as reported by the OS)",
+ }
+ metaIopsInProgress = metric.Metadata{
+ Name: "storage.disk.iopsinprogress",
+ Unit: metric.Unit_COUNT,
+ Measurement: "Operations",
+ Help: "IO operations currently in progress on the store's disk (as reported by the OS)",
+ }
)
// StoreMetrics is the set of metrics for a given store.
@@ -2750,6 +2805,17 @@ type StoreMetrics struct {
FlushUtilization *metric.GaugeFloat64
FsyncLatency *metric.ManualWindowHistogram
+
+ // Disk metrics
+ DiskReadBytes *metric.Gauge
+ DiskReadCount *metric.Gauge
+ DiskReadTime *metric.Gauge
+ DiskWriteBytes *metric.Gauge
+ DiskWriteCount *metric.Gauge
+ DiskWriteTime *metric.Gauge
+ DiskIOTime *metric.Gauge
+ DiskWeightedIOTime *metric.Gauge
+ IopsInProgress *metric.Gauge
}
type tenantMetricsRef struct {
@@ -3491,6 +3557,16 @@ func newStoreMetrics(histogramWindow time.Duration) *StoreMetrics {
ReplicaReadBatchDroppedLatchesBeforeEval: metric.NewCounter(metaReplicaReadBatchDroppedLatchesBeforeEval),
ReplicaReadBatchWithoutInterleavingIter: metric.NewCounter(metaReplicaReadBatchWithoutInterleavingIter),
+
+ DiskReadBytes: metric.NewGauge(metaDiskReadBytes),
+ DiskReadCount: metric.NewGauge(metaDiskReadCount),
+ DiskReadTime: metric.NewGauge(metaDiskReadTime),
+ DiskWriteBytes: metric.NewGauge(metaDiskWriteBytes),
+ DiskWriteCount: metric.NewGauge(metaDiskWriteCount),
+ DiskWriteTime: metric.NewGauge(metaDiskWriteTime),
+ DiskIOTime: metric.NewGauge(metaDiskIOTime),
+ DiskWeightedIOTime: metric.NewGauge(metaDiskWeightedIOTime),
+ IopsInProgress: metric.NewGauge(metaIopsInProgress),
}
storeRegistry.AddMetricStruct(sm)
@@ -3700,6 +3776,18 @@ func (sm *StoreMetrics) updateEnvStats(stats fs.EnvStats) {
sm.EncryptionAlgorithm.Update(int64(stats.EncryptionType))
}
+func (sm *StoreMetrics) updateDiskStats(stats disk.Stats) {
+ sm.DiskReadCount.Update(int64(stats.ReadsCount))
+ sm.DiskReadBytes.Update(int64(stats.BytesRead()))
+ sm.DiskReadTime.Update(int64(stats.ReadsDuration))
+ sm.DiskWriteCount.Update(int64(stats.WritesCount))
+ sm.DiskWriteBytes.Update(int64(stats.BytesWritten()))
+ sm.DiskWriteTime.Update(int64(stats.WritesDuration))
+ sm.DiskIOTime.Update(int64(stats.CumulativeDuration))
+ sm.DiskWeightedIOTime.Update(int64(stats.WeightedIODuration))
+ sm.IopsInProgress.Update(int64(stats.InProgressCount))
+}
+
func (sm *StoreMetrics) handleMetricsResult(ctx context.Context, metric result.Metrics) {
sm.LeaseRequestSuccessCount.Inc(int64(metric.LeaseRequestSuccess))
metric.LeaseRequestSuccess = 0
diff --git a/pkg/kv/kvserver/store.go b/pkg/kv/kvserver/store.go
index 19252a041d9d..83635982c55f 100644
--- a/pkg/kv/kvserver/store.go
+++ b/pkg/kv/kvserver/store.go
@@ -68,6 +68,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/spanconfig"
"github.com/cockroachdb/cockroach/pkg/spanconfig/spanconfigstore"
"github.com/cockroachdb/cockroach/pkg/storage"
+ "github.com/cockroachdb/cockroach/pkg/storage/disk"
"github.com/cockroachdb/cockroach/pkg/util"
"github.com/cockroachdb/cockroach/pkg/util/admission"
"github.com/cockroachdb/cockroach/pkg/util/admission/admissionpb"
@@ -1094,6 +1095,9 @@ type Store struct {
spanConfigUpdateQueueRateLimiter *quotapool.RateLimiter
rangeFeedSlowClosedTimestampNudge *singleflight.Group
+
+ // diskMonitor provides metrics for the disk associated with this store.
+ diskMonitor *disk.Monitor
}
var _ kv.Sender = &Store{}
@@ -3363,6 +3367,15 @@ func (s *Store) computeMetrics(ctx context.Context) (m storage.Metrics, err erro
s.metrics.RdbCheckpoints.Update(int64(len(dirs)))
}
+ // Get disk stats for the disk associated with this store.
+ if s.diskMonitor != nil {
+ diskStats, err := s.diskMonitor.CumulativeStats()
+ if err != nil {
+ return m, err
+ }
+ s.metrics.updateDiskStats(diskStats)
+ }
+
return m, nil
}
diff --git a/pkg/kv/kvserver/stores.go b/pkg/kv/kvserver/stores.go
index 5109ea28bab4..d57ba135016d 100644
--- a/pkg/kv/kvserver/stores.go
+++ b/pkg/kv/kvserver/stores.go
@@ -22,6 +22,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvadmission"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage"
+ "github.com/cockroachdb/cockroach/pkg/storage/disk"
"github.com/cockroachdb/cockroach/pkg/util/future"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
@@ -301,3 +302,32 @@ func (ls *Stores) updateBootstrapInfoLocked(bi *gossip.BootstrapInfo) error {
})
return err
}
+
+// RegisterDiskMonitors injects a monitor into each store to track an individual disk's stats.
+func (ls *Stores) RegisterDiskMonitors(
+ diskManager *disk.MonitorManager, diskPathToStore map[string]roachpb.StoreID,
+) error {
+ monitors := make(map[roachpb.StoreID]disk.Monitor)
+ for path, id := range diskPathToStore {
+ monitor, err := diskManager.Monitor(path)
+ if err != nil {
+ return err
+ }
+ monitors[id] = *monitor
+ }
+ return ls.VisitStores(func(s *Store) error {
+ if monitor, ok := monitors[s.StoreID()]; ok {
+ s.diskMonitor = &monitor
+ }
+ return nil
+ })
+}
+
+func (ls *Stores) CloseDiskMonitors() {
+ _ = ls.VisitStores(func(s *Store) error {
+ if s.diskMonitor != nil {
+ s.diskMonitor.Close()
+ }
+ return nil
+ })
+}
diff --git a/pkg/kv/kvserver/stores_test.go b/pkg/kv/kvserver/stores_test.go
index e49c65936ab9..6679bcf8bd45 100644
--- a/pkg/kv/kvserver/stores_test.go
+++ b/pkg/kv/kvserver/stores_test.go
@@ -12,7 +12,9 @@ package kvserver
import (
"context"
+ "path"
"reflect"
+ "strconv"
"testing"
"github.com/cockroachdb/cockroach/pkg/gossip"
@@ -20,6 +22,8 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/logstore"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage"
+ "github.com/cockroachdb/cockroach/pkg/storage/disk"
+ "github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/util"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
@@ -27,6 +31,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/errors"
+ "github.com/cockroachdb/pebble/vfs"
"github.com/stretchr/testify/require"
)
@@ -339,3 +344,36 @@ func TestStoresGossipStorageReadLatest(t *testing.T) {
t.Errorf("bootstrap info %+v not equal to expected %+v", verifyBI, bi)
}
}
+
+func TestRegisterDiskMonitors(t *testing.T) {
+ defer leaktest.AfterTest(t)()
+ defer log.Scope(t).Close(t)
+
+ dir, dirCleanupFn := testutils.TempDir(t)
+ defer dirCleanupFn()
+
+ _, stores, ls, stopper := createStores(2)
+ defer stopper.Stop(context.Background())
+ defer ls.CloseDiskMonitors()
+
+ ls.AddStore(stores[0])
+ ls.AddStore(stores[1])
+
+ fs := vfs.Default
+ pathToStore := make(map[string]roachpb.StoreID, len(stores))
+ for i, store := range stores {
+ storePath := path.Join(dir, strconv.Itoa(i))
+ pathToStore[storePath] = store.StoreID()
+
+ _, err := fs.Create(storePath)
+ require.NoError(t, err)
+ require.Nil(t, store.diskMonitor)
+ }
+
+ diskManager := disk.NewMonitorManager(fs)
+ err := ls.RegisterDiskMonitors(diskManager, pathToStore)
+ require.NoError(t, err)
+ for _, store := range stores {
+ require.NotNil(t, store.diskMonitor)
+ }
+}
diff --git a/pkg/server/BUILD.bazel b/pkg/server/BUILD.bazel
index fd5bbbccc732..d383747e9c80 100644
--- a/pkg/server/BUILD.bazel
+++ b/pkg/server/BUILD.bazel
@@ -277,6 +277,7 @@ go_library(
"//pkg/sql/ttl/ttljob",
"//pkg/sql/ttl/ttlschedule",
"//pkg/storage",
+ "//pkg/storage/disk",
"//pkg/storage/enginepb",
"//pkg/storage/fs",
"//pkg/testutils",
@@ -518,6 +519,7 @@ go_test(
"//pkg/sql/sessiondata",
"//pkg/sql/sqlstats",
"//pkg/storage",
+ "//pkg/storage/disk",
"//pkg/storage/fs",
"//pkg/testutils",
"//pkg/testutils/datapathutils",
@@ -556,6 +558,7 @@ go_test(
"@com_github_cockroachdb_datadriven//:datadriven",
"@com_github_cockroachdb_errors//:errors",
"@com_github_cockroachdb_logtags//:logtags",
+ "@com_github_cockroachdb_pebble//vfs",
"@com_github_cockroachdb_redact//:redact",
"@com_github_dustin_go_humanize//:go-humanize",
"@com_github_gogo_protobuf//jsonpb",
diff --git a/pkg/server/config.go b/pkg/server/config.go
index e16ccf4f1149..87c2690716fb 100644
--- a/pkg/server/config.go
+++ b/pkg/server/config.go
@@ -37,6 +37,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql/sem/catconstants"
"github.com/cockroachdb/cockroach/pkg/storage"
+ "github.com/cockroachdb/cockroach/pkg/storage/disk"
"github.com/cockroachdb/cockroach/pkg/storage/enginepb"
"github.com/cockroachdb/cockroach/pkg/storage/fs"
"github.com/cockroachdb/cockroach/pkg/ts"
@@ -51,6 +52,7 @@ import (
"github.com/cockroachdb/errors"
"github.com/cockroachdb/pebble"
"github.com/cockroachdb/pebble/bloom"
+ "github.com/cockroachdb/pebble/vfs"
"github.com/cockroachdb/redact"
)
@@ -275,6 +277,9 @@ type BaseConfig struct {
// listeners. This is set by in-memory tenants if the user has
// specified port range preferences.
RPCListenerFactory RPCListenerFactory
+
+ // DiskMonitorManager provides metrics for individual disks.
+ DiskMonitorManager *disk.MonitorManager
}
// MakeBaseConfig returns a BaseConfig with default values.
@@ -324,6 +329,7 @@ func (cfg *BaseConfig) SetDefaults(
cfg.Config.InitDefaults()
cfg.InitTestingKnobs()
cfg.EarlyBootExternalStorageAccessor = cloud.NewEarlyBootExternalStorageAccessor(st, cfg.ExternalIODirConfig)
+ cfg.DiskMonitorManager = disk.NewMonitorManager(vfs.Default)
}
// InitTestingKnobs sets up any testing knobs based on e.g. envvars.
diff --git a/pkg/server/node.go b/pkg/server/node.go
index 36db6fc38e07..8c008d4b63c4 100644
--- a/pkg/server/node.go
+++ b/pkg/server/node.go
@@ -52,6 +52,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/catalog/bootstrap"
"github.com/cockroachdb/cockroach/pkg/sql/isql"
"github.com/cockroachdb/cockroach/pkg/storage"
+ "github.com/cockroachdb/cockroach/pkg/storage/disk"
"github.com/cockroachdb/cockroach/pkg/util"
"github.com/cockroachdb/cockroach/pkg/util/admission"
"github.com/cockroachdb/cockroach/pkg/util/admission/admissionpb"
@@ -994,6 +995,7 @@ func (n *Node) gossipStores(ctx context.Context) {
// maintained.
func (n *Node) startComputePeriodicMetrics(stopper *stop.Stopper, interval time.Duration) {
ctx := n.AnnotateCtx(context.Background())
+ n.stopper.AddCloser(stop.CloserFn(func() { n.stores.CloseDiskMonitors() }))
_ = stopper.RunAsyncTask(ctx, "compute-metrics", func(ctx context.Context) {
// Compute periodic stats at the same frequency as metrics are sampled.
ticker := time.NewTicker(interval)
@@ -1064,18 +1066,18 @@ func (n *Node) UpdateIOThreshold(id roachpb.StoreID, threshold *admissionpb.IOTh
// admission.StoreMetrics.
type diskStatsMap struct {
provisionedRate map[roachpb.StoreID]base.ProvisionedRateSpec
- diskNameToStoreID map[string]roachpb.StoreID
+ diskPathToStoreID map[string]roachpb.StoreID
+ diskMonitors map[string]disk.Monitor
}
func (dsm *diskStatsMap) tryPopulateAdmissionDiskStats(
- ctx context.Context,
clusterProvisionedBandwidth int64,
- diskStatsFunc func(context.Context) ([]status.DiskStats, error),
+ diskStatsFunc func(map[string]disk.Monitor) (map[string]status.DiskStats, error),
) (stats map[roachpb.StoreID]admission.DiskStats, err error) {
if dsm.empty() {
return stats, nil
}
- diskStats, err := diskStatsFunc(ctx)
+ diskStats, err := diskStatsFunc(dsm.diskMonitors)
if err != nil {
return stats, err
}
@@ -1087,11 +1089,11 @@ func (dsm *diskStatsMap) tryPopulateAdmissionDiskStats(
}
stats[id] = s
}
- for i := range diskStats {
- if id, ok := dsm.diskNameToStoreID[diskStats[i].Name]; ok {
+ for path, diskStat := range diskStats {
+ if id, ok := dsm.diskPathToStoreID[path]; ok {
s := stats[id]
- s.BytesRead = uint64(diskStats[i].ReadBytes)
- s.BytesWritten = uint64(diskStats[i].WriteBytes)
+ s.BytesRead = uint64(diskStat.ReadBytes)
+ s.BytesWritten = uint64(diskStat.WriteBytes)
stats[id] = s
}
}
@@ -1102,28 +1104,49 @@ func (dsm *diskStatsMap) empty() bool {
return len(dsm.provisionedRate) == 0
}
-func (dsm *diskStatsMap) initDiskStatsMap(specs []base.StoreSpec, engines []storage.Engine) error {
+func (dsm *diskStatsMap) initDiskStatsMap(
+ specs []base.StoreSpec, engines []storage.Engine, diskManager *disk.MonitorManager,
+) error {
*dsm = diskStatsMap{
provisionedRate: make(map[roachpb.StoreID]base.ProvisionedRateSpec),
- diskNameToStoreID: make(map[string]roachpb.StoreID),
+ diskPathToStoreID: make(map[string]roachpb.StoreID),
+ diskMonitors: make(map[string]disk.Monitor),
}
for i := range engines {
+ if specs[i].Path == "" || specs[i].InMemory {
+ continue
+ }
id, err := kvstorage.ReadStoreIdent(context.Background(), engines[i])
if err != nil {
return err
}
- if len(specs[i].ProvisionedRateSpec.DiskName) > 0 {
- dsm.provisionedRate[id.StoreID] = specs[i].ProvisionedRateSpec
- dsm.diskNameToStoreID[specs[i].ProvisionedRateSpec.DiskName] = id.StoreID
+ monitor, err := diskManager.Monitor(specs[i].Path)
+ if err != nil {
+ return err
}
+ dsm.provisionedRate[id.StoreID] = specs[i].ProvisionedRateSpec
+ dsm.diskPathToStoreID[specs[i].Path] = id.StoreID
+ dsm.diskMonitors[specs[i].Path] = *monitor
}
return nil
}
+func (dsm *diskStatsMap) closeDiskMonitors() {
+ for _, monitor := range dsm.diskMonitors {
+ monitor.Close()
+ }
+}
+
func (n *Node) registerEnginesForDiskStatsMap(
- specs []base.StoreSpec, engines []storage.Engine,
+ specs []base.StoreSpec, engines []storage.Engine, diskManager *disk.MonitorManager,
) error {
- return n.diskStatsMap.initDiskStatsMap(specs, engines)
+ if err := n.diskStatsMap.initDiskStatsMap(specs, engines, diskManager); err != nil {
+ return err
+ }
+ if err := n.stores.RegisterDiskMonitors(diskManager, n.diskStatsMap.diskPathToStoreID); err != nil {
+ return err
+ }
+ return nil
}
// GetPebbleMetrics implements admission.PebbleMetricsProvider.
@@ -1131,7 +1154,7 @@ func (n *Node) GetPebbleMetrics() []admission.StoreMetrics {
clusterProvisionedBandwidth := kvadmission.ProvisionedBandwidth.Get(
&n.storeCfg.Settings.SV)
storeIDToDiskStats, err := n.diskStatsMap.tryPopulateAdmissionDiskStats(
- context.Background(), clusterProvisionedBandwidth, status.GetDiskCounters)
+ clusterProvisionedBandwidth, status.GetMonitorCounters)
if err != nil {
log.Warningf(context.Background(), "%v",
errors.Wrapf(err, "unable to populate disk stats"))
diff --git a/pkg/server/node_test.go b/pkg/server/node_test.go
index 6a9f2f0551a4..38787bf29d27 100644
--- a/pkg/server/node_test.go
+++ b/pkg/server/node_test.go
@@ -14,6 +14,7 @@ import (
"bytes"
"context"
"fmt"
+ "path"
"reflect"
"runtime/pprof"
"sort"
@@ -33,6 +34,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/server/status/statuspb"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/storage"
+ "github.com/cockroachdb/cockroach/pkg/storage/disk"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/util/admission"
@@ -42,6 +44,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/metric"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/cockroachdb/errors"
+ "github.com/cockroachdb/pebble/vfs"
"github.com/stretchr/testify/require"
)
@@ -966,6 +969,8 @@ func TestDiskStatsMap(t *testing.T) {
defer log.Scope(t).Close(t)
ctx := context.Background()
+ dir, dirCleanupFn := testutils.TempDir(t)
+ defer dirCleanupFn()
// Specs for two stores, one of which overrides the cluster-level
// provisioned bandwidth.
specs := []base.StoreSpec{
@@ -975,12 +980,14 @@ func TestDiskStatsMap(t *testing.T) {
// ProvisionedBandwidth is 0 so the cluster setting will be used.
ProvisionedBandwidth: 0,
},
+ Path: path.Join(dir, "foo"),
},
{
ProvisionedRateSpec: base.ProvisionedRateSpec{
DiskName: "bar",
ProvisionedBandwidth: 200,
},
+ Path: path.Join(dir, "bar"),
},
}
// Engines.
@@ -1000,38 +1007,42 @@ func TestDiskStatsMap(t *testing.T) {
require.NoError(t, storage.MVCCBlindPutProto(ctx, engines[i], keys.StoreIdentKey(),
hlc.Timestamp{}, &ident, storage.MVCCWriteOptions{}))
}
+ fs := vfs.Default
+ for _, storeSpec := range specs {
+ _, err := fs.Create(storeSpec.Path)
+ require.NoError(t, err)
+ }
var dsm diskStatsMap
+ defer dsm.closeDiskMonitors()
clusterProvisionedBW := int64(150)
// diskStatsMap contains nothing, so does not populate anything.
- stats, err := dsm.tryPopulateAdmissionDiskStats(ctx, clusterProvisionedBW, nil)
+ stats, err := dsm.tryPopulateAdmissionDiskStats(clusterProvisionedBW, nil)
require.NoError(t, err)
require.Equal(t, 0, len(stats))
+ diskManager := disk.NewMonitorManager(fs)
// diskStatsMap initialized with these two stores.
- require.NoError(t, dsm.initDiskStatsMap(specs, engines))
+ require.NoError(t, dsm.initDiskStatsMap(specs, engines, diskManager))
// diskStatsFunc returns stats for these two stores, and an unknown store.
- diskStatsFunc := func(context.Context) ([]status.DiskStats, error) {
- return []status.DiskStats{
- {
- Name: "baz",
+ diskStatsFunc := func(map[string]disk.Monitor) (map[string]status.DiskStats, error) {
+ return map[string]status.DiskStats{
+ path.Join(dir, "baz"): {
ReadBytes: 100,
WriteBytes: 200,
},
- {
- Name: "foo",
+ path.Join(dir, "foo"): {
ReadBytes: 500,
WriteBytes: 1000,
},
- {
- Name: "bar",
+ path.Join(dir, "bar"): {
ReadBytes: 2000,
WriteBytes: 2500,
},
}, nil
}
- stats, err = dsm.tryPopulateAdmissionDiskStats(ctx, clusterProvisionedBW, diskStatsFunc)
+ stats, err = dsm.tryPopulateAdmissionDiskStats(clusterProvisionedBW, diskStatsFunc)
require.NoError(t, err)
// The stats for the two stores are as expected.
require.Equal(t, 2, len(stats))
@@ -1053,16 +1064,15 @@ func TestDiskStatsMap(t *testing.T) {
}
// disk stats are only retrieved for "foo".
- diskStatsFunc = func(context.Context) ([]status.DiskStats, error) {
- return []status.DiskStats{
- {
- Name: "foo",
+ diskStatsFunc = func(map[string]disk.Monitor) (map[string]status.DiskStats, error) {
+ return map[string]status.DiskStats{
+ path.Join(dir, "foo"): {
ReadBytes: 3500,
WriteBytes: 4500,
},
}, nil
}
- stats, err = dsm.tryPopulateAdmissionDiskStats(ctx, clusterProvisionedBW, diskStatsFunc)
+ stats, err = dsm.tryPopulateAdmissionDiskStats(clusterProvisionedBW, diskStatsFunc)
require.NoError(t, err)
require.Equal(t, 2, len(stats))
for i := range engineIDs {
diff --git a/pkg/server/server.go b/pkg/server/server.go
index 97da8c711449..b38439a20419 100644
--- a/pkg/server/server.go
+++ b/pkg/server/server.go
@@ -1959,9 +1959,10 @@ func (s *topLevelServer) PreStart(ctx context.Context) error {
// wholly initialized stores (it reads the StoreIdentKeys). It also needs
// to come before the call into SetPebbleMetricsProvider, which internally
// uses the disk stats map we're initializing.
- if err := s.node.registerEnginesForDiskStatsMap(s.cfg.Stores.Specs, s.engines); err != nil {
+ if err := s.node.registerEnginesForDiskStatsMap(s.cfg.Stores.Specs, s.engines, s.cfg.DiskMonitorManager); err != nil {
return errors.Wrapf(err, "failed to register engines for the disk stats map")
}
+ s.stopper.AddCloser(stop.CloserFn(func() { s.node.diskStatsMap.closeDiskMonitors() }))
// Stores have been initialized, so Node can now provide Pebble metrics.
//
diff --git a/pkg/server/status/BUILD.bazel b/pkg/server/status/BUILD.bazel
index 2d84d7269727..33f4a2ca9a1a 100644
--- a/pkg/server/status/BUILD.bazel
+++ b/pkg/server/status/BUILD.bazel
@@ -6,6 +6,8 @@ go_library(
"disk_counters.go",
"disk_counters_darwin.go",
"health_check.go",
+ "monitor_counters.go",
+ "monitor_counters_linux.go",
"recorder.go",
"runtime.go",
"runtime_generic.go",
@@ -79,45 +81,59 @@ go_library(
"@com_github_shirou_gopsutil_v3//net",
] + select({
"@io_bazel_rules_go//go/platform:aix": [
+ "//pkg/storage/disk",
"@com_github_shirou_gopsutil_v3//disk",
],
"@io_bazel_rules_go//go/platform:android": [
+ "//pkg/storage/disk",
"@com_github_shirou_gopsutil_v3//disk",
],
"@io_bazel_rules_go//go/platform:darwin": [
+ "//pkg/storage/disk",
"@com_github_lufia_iostat//:iostat",
],
"@io_bazel_rules_go//go/platform:dragonfly": [
+ "//pkg/storage/disk",
"@com_github_shirou_gopsutil_v3//disk",
],
"@io_bazel_rules_go//go/platform:freebsd": [
+ "//pkg/storage/disk",
"@com_github_shirou_gopsutil_v3//disk",
],
"@io_bazel_rules_go//go/platform:illumos": [
+ "//pkg/storage/disk",
"@com_github_shirou_gopsutil_v3//disk",
],
"@io_bazel_rules_go//go/platform:ios": [
+ "//pkg/storage/disk",
"@com_github_lufia_iostat//:iostat",
],
"@io_bazel_rules_go//go/platform:js": [
+ "//pkg/storage/disk",
"@com_github_shirou_gopsutil_v3//disk",
],
"@io_bazel_rules_go//go/platform:linux": [
+ "//pkg/storage/disk",
"@com_github_shirou_gopsutil_v3//disk",
],
"@io_bazel_rules_go//go/platform:netbsd": [
+ "//pkg/storage/disk",
"@com_github_shirou_gopsutil_v3//disk",
],
"@io_bazel_rules_go//go/platform:openbsd": [
+ "//pkg/storage/disk",
"@com_github_shirou_gopsutil_v3//disk",
],
"@io_bazel_rules_go//go/platform:plan9": [
+ "//pkg/storage/disk",
"@com_github_shirou_gopsutil_v3//disk",
],
"@io_bazel_rules_go//go/platform:solaris": [
+ "//pkg/storage/disk",
"@com_github_shirou_gopsutil_v3//disk",
],
"@io_bazel_rules_go//go/platform:windows": [
+ "//pkg/storage/disk",
"@com_github_shirou_gopsutil_v3//disk",
],
"//conditions:default": [],
diff --git a/pkg/server/status/monitor_counters.go b/pkg/server/status/monitor_counters.go
new file mode 100644
index 000000000000..15e8799a2029
--- /dev/null
+++ b/pkg/server/status/monitor_counters.go
@@ -0,0 +1,34 @@
+// Copyright 2024 The Cockroach Authors.
+//
+// Use of this software is governed by the Business Source License
+// included in the file licenses/BSL.txt.
+//
+// As of the Change Date specified in that file, in accordance with
+// the Business Source License, use of this software will be governed
+// by the Apache License, Version 2.0, included in the file
+// licenses/APL.txt.
+
+//go:build !linux
+// +build !linux
+
+package status
+
+import (
+ "context"
+
+ "github.com/cockroachdb/cockroach/pkg/storage/disk"
+)
+
+// GetMonitorCounters returns DiskStats for all monitored disks.
+// TODO(cheranm): Filter disk counters by the monitored disk path for Darwin builds.
+func GetMonitorCounters(monitors map[string]disk.Monitor) (map[string]DiskStats, error) {
+ diskCounters, err := GetDiskCounters(context.Background())
+ if err != nil {
+ return map[string]DiskStats{}, err
+ }
+ output := make(map[string]DiskStats, len(diskCounters))
+ for _, stats := range diskCounters {
+ output[stats.Name] = stats
+ }
+ return output, nil
+}
diff --git a/pkg/server/status/monitor_counters_linux.go b/pkg/server/status/monitor_counters_linux.go
new file mode 100644
index 000000000000..69a04ea686ef
--- /dev/null
+++ b/pkg/server/status/monitor_counters_linux.go
@@ -0,0 +1,44 @@
+// Copyright 2024 The Cockroach Authors.
+//
+// Use of this software is governed by the Business Source License
+// included in the file licenses/BSL.txt.
+//
+// As of the Change Date specified in that file, in accordance with
+// the Business Source License, use of this software will be governed
+// by the Apache License, Version 2.0, included in the file
+// licenses/APL.txt.
+
+//go:build linux
+// +build linux
+
+package status
+
+import (
+ "time"
+
+ "github.com/cockroachdb/cockroach/pkg/storage/disk"
+)
+
+// GetMonitorCounters returns DiskStats for all monitored disks.
+func GetMonitorCounters(monitors map[string]disk.Monitor) (map[string]DiskStats, error) {
+ output := make(map[string]DiskStats, len(monitors))
+ for path, monitor := range monitors {
+ stats, err := monitor.CumulativeStats()
+ if err != nil {
+ return map[string]DiskStats{}, err
+ }
+ output[path] = DiskStats{
+ Name: stats.DeviceName,
+ ReadBytes: int64(stats.BytesRead()),
+ readCount: int64(stats.ReadsCount),
+ readTime: stats.ReadsDuration * time.Millisecond,
+ WriteBytes: int64(stats.BytesWritten()),
+ writeCount: int64(stats.WritesCount),
+ writeTime: stats.WritesDuration * time.Millisecond,
+ ioTime: stats.CumulativeDuration * time.Millisecond,
+ weightedIOTime: stats.WeightedIODuration * time.Millisecond,
+ iopsInProgress: int64(stats.InProgressCount),
+ }
+ }
+ return output, nil
+}
diff --git a/pkg/server/status/runtime.go b/pkg/server/status/runtime.go
index 954abe52a274..9afe28f13c4e 100644
--- a/pkg/server/status/runtime.go
+++ b/pkg/server/status/runtime.go
@@ -256,7 +256,7 @@ var (
Name: "sys.host.disk.weightedio.time",
Unit: metric.Unit_NANOSECONDS,
Measurement: "Time",
- Help: "Weighted time spent reading from or writing to to all disks since this process started (as reported by the OS)",
+ Help: "Weighted time spent reading from or writing to all disks since this process started (as reported by the OS)",
}
metaHostIopsInProgress = metric.Metadata{
Name: "sys.host.disk.iopsinprogress",
@@ -734,13 +734,10 @@ func (rsr *RuntimeStatSampler) SampleEnvironment(ctx context.Context, cs *CGoMem
}
}
- var deltaDisk DiskStats
diskCounters, err := getSummedDiskCounters(ctx)
if err != nil {
log.Ops.Warningf(ctx, "problem fetching disk stats: %s; disk stats will be empty.", err)
} else {
- deltaDisk = diskCounters
- subtractDiskCounters(&deltaDisk, rsr.last.disk)
rsr.last.disk = diskCounters
subtractDiskCounters(&diskCounters, rsr.initialDiskCounters)
diff --git a/pkg/sql/distsql_physical_planner.go b/pkg/sql/distsql_physical_planner.go
index 0b10d0493f12..0ba6c9cb6808 100644
--- a/pkg/sql/distsql_physical_planner.go
+++ b/pkg/sql/distsql_physical_planner.go
@@ -1202,6 +1202,34 @@ func (r SpanPartitionReason) String() string {
type SpanPartition struct {
SQLInstanceID base.SQLInstanceID
Spans roachpb.Spans
+
+ haveRangeInfo bool
+ numRanges int
+}
+
+// NumRanges returns the number of ranges in a partition only if this
+// information is present. The returned bool indicates if the result is correct.
+// Note that the returned count is not for distinct ranges. If there are two
+// spans belonging to the same range in the partition (ex. two disjoint spans),
+// this counts as two ranges.
+func (sp *SpanPartition) NumRanges() (int, bool) {
+ if !sp.haveRangeInfo {
+ return 0, false
+ }
+ return sp.numRanges, true
+}
+
+// MakeSpanPartitionWithRangeCount constructs a SpanPartition with the
+// specified count of ranges.
+func MakeSpanPartitionWithRangeCount(
+ instanceID base.SQLInstanceID, spans roachpb.Spans, numRanges int,
+) SpanPartition {
+ return SpanPartition{
+ SQLInstanceID: instanceID,
+ Spans: spans,
+ haveRangeInfo: true,
+ numRanges: numRanges,
+ }
}
type distSQLNodeHealth struct {
@@ -1307,7 +1335,8 @@ func (dsp *DistSQLPlanner) partitionSpansEx(
// always ignore misplanned ranges for local-only plans, and we choose
// to return `true` to explicitly highlight this fact, yet the boolean
// doesn't really matter.
- return []SpanPartition{{dsp.gatewaySQLInstanceID, spans}}, true /* ignoreMisplannedRanges */, nil
+ return []SpanPartition{{SQLInstanceID: dsp.gatewaySQLInstanceID, Spans: spans}},
+ true /* ignoreMisplannedRanges */, nil
}
if dsp.useGossipPlanning(ctx, planCtx) {
return dsp.deprecatedPartitionSpansSystem(ctx, planCtx, spans)
@@ -1382,7 +1411,7 @@ func (dsp *DistSQLPlanner) partitionSpan(
partitionIdx, inNodeMap := nodeMap[sqlInstanceID]
if !inNodeMap {
partitionIdx = len(partitions)
- partitions = append(partitions, SpanPartition{SQLInstanceID: sqlInstanceID})
+ partitions = append(partitions, SpanPartition{SQLInstanceID: sqlInstanceID, numRanges: 0, haveRangeInfo: true})
nodeMap[sqlInstanceID] = partitionIdx
}
partition := &partitions[partitionIdx]
@@ -1394,6 +1423,7 @@ func (dsp *DistSQLPlanner) partitionSpan(
// Thus, we include the span into partition.Spans without trying to
// merge it with the last span.
partition.Spans = append(partition.Spans, span)
+ partition.numRanges += 1
if log.ExpensiveLogEnabled(ctx, 2) {
log.VEventf(ctx, 2, "partition span: %s, instance ID: %d, reason: %s",
span, sqlInstanceID, reason)
@@ -1419,6 +1449,7 @@ func (dsp *DistSQLPlanner) partitionSpan(
} else {
partition.Spans = append(partition.Spans, partitionedSpan)
}
+ partition.numRanges += 1
if !endKey.Less(rSpan.EndKey) {
// Done.
@@ -1988,7 +2019,7 @@ func (dsp *DistSQLPlanner) maybeParallelizeLocalScans(
if err != nil {
// For some reason we couldn't partition the spans - fallback to
// having a single TableReader.
- spanPartitions = []SpanPartition{{dsp.gatewaySQLInstanceID, info.spans}}
+ spanPartitions = []SpanPartition{{SQLInstanceID: dsp.gatewaySQLInstanceID, Spans: info.spans}}
parallelizeLocal = false
return spanPartitions, parallelizeLocal
}
@@ -2041,7 +2072,7 @@ func (dsp *DistSQLPlanner) maybeParallelizeLocalScans(
// We weren't able to acquire the quota for any additional
// goroutines, so we will fallback to having a single
// TableReader.
- spanPartitions = []SpanPartition{{dsp.gatewaySQLInstanceID, info.spans}}
+ spanPartitions = []SpanPartition{{SQLInstanceID: dsp.gatewaySQLInstanceID, Spans: info.spans}}
}
}
if len(spanPartitions) == 1 {
@@ -2051,7 +2082,7 @@ func (dsp *DistSQLPlanner) maybeParallelizeLocalScans(
parallelizeLocal = false
}
} else {
- spanPartitions = []SpanPartition{{dsp.gatewaySQLInstanceID, info.spans}}
+ spanPartitions = []SpanPartition{{SQLInstanceID: dsp.gatewaySQLInstanceID, Spans: info.spans}}
}
return spanPartitions, parallelizeLocal
}
@@ -2084,7 +2115,7 @@ func (dsp *DistSQLPlanner) planTableReaders(
if err != nil {
return err
}
- spanPartitions = []SpanPartition{{sqlInstanceID, info.spans}}
+ spanPartitions = []SpanPartition{{SQLInstanceID: sqlInstanceID, Spans: info.spans}}
// The spans to scan might actually live on different nodes, so we don't
// want to create "misplanned ranges" metadata since it can result in
// false positives.
diff --git a/pkg/sql/distsql_physical_planner_test.go b/pkg/sql/distsql_physical_planner_test.go
index c38be9c47cbb..9b0ae998c8df 100644
--- a/pkg/sql/distsql_physical_planner_test.go
+++ b/pkg/sql/distsql_physical_planner_test.go
@@ -1210,6 +1210,32 @@ func TestPartitionSpans(t *testing.T) {
if err != nil {
t.Fatal(err)
}
+ countRanges := func(parts []SpanPartition) (count int) {
+ for _, sp := range parts {
+ ri := tsp.NewSpanResolverIterator(nil, nil)
+ for _, s := range sp.Spans {
+ for ri.Seek(ctx, s, kvcoord.Ascending); ; ri.Next(ctx) {
+ if !ri.Valid() {
+ require.NoError(t, ri.Error())
+ break
+ }
+ count += 1
+ if !ri.NeedAnother() {
+ break
+ }
+ }
+ }
+ }
+ return
+ }
+
+ var rangeCount int
+ for _, p := range partitions {
+ n, ok := p.NumRanges()
+ require.True(t, ok)
+ rangeCount += n
+ }
+ require.Equal(t, countRanges(partitions), rangeCount)
// Assert that the PartitionState is what we expect it to be.
tc.partitionState.testingOverrideRandomSelection = nil
@@ -1222,7 +1248,7 @@ func TestPartitionSpans(t *testing.T) {
resMap := make(map[int][][2]string)
for _, p := range partitions {
if _, ok := resMap[int(p.SQLInstanceID)]; ok {
- t.Fatalf("node %d shows up in multiple partitions", p)
+ t.Fatalf("node %d shows up in multiple partitions", p.SQLInstanceID)
}
var spans [][2]string
for _, s := range p.Spans {
@@ -1526,7 +1552,7 @@ func TestPartitionSpansSkipsNodesNotInGossip(t *testing.T) {
resMap := make(map[base.SQLInstanceID][][2]string)
for _, p := range partitions {
if _, ok := resMap[p.SQLInstanceID]; ok {
- t.Fatalf("node %d shows up in multiple partitions", p)
+ t.Fatalf("node %d shows up in multiple partitions", p.SQLInstanceID)
}
var spans [][2]string
for _, s := range p.Spans {
diff --git a/pkg/storage/disk/BUILD.bazel b/pkg/storage/disk/BUILD.bazel
new file mode 100644
index 000000000000..976b33c8b4cf
--- /dev/null
+++ b/pkg/storage/disk/BUILD.bazel
@@ -0,0 +1,56 @@
+load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
+
+go_library(
+ name = "disk",
+ srcs = [
+ "linux_parse.go",
+ "monitor.go",
+ "platform_default.go",
+ "platform_linux.go",
+ "stats.go",
+ ],
+ importpath = "github.com/cockroachdb/cockroach/pkg/storage/disk",
+ visibility = ["//visibility:public"],
+ deps = [
+ "//pkg/util/envutil",
+ "//pkg/util/syncutil",
+ "//pkg/util/timeutil",
+ "@com_github_cockroachdb_errors//:errors",
+ "@com_github_cockroachdb_pebble//vfs",
+ ] + select({
+ "@io_bazel_rules_go//go/platform:android": [
+ "//pkg/util/sysutil",
+ "@org_golang_x_sys//unix",
+ ],
+ "@io_bazel_rules_go//go/platform:linux": [
+ "//pkg/util/sysutil",
+ "@org_golang_x_sys//unix",
+ ],
+ "//conditions:default": [],
+ }),
+)
+
+go_test(
+ name = "disk_test",
+ srcs = [
+ "linux_parse_test.go",
+ "monitor_test.go",
+ ],
+ data = glob(["testdata/**"]),
+ embed = [":disk"],
+ deps = [
+ "//pkg/util/leaktest",
+ "//pkg/util/log",
+ "//pkg/util/syncutil",
+ "@com_github_cockroachdb_pebble//vfs",
+ "@com_github_stretchr_testify//require",
+ ] + select({
+ "@io_bazel_rules_go//go/platform:android": [
+ "@com_github_cockroachdb_datadriven//:datadriven",
+ ],
+ "@io_bazel_rules_go//go/platform:linux": [
+ "@com_github_cockroachdb_datadriven//:datadriven",
+ ],
+ "//conditions:default": [],
+ }),
+)
diff --git a/pkg/storage/disk/linux_parse.go b/pkg/storage/disk/linux_parse.go
new file mode 100644
index 000000000000..0516cdfc2058
--- /dev/null
+++ b/pkg/storage/disk/linux_parse.go
@@ -0,0 +1,219 @@
+// Copyright 2024 The Cockroach Authors.
+//
+// Use of this software is governed by the Business Source License
+// included in the file licenses/BSL.txt.
+//
+// As of the Change Date specified in that file, in accordance with
+// the Business Source License, use of this software will be governed
+// by the Apache License, Version 2.0, included in the file
+// licenses/APL.txt.
+
+//go:build linux
+// +build linux
+
+package disk
+
+import (
+ "bytes"
+ "slices"
+ "strconv"
+ "strings"
+ "time"
+ "unicode"
+ "unsafe"
+
+ "github.com/cockroachdb/errors"
+)
+
+// parseDiskStats parses disk stats from the provided byte slice of data read
+// from /proc/diskstats. It takes a slice of *monitoredDisks sorted by device
+// ID. Any devices not listed are ignored during parsing.
+//
+// Documentation of /proc/diskstats was retrieved from:
+// https://www.kernel.org/doc/Documentation/ABI/testing/procfs-diskstats
+//
+// The /proc/diskstats file displays the I/O statistics
+// of block devices. Each line contains the following 14
+// fields:
+//
+// 1 major number
+// 2 minor number
+// 3 device name
+// 4 reads completed successfully
+// 5 reads merged
+// 6 sectors read
+// 7 time spent reading (ms)
+// 8 writes completed
+// 9 writes merged
+// 10 sectors written
+// 11 time spent writing (ms)
+// 12 I/Os currently in progress
+// 13 time spent doing I/Os (ms)
+// 14 weighted time spent doing I/Os (ms)
+func parseDiskStats(contents []byte, disks []*monitoredDisk) error {
+ for lineNum := 0; len(contents) > 0; lineNum++ {
+ lineBytes, rest := splitLine(contents)
+ line := unsafe.String(&lineBytes[0], len(lineBytes))
+ contents = rest
+
+ line = strings.TrimSpace(line)
+
+ var deviceID DeviceID
+ if devMajor, rest, err := mustParseUint(line, 32, "deviceID.major"); err != nil {
+ return errors.Wrapf(err, "/proc/diskstats:%d: %q", lineNum, err)
+ } else {
+ line = rest
+ deviceID.major = uint32(devMajor)
+ }
+ if devMinor, rest, err := mustParseUint(line, 32, "deviceID.minor"); err != nil {
+ return errors.Wrapf(err, "/proc/diskstats:%d: %q", lineNum, err)
+ } else {
+ line = rest
+ deviceID.minor = uint32(devMinor)
+ }
+ diskIdx := slices.IndexFunc(disks, func(d *monitoredDisk) bool {
+ return d.deviceID == deviceID
+ })
+ if diskIdx == -1 {
+ // This device doesn't exist in the list of devices being monitored,
+ // so skip it.
+ continue
+ }
+
+ var stats Stats
+ var err error
+ stats.DeviceName, line = splitFieldDelim(line)
+ if stats.ReadsCount, line, err = mustParseUint(line, 64, "reads count"); err != nil {
+ return errors.Wrapf(err, "/proc/diskstats:%d: %q", lineNum, err)
+ }
+ if stats.ReadsMerged, line, err = mustParseUint(line, 64, "reads merged"); err != nil {
+ return errors.Wrapf(err, "/proc/diskstats:%d: %q", lineNum, err)
+ }
+ if stats.ReadsSectors, line, err = mustParseUint(line, 64, "reads sectors"); err != nil {
+ return errors.Wrapf(err, "/proc/diskstats:%d: %q", lineNum, err)
+ }
+ if millis, rest, err := mustParseUint(line, 64, "reads duration"); err != nil {
+ return errors.Wrapf(err, "/proc/diskstats:%d: %q", lineNum, err)
+ } else {
+ line = rest
+ stats.ReadsDuration = time.Duration(millis) * time.Millisecond
+ }
+ if stats.WritesCount, line, err = mustParseUint(line, 64, "writes count"); err != nil {
+ return errors.Wrapf(err, "/proc/diskstats:%d: %q", lineNum, err)
+ }
+ if stats.WritesMerged, line, err = mustParseUint(line, 64, "writes merged"); err != nil {
+ return errors.Wrapf(err, "/proc/diskstats:%d: %q", lineNum, err)
+ }
+ if stats.WritesSectors, line, err = mustParseUint(line, 64, "writes sectors"); err != nil {
+ return errors.Wrapf(err, "/proc/diskstats:%d: %q", lineNum, err)
+ }
+ if millis, rest, err := mustParseUint(line, 64, "writes duration"); err != nil {
+ return errors.Wrapf(err, "/proc/diskstats:%d: %q", lineNum, err)
+ } else {
+ line = rest
+ stats.WritesDuration = time.Duration(millis) * time.Millisecond
+ }
+ if stats.InProgressCount, line, err = mustParseUint(line, 64, "inprogress iops"); err != nil {
+ return errors.Wrapf(err, "/proc/diskstats:%d: %q", lineNum, err)
+ }
+ if millis, rest, err := mustParseUint(line, 64, "time doing IO"); err != nil {
+ return errors.Wrapf(err, "/proc/diskstats:%d: %q", lineNum, err)
+ } else {
+ line = rest
+ stats.CumulativeDuration = time.Duration(millis) * time.Millisecond
+ }
+ if millis, rest, err := mustParseUint(line, 64, "weighted IO duration"); err != nil {
+ return errors.Wrapf(err, "/proc/diskstats:%d: %q", lineNum, err)
+ } else {
+ line = rest
+ stats.WeightedIODuration = time.Duration(millis) * time.Millisecond
+ }
+
+ // The below fields are optional.
+ if stats.DiscardsCount, _, line, err = tryParseUint(line, 64); err != nil {
+ return errors.Wrapf(err, "/proc/diskstats:%d: %q", lineNum, err)
+ }
+ if stats.DiscardsMerged, _, line, err = tryParseUint(line, 64); err != nil {
+ return errors.Wrapf(err, "/proc/diskstats:%d: %q", lineNum, err)
+ }
+ if stats.DiscardsSectors, _, line, err = tryParseUint(line, 64); err != nil {
+ return errors.Wrapf(err, "/proc/diskstats:%d: %q", lineNum, err)
+ }
+ if millis, ok, rest, err := tryParseUint(line, 64); err != nil {
+ return errors.Wrapf(err, "/proc/diskstats:%d: %q", lineNum, err)
+ } else if ok {
+ line = rest
+ stats.DiscardsDuration = time.Duration(millis) * time.Millisecond
+ }
+ if stats.FlushesCount, _, line, err = tryParseUint(line, 64); err != nil {
+ return errors.Wrapf(err, "/proc/diskstats:%d: %q", lineNum, err)
+ }
+ if millis, ok, _, err := tryParseUint(line, 64); err != nil {
+ return errors.Wrapf(err, "/proc/diskstats:%d: %q", lineNum, err)
+ } else if ok {
+ stats.FlushesDuration = time.Duration(millis) * time.Millisecond
+ }
+ disks[diskIdx].recordStats(stats)
+ }
+ return nil
+}
+
+func splitLine(b []byte) (line, rest []byte) {
+ i := bytes.IndexByte(b, '\n')
+ if i >= 0 {
+ return b[:i], b[i+1:]
+ }
+ return b, nil
+}
+
+// splitFieldDelim accepts a string beginning with a non-whitespace character.
+// It returns the prefix of the string of non-whitespace characters (`field`),
+// and a slice that holds the beginning of the following string of
+// non-whitespace characters.
+func splitFieldDelim(s string) (field, next string) {
+ var ok bool
+ for i, r := range s {
+ // Whitespace delimits fields.
+ if unicode.IsSpace(r) {
+ // If !ok, this is the first whitespace we've seen. Set field. We
+ // don't stop iterating because we still need to find the start of
+ // `next`.
+ if !ok {
+ ok = true
+ field = s[:i]
+ }
+ } else if ok {
+ // This is the first non-whitespace character after the delimiter.
+ // We know this is where the following field begins and can return
+ // now.
+ next = s[i:]
+ return field, next
+ }
+ }
+ // We never found a delimiter, or the delimiter never ended.
+ return field, next
+}
+
+func mustParseUint(s string, bitSize int, fieldName string) (v int, next string, err error) {
+ var exists bool
+ v, exists, next, err = tryParseUint(s, bitSize)
+ if err != nil {
+ return v, next, err
+ } else if !exists {
+ return 0, next, errors.Newf("%s field not present", errors.Safe(fieldName))
+ }
+ return v, next, nil
+}
+
+func tryParseUint(s string, bitSize int) (v int, ok bool, next string, err error) {
+ var field string
+ field, next = splitFieldDelim(s)
+ if len(field) == 0 {
+ return v, false, next, nil
+ }
+ if v, err := strconv.ParseUint(field, 10, bitSize); err != nil {
+ return 0, true, next, err
+ } else {
+ return int(v), true, next, nil
+ }
+}
diff --git a/pkg/storage/disk/linux_parse_test.go b/pkg/storage/disk/linux_parse_test.go
new file mode 100644
index 000000000000..2f487ccea51a
--- /dev/null
+++ b/pkg/storage/disk/linux_parse_test.go
@@ -0,0 +1,84 @@
+// Copyright 2024 The Cockroach Authors.
+//
+// Use of this software is governed by the Business Source License
+// included in the file licenses/BSL.txt.
+//
+// As of the Change Date specified in that file, in accordance with
+// the Business Source License, use of this software will be governed
+// by the Apache License, Version 2.0, included in the file
+// licenses/APL.txt.
+
+//go:build linux
+// +build linux
+
+package disk
+
+import (
+ "bytes"
+ "cmp"
+ "fmt"
+ "slices"
+ "strconv"
+ "testing"
+ "time"
+
+ "github.com/cockroachdb/cockroach/pkg/util/leaktest"
+ "github.com/cockroachdb/cockroach/pkg/util/log"
+ "github.com/cockroachdb/datadriven"
+ "github.com/cockroachdb/pebble/vfs"
+ "github.com/stretchr/testify/require"
+)
+
+func compareDeviceIDs(a, b DeviceID) int {
+ if v := cmp.Compare(a.major, b.major); v != 0 {
+ return v
+ }
+ return cmp.Compare(a.minor, b.minor)
+}
+
+func TestLinux_CollectDiskStats(t *testing.T) {
+ defer leaktest.AfterTest(t)()
+ defer log.Scope(t).Close(t)
+
+ var buf bytes.Buffer
+ datadriven.RunTest(t, "testdata/linux_diskstats", func(t *testing.T, td *datadriven.TestData) string {
+ switch td.Cmd {
+ case "parse":
+ var disks []*monitoredDisk
+ for _, cmdArg := range td.CmdArgs {
+ var deviceID DeviceID
+ v, err := strconv.ParseUint(cmdArg.Vals[0], 10, 32)
+ require.NoError(t, err)
+ deviceID.major = uint32(v)
+ v, err = strconv.ParseUint(cmdArg.Vals[1], 10, 32)
+ require.NoError(t, err)
+ deviceID.minor = uint32(v)
+ disks = append(disks, &monitoredDisk{deviceID: deviceID})
+ }
+ slices.SortFunc(disks, func(a, b *monitoredDisk) int { return compareDeviceIDs(a.deviceID, b.deviceID) })
+
+ buf.Reset()
+ s := &linuxStatsCollector{
+ File: vfs.NewMemFile([]byte(td.Input)),
+ // Use a small initial buffer size to exercise the buffer
+ // resizing logic.
+ buf: make([]byte, 16),
+ }
+ err := s.collect(disks)
+ if err != nil {
+ return err.Error()
+ }
+ for i := range disks {
+ if i > 0 {
+ fmt.Fprintln(&buf)
+ }
+ fmt.Fprintf(&buf, "%s: ", disks[i].deviceID)
+ fmt.Fprint(&buf, disks[i].stats.lastMeasurement.String())
+ }
+ return buf.String()
+ default:
+ panic(fmt.Sprintf("unrecognized command %q", td.Cmd))
+ }
+ })
+ time.Sleep(time.Second)
+}
diff --git a/pkg/storage/disk/monitor.go b/pkg/storage/disk/monitor.go
new file mode 100644
index 000000000000..5f3d7e04da1a
--- /dev/null
+++ b/pkg/storage/disk/monitor.go
@@ -0,0 +1,230 @@
+// Copyright 2024 The Cockroach Authors.
+//
+// Use of this software is governed by the Business Source License
+// included in the file licenses/BSL.txt.
+//
+// As of the Change Date specified in that file, in accordance with
+// the Business Source License, use of this software will be governed
+// by the Apache License, Version 2.0, included in the file
+// licenses/APL.txt.
+
+package disk
+
+import (
+ "fmt"
+ "slices"
+ "time"
+
+ "github.com/cockroachdb/cockroach/pkg/util/envutil"
+ "github.com/cockroachdb/cockroach/pkg/util/syncutil"
+ "github.com/cockroachdb/cockroach/pkg/util/timeutil"
+ "github.com/cockroachdb/errors"
+ "github.com/cockroachdb/pebble/vfs"
+)
+
+var defaultDiskStatsPollingInterval = envutil.EnvOrDefaultDuration("COCKROACH_DISK_STATS_POLLING_INTERVAL", 100*time.Millisecond)
+
+// DeviceID uniquely identifies block devices.
+type DeviceID struct {
+ major uint32
+ minor uint32
+}
+
+// String returns the string representation of the device ID.
+func (d DeviceID) String() string {
+ return fmt.Sprintf("%d:%d", d.major, d.minor)
+}
+
+// MonitorManager provides observability into a pool of disks by sampling disk stats
+// at a high frequency. To do this efficiently, MonitorManager implements a pub/sub
+// mechanism to avoid redundantly reading disk stats or reading stats for unmonitored
+// disks. The subscription abstraction is implemented via a Monitor that provides
+// callers the flexibility to consume the latest disk stats at different sampling
+// frequencies while enforcing that the monitoredDisk is a singleton.
+type MonitorManager struct {
+ fs vfs.FS
+
+ mu struct {
+ syncutil.Mutex
+ stop chan struct{}
+ disks []*monitoredDisk
+ }
+}
+
+func NewMonitorManager(fs vfs.FS) *MonitorManager {
+ return &MonitorManager{fs: fs}
+}
+
+// Monitor identifies the device underlying the file/directory at the
+// provided path. If the device is not already being monitored it spawns a
+// goroutine to track its disk stats, otherwise it returns a Monitor handle
+// to access the stats.
+func (m *MonitorManager) Monitor(path string) (*Monitor, error) {
+ finfo, err := m.fs.Stat(path)
+ if err != nil {
+ return nil, errors.Wrapf(err, "fstat(%s)", path)
+ }
+ dev := deviceIDFromFileInfo(finfo)
+
+ m.mu.Lock()
+ defer m.mu.Unlock()
+
+ // Check if the disk is already being monitored.
+ var disk *monitoredDisk
+ for i := 0; i < len(m.mu.disks); i++ {
+ if m.mu.disks[i].deviceID == dev {
+ disk = m.mu.disks[i]
+ break
+ }
+ }
+
+ if disk == nil {
+ disk = &monitoredDisk{manager: m, deviceID: dev}
+ m.mu.disks = append(m.mu.disks, disk)
+
+ // The design maintains the invariant that the disk stat polling loop
+ // is always running unless there are no disks being monitored.
+ if m.mu.stop == nil {
+ collector, err := newStatsCollector(m.fs)
+ if err != nil {
+ return nil, err
+ }
+ m.mu.stop = make(chan struct{})
+ go m.monitorDisks(collector, m.mu.stop)
+ }
+ }
+ disk.refCount++
+
+ return &Monitor{monitoredDisk: disk}, nil
+}
+
+func (m *MonitorManager) unrefDisk(disk *monitoredDisk) {
+ var stop chan struct{}
+ func() {
+ m.mu.Lock()
+ defer m.mu.Unlock()
+ disk.refCount--
+ if disk.refCount == 0 {
+ // Since no one is monitoring this disk we remove it from the slice of monitored disks.
+ i := slices.Index(m.mu.disks, disk)
+ if i == -1 {
+ panic(errors.AssertionFailedf("disk %s had one ref, but is not monitored", disk.deviceID))
+ }
+ // Instead of mutating in place we create a new slice in case the disk stat polling loop
+ // is reading the old slice.
+ m.mu.disks = append(slices.Clone(m.mu.disks[:i]), m.mu.disks[i+1:]...)
+
+ // If the MonitorManager has no disks left to monitor, the disk stat polling loop can
+ // be stopped.
+ if len(m.mu.disks) == 0 {
+ stop = m.mu.stop
+ m.mu.stop = nil
+ }
+ }
+ }()
+
+ if stop != nil {
+ stop <- struct{}{}
+ }
+}
+
+type statsCollector interface {
+ collect(disks []*monitoredDisk) error
+}
+
+// monitorDisks runs a loop collecting disk stats for all monitored disks.
+//
+// NB: A stop channel must be passed down to ensure that the function terminates during the
+// race where the MonitorManager creates a new stop channel after unrefDisk sends a message
+// across the old stop channel.
+func (m *MonitorManager) monitorDisks(collector statsCollector, stop chan struct{}) {
+ ticker := time.NewTicker(defaultDiskStatsPollingInterval)
+ defer ticker.Stop()
+
+ for {
+ select {
+ case <-stop:
+ close(stop)
+ return
+ case <-ticker.C:
+ m.mu.Lock()
+ disks := m.mu.disks
+ m.mu.Unlock()
+
+ if err := collector.collect(disks); err != nil {
+ for i := range disks {
+ disks[i].stats.Lock()
+ disks[i].stats.err = err
+ disks[i].stats.Unlock()
+ }
+ }
+ }
+ }
+}
+
+type monitoredDisk struct {
+ manager *MonitorManager
+ deviceID DeviceID
+ // Tracks the number of Monitors observing stats on this disk. Once
+ // the count is zero, the MonitorManager no longer needs to collect stats
+ // for this device.
+ // refCount is protected by manager.mu since the MonitorManager is responsible
+ // for ensuring that the monitoredDisk is a singleton which relies on refCount
+ // being modified atomically.
+ refCount int
+
+ stats struct {
+ syncutil.Mutex
+ err error
+ lastMeasurement Stats
+ }
+}
+
+func (m *monitoredDisk) recordStats(stats Stats) {
+ m.stats.Lock()
+ defer m.stats.Unlock()
+ m.stats.lastMeasurement = stats
+ m.stats.err = nil
+}
+
+// Monitor provides statistics for an individual disk.
+type Monitor struct {
+ *monitoredDisk
+
+ // prevIncrement and prevIncrementAt are used to compute incremental stats.
+ prevIncrement Stats
+ prevIncrementAt time.Time
+}
+
+func (m *Monitor) Close() {
+ if m.monitoredDisk != nil {
+ m.manager.unrefDisk(m.monitoredDisk)
+ m.monitoredDisk = nil
+ }
+}
+
+// CumulativeStats returns the most-recent stats observed.
+func (m *Monitor) CumulativeStats() (Stats, error) {
+ m.stats.Lock()
+ defer m.stats.Unlock()
+ if m.stats.err != nil {
+ return Stats{}, m.stats.err
+ }
+ return m.stats.lastMeasurement, nil
+}
+
+// IncrementalStats computes the change in stats since the last time IncrementalStats
+// was invoked for this monitor. The first time IncrementalStats is invoked, it returns
+// an empty struct.
+func (m *Monitor) IncrementalStats() (Stats, error) {
+ stats, err := m.CumulativeStats()
+ if err != nil {
+ return Stats{}, err
+ }
+ if m.prevIncrementAt.IsZero() {
+ m.prevIncrementAt = timeutil.Now()
+ m.prevIncrement = stats
+ return Stats{}, nil
+ }
+ return stats.delta(&m.prevIncrement), nil
+}
diff --git a/pkg/storage/disk/monitor_test.go b/pkg/storage/disk/monitor_test.go
new file mode 100644
index 000000000000..7313251f49a4
--- /dev/null
+++ b/pkg/storage/disk/monitor_test.go
@@ -0,0 +1,125 @@
+// Copyright 2024 The Cockroach Authors.
+//
+// Use of this software is governed by the Business Source License
+// included in the file licenses/BSL.txt.
+//
+// As of the Change Date specified in that file, in accordance with
+// the Business Source License, use of this software will be governed
+// by the Apache License, Version 2.0, included in the file
+// licenses/APL.txt.
+
+package disk
+
+import (
+ "testing"
+ "time"
+
+ "github.com/cockroachdb/cockroach/pkg/util/leaktest"
+ "github.com/cockroachdb/cockroach/pkg/util/log"
+ "github.com/cockroachdb/cockroach/pkg/util/syncutil"
+ "github.com/cockroachdb/pebble/vfs"
+ "github.com/stretchr/testify/require"
+)
+
+type spyCollector struct {
+ collectCount int
+}
+
+func (s *spyCollector) collect(disks []*monitoredDisk) error {
+ s.collectCount++
+ return nil
+}
+
+func TestMonitorManager_monitorDisks(t *testing.T) {
+ defer leaktest.AfterTest(t)()
+ defer log.Scope(t).Close(t)
+
+ manager := NewMonitorManager(vfs.NewMem())
+ testDisk := &monitoredDisk{
+ manager: manager,
+ deviceID: DeviceID{
+ major: 0,
+ minor: 0,
+ },
+ }
+ manager.mu.disks = []*monitoredDisk{testDisk}
+
+ testCollector := &spyCollector{}
+ stop := make(chan struct{})
+ go manager.monitorDisks(testCollector, stop)
+
+ time.Sleep(2 * defaultDiskStatsPollingInterval)
+ stop <- struct{}{}
+ require.Greater(t, testCollector.collectCount, 0)
+}
+
+func TestMonitor_Close(t *testing.T) {
+ defer leaktest.AfterTest(t)()
+ defer log.Scope(t).Close(t)
+
+ manager := NewMonitorManager(vfs.NewMem())
+ testDisk := &monitoredDisk{
+ manager: manager,
+ deviceID: DeviceID{
+ major: 0,
+ minor: 0,
+ },
+ refCount: 2,
+ }
+ stop := make(chan struct{})
+ manager.mu.stop = stop
+ manager.mu.disks = []*monitoredDisk{testDisk}
+ monitor1 := Monitor{monitoredDisk: testDisk}
+ monitor2 := Monitor{monitoredDisk: testDisk}
+
+ monitor1.Close()
+ require.Equal(t, testDisk.refCount, 1)
+
+ monitor1.Close()
+ // Subsequent calls to a closed monitor should not reduce refCount.
+ require.Equal(t, testDisk.refCount, 1)
+
+ go monitor2.Close()
+ // If there are no monitors, stop the stat polling loop.
+ select {
+ case <-stop:
+ case <-time.After(time.Second):
+ t.Fatal("Failed to receive stop signal")
+ }
+ require.Equal(t, testDisk.refCount, 0)
+}
+
+func TestMonitor_IncrementalStats(t *testing.T) {
+ testDisk := &monitoredDisk{
+ stats: struct {
+ syncutil.Mutex
+ err error
+ lastMeasurement Stats
+ }{
+ lastMeasurement: Stats{
+ ReadsCount: 1,
+ InProgressCount: 3,
+ },
+ },
+ }
+ monitor := Monitor{monitoredDisk: testDisk}
+
+ // First attempt at getting incremental stats should return empty stats.
+ stats, err := monitor.IncrementalStats()
+ require.NoError(t, err)
+ require.Equal(t, stats, Stats{})
+
+ testDisk.stats.lastMeasurement = Stats{
+ ReadsCount: 2,
+ InProgressCount: 2,
+ }
+ wantIncremental := Stats{
+ ReadsCount: 1,
+ // InProgressCount is a gauge so the increment should not be computed.
+ InProgressCount: 2,
+ }
+
+ stats, err = monitor.IncrementalStats()
+ require.NoError(t, err)
+ require.Equal(t, stats, wantIncremental)
+}
diff --git a/pkg/storage/disk/platform_default.go b/pkg/storage/disk/platform_default.go
new file mode 100644
index 000000000000..e0750ce2d27c
--- /dev/null
+++ b/pkg/storage/disk/platform_default.go
@@ -0,0 +1,34 @@
+// Copyright 2024 The Cockroach Authors.
+//
+// Use of this software is governed by the Business Source License
+// included in the file licenses/BSL.txt.
+//
+// As of the Change Date specified in that file, in accordance with
+// the Business Source License, use of this software will be governed
+// by the Apache License, Version 2.0, included in the file
+// licenses/APL.txt.
+
+//go:build !linux
+// +build !linux
+
+package disk
+
+import (
+ "io/fs"
+
+ "github.com/cockroachdb/pebble/vfs"
+)
+
+type defaultCollector struct{}
+
+func (defaultCollector) collect([]*monitoredDisk) error {
+ return nil
+}
+
+func newStatsCollector(fs vfs.FS) (*defaultCollector, error) {
+ return &defaultCollector{}, nil
+}
+
+func deviceIDFromFileInfo(fs.FileInfo) DeviceID {
+ return DeviceID{}
+}
diff --git a/pkg/storage/disk/platform_linux.go b/pkg/storage/disk/platform_linux.go
new file mode 100644
index 000000000000..f63d13f773c6
--- /dev/null
+++ b/pkg/storage/disk/platform_linux.go
@@ -0,0 +1,77 @@
+// Copyright 2024 The Cockroach Authors.
+//
+// Use of this software is governed by the Business Source License
+// included in the file licenses/BSL.txt.
+//
+// As of the Change Date specified in that file, in accordance with
+// the Business Source License, use of this software will be governed
+// by the Apache License, Version 2.0, included in the file
+// licenses/APL.txt.
+
+//go:build linux
+// +build linux
+
+package disk
+
+import (
+ "io"
+ "io/fs"
+
+ "github.com/cockroachdb/cockroach/pkg/util/sysutil"
+ "github.com/cockroachdb/errors"
+ "github.com/cockroachdb/pebble/vfs"
+ "golang.org/x/sys/unix"
+)
+
+// A linuxStatsCollector collects disk stats from /proc/diskstats. It keeps
+// /proc/diskstats open, issuing `ReadAt` calls to re-read stats.
+type linuxStatsCollector struct {
+ vfs.File
+ buf []byte
+}
+
+// collect collects disk stats for the identified devices.
+func (s *linuxStatsCollector) collect(disks []*monitoredDisk) error {
+ var n int
+ var err error
+ for {
+ n, err = s.File.ReadAt(s.buf, 0)
+ if errors.Is(err, io.EOF) {
+ break
+ } else if err != nil {
+ return err
+ }
+ // err == nil
+ //
+ // NB: ReadAt is required to return a non-nil error when it returns n <
+ // len(s.buf). A nil error indicates a full len(s.buf) bytes were read,
+ // and the diskstats file does not fit in our current buffer.
+ //
+ // We want to grow the buffer to be large enough to fit the entirety of
+ // the file. This is required for consistency. We're only guaranteed a
+ // consistent read if we read the entirety of the diskstats file in a
+ // single read. Reallocate (doubling) the buffer and continue.
+ s.buf = make([]byte, len(s.buf)*2)
+ }
+ return parseDiskStats(s.buf[:n], disks)
+}
+
+func newStatsCollector(fs vfs.FS) (*linuxStatsCollector, error) {
+ file, err := fs.Open("/proc/diskstats")
+ if err != nil {
+ return nil, errors.Wrap(err, "opening /proc/diskstats")
+ }
+ return &linuxStatsCollector{
+ File: file,
+ buf: make([]byte, 64),
+ }, nil
+}
+
+func deviceIDFromFileInfo(finfo fs.FileInfo) DeviceID {
+ statInfo := finfo.Sys().(*sysutil.StatT)
+ id := DeviceID{
+ major: unix.Major(statInfo.Dev),
+ minor: unix.Minor(statInfo.Dev),
+ }
+ return id
+}
diff --git a/pkg/storage/disk/stats.go b/pkg/storage/disk/stats.go
new file mode 100644
index 000000000000..f3c890b55899
--- /dev/null
+++ b/pkg/storage/disk/stats.go
@@ -0,0 +1,126 @@
+// Copyright 2024 The Cockroach Authors.
+//
+// Use of this software is governed by the Business Source License
+// included in the file licenses/BSL.txt.
+//
+// As of the Change Date specified in that file, in accordance with
+// the Business Source License, use of this software will be governed
+// by the Apache License, Version 2.0, included in the file
+// licenses/APL.txt.
+
+package disk
+
+import (
+ "fmt"
+ "time"
+)
+
+// SectorSizeBytes is the number of bytes stored on a disk sector.
+const SectorSizeBytes = 512
+
+// Stats describes statistics for an individual disk or volume.
+type Stats struct {
+ DeviceName string
+ // ReadsCount is the count of read operations completed successfully.
+ ReadsCount int
+ // ReadsMerged is the count of adjacent read operations merged by the
+ // operating system before ultimately handling the I/O request to the disk.
+ ReadsMerged int
+ // ReadsSectors is the count of sectors read successfully.
+ ReadsSectors int
+ // ReadsDuration is the total time spent by all reads.
+ //
+ // On Linux this is measured as the time spent between
+ // blk_mq_alloc_request() and __blk_mq_end_request().
+ ReadsDuration time.Duration
+ // WritesCount is the count of write operations completed successfully.
+ WritesCount int
+ // WritesMerged is the count of adjacent write operations merged by the
+ // operating system before ultimately handing the I/O request to the disk.
+ WritesMerged int
+ // WritesSectors is the count of sectors written successfully.
+ WritesSectors int
+ // WritesDuration is the total time spent by all writes.
+ //
+ // On Linux this is measured as the time spent between
+ // blk_mq_alloc_request() and __blk_mq_end_request().
+ WritesDuration time.Duration
+ // InProgressCount is the count of I/O operations currently in-progress.
+ InProgressCount int
+ // CumulativeDuration is the total time spent doing I/O.
+ CumulativeDuration time.Duration
+ // WeightedIODuration is a weighted measure of cumulative time spent in IO.
+ //
+ // On Linux, this field is incremented at each I/O start, I/O completion,
+ // I/O merge, or read of these stats from /proc, by the number of I/Os in
+ // progress (InProgressCount) times the duration spent doing I/O since the
+ // last update of this field. This can provide an easy measure of both I/O
+ // completion time and the backlog that may be accumulating.
+ WeightedIODuration time.Duration
+ // DiscardsCount is the count of discard operations completed successfully.
+ DiscardsCount int
+ // DiscardsMerged is the count of adjacent discard operations merged by the
+ // operating system before ultimately handing the I/O request to disk.
+ DiscardsMerged int
+ // DiscardsSectors is the count of sectors discarded successfully.
+ DiscardsSectors int
+ // DiscardsDuration is the total time spent by all discards.
+ //
+ // On Linux this is measured as the time spent between
+ // blk_mq_alloc_request() and __blk_mq_end_request().
+ DiscardsDuration time.Duration
+ // FlushesCount is the total number of flush requests completed
+ // successfully.
+ //
+ // On Linux, the block layer combines flush requests and executes at most
+ // one at a time. This counts flush requests executed by disk. It is not
+ // available for partitions.
+ FlushesCount int
+ // FlushesDuration is the total time spent by all flush requests.
+ FlushesDuration time.Duration
+}
+
+// String implements fmt.Stringer.
+func (s *Stats) String() string {
+ return fmt.Sprintf("name: %s, r: (%d, %d, %d, %s), w: (%d, %d, %d, %s), (now: %d, c: %s, w: %s), d: (%d, %d, %d, %s), f: (%d, %s)",
+ s.DeviceName, s.ReadsCount, s.ReadsMerged, s.ReadsSectors, s.ReadsDuration,
+ s.WritesCount, s.WritesMerged, s.WritesSectors, s.WritesDuration,
+ s.InProgressCount, s.CumulativeDuration, s.WeightedIODuration,
+ s.DiscardsCount, s.DiscardsMerged, s.DiscardsSectors, s.DiscardsDuration,
+ s.FlushesCount, s.FlushesDuration)
+}
+
+// BytesRead computes the total number of bytes read from disk.
+func (s *Stats) BytesRead() int {
+ return s.ReadsSectors * SectorSizeBytes
+}
+
+// BytesWritten computes the total number of bytes written to disk.
+func (s *Stats) BytesWritten() int {
+ return s.WritesSectors * SectorSizeBytes
+}
+
+// delta computes the delta between the receiver and the provided stats.
+func (s *Stats) delta(o *Stats) Stats {
+ return Stats{
+ DeviceName: s.DeviceName,
+ ReadsCount: s.ReadsCount - o.ReadsCount,
+ ReadsMerged: s.ReadsMerged - o.ReadsMerged,
+ ReadsSectors: s.ReadsSectors - o.ReadsSectors,
+ ReadsDuration: s.ReadsDuration - o.ReadsDuration,
+ WritesCount: s.WritesCount - o.WritesCount,
+ WritesMerged: s.WritesMerged - o.WritesMerged,
+ WritesSectors: s.WritesSectors - o.WritesSectors,
+ WritesDuration: s.WritesDuration - o.WritesDuration,
+ // InProgressCount is not a cumulative stat so we don't compute the difference.
+ InProgressCount: s.InProgressCount,
+ CumulativeDuration: s.CumulativeDuration - o.CumulativeDuration,
+ WeightedIODuration: s.WeightedIODuration - o.WeightedIODuration,
+ DiscardsCount: s.DiscardsCount - o.DiscardsCount,
+ DiscardsMerged: s.DiscardsMerged - o.DiscardsMerged,
+ DiscardsSectors: s.DiscardsSectors - o.DiscardsSectors,
+ DiscardsDuration: s.DiscardsDuration - o.DiscardsDuration,
+ FlushesCount: s.FlushesCount - o.FlushesCount,
+ FlushesDuration: s.FlushesDuration - o.FlushesDuration,
+ }
+}
diff --git a/pkg/storage/disk/testdata/linux_diskstats b/pkg/storage/disk/testdata/linux_diskstats
new file mode 100644
index 000000000000..328cc958fdba
--- /dev/null
+++ b/pkg/storage/disk/testdata/linux_diskstats
@@ -0,0 +1,55 @@
+parse deviceID=(8,0)
+ 7 0 loop0 223 0 3898 79 0 0 0 0 0 256 79 0 0 0 0 0 0
+ 7 1 loop1 16 0 48 6 0 0 0 0 0 16 6 0 0 0 0 0 0
+ 7 2 loop2 7 0 18 1 0 0 0 0 0 4 1 0 0 0 0 0 0
+ 7 3 loop3 14 0 46 8 0 0 0 0 0 16 8 0 0 0 0 0 0
+ 7 4 loop4 7 0 18 8 0 0 0 0 0 12 8 0 0 0 0 0 0
+ 7 5 loop5 16 0 58 6 0 0 0 0 0 12 6 0 0 0 0 0 0
+ 7 6 loop6 7 0 16 5 0 0 0 0 0 8 5 0 0 0 0 0 0
+ 7 7 loop7 1140 0 92076 223 0 0 0 0 0 3552 223 0 0 0 0 0 0
+ 8 0 sda 7794 4956 678238 5695 2254 2939 64002 2371 0 8796 12963 13415 2 164354873 4887 241 9
+ 8 1 sda1 7451 3314 658717 5593 2252 2939 64000 2366 0 8724 12845 13413 0 164153528 4885 0 0
+ 8 14 sda14 73 0 832 21 0 0 0 0 0 40 21 0 0 0 0 0 0
+ 8 15 sda15 188 1642 16265 64 2 0 2 5 0 100 71 2 2 201345 1 0 0
+ 7 8 loop8 67 0 2204 44 0 0 0 0 0 56 44 0 0 0 0 0 0
+ 7 9 loop9 11 0 28 0 0 0 0 0 0 4 0 0 0 0 0 0 0
+----
+8:0: name: sda, r: (7794, 4956, 678238, 5.695s), w: (2254, 2939, 64002, 2.371s), (now: 0, c: 8.796s, w: 12.963s), d: (13415, 2, 164354873, 4.887s), f: (241, 0s)
+
+parse deviceID=(8,0) deviceID=(8,1) deviceID=(8,15)
+ 7 0 loop0 223 0 3898 79 0 0 0 0 0 256 79 0 0 0 0 0 0
+ 7 1 loop1 16 0 48 6 0 0 0 0 0 16 6 0 0 0 0 0 0
+ 7 2 loop2 7 0 18 1 0 0 0 0 0 4 1 0 0 0 0 0 0
+ 7 3 loop3 14 0 46 8 0 0 0 0 0 16 8 0 0 0 0 0 0
+ 7 4 loop4 7 0 18 8 0 0 0 0 0 12 8 0 0 0 0 0 0
+ 7 5 loop5 16 0 58 6 0 0 0 0 0 12 6 0 0 0 0 0 0
+ 7 6 loop6 7 0 16 5 0 0 0 0 0 8 5 0 0 0 0 0 0
+ 7 7 loop7 1140 0 92076 223 0 0 0 0 0 3552 223 0 0 0 0 0 0
+ 8 0 sda 7794 4956 678238 5695 2254 2939 64002 2371 0 8796 12963 13415 2 164354873 4887 241 9
+ 8 1 sda1 7451 3314 658717 5593 2252 2939 64000 2366 0 8724 12845 13413 0 164153528 4885 0 0
+ 8 14 sda14 73 0 832 21 0 0 0 0 0 40 21 0 0 0 0 0 0
+ 8 15 sda15 188 1642 16265 64 2 0 2 5 0 100 71 2 2 201345 1 0 0
+ 7 8 loop8 67 0 2204 44 0 0 0 0 0 56 44 0 0 0 0 0 0
+ 7 9 loop9 11 0 28 0 0 0 0 0 0 4 0 0 0 0 0 0 0
+----
+8:0: name: sda, r: (7794, 4956, 678238, 5.695s), w: (2254, 2939, 64002, 2.371s), (now: 0, c: 8.796s, w: 12.963s), d: (13415, 2, 164354873, 4.887s), f: (241, 0s)
+8:1: name: sda1, r: (7451, 3314, 658717, 5.593s), w: (2252, 2939, 64000, 2.366s), (now: 0, c: 8.724s, w: 12.845s), d: (13413, 0, 164153528, 4.885s), f: (0, 0s)
+8:15: name: sda15, r: (188, 1642, 16265, 64ms), w: (2, 0, 2, 5ms), (now: 0, c: 100ms, w: 71ms), d: (2, 2, 201345, 1ms), f: (0, 0s)
+
+parse deviceID=(259,0) deviceID=(259,1)
+ 7 0 loop0 279 0 19098 105 0 0 0 0 0 816 105 0 0 0 0 0 0
+ 7 1 loop1 15 0 304 12 0 0 0 0 0 24 12 0 0 0 0 0 0
+ 7 2 loop2 246 0 4580 175 0 0 0 0 0 368 175 0 0 0 0 0 0
+ 7 3 loop3 27 0 354 29 0 0 0 0 0 60 29 0 0 0 0 0 0
+ 7 4 loop4 570 0 44320 325 0 0 0 0 0 1828 325 0 0 0 0 0 0
+ 7 5 loop5 11 0 28 0 0 0 0 0 0 4 0 0 0 0 0 0 0
+ 7 6 loop6 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0
+ 7 7 loop7 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0
+ 259 0 nvme0n1 9101 2820 629162 10631 73443 121952 10376450 446427 0 55352 457059 0 0 0 0 0 0
+ 259 2 nvme0n1p1 8657 2210 610588 10381 73436 121944 10376344 446422 0 55216 456804 0 0 0 0 0 0
+ 259 3 nvme0n1p14 110 0 1376 52 0 0 0 0 0 80 52 0 0 0 0 0 0
+ 259 4 nvme0n1p15 198 610 13486 124 2 0 2 1 0 104 126 0 0 0 0 0 0
+ 259 1 nvme1n1 6586 0 58314 64220 7833172 441191 342667872 6135899 2 6684736 6200120 0 0 0 0 0 0
+----
+259:0: name: nvme0n1, r: (9101, 2820, 629162, 10.631s), w: (73443, 121952, 10376450, 7m26.427s), (now: 0, c: 55.352s, w: 7m37.059s), d: (0, 0, 0, 0s), f: (0, 0s)
+259:1: name: nvme1n1, r: (6586, 0, 58314, 1m4.22s), w: (7833172, 441191, 342667872, 1h42m15.899s), (now: 2, c: 1h51m24.736s, w: 1h43m20.12s), d: (0, 0, 0, 0s), f: (0, 0s)
diff --git a/pkg/testutils/lint/passes/deferunlockcheck/deferunlockcheck.go b/pkg/testutils/lint/passes/deferunlockcheck/deferunlockcheck.go
index bfdedcab8028..1f3a3ee721e7 100644
--- a/pkg/testutils/lint/passes/deferunlockcheck/deferunlockcheck.go
+++ b/pkg/testutils/lint/passes/deferunlockcheck/deferunlockcheck.go
@@ -240,7 +240,7 @@ func (lt *LockTracker) addLock(call *ast.CallExpr, isRead bool) {
// maybeReportUnlock tries to find a matching lock for a given unlock by
// iterating backwards in the locks slice. If one is found, it checks if the
-// distance between is greater than maxLineDistance and also has no nlint
+// distance between is greater than maxLineDistance and also has no nolint
// comment and reports on that if both are true.
func (lt *LockTracker) maybeReportUnlock(call *ast.CallExpr, isRead bool) {
sel, ok := call.Fun.(*ast.SelectorExpr)
diff --git a/pkg/util/sysutil/sysutil_unix.go b/pkg/util/sysutil/sysutil_unix.go
index 58c9a0e8a16a..c9aad5293b68 100644
--- a/pkg/util/sysutil/sysutil_unix.go
+++ b/pkg/util/sysutil/sysutil_unix.go
@@ -23,6 +23,9 @@ import (
"golang.org/x/sys/unix"
)
+// StatT is syscall.Stat_t.
+type StatT = syscall.Stat_t
+
// ProcessIdentity returns a string describing the user and group that this
// process is running as.
func ProcessIdentity() string {