Skip to content

Commit

Permalink
vtorc: fetch shard names only every `--topo-information-refresh-dur…
Browse files Browse the repository at this point in the history
…ation`

Signed-off-by: Tim Vaillancourt <tim@timvaillancourt.com>
  • Loading branch information
timvaillancourt committed Dec 3, 2024
1 parent 2b71d1b commit d5456a2
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 11 deletions.
44 changes: 42 additions & 2 deletions go/vt/vtorc/logic/keyspace_shard_discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,43 @@ import (
"strings"
"sync"

"vitess.io/vitess/go/stats"
"vitess.io/vitess/go/vt/log"

"vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/vtorc/inst"
)

var (
// keyspaceShardNames stores the current names of shards by keyspace.
keyspaceShardNames = make(map[string][]string)
keyspaceShardNamesMu sync.Mutex
statsKeyspaceShardsWatched = stats.NewGaugesFuncWithMultiLabels("KeyspaceShardsWatched",
"The keyspace/shards watched by VTOrc",
[]string{"Keyspace", "Shard"},
getKeyspaceShardsStats,
)
)

// getKeyspaceShardsStats returns the current keyspace/shards watched in stats format.
func getKeyspaceShardsStats() map[string]int64 {
keyspaceShardNamesMu.Lock()
defer keyspaceShardNamesMu.Unlock()
keyspaceShards := make(map[string]int64)
for ks, shards := range keyspaceShardNames {
for _, shard := range shards {
keyspaceShards[ks+"."+shard] = 1
}
}
return keyspaceShards
}

// GetKeyspaceShardNames returns the names of the shards in a given keyspace.
func GetKeyspaceShardNames(keyspaceName string) []string {
keyspaceShardNamesMu.Lock()
defer keyspaceShardNamesMu.Unlock()
return keyspaceShardNames[keyspaceName]
}

// RefreshAllKeyspacesAndShards reloads the keyspace and shard information for the keyspaces that vtorc is concerned with.
func RefreshAllKeyspacesAndShards() {
var keyspaces []string
Expand Down Expand Up @@ -134,13 +165,22 @@ func refreshAllShards(ctx context.Context, keyspaceName string) error {
log.Error(err)
return err
}
for _, shardInfo := range shardInfos {

shardNames := make([]string, 0, len(shardInfos))
for shardName, shardInfo := range shardInfos {
err = inst.SaveShard(shardInfo)
if err != nil {
log.Error(err)
return err
}
shardNames = append(shardNames, shardName)
}
sort.Strings(shardNames)

keyspaceShardNamesMu.Lock()
defer keyspaceShardNamesMu.Unlock()
keyspaceShardNames[keyspaceName] = shardNames

return nil
}

Expand Down
17 changes: 17 additions & 0 deletions go/vt/vtorc/logic/keyspace_shard_discovery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,12 @@ var (
)

func TestRefreshAllKeyspaces(t *testing.T) {
// reset keyspaceShardNames
keyspaceShardNames = make(map[string][]string)
defer func() {
keyspaceShardNames = make(map[string][]string)
}()

// Store the old flags and restore on test completion
oldTs := ts
oldClustersToWatch := clustersToWatch
Expand Down Expand Up @@ -119,6 +125,17 @@ func TestRefreshAllKeyspaces(t *testing.T) {
verifyKeyspaceInfo(t, "ks4", keyspaceDurabilityTest, "")
verifyPrimaryAlias(t, "ks4", "80-", "zone_ks4-0000000101", "")

// Confirm caching of shard names
require.Equal(t, map[string][]string{
"ks1": {"-80", "80-"},
"ks2": {"-80", "80-"},
"ks3": {"-80", "80-"},
"ks4": {"-80", "80-"},
}, keyspaceShardNames)
for _, ksName := range keyspaceNames {
require.Equal(t, []string{"-80", "80-"}, GetKeyspaceShardNames(ksName))
}
require.Len(t, GetKeyspaceShardNames("does-not-exist"), 0)
}

func TestRefreshKeyspace(t *testing.T) {
Expand Down
10 changes: 1 addition & 9 deletions go/vt/vtorc/logic/tablet_discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,15 +118,7 @@ func refreshTabletsUsing(loader func(tabletAlias string), forceRefresh bool) {
input := strings.Split(ks, "/")
keyspaceShards = append(keyspaceShards, &topo.KeyspaceShard{Keyspace: input[0], Shard: input[1]})
} else {
// Assume this is a keyspace and find all shards in keyspace
ctx, cancel := context.WithTimeout(context.Background(), topo.RemoteOperationTimeout)
defer cancel()
shards, err := ts.GetShardNames(ctx, ks)
if err != nil {
// Log the errr and continue
log.Errorf("Error fetching shards for keyspace: %v", ks)
continue
}
shards := GetKeyspaceShardNames(ks)
if len(shards) == 0 {
log.Errorf("Topo has no shards for ks: %v", ks)
continue
Expand Down

0 comments on commit d5456a2

Please sign in to comment.