Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
119416: pkg/util/eventagg: general aggregation framework for reduction of event cardinality r=dhartunian a=abarganier

**Reviewer note: review commit-wise**

The eventagg package is (currently) a proof of concept ("POC") that aims to provide an easy-to-use library that standardizes the way in which we aggregate Observability event data in CRDB. The goal is to eventually emit that data as "exhaust" from CRDB, which downstream systems can consume to build Observability features that do not rely on CRDB's own availability to aid in debugging & investigations. Additionally, we want to provide facilities for code within CRDB to consume this same data, such that it can also power features internally.

This pull request contains work to create the aggregation mechanism in `pkg/util/eventagg`.

This facilities provide a way of aggregating notable events to reduce cardinality, before performing further processing and/or structured logging.

In addition to the framework, a toy SQL Stats example is provided in `pkg/sql/sqlstats/aggregate.go`, which shows the current developer experience when using the APIs.

See `pkg/util/eventagg/doc.go` for more details

Since this feature is currently experimental, it's gated by the `COCKROACH_ENABLE_STRUCTURED_EVENTS` environment variable, which is disabled by default.

---

Release note: none

Epic: CRDB-35919

123120: ui: Highlight unavailable ranges in red on the summary bar with nonzero r=abarganier a=theloneexplorerquest

Modify the summary bar to change the color of unavailable ranges. When the unavailable range is greater than zero, it will be displayed in red; if it is zero, it will be green.

Fix: #122014

Release note (ui): Changed the color of unavailable ranges on the summary bar to red when nonzero; ranges are green when zero.

124160: roachtest: add test for admission control disk bandwidth  r=sumeerbhola a=aadityasondhi

This test runs a single node target cluster that has two workloads
running on it. The lower priority (qos=background) is very bandwidth
intensive, and without the AC bandwidth limiter would saturate the
provisioned bandwidth (controlled using cgroups).

This test shows how setting the cluster setting
`kvadmission.store.provisioned-bandwidth` limits the disk bandwidth
usage of lower priority work and shapes it at the value set in the
setting.

Fixes #121576.

Release note: None


124293: tools: switch md5 cmd name based on existence  r=dt a=dt

Release note: none.
Epic: none.

124348: backupccl: download pre restore data in cluster restore r=dt a=msbutler

This patch adds the pre restore data spans to the list of spans to download.
While these pre restore spans map to data in the temporary system table
database that are then rewwritten to the actual system table, the download job
ought to download all external data linked into the cluster out of principle.

Fixes #124330

Release note: none

124403: roachtest: use first transient error when checking for flakes r=srosenberg a=renatolabs

Previously, roachtest would only look at the outermost error in a chain that matched a `TransientError` (or `ErrorWithOwnership`) when checking for flakes. However, that is in most cases *not* what we want: if a transient error wraps another transient error, the actual reason for the failure is the original (wrapped) error.

Informs: #123887

Release note: None

124486: kvclient: add WithFiltering option to rangefeed client r=nvanbenschoten,msbutler a=stevendanna

This adds a WithFiltering option to the rangefeed client that passes through the option to the underlying rangefeed.

Epic: none
Release note: None

124491: raft: remove RawNode.TickQuiesced r=pav-kv a=nvanbenschoten

This commit removes the `(*RawNode).TickQuiesced` method. The method was deprecated back in etcd-io/raft#62 and has not been in use since 2018.

Epic: None
Release note: None

Co-authored-by: Alex Barganier <abarganier@cockroachlabs.com>
Co-authored-by: theloneexplorerquest <theloneexplorerquest@gmail.com>
Co-authored-by: Aaditya Sondhi <20070511+aadityasondhi@users.noreply.github.com>
Co-authored-by: David Taylor <tinystatemachine@gmail.com>
Co-authored-by: Michael Butler <butler@cockroachlabs.com>
Co-authored-by: Renato Costa <renato@cockroachlabs.com>
Co-authored-by: Steven Danna <danna@cockroachlabs.com>
Co-authored-by: Nathan VanBenschoten <nvanbenschoten@gmail.com>
  • Loading branch information
