Skip to content

Commit

Permalink
backport upstream 16655
Browse files Browse the repository at this point in the history
  • Loading branch information
GuptaManan100 authored and tanjinx committed Nov 22, 2024
1 parent 44b64bb commit 8fc4301
Show file tree
Hide file tree
Showing 7 changed files with 302 additions and 65 deletions.
99 changes: 87 additions & 12 deletions go/vt/discovery/keyspace_events.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ package discovery
import (
"context"
"fmt"
"slices"
"sync"
"time"

"google.golang.org/protobuf/proto"

Expand All @@ -36,6 +38,11 @@ import (
vschemapb "vitess.io/vitess/go/vt/proto/vschema"
)

var (
// waitConsistentKeyspacesCheck is the amount of time to wait for between checks to verify the keyspace is consistent.
waitConsistentKeyspacesCheck = 100 * time.Millisecond
)

// KeyspaceEventWatcher is an auxiliary watcher that watches all availability incidents
// for all keyspaces in a Vitess cell and notifies listeners when the events have been resolved.
// Right now this is capable of detecting the end of failovers, both planned and unplanned,
Expand Down Expand Up @@ -643,28 +650,53 @@ func (kew *KeyspaceEventWatcher) TargetIsBeingResharded(ctx context.Context, tar
return ks.beingResharded(target.Shard)
}

// PrimaryIsNotServing checks if the reason why the given target is not accessible right now is
// that the primary tablet for that shard is not serving. This is possible during a Planned Reparent Shard
// operation. Just as the operation completes, a new primary will be elected, and it will send its own healthcheck
// stating that it is serving. We should buffer requests until that point.
// There are use cases where people do not run with a Primary server at all, so we must verify that
// we only start buffering when a primary was present, and it went not serving.
// The shard state keeps track of the current primary and the last externally reparented time, which we can use
// to determine that there was a serving primary which now became non serving. This is only possible in a DemotePrimary
// RPC which are only called from ERS and PRS. So buffering will stop when these operations succeed.
// We return the tablet alias of the primary if it is serving.
func (kew *KeyspaceEventWatcher) PrimaryIsNotServing(ctx context.Context, target *querypb.Target) (*topodatapb.TabletAlias, bool) {
// ShouldStartBufferingForTarget checks if we should be starting buffering for the given target.
// We check the following things before we start buffering -
// 1. The shard must have a primary.
// 2. The primary must be non-serving.
// 3. The keyspace must be marked inconsistent.
//
// This buffering is meant to kick in during a Planned Reparent Shard operation.
// As part of that operation the old primary will become non-serving. At that point
// this code should return true to start buffering requests.
// Just as the PRS operation completes, a new primary will be elected, and
// it will send its own healthcheck stating that it is serving. We should buffer requests until
// that point.
//
// There are use cases where people do not run with a Primary server at all, so we must
// verify that we only start buffering when a primary was present, and it went not serving.
// The shard state keeps track of the current primary and the last externally reparented time, which
// we can use to determine that there was a serving primary which now became non serving. This is
// only possible in a DemotePrimary RPC which are only called from ERS and PRS. So buffering will
// stop when these operations succeed. We also return the tablet alias of the primary if it is serving.
func (kew *KeyspaceEventWatcher) ShouldStartBufferingForTarget(ctx context.Context, target *querypb.Target) (*topodatapb.TabletAlias, bool) {
if target.TabletType != topodatapb.TabletType_PRIMARY {
// We don't support buffering for any target tablet type other than the primary.
return nil, false
}
ks := kew.getKeyspaceStatus(ctx, target.Keyspace)
if ks == nil {
// If the keyspace status is nil, then the keyspace must be deleted.
// The user query is trying to access a keyspace that has been deleted.
// There is no reason to buffer this query.
return nil, false
}
ks.mu.Lock()
defer ks.mu.Unlock()
if state, ok := ks.shards[target.Shard]; ok {
// If the primary tablet was present then externallyReparented will be non-zero and currentPrimary will be not nil
// As described in the function comment, we only want to start buffering when all the following conditions are met -
// 1. The shard must have a primary. We check this by checking the currentPrimary and externallyReparented fields being non-empty.
// They are set the first time the shard registers an update from a serving primary and are never cleared out after that.
// If the user has configured vtgates to wait for the primary tablet healthchecks before starting query service, this condition
// will always be true.
// 2. The primary must be non-serving. We check this by checking the serving field in the shard state.
// When a primary becomes non-serving, it also marks the keyspace inconsistent. So the next check is only added
// for being defensive against any bugs.
// 3. The keyspace must be marked inconsistent. We check this by checking the consistent field in the keyspace state.
//
// The reason we need all the three checks is that we want to be very defensive in when we start buffering.
// We don't want to start buffering when we don't know for sure if the primary
// is not serving and we will receive an update that stops buffering soon.
return state.currentPrimary, !state.serving && !ks.consistent && state.externallyReparented != 0 && state.currentPrimary != nil
}
return nil, false
Expand Down Expand Up @@ -716,3 +748,46 @@ func (kew *KeyspaceEventWatcher) MarkShardNotServing(ctx context.Context, keyspa
}
return true
}

// WaitForConsistentKeyspaces waits for the given set of keyspaces to be marked consistent.
func (kew *KeyspaceEventWatcher) WaitForConsistentKeyspaces(ctx context.Context, ksList []string) error {
// We don't want to change the original keyspace list that we receive so we clone it
// before we empty it elements down below.
keyspaces := slices.Clone(ksList)
for {
// We empty keyspaces as we find them to be consistent.
allConsistent := true
for i, ks := range keyspaces {
if ks == "" {
continue
}

// Get the keyspace status and see it is consistent yet or not.
kss := kew.getKeyspaceStatus(ctx, ks)
// If kss is nil, then it must be deleted. In that case too it is fine for us to consider
// it consistent since the keyspace has been deleted.
if kss == nil || kss.consistent {
keyspaces[i] = ""
} else {
allConsistent = false
}
}

if allConsistent {
// all the keyspaces are consistent.
return nil
}

// Unblock after the sleep or when the context has expired.
select {
case <-ctx.Done():
for _, ks := range keyspaces {
if ks != "" {
log.Infof("keyspace %v didn't become consistent", ks)
}
}
return ctx.Err()
case <-time.After(waitConsistentKeyspacesCheck):
}
}
}
119 changes: 100 additions & 19 deletions go/vt/discovery/keyspace_events_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,11 +86,11 @@ func TestKeyspaceEventTypes(t *testing.T) {
kew := NewKeyspaceEventWatcher(ctx, ts2, hc, cell)

type testCase struct {
name string
kss *keyspaceState
shardToCheck string
expectResharding bool
expectPrimaryNotServing bool
name string
kss *keyspaceState
shardToCheck string
expectResharding bool
expectShouldBuffer bool
}

testCases := []testCase{
Expand Down Expand Up @@ -127,9 +127,9 @@ func TestKeyspaceEventTypes(t *testing.T) {
},
consistent: false,
},
shardToCheck: "-",
expectResharding: true,
expectPrimaryNotServing: false,
shardToCheck: "-",
expectResharding: true,
expectShouldBuffer: false,
},
{
name: "two to four resharding in progress",
Expand Down Expand Up @@ -188,9 +188,9 @@ func TestKeyspaceEventTypes(t *testing.T) {
},
consistent: false,
},
shardToCheck: "-80",
expectResharding: true,
expectPrimaryNotServing: false,
shardToCheck: "-80",
expectResharding: true,
expectShouldBuffer: false,
},
{
name: "unsharded primary not serving",
Expand All @@ -214,9 +214,9 @@ func TestKeyspaceEventTypes(t *testing.T) {
},
consistent: false,
},
shardToCheck: "-",
expectResharding: false,
expectPrimaryNotServing: true,
shardToCheck: "-",
expectResharding: false,
expectShouldBuffer: true,
},
{
name: "sharded primary not serving",
Expand Down Expand Up @@ -248,9 +248,9 @@ func TestKeyspaceEventTypes(t *testing.T) {
},
consistent: false,
},
shardToCheck: "-80",
expectResharding: false,
expectPrimaryNotServing: true,
shardToCheck: "-80",
expectResharding: false,
expectShouldBuffer: true,
},
}

