Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rangefeedcache,settingswatcher,systemcfgwatcher: lose gossip deps #74612

Merged
merged 10 commits into from
Feb 2, 2022
Merged
2 changes: 2 additions & 0 deletions pkg/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ ALL_TESTS = [
"//pkg/kv/kvclient/kvstreamer:kvstreamer_test",
"//pkg/kv/kvclient/rangecache:rangecache_test",
"//pkg/kv/kvclient/rangefeed/rangefeedbuffer:rangefeedbuffer_test",
"//pkg/kv/kvclient/rangefeed/rangefeedcache:rangefeedcache_test",
"//pkg/kv/kvclient/rangefeed:rangefeed_test",
"//pkg/kv/kvnemesis:kvnemesis_test",
"//pkg/kv/kvprober:kvprober_test",
Expand Down Expand Up @@ -186,6 +187,7 @@ ALL_TESTS = [
"//pkg/server/serverpb:serverpb_test",
"//pkg/server/settingswatcher:settingswatcher_test",
"//pkg/server/status:status_test",
"//pkg/server/systemconfigwatcher:systemconfigwatcher_test",
"//pkg/server/telemetry:telemetry_test",
"//pkg/server/tracedumper:tracedumper_test",
"//pkg/server:server_test",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ exp,benchmark
21,AlterPrimaryRegion/alter_empty_database_alter_primary_region
25-26,AlterPrimaryRegion/alter_empty_database_set_initial_primary_region
21,AlterPrimaryRegion/alter_populated_database_alter_primary_region
27,AlterPrimaryRegion/alter_populated_database_set_initial_primary_region
26,AlterPrimaryRegion/alter_populated_database_set_initial_primary_region
20,AlterRegions/alter_empty_database_add_region
21,AlterRegions/alter_empty_database_drop_region
20,AlterRegions/alter_populated_database_add_region
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
# LogicTest: multiregion-9node-3region-3azs
# TODO(#69265): enable multiregion-9node-3region-3azs-tenant.

# Set the closed timestamp interval to be short to shorten the amount of time
# we need to wait for the system config to propagate.
statement ok
SET CLUSTER SETTING kv.closed_timestamp.side_transport_interval = '10ms';
SET CLUSTER SETTING kv.closed_timestamp.target_duration = '10ms';

statement ok
CREATE DATABASE multi_region_test_db PRIMARY REGION "ca-central-1" REGIONS "ap-southeast-2", "us-east-1";
USE multi_region_test_db
Expand Down Expand Up @@ -187,7 +193,7 @@ ALTER TABLE history INJECT STATISTICS '[

# Regression test for #63735. Ensure that we choose locality optimized anti
# joins for the foreign key checks.
query T
query T retry
EXPLAIN INSERT
INTO
history (h_c_id, h_c_d_id, h_c_w_id, h_d_id, h_w_id, h_amount, h_date, h_data)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,12 @@
# TODO(#69265): enable multiregion-9node-3region-3azs-tenant and/or revert
# the commit that split these changes out.

# Set the closed timestamp interval to be short to shorten the amount of time
# we need to wait for the system config to propagate.
statement ok
SET CLUSTER SETTING kv.closed_timestamp.side_transport_interval = '10ms';
SET CLUSTER SETTING kv.closed_timestamp.target_duration = '10ms';

statement ok
CREATE DATABASE multi_region_test_db PRIMARY REGION "ca-central-1" REGIONS "ap-southeast-2", "us-east-1" SURVIVE REGION FAILURE;
USE multi_region_test_db
Expand All @@ -20,7 +26,7 @@ CREATE TABLE regional_by_row_table (
) LOCALITY REGIONAL BY ROW

# Do a REGEXP replace of the enums as these may not be static.
query T
query T retry
SELECT regexp_replace(info, '@\d+', '@<enum_val>', 'g') FROM
[EXPLAIN (OPT, CATALOG) SELECT * FROM regional_by_row_table]
----
Expand Down
1 change: 1 addition & 0 deletions pkg/ccl/serverccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ go_test(
"//pkg/security/securitytest",
"//pkg/server",
"//pkg/server/serverpb",
"//pkg/server/systemconfigwatcher/systemconfigwatchertest",
"//pkg/sql",
"//pkg/sql/distsql",
"//pkg/sql/tests",
Expand Down
6 changes: 6 additions & 0 deletions pkg/ccl/serverccl/server_sql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/ccl/utilccl/licenseccl"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/security"
"github.com/cockroachdb/cockroach/pkg/server/systemconfigwatcher/systemconfigwatchertest"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/distsql"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
Expand Down Expand Up @@ -250,3 +251,8 @@ func TestNoInflightTracesVirtualTableOnTenant(t *testing.T) {
require.Error(t, err, "cluster_inflight_traces should be unsupported")
require.Contains(t, err.Error(), "table crdb_internal.cluster_inflight_traces is not implemented on tenants")
}

func TestSystemConfigWatcherCache(t *testing.T) {
defer leaktest.AfterTest(t)()
systemconfigwatchertest.TestSystemConfigWatcher(t, false /* skipSecondary */)
}
14 changes: 14 additions & 0 deletions pkg/ccl/telemetryccl/testdata/telemetry/multiregion
Original file line number Diff line number Diff line change
Expand Up @@ -384,6 +384,14 @@ IMPORT INTO t7 CSV DATA ('nodelocal://0/t7/export*.csv')
sql.multiregion.import

# Test for locality optimized search counter.

# Lower the closed timestamp subsystem so system config info is transmitted
# rapidly.
exec
SET CLUSTER SETTING kv.closed_timestamp.side_transport_interval = '10ms';
SET CLUSTER SETTING kv.closed_timestamp.target_duration = '5ms';
----

feature-allowlist
sql.plan.opt.locality-optimized-search
----
Expand All @@ -393,6 +401,12 @@ USE survive_region;
CREATE TABLE t8 (a INT PRIMARY KEY) LOCALITY REGIONAL BY ROW
----

# Sleep a large multiple of the closed timestamp target duration to ensure
# that a fresh system config has made its way to the optimizer.
exec
SELECT pg_sleep(.05);
----

feature-usage
SELECT * FROM t8 WHERE a = 1
----
Expand Down
34 changes: 26 additions & 8 deletions pkg/kv/kvclient/rangefeed/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,17 @@ type Option interface {

type config struct {
scanConfig
retryOptions retry.Options
onInitialScanDone OnInitialScanDone
withInitialScan bool
retryOptions retry.Options
onInitialScanDone OnInitialScanDone
withInitialScan bool
onInitialScanError OnInitialScanError
// useRowTimestampInInitialScan indicates that when rows are scanned in an
// initial scan, they should report their timestamp as it exists in KV as
// opposed to the timestamp at which the scan occurred. Both behaviors can
// be sane depending on your use case.
useRowTimestampInInitialScan bool

withDiff bool
onInitialScanError OnInitialScanError
onUnrecoverableError OnUnrecoverableError
onCheckpoint OnCheckpoint
onFrontierAdvance OnFrontierAdvance
Expand Down Expand Up @@ -101,6 +107,17 @@ func WithOnInitialScanError(f OnInitialScanError) Option {
})
}

// WithRowTimestampInInitialScan indicates whether the timestamp of rows
// reported during an initial scan should correspond to the timestamp of that
// row as it exists in KV or should correspond to the timestamp of the initial
// scan. The default is false, indicating that the timestamp should correspond
// to the timestamp of the initial scan.
func WithRowTimestampInInitialScan(shouldUse bool) Option {
return optionFunc(func(c *config) {
c.useRowTimestampInInitialScan = shouldUse
})
}

// WithOnInternalError sets up a callback to report unrecoverable errors during
// operation.
func WithOnInternalError(f OnUnrecoverableError) Option {
Expand All @@ -109,11 +126,12 @@ func WithOnInternalError(f OnUnrecoverableError) Option {
})
}

// WithDiff makes an option to enable an initial scan which defaults to
// false.
func WithDiff() Option {
// WithDiff makes an option to set whether rangefeed events carry the previous
// value in addition to the new value. The option defaults to false. If set,
// initial scan events will carry the same value for both Value and PrevValue.
func WithDiff(withDiff bool) Option {
return optionFunc(func(c *config) {
c.withDiff = true
c.withDiff = withDiff
})
}

Expand Down
6 changes: 3 additions & 3 deletions pkg/kv/kvclient/rangefeed/rangefeed_external_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func TestRangeFeedIntegration(t *testing.T) {
case <-ctx.Done():
}
},
rangefeed.WithDiff(),
rangefeed.WithDiff(true),
rangefeed.WithInitialScan(func(ctx context.Context) {
close(initialScanDone)
}),
Expand Down Expand Up @@ -387,7 +387,7 @@ func TestRangefeedValueTimestamps(t *testing.T) {
case <-ctx.Done():
}
},
rangefeed.WithDiff(),
rangefeed.WithDiff(true),
)
require.NoError(t, err)
defer r.Close()
Expand Down Expand Up @@ -669,7 +669,7 @@ func TestUnrecoverableErrors(t *testing.T) {

r, err := f.RangeFeed(ctx, "test", []roachpb.Span{sp}, preGCThresholdTS,
func(context.Context, *roachpb.RangeFeedValue) {},
rangefeed.WithDiff(),
rangefeed.WithDiff(true),
rangefeed.WithOnInternalError(func(ctx context.Context, err error) {
mu.Lock()
defer mu.Unlock()
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvclient/rangefeed/rangefeed_mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,7 @@ func TestRangeFeedMock(t *testing.T) {
ctx context.Context, value *roachpb.RangeFeedValue,
) {
rows <- value
}, rangefeed.WithDiff())
}, rangefeed.WithDiff(true))
require.NoError(t, err)
<-rows
r.Close()
Expand Down
16 changes: 13 additions & 3 deletions pkg/kv/kvclient/rangefeed/rangefeedbuffer/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,14 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")

go_library(
name = "rangefeedbuffer",
srcs = ["buffer.go"],
srcs = [
"buffer.go",
"kvs.go",
],
importpath = "github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangefeed/rangefeedbuffer",
visibility = ["//visibility:public"],
deps = [
"//pkg/roachpb:with-mocks",
"//pkg/util/hlc",
"//pkg/util/log",
"//pkg/util/syncutil",
Expand All @@ -15,9 +19,15 @@ go_library(

go_test(
name = "rangefeedbuffer_test",
srcs = ["buffer_test.go"],
srcs = [
"buffer_test.go",
"kvs_test.go",
],
embed = [":rangefeedbuffer"],
deps = [
":rangefeedbuffer",
"//pkg/keys",
"//pkg/roachpb:with-mocks",
"//pkg/util/encoding",
"//pkg/util/hlc",
"//pkg/util/leaktest",
"@com_github_stretchr_testify//require",
Expand Down
71 changes: 71 additions & 0 deletions pkg/kv/kvclient/rangefeed/rangefeedbuffer/kvs.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
// Copyright 2022 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package rangefeedbuffer

import (
"sort"

"github.com/cockroachdb/cockroach/pkg/roachpb"
)

// RangeFeedValueEventToKV is a function to type assert an Event into a
// *roachpb.RangeFeedValue and then convert it to a roachpb.KeyValue.
func RangeFeedValueEventToKV(event Event) roachpb.KeyValue {
rfv := event.(*roachpb.RangeFeedValue)
return roachpb.KeyValue{Key: rfv.Key, Value: rfv.Value}
}

// EventsToKVs converts a slice of Events to a slice of KeyValue pairs.
func EventsToKVs(events []Event, f func(ev Event) roachpb.KeyValue) []roachpb.KeyValue {
kvs := make([]roachpb.KeyValue, 0, len(events))
for _, ev := range events {
kvs = append(kvs, f(ev))
}
return kvs
}

// MergeKVs merges two sets of KVs into a single set of KVs with at most one
// KV for any key. The latest value in the merged set wins. If the latest
// value in the set corresponds to a deletion (i.e. its IsPresent() method
// returns false), the value will be omitted from the final set.
//
// Note that the assumption is that base has no duplicated keys. If the set
// of updates is empty, base is returned directly.
func MergeKVs(base, updates []roachpb.KeyValue) []roachpb.KeyValue {
if len(updates) == 0 {
return base
}
combined := make([]roachpb.KeyValue, 0, len(base)+len(updates))
combined = append(append(combined, base...), updates...)
sort.Slice(combined, func(i, j int) bool {
cmp := combined[i].Key.Compare(combined[j].Key)
if cmp == 0 {
return combined[i].Value.Timestamp.Less(combined[j].Value.Timestamp)
}
return cmp < 0
})
r := combined[:0]
for _, kv := range combined {
prevIsSameKey := len(r) > 0 && r[len(r)-1].Key.Equal(kv.Key)
if kv.Value.IsPresent() {
if prevIsSameKey {
r[len(r)-1] = kv
} else {
r = append(r, kv)
}
} else {
if prevIsSameKey {
r = r[:len(r)-1]
}
}
}
return r
}
Loading