Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
71225: rangefeedbuffer: introduce a rangefeed buffer r=irfansharif a=irfansharif

Buffer provides a thin memory-bounded buffer to sit on top of a rangefeed. It
accumulates raw rangefeed events[^1], which can be flushed out in timestamp
sorted order en-masse whenever the rangefeed frontier is bumped. If we
accumulate more events than the limit allows for, we error out to the caller.

We need such a thing in both #69614 and #69661.

[^1]: Rangefeed error events are propagated to the caller, checkpoint events
     are discarded.

Release note: None

First commit is from #71256. Co-authored-by: Arul Ajmani <arula@cockroachlabs.com>.

71534: ui/sql: show summarized statements in the statements table r=lindseyjin a=lindseyjin

Resolves #27021

Previously, statements on the statements page hid too much information.
There were complaints that it was difficult to disambiguate between
statements without having to view the full query on the tooltips.

The first commit in this patch implemented back-end changes to add a new
metadata field for summarized queries, as well as formatting functions.
This second commit implements additional logic to pass that new metadata
to the front-end and display it in the Statements Table.

Currently, we only create summaries of SELECT, INSERT/UPSERT, and UPDATE
statements in the back-end. For all other statement types, we will
continue to use the existing summary system.

![image](https://user-images.githubusercontent.com/29153209/137195266-8582bfe8-23bb-4d64-9129-d876087c9abc.png)

Release note (ui change): Show new statement summaries on the Statements
page. This applies for SELECT, INSERT/UPSERT, and UPDATE statements, and
will enable them to be more detailed and less ambiguous than our
previous formats.

71625: clusterversion: add a (disabled) assertion that binary version is latest r=dt a=dt

This is intended to be flipped on and the release version updated when the cluster version mint
commit is backported to a release branch. Doing so would then prevent accidentally backporting
any future cluster versions without causing this to panic in all tests.

Release note: none.

Co-authored-by: irfan sharif <irfanmahmoudsharif@gmail.com>
Co-authored-by: Lindsey Jin <lindsey.jin@cockroachlabs.com>
Co-authored-by: David Taylor <tinystatemachine@gmail.com>
  • Loading branch information
4 people committed Oct 18, 2021
4 parents 8b9c7dd + d93e7cf + 259cdd9 + 0fe5226 commit 38168db
Show file tree
Hide file tree
Showing 34 changed files with 776 additions and 118 deletions.
1 change: 1 addition & 0 deletions pkg/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ ALL_TESTS = [
"//pkg/kv/bulk:bulk_test",
"//pkg/kv/kvclient/kvcoord:kvcoord_test",
"//pkg/kv/kvclient/rangecache:rangecache_test",
"//pkg/kv/kvclient/rangefeed/rangefeedbuffer:rangefeedbuffer_test",
"//pkg/kv/kvclient/rangefeed:rangefeed_test",
"//pkg/kv/kvnemesis:kvnemesis_test",
"//pkg/kv/kvprober:kvprober_test",
Expand Down
9 changes: 9 additions & 0 deletions pkg/clusterversion/cockroach_versions.go
Original file line number Diff line number Diff line change
Expand Up @@ -517,6 +517,15 @@ var (
binaryVersion = versionsSingleton[len(versionsSingleton)-1].Version
)

func init() {
const isReleaseBranch = false
if isReleaseBranch {
if binaryVersion != ByKey(V21_2) {
panic("unexpected cluster version greater than release's binary version")
}
}
}

// ByKey returns the roachpb.Version for a given key.
// It is a fatal error to use an invalid key.
func ByKey(key Key) roachpb.Version {
Expand Down
1 change: 1 addition & 0 deletions pkg/kv/kvclient/rangefeed/rangefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,7 @@ func (f *RangeFeed) maybeRunInitialScan(
// indicating that the value was previously deleted.
if f.withDiff {
v.PrevValue = v.Value
v.PrevValue.Timestamp = hlc.Timestamp{}
}

// It's something of a bummer that we must allocate a new value for each
Expand Down
115 changes: 114 additions & 1 deletion pkg/kv/kvclient/rangefeed/rangefeed_external_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,9 @@ func TestRangeFeedIntegration(t *testing.T) {
v1 := <-rows
require.Equal(t, mkKey("a"), v1.Key)
// Ensure the initial scan contract is fulfilled when WithDiff is specified.
require.Equal(t, v1.Value, v1.PrevValue)
require.Equal(t, v1.Value.RawBytes, v1.PrevValue.RawBytes)
require.Equal(t, v1.Value.Timestamp, afterB)
require.True(t, v1.PrevValue.Timestamp.IsEmpty())
}
{
v2 := <-rows
Expand Down Expand Up @@ -333,3 +334,115 @@ func TestWithOnCheckpoint(t *testing.T) {

wg.Wait()
}

// TestRangefeedValueTimestamps tests that the rangefeed values (and previous
// values) have the kind of timestamps we expect when writing, overwriting, and
// deleting keys.
func TestRangefeedValueTimestamps(t *testing.T) {
defer leaktest.AfterTest(t)()

ctx := context.Background()
tc := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{})
defer tc.Stopper().Stop(ctx)

db := tc.Server(0).DB()
scratchKey := tc.ScratchRange(t)
scratchKey = scratchKey[:len(scratchKey):len(scratchKey)]
mkKey := func(k string) roachpb.Key {
return encoding.EncodeStringAscending(scratchKey, k)
}

sp := roachpb.Span{
Key: scratchKey,
EndKey: scratchKey.PrefixEnd(),
}
{
// Enable rangefeeds, otherwise the thing will retry until they are enabled.
_, err := tc.ServerConn(0).Exec("SET CLUSTER SETTING kv.rangefeed.enabled = true")
require.NoError(t, err)
}
{
// Lower the closed timestamp target duration to speed up the test.
_, err := tc.ServerConn(0).Exec("SET CLUSTER SETTING kv.closed_timestamp.target_duration = '100ms'")
require.NoError(t, err)
}

f, err := rangefeed.NewFactory(tc.Stopper(), db, nil)
require.NoError(t, err)

rows := make(chan *roachpb.RangeFeedValue)
r, err := f.RangeFeed(ctx, "test", sp, db.Clock().Now(),
func(ctx context.Context, value *roachpb.RangeFeedValue) {
select {
case rows <- value:
case <-ctx.Done():
}
},
rangefeed.WithDiff(),
)
require.NoError(t, err)
defer r.Close()

mustGetInt := func(value roachpb.Value) int {
val, err := value.GetInt()
require.NoError(t, err)
return int(val)
}

{
beforeWriteTS := db.Clock().Now()
require.NoError(t, db.Put(ctx, mkKey("a"), 1))
afterWriteTS := db.Clock().Now()

v := <-rows
require.Equal(t, mustGetInt(v.Value), 1)
require.True(t, beforeWriteTS.Less(v.Value.Timestamp))
require.True(t, v.Value.Timestamp.Less(afterWriteTS))

require.False(t, v.PrevValue.IsPresent())
}

{
beforeOverwriteTS := db.Clock().Now()
require.NoError(t, db.Put(ctx, mkKey("a"), 2))
afterOverwriteTS := db.Clock().Now()

v := <-rows
require.Equal(t, mustGetInt(v.Value), 2)
require.True(t, beforeOverwriteTS.Less(v.Value.Timestamp))
require.True(t, v.Value.Timestamp.Less(afterOverwriteTS))

require.True(t, v.PrevValue.IsPresent())
require.Equal(t, mustGetInt(v.PrevValue), 1)
require.True(t, v.PrevValue.Timestamp.IsEmpty())
}

{
beforeDelTS := db.Clock().Now()
require.NoError(t, db.Del(ctx, mkKey("a")))
afterDelTS := db.Clock().Now()

v := <-rows
require.False(t, v.Value.IsPresent())
require.True(t, beforeDelTS.Less(v.Value.Timestamp))
require.True(t, v.Value.Timestamp.Less(afterDelTS))

require.True(t, v.PrevValue.IsPresent())
require.Equal(t, mustGetInt(v.PrevValue), 2)
require.True(t, v.PrevValue.Timestamp.IsEmpty())
}

{
beforeDelTS := db.Clock().Now()
require.NoError(t, db.Del(ctx, mkKey("a")))
afterDelTS := db.Clock().Now()

v := <-rows
require.False(t, v.Value.IsPresent())
require.True(t, beforeDelTS.Less(v.Value.Timestamp))
require.True(t, v.Value.Timestamp.Less(afterDelTS))

require.False(t, v.PrevValue.IsPresent())
require.True(t, v.PrevValue.Timestamp.IsEmpty())
}
}
45 changes: 45 additions & 0 deletions pkg/kv/kvclient/rangefeed/rangefeedbuffer/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")

go_library(
name = "buffer",
srcs = ["buffer.go"],
importpath = "github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangefeed/buffer",
visibility = ["//visibility:public"],
deps = [
"//pkg/ccl/changefeedccl/changefeedbase",
"//pkg/jobs/jobspb",
"//pkg/roachpb:with-mocks",
"//pkg/settings",
"//pkg/util/hlc",
"//pkg/util/log/logcrash",
"//pkg/util/mon",
"//pkg/util/quotapool",
"//pkg/util/syncutil",
"//pkg/util/timeutil",
"@com_github_cockroachdb_errors//:errors",
],
)

go_library(
name = "rangefeedbuffer",
srcs = ["buffer.go"],
importpath = "github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangefeed/rangefeedbuffer",
visibility = ["//visibility:public"],
deps = [
"//pkg/util/hlc",
"//pkg/util/log",
"//pkg/util/syncutil",
"@com_github_cockroachdb_errors//:errors",
],
)

go_test(
name = "rangefeedbuffer_test",
srcs = ["buffer_test.go"],
deps = [
":rangefeedbuffer",
"//pkg/util/hlc",
"//pkg/util/leaktest",
"@com_github_stretchr_testify//require",
],
)
102 changes: 102 additions & 0 deletions pkg/kv/kvclient/rangefeed/rangefeedbuffer/buffer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
// Copyright 2021 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package rangefeedbuffer

import (
"context"
"sort"

"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/errors"
)

// ErrBufferLimitExceeded is returned by the buffer when attempting to add more
// events than the limit the buffer is configured with.
var ErrBufferLimitExceeded = errors.New("buffer limit exceeded")

// Event is the unit of what can be added to the buffer.
type Event interface {
Timestamp() hlc.Timestamp
}

// Buffer provides a thin memory-bounded buffer to sit on top of a rangefeed. It
// accumulates raw events which can then be flushed out in timestamp sorted
// order en-masse whenever the rangefeed frontier is bumped. If we accumulate
// more events than the limit allows for, we error out to the caller.
type Buffer struct {
limit int

mu struct {
syncutil.Mutex

events
frontier hlc.Timestamp
}
}

// New constructs a Buffer with the provided limit.
func New(limit int) *Buffer {
return &Buffer{limit: limit}
}

// Add adds the given entry to the buffer.
func (b *Buffer) Add(ctx context.Context, ev Event) error {
b.mu.Lock()
defer b.mu.Unlock()

if ev.Timestamp().LessEq(b.mu.frontier) {
// If the entry is at a timestamp less than or equal to our last known
// frontier, we can discard it.
return nil
}

if b.mu.events.Len()+1 > b.limit {
return ErrBufferLimitExceeded
}

b.mu.events = append(b.mu.events, ev)
return nil
}

// Flush returns the timestamp sorted list of accumulated events with timestamps
// less than or equal to the provided frontier timestamp. The timestamp is
// recorded (expected to monotonically increase), and future events with
// timestamps less than or equal to it are discarded.
func (b *Buffer) Flush(ctx context.Context, frontier hlc.Timestamp) (events []Event) {
b.mu.Lock()
defer b.mu.Unlock()

if frontier.Less(b.mu.frontier) {
log.Fatalf(ctx, "frontier timestamp regressed: saw %s, previously %s", frontier, b.mu.frontier)
}

// Accumulate all events with timestamps <= the given timestamp in sorted
// order.
sort.Sort(&b.mu.events)
idx := sort.Search(len(b.mu.events), func(i int) bool {
return !b.mu.events[i].Timestamp().LessEq(frontier)
})

events = b.mu.events[:idx]
b.mu.events = b.mu.events[idx:]
b.mu.frontier = frontier
return events
}

type events []Event

var _ sort.Interface = (*events)(nil)

func (es *events) Len() int { return len(*es) }
func (es *events) Less(i, j int) bool { return (*es)[i].Timestamp().Less((*es)[j].Timestamp()) }
func (es *events) Swap(i, j int) { (*es)[i], (*es)[j] = (*es)[j], (*es)[i] }
Loading

0 comments on commit 38168db

Please sign in to comment.