Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use GetTabletsByCell in healthcheck #14693

Merged
merged 11 commits into from
Dec 12, 2023
Merged
6 changes: 4 additions & 2 deletions go/stats/counters.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,15 +62,17 @@ func (c *counters) set(name string, value int64) {
func (c *counters) reset() {
c.mu.Lock()
defer c.mu.Unlock()
c.counts = make(map[string]int64)
clear(c.counts)
}

// ZeroAll zeroes out all values
func (c *counters) ZeroAll() {
c.mu.Lock()
defer c.mu.Unlock()

clear(c.counts)
deepthi marked this conversation as resolved.
Show resolved Hide resolved
for k := range c.counts {
c.counts[k] = 0
}
}

// Counts returns a copy of the Counters' map.
Expand Down
8 changes: 4 additions & 4 deletions go/vt/discovery/healthcheck.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ limitations under the License.
// Alternatively, use a Watcher implementation which will constantly watch
// a source (e.g. the topology) and add and remove tablets as they are
// added or removed from the source.
// For a Watcher example have a look at NewCellTabletsWatcher().
// For a Watcher example have a look at NewTopologyWatcher().
//
// Internally, the HealthCheck module is connected to each tablet and has a
// streaming RPC (StreamHealth) open to receive periodic health infos.
Expand Down Expand Up @@ -88,7 +88,7 @@ var (
refreshKnownTablets = true

// topoReadConcurrency tells us how many topo reads are allowed in parallel.
topoReadConcurrency = 32
topoReadConcurrency int64 = 32

// How much to sleep between each check.
waitAvailableTabletInterval = 100 * time.Millisecond
Expand Down Expand Up @@ -176,7 +176,7 @@ func registerWebUIFlags(fs *pflag.FlagSet) {
fs.StringVar(&TabletURLTemplateString, "tablet_url_template", "http://{{.GetTabletHostPort}}", "Format string describing debug tablet url formatting. See getTabletDebugURL() for how to customize this.")
fs.DurationVar(&refreshInterval, "tablet_refresh_interval", 1*time.Minute, "Tablet refresh interval.")
fs.BoolVar(&refreshKnownTablets, "tablet_refresh_known_tablets", true, "Whether to reload the tablet's address/port map from topo in case they change.")
fs.IntVar(&topoReadConcurrency, "topo_read_concurrency", 32, "Concurrency of topo reads.")
fs.Int64Var(&topoReadConcurrency, "topo_read_concurrency", 32, "Concurrency of topo reads.")
ParseTabletURLTemplateFromFlag()
}

Expand Down Expand Up @@ -362,7 +362,7 @@ func NewHealthCheck(ctx context.Context, retryDelay, healthCheckTimeout time.Dur
} else if len(KeyspacesToWatch) > 0 {
filter = NewFilterByKeyspace(KeyspacesToWatch)
}
topoWatchers = append(topoWatchers, NewCellTabletsWatcher(ctx, topoServer, hc, filter, c, refreshInterval, refreshKnownTablets, topoReadConcurrency))
topoWatchers = append(topoWatchers, NewTopologyWatcher(ctx, topoServer, hc, filter, c, refreshInterval, refreshKnownTablets, topoReadConcurrency))
}

hc.topoWatchers = topoWatchers
Expand Down
2 changes: 1 addition & 1 deletion go/vt/discovery/tablet_picker.go
Original file line number Diff line number Diff line change
Expand Up @@ -428,7 +428,7 @@ func (tp *TabletPicker) GetMatchingTablets(ctx context.Context) []*topo.TabletIn

shortCtx, cancel := context.WithTimeout(ctx, topo.RemoteOperationTimeout)
defer cancel()
tabletMap, err := tp.ts.GetTabletMap(shortCtx, aliases)
tabletMap, err := tp.ts.GetTabletMap(shortCtx, aliases, nil)
if err != nil {
log.Warningf("Error fetching tablets from topo: %v", err)
// If we get a partial result we can still use it, otherwise return.
Expand Down
83 changes: 25 additions & 58 deletions go/vt/discovery/topology_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,7 @@ type TopologyWatcher struct {
cell string
refreshInterval time.Duration
refreshKnownTablets bool
getTablets func(tw *TopologyWatcher) ([]*topodata.TabletAlias, error)
sem chan int
concurrency int64
ctx context.Context
cancelFunc context.CancelFunc
// wg keeps track of all launched Go routines.
Expand All @@ -92,34 +91,28 @@ type TopologyWatcher struct {
}

// NewTopologyWatcher returns a TopologyWatcher that monitors all
// the tablets that it is configured to watch, and reloads them periodically if needed.
// As of now there is only one implementation: watch all tablets in a cell.
func NewTopologyWatcher(ctx context.Context, topoServer *topo.Server, hc HealthCheck, filter TabletFilter, cell string, refreshInterval time.Duration, refreshKnownTablets bool, topoReadConcurrency int, getTablets func(tw *TopologyWatcher) ([]*topodata.TabletAlias, error)) *TopologyWatcher {
// the tablets in a cell, and reloads them as needed.
func NewTopologyWatcher(ctx context.Context, topoServer *topo.Server, hc HealthCheck, f TabletFilter, cell string, refreshInterval time.Duration, refreshKnownTablets bool, topoReadConcurrency int64) *TopologyWatcher {
tw := &TopologyWatcher{
topoServer: topoServer,
healthcheck: hc,
tabletFilter: filter,
tabletFilter: f,
cell: cell,
refreshInterval: refreshInterval,
refreshKnownTablets: refreshKnownTablets,
getTablets: getTablets,
sem: make(chan int, topoReadConcurrency),
concurrency: topoReadConcurrency,
tablets: make(map[string]*tabletInfo),
}
tw.firstLoadChan = make(chan struct{})

// We want the span from the context, but not the cancelation that comes with it
// We want the span from the context, but not the cancellation that comes with it
spanContext := trace.CopySpan(context.Background(), ctx)
tw.ctx, tw.cancelFunc = context.WithCancel(spanContext)
return tw
}

// NewCellTabletsWatcher returns a TopologyWatcher that monitors all
// the tablets in a cell, and reloads them as needed.
func NewCellTabletsWatcher(ctx context.Context, topoServer *topo.Server, hc HealthCheck, f TabletFilter, cell string, refreshInterval time.Duration, refreshKnownTablets bool, topoReadConcurrency int) *TopologyWatcher {
return NewTopologyWatcher(ctx, topoServer, hc, f, cell, refreshInterval, refreshKnownTablets, topoReadConcurrency, func(tw *TopologyWatcher) ([]*topodata.TabletAlias, error) {
return tw.topoServer.GetTabletAliasesByCell(ctx, tw.cell)
})
func (tw *TopologyWatcher) getTablets() ([]*topo.TabletInfo, error) {
return tw.topoServer.GetTabletsByCell(tw.ctx, tw.cell, &topo.GetTabletsByCellOptions{Concurrency: tw.concurrency})
}

// Start starts the topology watcher.
Expand Down Expand Up @@ -149,30 +142,31 @@ func (tw *TopologyWatcher) Stop() {
}

func (tw *TopologyWatcher) loadTablets() {
var wg sync.WaitGroup
newTablets := make(map[string]*tabletInfo)

// First get the list of relevant tabletAliases.
tabletAliases, err := tw.getTablets(tw)
// First get the list of all tablets.
tabletInfos, err := tw.getTablets()
topologyWatcherOperations.Add(topologyWatcherOpListTablets, 1)
if err != nil {
topologyWatcherErrors.Add(topologyWatcherOpListTablets, 1)
select {
case <-tw.ctx.Done():
// If we get a partial result error, we just log it and process the tablets that we did manage to fetch.
if topo.IsErrType(err, topo.PartialResult) {
log.Errorf("received partial result from getTablets for cell %v: %v", tw.cell, err)
} else { // For all other errors, just return.
log.Errorf("error getting tablets for cell: %v: %v", tw.cell, err)
return
default:
}
log.Errorf("cannot get tablets for cell: %v: %v", tw.cell, err)
return
}

// Accumulate a list of all known alias strings to use later
// when sorting.
tabletAliasStrs := make([]string, 0, len(tabletAliases))
tabletAliasStrs := make([]string, 0, len(tabletInfos))

tw.mu.Lock()
deepthi marked this conversation as resolved.
Show resolved Hide resolved
for _, tAlias := range tabletAliases {
aliasStr := topoproto.TabletAliasString(tAlias)
defer tw.mu.Unlock()

for _, tInfo := range tabletInfos {
aliasStr := topoproto.TabletAliasString(tInfo.Alias)
tabletAliasStrs = append(tabletAliasStrs, aliasStr)

if !tw.refreshKnownTablets {
Expand All @@ -182,38 +176,13 @@ func (tw *TopologyWatcher) loadTablets() {
continue
}
}

wg.Add(1)
go func(alias *topodata.TabletAlias) {
defer wg.Done()
tw.sem <- 1 // Wait for active queue to drain.
tablet, err := tw.topoServer.GetTablet(tw.ctx, alias)
topologyWatcherOperations.Add(topologyWatcherOpGetTablet, 1)
<-tw.sem // Done; enable next request to run.
if err != nil {
topologyWatcherErrors.Add(topologyWatcherOpGetTablet, 1)
select {
case <-tw.ctx.Done():
return
default:
}
log.Errorf("cannot get tablet for alias %v: %v", alias, err)
return
}
tw.mu.Lock()
aliasStr := topoproto.TabletAliasString(alias)
newTablets[aliasStr] = &tabletInfo{
alias: aliasStr,
tablet: tablet.Tablet,
}
tw.mu.Unlock()
}(tAlias)
// There's no network call here, so we just do the tablets one at a time instead of in parallel goroutines.
newTablets[aliasStr] = &tabletInfo{
alias: aliasStr,
tablet: tInfo.Tablet,
}
}

tw.mu.Unlock()
wg.Wait()
tw.mu.Lock()

for alias, newVal := range newTablets {
if tw.tabletFilter != nil && !tw.tabletFilter.IsIncluded(newVal.tablet) {
continue
Expand Down Expand Up @@ -266,8 +235,6 @@ func (tw *TopologyWatcher) loadTablets() {
tw.topoChecksum = crc32.ChecksumIEEE(buf.Bytes())
tw.lastRefresh = time.Now()

tw.mu.Unlock()

}

// RefreshLag returns the time since the last refresh.
Expand Down
Loading
Loading