Skip to content

Commit

Permalink
Merge pull request #97659 from AlexTalks/backport22.2-97212
Browse files Browse the repository at this point in the history
release-22.2: kvserver: rangefeeds check span validity before settings
  • Loading branch information
AlexTalks authored Mar 8, 2023
2 parents 100ff45 + 39e9c9b commit febfc5e
Show file tree
Hide file tree
Showing 3 changed files with 93 additions and 6 deletions.
8 changes: 8 additions & 0 deletions pkg/kv/kvserver/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"unsafe"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/docs"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/abortspan"
Expand Down Expand Up @@ -1038,6 +1039,10 @@ func (r *Replica) isRangefeedEnabled() (ret bool) {
r.mu.RLock()
defer r.mu.RUnlock()

return r.isRangefeedEnabledRLocked()
}

func (r *Replica) isRangefeedEnabledRLocked() (ret bool) {
if !r.mu.spanConfigExplicitlySet {
return true
}
Expand Down Expand Up @@ -1580,6 +1585,9 @@ func (r *Replica) checkExecutionCanProceedForRangeFeed(
return err
} else if err := r.checkSpanInRangeRLocked(ctx, rSpan); err != nil {
return err
} else if !r.isRangefeedEnabledRLocked() && !RangefeedEnabled.Get(&r.store.cfg.Settings.SV) {
return errors.Errorf("rangefeeds require the kv.rangefeed.enabled setting. See %s",
docs.URL(`change-data-capture.html#enable-rangefeeds-to-reduce-latency`))
} else if err := r.checkTSAboveGCThresholdRLocked(ts, status, false /* isAdmin */); err != nil {
return err
} else if r.requiresExpiringLeaseRLocked() {
Expand Down
5 changes: 0 additions & 5 deletions pkg/kv/kvserver/replica_rangefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (
"sync"
"time"

"github.com/cockroachdb/cockroach/pkg/docs"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/closedts"
Expand Down Expand Up @@ -146,10 +145,6 @@ func (r *Replica) rangeFeedWithRangeID(
stream roachpb.RangeFeedEventSink,
pacer *admission.Pacer,
) *roachpb.Error {
if !r.isRangefeedEnabled() && !RangefeedEnabled.Get(&r.store.cfg.Settings.SV) {
return roachpb.NewErrorf("rangefeeds require the kv.rangefeed.enabled setting. See %s",
docs.URL(`change-data-capture.html#enable-rangefeeds-to-reduce-latency`))
}
ctx := r.AnnotateCtx(stream.Context())

rSpan, err := keys.SpanAddr(args.Span)
Expand Down
86 changes: 85 additions & 1 deletion pkg/kv/kvserver/replica_rangefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -524,7 +524,7 @@ func TestReplicaRangefeedExpiringLeaseError(t *testing.T) {
}
}

func TestReplicaRangefeedRetryErrors(t *testing.T) {
func TestReplicaRangefeedErrors(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

Expand Down Expand Up @@ -611,6 +611,9 @@ func TestReplicaRangefeedRetryErrors(t *testing.T) {
&resolveTimestampEvent,
}
}
if len(events) > 2 {
events = events[:2]
}
if !reflect.DeepEqual(events, expEvents) {
t.Fatalf("incorrect events on stream, found %v, want %v", events, expEvents)
}
Expand Down Expand Up @@ -961,6 +964,87 @@ func TestReplicaRangefeedRetryErrors(t *testing.T) {
pErr := <-streamErrC
assertRangefeedRetryErr(t, pErr, roachpb.RangeFeedRetryError_REASON_LOGICAL_OPS_MISSING)
})
t.Run("range key mismatch", func(t *testing.T) {
knobs := base.TestingKnobs{
Store: &kvserver.StoreTestingKnobs{
// Use a span config override to check that we get a key mismatch error
// despite the span config's setting whenever the key is outside the
// bounds of the range.
SetSpanConfigInterceptor: func(desc *roachpb.RangeDescriptor, conf roachpb.SpanConfig) roachpb.SpanConfig {
if desc.ContainsKey(roachpb.RKey(keys.ScratchRangeMin)) {
conf.RangefeedEnabled = false
return conf
} else if desc.ContainsKey(startRKey) {
conf.RangefeedEnabled = true
return conf
}
return conf
},
},
}
tc, _ := setup(t, knobs)
defer tc.Stopper().Stop(ctx)

ts := tc.Servers[0]
store, err := ts.Stores().GetStore(ts.GetFirstStoreID())
if err != nil {
t.Fatal(err)
}
// Split the range so that the RHS should have a span config with
// rangefeeds enabled (like a range on a system table would), while the
// LHS does not. A rangefeed request on the LHS should still return a
// RangeKeyMismatchError given the span is outside the range, even though
// rangefeeds are not enabled.
tc.SplitRangeOrFatal(t, startKey)

leftReplica := store.LookupReplica(roachpb.RKey(keys.ScratchRangeMin))
leftRangeID := leftReplica.RangeID
rightReplica := store.LookupReplica(startRKey)
rightRangeID := rightReplica.RangeID

// Attempt to establish a rangefeed, sending the request to the LHS.
stream := newTestStream()
streamErrC := make(chan *roachpb.Error, 1)

endKey := keys.ScratchRangeMax
rangefeedSpan := roachpb.Span{Key: startKey, EndKey: endKey}

go func() {
req := roachpb.RangeFeedRequest{
Header: roachpb.Header{
RangeID: leftRangeID,
},
Span: rangefeedSpan,
}
timer := time.AfterFunc(10*time.Second, stream.Cancel)
defer timer.Stop()
streamErrC <- store.RangeFeed(&req, stream)
}()

// Check the error.
pErr := <-streamErrC
if _, ok := pErr.GetDetail().(*roachpb.RangeKeyMismatchError); !ok {
t.Fatalf("got incorrect error for RangeFeed: %v; expecting RangeKeyMismatchError", pErr)
}

// Now send the range feed request to the correct replica, which should not
// encounter errors.
stream = newTestStream()
go func() {
req := roachpb.RangeFeedRequest{
Header: roachpb.Header{
RangeID: rightRangeID,
},
Span: rangefeedSpan,
}
timer := time.AfterFunc(10*time.Second, stream.Cancel)
defer timer.Stop()
streamErrC <- store.RangeFeed(&req, stream)
}()

// Wait for the first checkpoint event.
waitForInitialCheckpointAcrossSpan(t, stream, streamErrC, rangefeedSpan)
})
}

// TestReplicaRangefeedMVCCHistoryMutationError tests that rangefeeds are
Expand Down

0 comments on commit febfc5e

Please sign in to comment.