9 people committed May 21, 2024
9 parents 3685bb0 + e87a489 + bc8bc7e + f23b867 + 1304f42 + be876f3 + e24022b + 1f27574 + 63504b7 commit 7807ee2
Show file tree
Hide file tree
Showing 26 changed files with 1,060 additions and 24 deletions.
3 changes: 3 additions & 0 deletions pkg/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,7 @@ ALL_TESTS = [
"//pkg/multitenant/tenantcapabilities/tenantcapabilitieswatcher:tenantcapabilitieswatcher_test",
"//pkg/multitenant/tenantcapabilities:tenantcapabilities_test",
"//pkg/multitenant/tenantcostmodel:tenantcostmodel_test",
"//pkg/obs/eventagg:eventagg_test",
"//pkg/obsservice/obslib/ingest:ingest_test",
"//pkg/obsservice/obslib/migrations:migrations_test",
"//pkg/obsservice/obslib/process:process_test",
Expand Down Expand Up @@ -1513,6 +1514,8 @@ GO_TARGETS = [
"//pkg/multitenant/tenantcostmodel:tenantcostmodel",
"//pkg/multitenant/tenantcostmodel:tenantcostmodel_test",
"//pkg/multitenant:multitenant",
"//pkg/obs/eventagg:eventagg",
"//pkg/obs/eventagg:eventagg_test",
"//pkg/obs:obs",
"//pkg/obsservice/cmd/obsservice:obsservice",
"//pkg/obsservice/cmd/obsservice:obsservice_lib",
Expand Down
14 changes: 13 additions & 1 deletion pkg/ccl/backupccl/restore_online.go
Original file line number Diff line number Diff line change
Expand Up @@ -502,13 +502,25 @@ func (r *restoreResumer) maybeWriteDownloadJob(
if !details.ExperimentalOnline {
return nil
}
rekey := mainRestoreData.getRekeys()
rekey = append(rekey, preRestoreData.getRekeys()...)

kr, err := MakeKeyRewriterFromRekeys(execConfig.Codec, mainRestoreData.getRekeys(), mainRestoreData.getTenantRekeys(),
tenantRekey := mainRestoreData.getTenantRekeys()
tenantRekey = append(tenantRekey, preRestoreData.getTenantRekeys()...)
kr, err := MakeKeyRewriterFromRekeys(execConfig.Codec, rekey, tenantRekey,
false /* restoreTenantFromStream */)
if err != nil {
return errors.Wrap(err, "creating key rewriter from rekeys")
}
downloadSpans := mainRestoreData.getSpans()

// Intentionally download preRestoreData after the main data. During a cluster
// restore, preRestore data are linked to a temp system db that are then
// copied over to the real system db. This temp system db is then deleted and
// should never be queried. We still want to download this data, however, to
// protect against external storage deletions of these linked in ssts, but at
// lower priority to the main data.
downloadSpans = append(downloadSpans, preRestoreData.getSpans()...)
for i := range downloadSpans {
var err error
downloadSpans[i], err = rewriteSpan(kr, downloadSpans[i].Clone(), execinfrapb.ElidePrefix_None)
Expand Down
19 changes: 17 additions & 2 deletions pkg/cmd/roachtest/github_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,22 @@ func TestCreatePostRequest(t *testing.T) {
expectedMessagePrefix: testName + " failed",
expectedLabels: []string{"T-testeng", "X-infra-flake"},
},
// 13. Verify hostError failure are routed to test-eng and marked as infra-flake, when the
// 13. When a transient error happens as a result of *another*
// transient error, the corresponding issue uses the first
// transient error in the chain.
{
failures: []failure{
createFailure(rperrors.TransientFailure(
rperrors.NewSSHError(errors.New("oops")), "some_problem",
)),
},
expectedPost: true,
expectedTeam: "@cockroachdb/test-eng",
expectedName: "ssh_problem",
expectedMessagePrefix: testName + " failed",
expectedLabels: []string{"T-testeng", "X-infra-flake"},
},
// 14. Verify hostError failure are routed to test-eng and marked as infra-flake, when the
// first failure is a non-handled error.
{
nonReleaseBlocker: true,
Expand All @@ -359,7 +374,7 @@ func TestCreatePostRequest(t *testing.T) {
expectedMessagePrefix: testName + " failed",
expectedLabels: []string{"T-testeng", "X-infra-flake"},
},
// 14. Verify hostError failure are routed to test-eng and marked as infra-flake, when the only error is
// 15. Verify hostError failure are routed to test-eng and marked as infra-flake, when the only error is
// hostError failure
{
nonReleaseBlocker: true,
Expand Down
23 changes: 21 additions & 2 deletions pkg/cmd/roachtest/test_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -499,14 +499,33 @@ func (t *testImpl) failureMsg() string {
// target. If it does, `refError` is set to that target error value
// and returns true. Otherwise, it returns false.
func failuresMatchingError(failures []failure, refError any) bool {
// unwrap unwraps the error passed to find the innermost error in the
// chain that satisfies the `refError` provided.
unwrap := func(err error) bool {
var matched bool
for {
if isRef := errors.As(err, refError); !isRef {
break
}

matched = true
err = errors.Unwrap(err)
if err == nil {
break
}
}

return matched
}

for _, f := range failures {
for _, err := range f.errors {
if errors.As(err, refError) {
if unwrap(err) {
return true
}
}

if errors.As(f.squashedErr, refError) {
if unwrap(f.squashedErr) {
return true
}
}
Expand Down
17 changes: 17 additions & 0 deletions pkg/cmd/roachtest/testdata/help_command_createpost_15.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
echo
----
----


See: [roachtest README](https://github.com/cockroachdb/cockroach/blob/master/pkg/cmd/roachtest/README.md)



See: [How To Investigate \(internal\)](https://cockroachlabs.atlassian.net/l/c/SSSBr8c7)



See: [Grafana](https://go.crdb.dev/roachtest-grafana//github-test/1689957243000/1689957853000)

----
----
1 change: 1 addition & 0 deletions pkg/cmd/roachtest/tests/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ go_library(
"activerecord_blocklist.go",
"admission_control.go",
"admission_control_database_drop.go",
"admission_control_disk_bandwidth_overload.go",
"admission_control_elastic_backup.go",
"admission_control_elastic_cdc.go",
"admission_control_elastic_io.go",
Expand Down
1 change: 1 addition & 0 deletions pkg/cmd/roachtest/tests/admission_control.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,4 +42,5 @@ func registerAdmission(r registry.Registry) {
registerDatabaseDrop(r)
registerIntentResolutionOverload(r)
registerElasticIO(r)
registerDiskBandwidthOverload(r)
}
210 changes: 210 additions & 0 deletions pkg/cmd/roachtest/tests/admission_control_disk_bandwidth_overload.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,210 @@
// Copyright 2024 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package tests

import (
"context"
"fmt"
"path/filepath"
"strconv"
"strings"
"time"

"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/cluster"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/clusterstats"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/grafana"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/option"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/registry"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/roachtestutil"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/spec"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/test"
"github.com/cockroachdb/cockroach/pkg/roachprod/install"
"github.com/cockroachdb/cockroach/pkg/roachprod/prometheus"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/require"
)

// This test sets up 2 workloads – kv0 consisting of "normal" priority writes
// and kv0 consisting of "background" priority writes. The goal is to show that
// even with a demanding "background" workload that is able to push the used
// bandwidth much higher than the provisioned one, the AC bandwidth limiter
// paces the traffic at the set bandwidth limit.
func registerDiskBandwidthOverload(r registry.Registry) {
r.Add(registry.TestSpec{
Name: "admission-control/disk-bandwidth-limiter",
Owner: registry.OwnerAdmissionControl,
Timeout: time.Hour,
Benchmark: true,
CompatibleClouds: registry.AllClouds,
Suites: registry.ManualOnly,
Cluster: r.MakeClusterSpec(2, spec.CPU(8)),
RequiresLicense: true,
Leases: registry.MetamorphicLeases,
Run: func(ctx context.Context, t test.Test, c cluster.Cluster) {
if c.Spec().NodeCount != 2 {
t.Fatalf("expected 2 nodes, found %d", c.Spec().NodeCount)
}
crdbNodes := c.Spec().NodeCount - 1
workloadNode := crdbNodes + 1

promCfg := &prometheus.Config{}
promCfg.WithPrometheusNode(c.Node(workloadNode).InstallNodes()[0]).
WithNodeExporter(c.Range(1, c.Spec().NodeCount-1).InstallNodes()).
WithCluster(c.Range(1, c.Spec().NodeCount-1).InstallNodes()).
WithGrafanaDashboardJSON(grafana.SnapshotAdmissionControlGrafanaJSON)
err := c.StartGrafana(ctx, t.L(), promCfg)
require.NoError(t, err)

startOpts := option.NewStartOpts(option.NoBackupSchedule)
startOpts.RoachprodOpts.ExtraArgs = append(startOpts.RoachprodOpts.ExtraArgs,
"--vmodule=io_load_listener=2")
roachtestutil.SetDefaultAdminUIPort(c, &startOpts.RoachprodOpts)
settings := install.MakeClusterSettings()
c.Start(ctx, t.L(), startOpts, settings, c.Range(1, crdbNodes))

promClient, err := clusterstats.SetupCollectorPromClient(ctx, c, t.L(), promCfg)
require.NoError(t, err)
statCollector := clusterstats.NewStatsCollector(ctx, promClient)

setAdmissionControl(ctx, t, c, true)

// TODO(aaditya): This function shares some of the logic with roachtestutil.DiskStaller. Consider merging the two.
setBandwidthLimit := func(nodes option.NodeListOption, rw string, bw int, max bool) error {
res, err := c.RunWithDetailsSingleNode(context.TODO(), t.L(), option.WithNodes(nodes[:1]), "lsblk | grep /mnt/data1 | awk '{print $2}'")
if err != nil {
t.Fatalf("error when determining block device: %s", err)
}
parts := strings.Split(strings.TrimSpace(res.Stdout), ":")
if len(parts) != 2 {
t.Fatalf("unexpected output from lsblk: %s", res.Stdout)
}
major, err := strconv.Atoi(parts[0])
if err != nil {
t.Fatalf("error when determining block device: %s", err)
}
minor, err := strconv.Atoi(parts[1])
if err != nil {
t.Fatalf("error when determining block device: %s", err)
}

cockroachIOController := filepath.Join("/sys/fs/cgroup/system.slice", roachtestutil.SystemInterfaceSystemdUnitName()+".service", "io.max")
bytesPerSecondStr := "max"
if !max {
bytesPerSecondStr = fmt.Sprintf("%d", bw)
}
return c.RunE(ctx, option.WithNodes(nodes), "sudo", "/bin/bash", "-c", fmt.Sprintf(
`'echo %d:%d %s=%s > %s'`,
major,
minor,
rw,
bytesPerSecondStr,
cockroachIOController,
))
}

if err := setBandwidthLimit(c.Range(1, crdbNodes), "wbps", 128<<20 /* 128MiB */, false); err != nil {
t.Fatal(err)
}

// TODO(aaditya): Extend this test to also limit reads once we have a
// mechanism to pace read traffic in AC.

db := c.Conn(ctx, t.L(), crdbNodes)
defer db.Close()

const bandwidthLimit = 75
if _, err := db.ExecContext(
// We intentionally set this to much lower than the provisioned value
// above to clearly show that the bandwidth limiter works.
ctx, fmt.Sprintf("SET CLUSTER SETTING kvadmission.store.provisioned_bandwidth = '%dMiB'", bandwidthLimit)); err != nil {
t.Fatalf("failed to set kvadmission.store.provisioned_bandwidth: %v", err)
}

duration := 30 * time.Minute
m := c.NewMonitor(ctx, c.Range(1, crdbNodes))
m.Go(func(ctx context.Context) error {
t.Status(fmt.Sprintf("starting foreground kv workload thread (<%s)", time.Minute))
dur := " --duration=" + duration.String()
url := fmt.Sprintf(" {pgurl:1-%d}", crdbNodes)
cmd := "./cockroach workload run kv --init --histograms=perf/stats.json --concurrency=2 " +
"--splits=1000 --read-percent=50 --min-block-bytes=4096 --max-block-bytes=4096 " +
"--txn-qos='regular' --tolerate-errors" + dur + url
c.Run(ctx, option.WithNodes(c.Node(workloadNode)), cmd)
return nil
})

m.Go(func(ctx context.Context) error {
time.Sleep(1 * time.Minute)
t.Status(fmt.Sprintf("starting background kv workload thread (<%s)", time.Minute))
dur := " --duration=" + duration.String()
url := fmt.Sprintf(" {pgurl:1-%d}", crdbNodes)
cmd := "./cockroach workload run kv --init --histograms=perf/stats.json --concurrency=1024 " +
"--splits=1000 --read-percent=0 --min-block-bytes=4096 --max-block-bytes=4096 " +
"--txn-qos='background' --tolerate-errors" + dur + url
c.Run(ctx, option.WithNodes(c.Node(workloadNode)), cmd)
return nil
})

m.Go(func(ctx context.Context) error {
t.Status(fmt.Sprintf("starting monitoring thread (<%s)", time.Minute))
writeBWMetric := divQuery("rate(sys_host_disk_write_bytes[1m])", 1<<20 /* 1MiB */)
getMetricVal := func(query string, label string) (float64, error) {
point, err := statCollector.CollectPoint(ctx, t.L(), timeutil.Now(), query)
if err != nil {
t.L().Errorf("could not query prom %s", err.Error())
return 0, err
}
val := point[label]
if len(val) != 1 {
err = errors.Errorf(
"unexpected number %d of points for metric %s", len(val), query)
t.L().Errorf("%s", err.Error())
return 0, err
}
for storeID, v := range val {
t.L().Printf("%s(store=%s): %f", query, storeID, v.Value)
return v.Value, nil
}
// Unreachable.
panic("unreachable")
}

// Allow a 5% room for error.
const bandwidthThreshold = bandwidthLimit * 1.05
const collectionIntervalSeconds = 10.0
// Loop for ~20 minutes.
const numIterations = int(20 / (collectionIntervalSeconds / 60))
numErrors := 0
numSuccesses := 0
for i := 0; i < numIterations; i++ {
time.Sleep(collectionIntervalSeconds * time.Second)
val, err := getMetricVal(writeBWMetric, "node")
if err != nil {
numErrors++
continue
}
if val > bandwidthThreshold {
t.Fatalf("write bandwidth %f over last exceeded threshold", val)
}
numSuccesses++
}
t.Status(fmt.Sprintf("done monitoring, errors: %d successes: %d", numErrors, numSuccesses))
if numErrors > numSuccesses {
t.Fatalf("too many errors retrieving metrics")
}
return nil
})

m.Wait()
},
})
}
4 changes: 2 additions & 2 deletions pkg/cmd/roachtest/tests/mixed_version_backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -2338,12 +2338,12 @@ func (bc *backupCollection) verifyOnlineRestore(
}
conn := d.testUtils.cluster.Conn(ctx, l, d.roachNodes[0])
defer conn.Close()
var externalBytes int
var externalBytes uint64
if err := conn.QueryRowContext(ctx, jobutils.GetExternalBytesForConnectedTenant).Scan(&externalBytes); err != nil {
return nil, fmt.Errorf("could not get external bytes: %w", err)
}
if externalBytes != 0 {
return nil, fmt.Errorf("download job %d did not download all data", downloadJobID)
return nil, fmt.Errorf("download job %d did not download all data. Cluster has %d external bytes", downloadJobID, externalBytes)
}
return restoredContents, nil
}
Expand Down
9 changes: 9 additions & 0 deletions pkg/kv/kvclient/rangefeed/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ type config struct {
useRowTimestampInInitialScan bool

withDiff bool
withFiltering bool
onUnrecoverableError OnUnrecoverableError
onCheckpoint OnCheckpoint
frontierQuantize time.Duration
Expand Down Expand Up @@ -146,6 +147,14 @@ func WithDiff(withDiff bool) Option {
})
}

// WithFiltering makes an option to set whether to filter out rangefeeds events
// where the user has set omit_from_changefeeds in their session.
func WithFiltering(withFiltering bool) Option {
return optionFunc(func(c *config) {
c.withFiltering = withFiltering
})
}

// WithRetry configures the retry options for the rangefeed.
func WithRetry(options retry.Options) Option {
return optionFunc(func(c *config) {
Expand Down
Loading

0 comments on commit 7807ee2

Please sign in to comment.