Skip to content

Commit

Permalink
Merge branch 'slack-15.0' into patch-16304
Browse files Browse the repository at this point in the history
  • Loading branch information
timvaillancourt authored Oct 9, 2024
2 parents d716f78 + 610fc3f commit c45f303
Show file tree
Hide file tree
Showing 30 changed files with 861 additions and 351 deletions.
21 changes: 17 additions & 4 deletions go/vt/discovery/healthcheck.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ limitations under the License.
// Alternatively, use a Watcher implementation which will constantly watch
// a source (e.g. the topology) and add and remove tablets as they are
// added or removed from the source.
// For a Watcher example have a look at NewCellTabletsWatcher().
// For a Watcher example have a look at NewTopologyWatcher().
//
// Internally, the HealthCheck module is connected to each tablet and has a
// streaming RPC (StreamHealth) open to receive periodic health infos.
Expand Down Expand Up @@ -92,7 +92,7 @@ var (
refreshKnownTablets = true

// topoReadConcurrency tells us how many topo reads are allowed in parallel.
topoReadConcurrency = 32
topoReadConcurrency int64 = 32

// healthCheckDialConcurrency tells us how many healthcheck connections can be opened to tablets at once. This should be less than the golang max thread limit of 10000.
healthCheckDialConcurrency int64 = 1024
Expand Down Expand Up @@ -178,7 +178,7 @@ func registerWebUIFlags(fs *pflag.FlagSet) {
fs.StringVar(&TabletURLTemplateString, "tablet_url_template", "http://{{.GetTabletHostPort}}", "Format string describing debug tablet url formatting. See getTabletDebugURL() for how to customize this.")
fs.DurationVar(&refreshInterval, "tablet_refresh_interval", 1*time.Minute, "Tablet refresh interval.")
fs.BoolVar(&refreshKnownTablets, "tablet_refresh_known_tablets", true, "Whether to reload the tablet's address/port map from topo in case they change.")
fs.IntVar(&topoReadConcurrency, "topo_read_concurrency", 32, "Concurrency of topo reads.")
fs.Int64Var(&topoReadConcurrency, "topo_read_concurrency", 32, "Concurrency of topo reads.")
fs.Int64Var(&healthCheckDialConcurrency, "healthcheck-dial-concurrency", 1024, "Maximum concurrency of new healthcheck connections. This should be less than the golang max thread limit of 10000.")
ParseTabletURLTemplateFromFlag()
}
Expand Down Expand Up @@ -365,6 +365,7 @@ func NewHealthCheck(ctx context.Context, retryDelay, healthCheckTimeout time.Dur
cellAliases: make(map[string]string),
}
var topoWatchers []*TopologyWatcher
var filter TabletFilter
cells := strings.Split(cellsToWatch, ",")
if cellsToWatch == "" {
cells = append(cells, localCell)
Expand All @@ -375,7 +376,19 @@ func NewHealthCheck(ctx context.Context, retryDelay, healthCheckTimeout time.Dur
if c == "" {
continue
}
topoWatchers = append(topoWatchers, NewCellTabletsWatcher(ctx, topoServer, hc, filters, c, refreshInterval, refreshKnownTablets, topoReadConcurrency))
if len(tabletFilters) > 0 {
if len(KeyspacesToWatch) > 0 {
log.Exitf("Only one of -keyspaces_to_watch and -tablet_filters may be specified at a time")
}
fbs, err := NewFilterByShard(tabletFilters)
if err != nil {
log.Exitf("Cannot parse tablet_filters parameter: %v", err)
}
filter = fbs
} else if len(KeyspacesToWatch) > 0 {
filter = NewFilterByKeyspace(KeyspacesToWatch)
}
topoWatchers = append(topoWatchers, NewTopologyWatcher(ctx, topoServer, hc, filter, c, refreshInterval, refreshKnownTablets, topoReadConcurrency))
}

hc.topoWatchers = topoWatchers
Expand Down
103 changes: 52 additions & 51 deletions go/vt/discovery/tablet_picker.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,28 +17,26 @@ limitations under the License.
package discovery

import (
"context"
"fmt"
"io"
"math/rand"
"sort"
"strings"
"sync"
"time"

"vitess.io/vitess/go/stats"

"vitess.io/vitess/go/vt/grpcclient"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/topo/topoproto"

vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"

"vitess.io/vitess/go/vt/vterrors"
"vitess.io/vitess/go/vt/vttablet/tabletconn"

"vitess.io/vitess/go/vt/log"

"context"

querypb "vitess.io/vitess/go/vt/proto/query"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
"vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/vterrors"
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
)

type TabletPickerCellPreference int
Expand Down Expand Up @@ -291,13 +289,12 @@ func (tp *TabletPicker) orderByTabletType(candidates []*topo.TabletInfo) []*topo
return candidates
}

// PickForStreaming picks an available tablet.
// PickForStreaming picks a tablet that is healthy and serving.
// Selection is based on CellPreference.
// See prioritizeTablets for prioritization logic.
func (tp *TabletPicker) PickForStreaming(ctx context.Context) (*topodatapb.Tablet, error) {
rand.Seed(time.Now().UnixNano())
// keep trying at intervals (tabletPickerRetryDelay) until a tablet is found
// or the context is canceled
// Keep trying at intervals (tabletPickerRetryDelay) until a healthy
// serving tablet is found or the context is cancelled.
for {
select {
case <-ctx.Done():
Expand Down Expand Up @@ -330,15 +327,15 @@ func (tp *TabletPicker) PickForStreaming(ctx context.Context) (*topodatapb.Table
} else if tp.inOrder {
candidates = tp.orderByTabletType(candidates)
} else {
// Randomize candidates
// Randomize candidates.
rand.Shuffle(len(candidates), func(i, j int) {
candidates[i], candidates[j] = candidates[j], candidates[i]
})
}
if len(candidates) == 0 {
// if no candidates were found, sleep and try again
// If no viable candidates were found, sleep and try again.
tp.incNoTabletFoundStat()
log.Infof("No tablet found for streaming, shard %s.%s, cells %v, tabletTypes %v, sleeping for %.3f seconds",
log.Infof("No healthy serving tablet found for streaming, shard %s.%s, cells %v, tabletTypes %v, sleeping for %.3f seconds.",
tp.keyspace, tp.shard, tp.cells, tp.tabletTypes, float64(GetTabletPickerRetryDelay().Milliseconds())/1000.0)
timer := time.NewTimer(GetTabletPickerRetryDelay())
select {
Expand All @@ -349,34 +346,24 @@ func (tp *TabletPicker) PickForStreaming(ctx context.Context) (*topodatapb.Table
}
continue
}
for _, ti := range candidates {
// try to connect to tablet
if conn, err := tabletconn.GetDialer()(ti.Tablet, true); err == nil {
// OK to use ctx here because it is not actually used by the underlying Close implementation
_ = conn.Close(ctx)
log.Infof("tablet picker found tablet %s", ti.Tablet.String())
return ti.Tablet, nil
}
// err found
log.Warningf("unable to connect to tablet for alias %v", ti.Alias)
}
// Got here? Means we iterated all tablets and did not find a healthy one
tp.incNoTabletFoundStat()
log.Infof("Tablet picker found a healthy serving tablet for streaming: %s", candidates[0].Tablet.String())
return candidates[0].Tablet, nil
}
}

// GetMatchingTablets returns a list of TabletInfo for tablets
// that match the cells, keyspace, shard and tabletTypes for this TabletPicker
// GetMatchingTablets returns a list of TabletInfo for healthy
// serving tablets that match the cells, keyspace, shard and
// tabletTypes for this TabletPicker.
func (tp *TabletPicker) GetMatchingTablets(ctx context.Context) []*topo.TabletInfo {
// Special handling for PRIMARY tablet type
// Since there is only one primary, we ignore cell and find the primary
// Special handling for PRIMARY tablet type: since there is only
// one primary per shard, we ignore cell and find the primary.
aliases := make([]*topodatapb.TabletAlias, 0)
if len(tp.tabletTypes) == 1 && tp.tabletTypes[0] == topodatapb.TabletType_PRIMARY {
shortCtx, cancel := context.WithTimeout(ctx, topo.RemoteOperationTimeout)
defer cancel()
si, err := tp.ts.GetShard(shortCtx, tp.keyspace, tp.shard)
if err != nil {
log.Errorf("error getting shard %s/%s: %s", tp.keyspace, tp.shard, err.Error())
log.Errorf("Error getting shard %s/%s: %v", tp.keyspace, tp.shard, err)
return nil
}
if _, ignore := tp.ignoreTablets[si.PrimaryAlias.String()]; !ignore {
Expand All @@ -385,37 +372,37 @@ func (tp *TabletPicker) GetMatchingTablets(ctx context.Context) []*topo.TabletIn
} else {
actualCells := make([]string, 0)
for _, cell := range tp.cells {
// check if cell is actually an alias
// non-blocking read so that this is fast
// Check if cell is actually an alias; using a
// non-blocking read so that this is fast.
shortCtx, cancel := context.WithTimeout(ctx, topo.RemoteOperationTimeout)
defer cancel()
_, err := tp.ts.GetCellInfo(shortCtx, cell, false)
if err != nil {
// not a valid cell, check whether it is a cell alias
// Not a valid cell, check whether it is a cell alias...
shortCtx, cancel := context.WithTimeout(ctx, topo.RemoteOperationTimeout)
defer cancel()
alias, err := tp.ts.GetCellsAlias(shortCtx, cell, false)
// if we get an error, either cellAlias doesn't exist or it isn't a cell alias at all. Ignore and continue
// If we get an error, either cellAlias doesn't exist or
// it isn't a cell alias at all; ignore and continue.
if err == nil {
actualCells = append(actualCells, alias.Cells...)
} else {
log.Infof("Unable to resolve cell %s, ignoring", cell)
}
} else {
// valid cell, add it to our list
// Valid cell, add it to our list.
actualCells = append(actualCells, cell)
}
}

for _, cell := range actualCells {
shortCtx, cancel := context.WithTimeout(ctx, topo.RemoteOperationTimeout)
defer cancel()
// match cell, keyspace and shard
// Match cell, keyspace, and shard.
sri, err := tp.ts.GetShardReplication(shortCtx, cell, tp.keyspace, tp.shard)
if err != nil {
continue
}

for _, node := range sri.Nodes {
if _, ignore := tp.ignoreTablets[node.TabletAlias.String()]; !ignore {
aliases = append(aliases, node.TabletAlias)
Expand All @@ -427,33 +414,47 @@ func (tp *TabletPicker) GetMatchingTablets(ctx context.Context) []*topo.TabletIn
if len(aliases) == 0 {
return nil
}

shortCtx, cancel := context.WithTimeout(ctx, topo.RemoteOperationTimeout)
defer cancel()
tabletMap, err := tp.ts.GetTabletMap(shortCtx, aliases)
tabletMap, err := tp.ts.GetTabletMap(shortCtx, aliases, nil)
if err != nil {
log.Warningf("error fetching tablets from topo: %v", err)
// If we get a partial result we can still use it, otherwise return
log.Warningf("Error fetching tablets from topo: %v", err)
// If we get a partial result we can still use it, otherwise return.
if len(tabletMap) == 0 {
return nil
}
}

tablets := make([]*topo.TabletInfo, 0, len(aliases))
for _, tabletAlias := range aliases {
tabletInfo, ok := tabletMap[topoproto.TabletAliasString(tabletAlias)]
if !ok {
// Either tablet disappeared on us, or we got a partial result (GetTabletMap ignores
// topo.ErrNoNode). Just log a warning
log.Warningf("failed to load tablet %v", tabletAlias)
// Either tablet disappeared on us, or we got a partial result
// (GetTabletMap ignores topo.ErrNoNode); just log a warning.
log.Warningf("Tablet picker failed to load tablet %v", tabletAlias)
} else if topoproto.IsTypeInList(tabletInfo.Type, tp.tabletTypes) {
tablets = append(tablets, tabletInfo)
// Try to connect to the tablet and confirm that it's usable.
if conn, err := tabletconn.GetDialer()(tabletInfo.Tablet, grpcclient.FailFast(true)); err == nil {
// Ensure that the tablet is healthy and serving.
shortCtx, cancel := context.WithTimeout(ctx, topo.RemoteOperationTimeout)
defer cancel()
if err := conn.StreamHealth(shortCtx, func(shr *querypb.StreamHealthResponse) error {
if shr != nil && shr.Serving && shr.RealtimeStats != nil && shr.RealtimeStats.HealthError == "" {
return io.EOF // End the stream
}
return vterrors.New(vtrpcpb.Code_INTERNAL, "tablet is not healthy and serving")
}); err == nil || err == io.EOF {
tablets = append(tablets, tabletInfo)
}
_ = conn.Close(ctx)
}
}
}
return tablets
}

func init() {
// TODO(sougou): consolidate this call to be once per process.
rand.Seed(time.Now().UnixNano())
globalTPStats = newTabletPickerStats()
}

Expand Down
65 changes: 54 additions & 11 deletions go/vt/discovery/tablet_picker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,11 @@ import (
"github.com/stretchr/testify/require"
"google.golang.org/protobuf/proto"

querypb "vitess.io/vitess/go/vt/proto/query"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
"vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/topo/memorytopo"

querypb "vitess.io/vitess/go/vt/proto/query"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
)

func TestPickPrimary(t *testing.T) {
Expand Down Expand Up @@ -503,6 +504,45 @@ func TestPickErrorOnlySpecified(t *testing.T) {
require.Greater(t, globalTPStats.noTabletFoundError.Counts()["cell.ks.0.replica"], int64(0))
}

// TestPickFallbackType tests that when providing a list of tablet types to
// pick from, with the list in preference order, that when the primary/first
// type has no available healthy serving tablets that we select a healthy
// serving tablet from the secondary/second type.
func TestPickFallbackType(t *testing.T) {
cells := []string{"cell1", "cell2"}
localCell := cells[0]
tabletTypes := "replica,primary"
options := TabletPickerOptions{
TabletOrder: "InOrder",
}
te := newPickerTestEnv(t, cells)

// This one should be selected even though it's the secondary type
// as it is healthy and serving.
primaryTablet := addTablet(te, 100, topodatapb.TabletType_PRIMARY, localCell, true, true)
defer deleteTablet(t, te, primaryTablet)

// Replica tablet should not be selected as it is unhealthy.
replicaTablet := addTablet(te, 200, topodatapb.TabletType_REPLICA, localCell, false, false)
defer deleteTablet(t, te, replicaTablet)

ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond)
defer cancel()
_, err := te.topoServ.UpdateShardFields(ctx, te.keyspace, te.shard, func(si *topo.ShardInfo) error {
si.PrimaryAlias = primaryTablet.Alias
return nil
})
require.NoError(t, err)

tp, err := NewTabletPicker(context.Background(), te.topoServ, cells, localCell, te.keyspace, te.shard, tabletTypes, options)
require.NoError(t, err)
ctx2, cancel2 := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel2()
tablet, err := tp.PickForStreaming(ctx2)
require.NoError(t, err)
assert.True(t, proto.Equal(primaryTablet, tablet), "Pick: %v, want %v", tablet, primaryTablet)
}

type pickerTestEnv struct {
t *testing.T
keyspace string
Expand Down Expand Up @@ -551,18 +591,21 @@ func addTablet(te *pickerTestEnv, id int, tabletType topodatapb.TabletType, cell
err := te.topoServ.CreateTablet(context.Background(), tablet)
require.NoError(te.t, err)

shr := &querypb.StreamHealthResponse{
Serving: serving,
Target: &querypb.Target{
Keyspace: te.keyspace,
Shard: te.shard,
TabletType: tabletType,
},
RealtimeStats: &querypb.RealtimeStats{HealthError: "tablet is unhealthy"},
}
if healthy {
_ = createFixedHealthConn(tablet, &querypb.StreamHealthResponse{
Serving: serving,
Target: &querypb.Target{
Keyspace: te.keyspace,
Shard: te.shard,
TabletType: tabletType,
},
RealtimeStats: &querypb.RealtimeStats{HealthError: ""},
})
shr.RealtimeStats.HealthError = ""
}

_ = createFixedHealthConn(tablet, shr)

return tablet
}

Expand Down
Loading

0 comments on commit c45f303

Please sign in to comment.