Expand All @@ -265,8 +265,89 @@ func TestKeyspaceEventTypes(t *testing.T) {
resharding := kew.TargetIsBeingResharded(ctx, tc.kss.shards[tc.shardToCheck].target)
require.Equal(t, resharding, tc.expectResharding, "TargetIsBeingResharded should return %t", tc.expectResharding)

_, primaryDown := kew.PrimaryIsNotServing(ctx, tc.kss.shards[tc.shardToCheck].target)
require.Equal(t, primaryDown, tc.expectPrimaryNotServing, "PrimaryIsNotServing should return %t", tc.expectPrimaryNotServing)
_, shouldBuffer := kew.ShouldStartBufferingForTarget(ctx, tc.kss.shards[tc.shardToCheck].target)
require.Equal(t, shouldBuffer, tc.expectShouldBuffer, "ShouldStartBufferingForTarget should return %t", tc.expectShouldBuffer)
})
}
}

// TestWaitForConsistentKeyspaces tests the behaviour of WaitForConsistent for different scenarios.
func TestWaitForConsistentKeyspaces(t *testing.T) {
testcases := []struct {
name string
ksMap map[string]*keyspaceState
ksList []string
errExpected string
}{
{
name: "Empty keyspace list",
ksList: nil,
ksMap: map[string]*keyspaceState{
"ks1": {},
},
errExpected: "",
},
{
name: "All keyspaces consistent",
ksList: []string{"ks1", "ks2"},
ksMap: map[string]*keyspaceState{
"ks1": {
consistent: true,
},
"ks2": {
consistent: true,
},
},
errExpected: "",
},
{
name: "One keyspace inconsistent",
ksList: []string{"ks1", "ks2"},
ksMap: map[string]*keyspaceState{
"ks1": {
consistent: true,
},
"ks2": {
consistent: false,
},
},
errExpected: "context canceled",
},
{
name: "One deleted keyspace - consistent",
ksList: []string{"ks1", "ks2"},
ksMap: map[string]*keyspaceState{
"ks1": {
consistent: true,
},
"ks2": {
deleted: true,
},
},
errExpected: "",
},
}

for _, tt := range testcases {
t.Run(tt.name, func(t *testing.T) {
// We create a cancelable context and immediately cancel it.
// We don't want the unit tests to wait, so we only test the first
// iteration of whether the keyspace event watcher returns
// that the keyspaces are consistent or not.
ctx, cancel := context.WithCancel(context.Background())
cancel()
kew := KeyspaceEventWatcher{
keyspaces: tt.ksMap,
mu: sync.Mutex{},
ts: &fakeTopoServer{},
}
err := kew.WaitForConsistentKeyspaces(ctx, tt.ksList)
if tt.errExpected != "" {
require.ErrorContains(t, err, tt.errExpected)
} else {
require.NoError(t, err)
}

})
}
}
Expand Down
14 changes: 7 additions & 7 deletions go/vt/srvtopo/discover.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,8 @@ limitations under the License.
package srvtopo

