Skip to content

Commit

Permalink
bulkpb: move backuppb.IngestionPerformanceStats out of ccl
Browse files Browse the repository at this point in the history
This message was relied on by kv code. Nobody used the message at its old path.
It's just used for trace events. This PR just moves its code to
`kv/bulk/bulkpb` from `ccl/backupccl/backuppb`.

Part of cockroachdb#91714

Epic: none

Release note: None
  • Loading branch information
ajwerner committed Dec 14, 2022
1 parent dbea195 commit dee89f1
Show file tree
Hide file tree
Showing 14 changed files with 385 additions and 301 deletions.
2 changes: 2 additions & 0 deletions pkg/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -1105,6 +1105,7 @@ GO_TARGETS = [
"//pkg/keys:keys",
"//pkg/keys:keys_test",
"//pkg/keysbase:keysbase",
"//pkg/kv/bulk/bulkpb:bulkpb",
"//pkg/kv/bulk:bulk",
"//pkg/kv/bulk:bulk_test",
"//pkg/kv/kvbase:kvbase",
Expand Down Expand Up @@ -2481,6 +2482,7 @@ GET_X_DATA_TARGETS = [
"//pkg/keysbase:get_x_data",
"//pkg/kv:get_x_data",
"//pkg/kv/bulk:get_x_data",
"//pkg/kv/bulk/bulkpb:get_x_data",
"//pkg/kv/kvbase:get_x_data",
"//pkg/kv/kvclient:get_x_data",
"//pkg/kv/kvclient/kvcoord:get_x_data",
Expand Down
4 changes: 0 additions & 4 deletions pkg/ccl/backupccl/backuppb/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -44,17 +44,13 @@ go_library(
visibility = ["//visibility:public"],
deps = [
"//pkg/cloud",
"//pkg/roachpb",
"//pkg/sql/catalog/descpb",
"//pkg/sql/parser",
"//pkg/sql/protoreflect",
"//pkg/sql/sem/tree",
"//pkg/util/bulk",
"//pkg/util/humanizeutil",
"//pkg/util/log",
"//pkg/util/uuid",
"@com_github_cockroachdb_errors//:errors",
"@com_github_cockroachdb_redact//:redact",
"@com_github_gogo_protobuf//jsonpb",
"@io_opentelemetry_go_otel//attribute",
],
Expand Down
217 changes: 0 additions & 217 deletions pkg/ccl/backupccl/backuppb/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,25 +9,17 @@
package backuppb

import (
"context"
"encoding/json"
"fmt"
"sort"
"strings"
"time"

"github.com/cockroachdb/cockroach/pkg/cloud"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/sql/parser"
"github.com/cockroachdb/cockroach/pkg/sql/protoreflect"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/util/bulk"
"github.com/cockroachdb/cockroach/pkg/util/humanizeutil"
"github.com/cockroachdb/cockroach/pkg/util/log"
_ "github.com/cockroachdb/cockroach/pkg/util/uuid" // required for backup.proto
"github.com/cockroachdb/errors"
"github.com/cockroachdb/redact"
"github.com/gogo/protobuf/jsonpb"
"go.opentelemetry.io/otel/attribute"
)
Expand Down Expand Up @@ -186,215 +178,6 @@ func (e *ExportStats) Tag() string {
return "ExportStats"
}

// Identity implements the TracingAggregatorEvent interface.
func (s *IngestionPerformanceStats) Identity() bulk.TracingAggregatorEvent {
stats := IngestionPerformanceStats{}
stats.SendWaitByStore = make(map[roachpb.StoreID]time.Duration)
return &stats
}

// Combine implements the TracingAggregatorEvent interface.
func (s *IngestionPerformanceStats) Combine(other bulk.TracingAggregatorEvent) {
otherStats, ok := other.(*IngestionPerformanceStats)
if !ok {
panic(fmt.Sprintf("`other` is not of type IngestionPerformanceStats: %T", other))
}

s.DataSize += otherStats.DataSize
s.BufferFlushes += otherStats.BufferFlushes
s.FlushesDueToSize += otherStats.FlushesDueToSize
s.Batches += otherStats.Batches
s.BatchesDueToRange += otherStats.BatchesDueToRange
s.BatchesDueToSize += otherStats.BatchesDueToSize
s.SplitRetries += otherStats.SplitRetries
s.Splits += otherStats.Splits
s.Scatters += otherStats.Scatters
s.ScatterMoved += otherStats.ScatterMoved
s.FillWait += otherStats.FillWait
s.SortWait += otherStats.SortWait
s.FlushWait += otherStats.FlushWait
s.BatchWait += otherStats.BatchWait
s.SendWait += otherStats.SendWait
s.SplitWait += otherStats.SplitWait
s.ScatterWait += otherStats.ScatterWait
s.CommitWait += otherStats.CommitWait
s.Duration += otherStats.Duration

for k, v := range otherStats.SendWaitByStore {
s.SendWaitByStore[k] += v
}
}

// Tag implements the TracingAggregatorEvent interface.
func (s *IngestionPerformanceStats) Tag() string {
return "IngestionPerformanceStats"
}

// Render implements the TracingAggregatorEvent interface.
func (s *IngestionPerformanceStats) Render() []attribute.KeyValue {
const mb = 1 << 20
tags := make([]attribute.KeyValue, 0)
if s.Batches > 0 {
tags = append(tags,
attribute.KeyValue{
Key: "num_batches",
Value: attribute.Int64Value(s.Batches),
},
attribute.KeyValue{
Key: "num_batches_due_to_size",
Value: attribute.Int64Value(s.BatchesDueToSize),
},
attribute.KeyValue{
Key: "num_batches_due_to_range",
Value: attribute.Int64Value(s.BatchesDueToRange),
},
attribute.KeyValue{
Key: "split_retires",
Value: attribute.Int64Value(s.SplitRetries),
},
)
}

if s.BufferFlushes > 0 {
tags = append(tags,
attribute.KeyValue{
Key: "num_flushes",
Value: attribute.Int64Value(s.BufferFlushes),
},
attribute.KeyValue{
Key: "num_flushes_due_to_size",
Value: attribute.Int64Value(s.FlushesDueToSize),
},
)
}

if s.DataSize > 0 {
dataSizeMB := float64(s.DataSize) / mb
tags = append(tags, attribute.KeyValue{
Key: "data_size",
Value: attribute.StringValue(fmt.Sprintf("%.2f MB", dataSizeMB)),
})

if s.Duration > 0 {
throughput := dataSizeMB / s.Duration.Seconds()
tags = append(tags, attribute.KeyValue{
Key: "throughput",
Value: attribute.StringValue(fmt.Sprintf("%.2f MB/s", throughput)),
})
}
}

tags = append(tags,
timeKeyValue("fill_wait", s.FillWait),
timeKeyValue("sort_wait", s.SortWait),
timeKeyValue("flush_wait", s.FlushWait),
timeKeyValue("batch_wait", s.BatchWait),
timeKeyValue("send_wait", s.SendWait),
timeKeyValue("split_wait", s.SplitWait),
attribute.KeyValue{Key: "splits", Value: attribute.Int64Value(s.Splits)},
timeKeyValue("scatter_wait", s.ScatterWait),
attribute.KeyValue{Key: "scatters", Value: attribute.Int64Value(s.Scatters)},
attribute.KeyValue{Key: "scatter_moved", Value: attribute.Int64Value(s.ScatterMoved)},
timeKeyValue("commit_wait", s.CommitWait),
)

// Sort store send wait by IDs before adding them as tags.
ids := make(roachpb.StoreIDSlice, 0, len(s.SendWaitByStore))
for i := range s.SendWaitByStore {
ids = append(ids, i)
}
sort.Sort(ids)
for _, id := range ids {
tags = append(tags, timeKeyValue(attribute.Key(fmt.Sprintf("store-%d_send_wait", id)), s.SendWaitByStore[id]))
}

return tags
}

func timeKeyValue(key attribute.Key, time time.Duration) attribute.KeyValue {
return attribute.KeyValue{
Key: key,
Value: attribute.StringValue(string(humanizeutil.Duration(time))),
}
}

// LogTimings logs the timing ingestion stats.
func (s *IngestionPerformanceStats) LogTimings(ctx context.Context, name, action string) {
log.Infof(ctx,
"%s adder %s; ingested %s: %s filling; %v sorting; %v / %v flushing; %v sending; %v splitting; %d; %v scattering, %d, %v; %v commit-wait",
name,
redact.Safe(action),
sz(s.DataSize),
timing(s.FillWait),
timing(s.SortWait),
timing(s.FlushWait),
timing(s.BatchWait),
timing(s.SendWait),
timing(s.SplitWait),
s.Splits,
timing(s.ScatterWait),
s.Scatters,
s.ScatterMoved,
timing(s.CommitWait),
)
}

// LogFlushes logs stats about buffering added and SST batcher flushes.
func (s *IngestionPerformanceStats) LogFlushes(
ctx context.Context, name, action string, bufSize int64, span roachpb.Span,
) {
log.Infof(ctx,
"%s adder %s; flushed into %s %d times, %d due to buffer size (%s); flushing chunked into %d files (%d for ranges, %d for sst size) +%d split-retries",
name,
redact.Safe(action),
span,
s.BufferFlushes,
s.FlushesDueToSize,
sz(bufSize),
s.Batches,
s.BatchesDueToRange,
s.BatchesDueToSize,
s.SplitRetries,
)
}

// LogPerStoreTimings logs send waits per store.
func (s *IngestionPerformanceStats) LogPerStoreTimings(ctx context.Context, name string) {
if len(s.SendWaitByStore) == 0 {
return
}
ids := make(roachpb.StoreIDSlice, 0, len(s.SendWaitByStore))
for i := range s.SendWaitByStore {
ids = append(ids, i)
}
sort.Sort(ids)

var sb strings.Builder
for i, id := range ids {
// Hack: fill the map with placeholder stores if we haven't seen the store
// with ID below K for all but lowest K, so that next time we print a zero.
if i > 0 && ids[i-1] != id-1 {
s.SendWaitByStore[id-1] = 0
fmt.Fprintf(&sb, "%d: %s;", id-1, timing(0))
}
fmt.Fprintf(&sb, "%d: %s;", id, timing(s.SendWaitByStore[id]))

}
log.Infof(ctx, "%s waited on sending to: %s", name, redact.Safe(sb.String()))
}

type sz int64

func (b sz) String() string { return string(humanizeutil.IBytes(int64(b))) }
func (b sz) SafeValue() {}

type timing time.Duration

func (t timing) String() string { return time.Duration(t).Round(time.Second).String() }
func (t timing) SafeValue() {}

var _ bulk.TracingAggregatorEvent = &IngestionPerformanceStats{}

func init() {
protoreflect.RegisterShorthands((*BackupManifest)(nil), "backup", "backup_manifest")
}
64 changes: 0 additions & 64 deletions pkg/ccl/backupccl/backuppb/backup.proto
Original file line number Diff line number Diff line change
Expand Up @@ -211,67 +211,3 @@ message ExportStats {
// ExportResponse and push the response on a channel.
int64 duration = 3 [(gogoproto.casttype) = "time.Duration"];
}

// IngestionPerformanceStats is a message containing information about the
// creation of SSTables by an SSTBatcher or BufferingAdder.
message IngestionPerformanceStats {
// DataSize is the total byte size of all the SST files ingested.
int64 data_size = 1;

// Buffer Flushes is the number of buffer flushes.
int64 buffer_flushes = 2;

// FlushesDueToSize is the number of buffer flushes due to buffer size.
int64 flushes_due_to_size = 3;

// Batches is the number of batches (addsstable calls) sent.
int64 batches = 4;

// BatchesDueToRange is the number of batches due to range bounds.
int64 batches_due_to_range = 5;

// BatchesDueToSize is the number of batches due to batch size.
int64 batches_due_to_size = 6;

// SplitRetries is the number of extra sub-batches created due to unexpected
// splits.
int64 split_retries = 7;

// Splits is the number of splits sent.
int64 splits = 8;

// Scatters is the number of scatters sent.0
int64 scatters = 9;

// ScatterMoved is the total size in bytes moved by scatter calls.
int64 scatter_moved = 10; // total size moved by scatter calls.

// FillWait is the time spent between buffer flushes.
int64 fill_wait = 11 [(gogoproto.casttype) = "time.Duration"];

// SortWait is the time spent sorting buffers.
int64 sort_wait = 12 [(gogoproto.casttype) = "time.Duration"];

// FlushWait is the time spent flushing buffers.
int64 flush_wait = 13 [(gogoproto.casttype) = "time.Duration"];

// BatchWait is the time spent flushing batches (inc split/scatter/send).
int64 batch_wait = 14 [(gogoproto.casttype) = "time.Duration"];

// SendWait is the time spent sending batches (addsstable+retries)
int64 send_wait = 15 [(gogoproto.casttype) = "time.Duration"];

// SplitWait is the time spent splitting.
int64 split_wait = 16 [(gogoproto.casttype) = "time.Duration"];
// ScatterWait is the time spent scattering.
int64 scatter_wait = 17 [(gogoproto.casttype) = "time.Duration"];

// CommitWait is the time spent waiting for commit timestamps.
int64 commit_wait = 18 [(gogoproto.casttype) = "time.Duration"];

// Duration is the total ingestion time.
int64 duration = 19 [(gogoproto.casttype) = "time.Duration"];

// SendWaitByStore is the time spent sending batches to each store.
map<int32, int64> send_wait_by_store = 20 [(gogoproto.castkey) = "github.com/cockroachdb/cockroach/pkg/roachpb.StoreID", (gogoproto.castvalue) = "time.Duration"];
}
1 change: 1 addition & 0 deletions pkg/gen/protobuf.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ PROTOBUF_SRCS = [
"//pkg/geo/geopb:geopb_go_proto",
"//pkg/gossip:gossip_go_proto",
"//pkg/jobs/jobspb:jobspb_go_proto",
"//pkg/kv/bulk/bulkpb:bulkpb_go_proto",
"//pkg/kv/kvnemesis:kvnemesis_go_proto",
"//pkg/kv/kvserver/closedts/ctpb:ctpb_go_proto",
"//pkg/kv/kvserver/concurrency/lock:lock_go_proto",
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/bulk/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@ go_library(
importpath = "github.com/cockroachdb/cockroach/pkg/kv/bulk",
visibility = ["//visibility:public"],
deps = [
"//pkg/ccl/backupccl/backuppb",
"//pkg/keys",
"//pkg/kv",
"//pkg/kv/bulk/bulkpb",
"//pkg/kv/kvclient/rangecache",
"//pkg/kv/kvserver/kvserverbase",
"//pkg/roachpb",
Expand Down
8 changes: 4 additions & 4 deletions pkg/kv/bulk/buffering_adder.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@ import (
"strings"
"time"

"github.com/cockroachdb/cockroach/pkg/ccl/backupccl/backuppb"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/bulk/bulkpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangecache"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase"
"github.com/cockroachdb/cockroach/pkg/roachpb"
Expand Down Expand Up @@ -249,12 +249,12 @@ func (b *BufferingAdder) doFlush(ctx context.Context, forSize bool) error {
}
b.sink.currentStats.BufferFlushes++

var before *backuppb.IngestionPerformanceStats
var before *bulkpb.IngestionPerformanceStats
var beforeSize int64
// Get the stats before flush by summing totalStats and currentStats
if log.V(3) {
b.sink.mu.Lock()
before = b.sink.mu.totalStats.Identity().(*backuppb.IngestionPerformanceStats)
before = b.sink.mu.totalStats.Identity().(*bulkpb.IngestionPerformanceStats)
before.Combine(&b.sink.mu.totalStats)
before.Combine(&b.sink.currentStats)
beforeSize = b.sink.mu.totalRows.DataSize
Expand Down Expand Up @@ -310,7 +310,7 @@ func (b *BufferingAdder) doFlush(ctx context.Context, forSize bool) error {
if log.V(3) && before != nil {
b.sink.mu.Lock()
written := b.sink.mu.totalRows.DataSize - beforeSize
afterStats := b.sink.mu.totalStats.Identity().(*backuppb.IngestionPerformanceStats)
afterStats := b.sink.mu.totalStats.Identity().(*bulkpb.IngestionPerformanceStats)
afterStats.Combine(&b.sink.mu.totalStats)
afterStats.Combine(&b.sink.currentStats)
b.sink.mu.Unlock()
Expand Down
Loading

0 comments on commit dee89f1

Please sign in to comment.