import (
"sync"

"context"
"sync"

"vitess.io/vitess/go/vt/concurrency"
"vitess.io/vitess/go/vt/log"
Expand All @@ -29,15 +28,16 @@ import (
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
)

// FindAllTargets goes through all serving shards in the topology for the provided keyspaces
// FindAllTargetsAndKeyspaces goes through all serving shards in the topology for the provided keyspaces
// and tablet types. If no keyspaces are provided all available keyspaces in the topo are
// fetched. It returns one Target object per keyspace/shard/matching TabletType.
func FindAllTargets(ctx context.Context, ts Server, cell string, keyspaces []string, tabletTypes []topodatapb.TabletType) ([]*querypb.Target, error) {
// It also returns all the keyspaces that it found.
func FindAllTargetsAndKeyspaces(ctx context.Context, ts Server, cell string, keyspaces []string, tabletTypes []topodatapb.TabletType) ([]*querypb.Target, []string, error) {
var err error
if len(keyspaces) == 0 {
keyspaces, err = ts.GetSrvKeyspaceNames(ctx, cell, true)
if err != nil {
return nil, err
return nil, nil, err
}
}

Expand Down Expand Up @@ -95,8 +95,8 @@ func FindAllTargets(ctx context.Context, ts Server, cell string, keyspaces []str
}
wg.Wait()
if errRecorder.HasErrors() {
return nil, errRecorder.Error()
return nil, nil, errRecorder.Error()
}

return targets, nil
return targets, keyspaces, nil
}
Loading

0 comments on commit 8fc4301

Please sign in to comment.