diff --git a/go/stats/counters.go b/go/stats/counters.go index e79da39c48b..bcf7fc3a8b6 100644 --- a/go/stats/counters.go +++ b/go/stats/counters.go @@ -62,7 +62,7 @@ func (c *counters) set(name string, value int64) { func (c *counters) reset() { c.mu.Lock() defer c.mu.Unlock() - c.counts = make(map[string]int64) + clear(c.counts) } // ZeroAll zeroes out all values @@ -70,7 +70,9 @@ func (c *counters) ZeroAll() { c.mu.Lock() defer c.mu.Unlock() - clear(c.counts) + for k := range c.counts { + c.counts[k] = 0 + } } // Counts returns a copy of the Counters' map. diff --git a/go/vt/discovery/healthcheck.go b/go/vt/discovery/healthcheck.go index 20e16875748..5e25be60f4f 100644 --- a/go/vt/discovery/healthcheck.go +++ b/go/vt/discovery/healthcheck.go @@ -25,7 +25,7 @@ limitations under the License. // Alternatively, use a Watcher implementation which will constantly watch // a source (e.g. the topology) and add and remove tablets as they are // added or removed from the source. -// For a Watcher example have a look at NewCellTabletsWatcher(). +// For a Watcher example have a look at NewTopologyWatcher(). // // Internally, the HealthCheck module is connected to each tablet and has a // streaming RPC (StreamHealth) open to receive periodic health infos. @@ -88,7 +88,7 @@ var ( refreshKnownTablets = true // topoReadConcurrency tells us how many topo reads are allowed in parallel. - topoReadConcurrency = 32 + topoReadConcurrency int64 = 32 // How much to sleep between each check. waitAvailableTabletInterval = 100 * time.Millisecond @@ -176,7 +176,7 @@ func registerWebUIFlags(fs *pflag.FlagSet) { fs.StringVar(&TabletURLTemplateString, "tablet_url_template", "http://{{.GetTabletHostPort}}", "Format string describing debug tablet url formatting. See getTabletDebugURL() for how to customize this.") fs.DurationVar(&refreshInterval, "tablet_refresh_interval", 1*time.Minute, "Tablet refresh interval.") fs.BoolVar(&refreshKnownTablets, "tablet_refresh_known_tablets", true, "Whether to reload the tablet's address/port map from topo in case they change.") - fs.IntVar(&topoReadConcurrency, "topo_read_concurrency", 32, "Concurrency of topo reads.") + fs.Int64Var(&topoReadConcurrency, "topo_read_concurrency", 32, "Concurrency of topo reads.") ParseTabletURLTemplateFromFlag() } @@ -362,7 +362,7 @@ func NewHealthCheck(ctx context.Context, retryDelay, healthCheckTimeout time.Dur } else if len(KeyspacesToWatch) > 0 { filter = NewFilterByKeyspace(KeyspacesToWatch) } - topoWatchers = append(topoWatchers, NewCellTabletsWatcher(ctx, topoServer, hc, filter, c, refreshInterval, refreshKnownTablets, topoReadConcurrency)) + topoWatchers = append(topoWatchers, NewTopologyWatcher(ctx, topoServer, hc, filter, c, refreshInterval, refreshKnownTablets, topoReadConcurrency)) } hc.topoWatchers = topoWatchers diff --git a/go/vt/discovery/tablet_picker.go b/go/vt/discovery/tablet_picker.go index a507528d3a2..7525ab82dfc 100644 --- a/go/vt/discovery/tablet_picker.go +++ b/go/vt/discovery/tablet_picker.go @@ -428,7 +428,7 @@ func (tp *TabletPicker) GetMatchingTablets(ctx context.Context) []*topo.TabletIn shortCtx, cancel := context.WithTimeout(ctx, topo.RemoteOperationTimeout) defer cancel() - tabletMap, err := tp.ts.GetTabletMap(shortCtx, aliases) + tabletMap, err := tp.ts.GetTabletMap(shortCtx, aliases, nil) if err != nil { log.Warningf("Error fetching tablets from topo: %v", err) // If we get a partial result we can still use it, otherwise return. diff --git a/go/vt/discovery/topology_watcher.go b/go/vt/discovery/topology_watcher.go index b3298f55700..dbee11c1610 100644 --- a/go/vt/discovery/topology_watcher.go +++ b/go/vt/discovery/topology_watcher.go @@ -70,8 +70,7 @@ type TopologyWatcher struct { cell string refreshInterval time.Duration refreshKnownTablets bool - getTablets func(tw *TopologyWatcher) ([]*topodata.TabletAlias, error) - sem chan int + concurrency int64 ctx context.Context cancelFunc context.CancelFunc // wg keeps track of all launched Go routines. @@ -92,34 +91,28 @@ type TopologyWatcher struct { } // NewTopologyWatcher returns a TopologyWatcher that monitors all -// the tablets that it is configured to watch, and reloads them periodically if needed. -// As of now there is only one implementation: watch all tablets in a cell. -func NewTopologyWatcher(ctx context.Context, topoServer *topo.Server, hc HealthCheck, filter TabletFilter, cell string, refreshInterval time.Duration, refreshKnownTablets bool, topoReadConcurrency int, getTablets func(tw *TopologyWatcher) ([]*topodata.TabletAlias, error)) *TopologyWatcher { +// the tablets in a cell, and reloads them as needed. +func NewTopologyWatcher(ctx context.Context, topoServer *topo.Server, hc HealthCheck, f TabletFilter, cell string, refreshInterval time.Duration, refreshKnownTablets bool, topoReadConcurrency int64) *TopologyWatcher { tw := &TopologyWatcher{ topoServer: topoServer, healthcheck: hc, - tabletFilter: filter, + tabletFilter: f, cell: cell, refreshInterval: refreshInterval, refreshKnownTablets: refreshKnownTablets, - getTablets: getTablets, - sem: make(chan int, topoReadConcurrency), + concurrency: topoReadConcurrency, tablets: make(map[string]*tabletInfo), } tw.firstLoadChan = make(chan struct{}) - // We want the span from the context, but not the cancelation that comes with it + // We want the span from the context, but not the cancellation that comes with it spanContext := trace.CopySpan(context.Background(), ctx) tw.ctx, tw.cancelFunc = context.WithCancel(spanContext) return tw } -// NewCellTabletsWatcher returns a TopologyWatcher that monitors all -// the tablets in a cell, and reloads them as needed. -func NewCellTabletsWatcher(ctx context.Context, topoServer *topo.Server, hc HealthCheck, f TabletFilter, cell string, refreshInterval time.Duration, refreshKnownTablets bool, topoReadConcurrency int) *TopologyWatcher { - return NewTopologyWatcher(ctx, topoServer, hc, f, cell, refreshInterval, refreshKnownTablets, topoReadConcurrency, func(tw *TopologyWatcher) ([]*topodata.TabletAlias, error) { - return tw.topoServer.GetTabletAliasesByCell(ctx, tw.cell) - }) +func (tw *TopologyWatcher) getTablets() ([]*topo.TabletInfo, error) { + return tw.topoServer.GetTabletsByCell(tw.ctx, tw.cell, &topo.GetTabletsByCellOptions{Concurrency: tw.concurrency}) } // Start starts the topology watcher. @@ -149,30 +142,31 @@ func (tw *TopologyWatcher) Stop() { } func (tw *TopologyWatcher) loadTablets() { - var wg sync.WaitGroup newTablets := make(map[string]*tabletInfo) - // First get the list of relevant tabletAliases. - tabletAliases, err := tw.getTablets(tw) + // First get the list of all tablets. + tabletInfos, err := tw.getTablets() topologyWatcherOperations.Add(topologyWatcherOpListTablets, 1) if err != nil { topologyWatcherErrors.Add(topologyWatcherOpListTablets, 1) - select { - case <-tw.ctx.Done(): + // If we get a partial result error, we just log it and process the tablets that we did manage to fetch. + if topo.IsErrType(err, topo.PartialResult) { + log.Errorf("received partial result from getTablets for cell %v: %v", tw.cell, err) + } else { // For all other errors, just return. + log.Errorf("error getting tablets for cell: %v: %v", tw.cell, err) return - default: } - log.Errorf("cannot get tablets for cell: %v: %v", tw.cell, err) - return } // Accumulate a list of all known alias strings to use later // when sorting. - tabletAliasStrs := make([]string, 0, len(tabletAliases)) + tabletAliasStrs := make([]string, 0, len(tabletInfos)) tw.mu.Lock() - for _, tAlias := range tabletAliases { - aliasStr := topoproto.TabletAliasString(tAlias) + defer tw.mu.Unlock() + + for _, tInfo := range tabletInfos { + aliasStr := topoproto.TabletAliasString(tInfo.Alias) tabletAliasStrs = append(tabletAliasStrs, aliasStr) if !tw.refreshKnownTablets { @@ -182,38 +176,13 @@ func (tw *TopologyWatcher) loadTablets() { continue } } - - wg.Add(1) - go func(alias *topodata.TabletAlias) { - defer wg.Done() - tw.sem <- 1 // Wait for active queue to drain. - tablet, err := tw.topoServer.GetTablet(tw.ctx, alias) - topologyWatcherOperations.Add(topologyWatcherOpGetTablet, 1) - <-tw.sem // Done; enable next request to run. - if err != nil { - topologyWatcherErrors.Add(topologyWatcherOpGetTablet, 1) - select { - case <-tw.ctx.Done(): - return - default: - } - log.Errorf("cannot get tablet for alias %v: %v", alias, err) - return - } - tw.mu.Lock() - aliasStr := topoproto.TabletAliasString(alias) - newTablets[aliasStr] = &tabletInfo{ - alias: aliasStr, - tablet: tablet.Tablet, - } - tw.mu.Unlock() - }(tAlias) + // There's no network call here, so we just do the tablets one at a time instead of in parallel goroutines. + newTablets[aliasStr] = &tabletInfo{ + alias: aliasStr, + tablet: tInfo.Tablet, + } } - tw.mu.Unlock() - wg.Wait() - tw.mu.Lock() - for alias, newVal := range newTablets { if tw.tabletFilter != nil && !tw.tabletFilter.IsIncluded(newVal.tablet) { continue @@ -266,8 +235,6 @@ func (tw *TopologyWatcher) loadTablets() { tw.topoChecksum = crc32.ChecksumIEEE(buf.Bytes()) tw.lastRefresh = time.Now() - tw.mu.Unlock() - } // RefreshLag returns the time since the last refresh. diff --git a/go/vt/discovery/topology_watcher_test.go b/go/vt/discovery/topology_watcher_test.go index 3ac567acef8..bdf2c2dd2da 100644 --- a/go/vt/discovery/topology_watcher_test.go +++ b/go/vt/discovery/topology_watcher_test.go @@ -65,7 +65,7 @@ func TestStartAndCloseTopoWatcher(t *testing.T) { fhc := NewFakeHealthCheck(nil) defer fhc.Close() topologyWatcherOperations.ZeroAll() - tw := NewCellTabletsWatcher(context.Background(), ts, fhc, nil, "aa", 100*time.Microsecond, true, 5) + tw := NewTopologyWatcher(context.Background(), ts, fhc, nil, "aa", 100*time.Microsecond, true, 5) done := make(chan bool, 3) result := make(chan bool, 1) @@ -102,9 +102,8 @@ func TestStartAndCloseTopoWatcher(t *testing.T) { done <- true _, ok := <-result - if !ok { - t.Fatal("timed out") - } + require.True(t, ok, "timed out") + } func TestCellTabletsWatcher(t *testing.T) { @@ -125,7 +124,7 @@ func checkWatcher(t *testing.T, refreshKnownTablets bool) { logger := logutil.NewMemoryLogger() topologyWatcherOperations.ZeroAll() counts := topologyWatcherOperations.Counts() - tw := NewCellTabletsWatcher(context.Background(), ts, fhc, nil, "aa", 10*time.Minute, refreshKnownTablets, 5) + tw := NewTopologyWatcher(context.Background(), ts, fhc, nil, "aa", 10*time.Minute, refreshKnownTablets, 5) counts = checkOpCounts(t, counts, map[string]int64{}) checkChecksum(t, tw, 0) @@ -143,19 +142,18 @@ func checkWatcher(t *testing.T, refreshKnownTablets bool) { Keyspace: "keyspace", Shard: "shard", } - if err := ts.CreateTablet(context.Background(), tablet); err != nil { - t.Fatalf("CreateTablet failed: %v", err) - } + require.NoError(t, ts.CreateTablet(context.Background(), tablet), "CreateTablet failed for %v", tablet.Alias) + tw.loadTablets() - counts = checkOpCounts(t, counts, map[string]int64{"ListTablets": 1, "GetTablet": 1, "AddTablet": 1}) + counts = checkOpCounts(t, counts, map[string]int64{"ListTablets": 1, "GetTablet": 0, "AddTablet": 1}) checkChecksum(t, tw, 3238442862) // Check the tablet is returned by GetAllTablets(). allTablets := fhc.GetAllTablets() key := TabletToMapKey(tablet) - if _, ok := allTablets[key]; !ok || len(allTablets) != 1 || !proto.Equal(allTablets[key], tablet) { - t.Errorf("fhc.GetAllTablets() = %+v; want %+v", allTablets, tablet) - } + assert.Len(t, allTablets, 1) + assert.Contains(t, allTablets, key) + assert.True(t, proto.Equal(tablet, allTablets[key])) // Add a second tablet to the topology. tablet2 := &topodatapb.Tablet{ @@ -170,75 +168,51 @@ func checkWatcher(t *testing.T, refreshKnownTablets bool) { Keyspace: "keyspace", Shard: "shard", } - if err := ts.CreateTablet(context.Background(), tablet2); err != nil { - t.Fatalf("CreateTablet failed: %v", err) - } + require.NoError(t, ts.CreateTablet(context.Background(), tablet2), "CreateTablet failed for %v", tablet2.Alias) tw.loadTablets() - // If refreshKnownTablets is disabled, only the new tablet is read - // from the topo - if refreshKnownTablets { - counts = checkOpCounts(t, counts, map[string]int64{"ListTablets": 1, "GetTablet": 2, "AddTablet": 1}) - } else { - counts = checkOpCounts(t, counts, map[string]int64{"ListTablets": 1, "GetTablet": 1, "AddTablet": 1}) - } + counts = checkOpCounts(t, counts, map[string]int64{"ListTablets": 1, "GetTablet": 0, "AddTablet": 1}) checkChecksum(t, tw, 2762153755) // Check the new tablet is returned by GetAllTablets(). allTablets = fhc.GetAllTablets() key = TabletToMapKey(tablet2) - if _, ok := allTablets[key]; !ok || len(allTablets) != 2 || !proto.Equal(allTablets[key], tablet2) { - t.Errorf("fhc.GetAllTablets() = %+v; want %+v", allTablets, tablet2) - } - - // Load the tablets again to show that when refreshKnownTablets is disabled, - // only the list is read from the topo and the checksum doesn't change - tw.loadTablets() - if refreshKnownTablets { - counts = checkOpCounts(t, counts, map[string]int64{"ListTablets": 1, "GetTablet": 2}) - } else { - counts = checkOpCounts(t, counts, map[string]int64{"ListTablets": 1}) - } - checkChecksum(t, tw, 2762153755) + assert.Len(t, allTablets, 2) + assert.Contains(t, allTablets, key) + assert.True(t, proto.Equal(tablet2, allTablets[key])) // same tablet, different port, should update (previous // one should go away, new one be added) // // if refreshKnownTablets is disabled, this case is *not* - // detected and the tablet remains in the topo using the + // detected and the tablet remains in the healthcheck using the // old key origTablet := tablet.CloneVT() origKey := TabletToMapKey(tablet) tablet.PortMap["vt"] = 456 - if _, err := ts.UpdateTabletFields(context.Background(), tablet.Alias, func(t *topodatapb.Tablet) error { + _, err := ts.UpdateTabletFields(context.Background(), tablet.Alias, func(t *topodatapb.Tablet) error { t.PortMap["vt"] = 456 return nil - }); err != nil { - t.Fatalf("UpdateTabletFields failed: %v", err) - } + }) + require.Nil(t, err, "UpdateTabletFields failed") + tw.loadTablets() allTablets = fhc.GetAllTablets() key = TabletToMapKey(tablet) if refreshKnownTablets { - counts = checkOpCounts(t, counts, map[string]int64{"ListTablets": 1, "GetTablet": 2, "ReplaceTablet": 1}) - - if _, ok := allTablets[key]; !ok || len(allTablets) != 2 || !proto.Equal(allTablets[key], tablet) { - t.Errorf("fhc.GetAllTablets() = %+v; want %+v", allTablets, tablet) - } - if _, ok := allTablets[origKey]; ok { - t.Errorf("fhc.GetAllTablets() = %+v; don't want %v", allTablets, origKey) - } + counts = checkOpCounts(t, counts, map[string]int64{"ListTablets": 1, "GetTablet": 0, "ReplaceTablet": 1}) + assert.Len(t, allTablets, 2) + assert.Contains(t, allTablets, key) + assert.True(t, proto.Equal(tablet, allTablets[key])) + assert.NotContains(t, allTablets, origKey) checkChecksum(t, tw, 2762153755) } else { - counts = checkOpCounts(t, counts, map[string]int64{"ListTablets": 1}) - - if _, ok := allTablets[origKey]; !ok || len(allTablets) != 2 || !proto.Equal(allTablets[origKey], origTablet) { - t.Errorf("fhc.GetAllTablets() = %+v; want %+v", allTablets, origTablet) - } - if _, ok := allTablets[key]; ok { - t.Errorf("fhc.GetAllTablets() = %+v; don't want %v", allTablets, key) - } + counts = checkOpCounts(t, counts, map[string]int64{"ListTablets": 1, "GetTablet": 0, "ReplaceTablet": 0}) + assert.Len(t, allTablets, 2) + assert.Contains(t, allTablets, origKey) + assert.True(t, proto.Equal(origTablet, allTablets[origKey])) + assert.NotContains(t, allTablets, key) checkChecksum(t, tw, 2762153755) } @@ -248,94 +222,77 @@ func checkWatcher(t *testing.T, refreshKnownTablets bool) { if refreshKnownTablets { origTablet := tablet.CloneVT() origTablet2 := tablet2.CloneVT() - if _, err := ts.UpdateTabletFields(context.Background(), tablet2.Alias, func(t *topodatapb.Tablet) error { + _, err := ts.UpdateTabletFields(context.Background(), tablet2.Alias, func(t *topodatapb.Tablet) error { t.Hostname = tablet.Hostname t.PortMap = tablet.PortMap tablet2 = t return nil - }); err != nil { - t.Fatalf("UpdateTabletFields failed: %v", err) - } - if _, err := ts.UpdateTabletFields(context.Background(), tablet.Alias, func(t *topodatapb.Tablet) error { + }) + require.Nil(t, err, "UpdateTabletFields failed") + _, err = ts.UpdateTabletFields(context.Background(), tablet.Alias, func(t *topodatapb.Tablet) error { t.Hostname = "host3" tablet = t return nil - }); err != nil { - t.Fatalf("UpdateTabletFields failed: %v", err) - } + }) + require.Nil(t, err, "UpdateTabletFields failed") tw.loadTablets() - counts = checkOpCounts(t, counts, map[string]int64{"ListTablets": 1, "GetTablet": 2, "ReplaceTablet": 2}) + counts = checkOpCounts(t, counts, map[string]int64{"ListTablets": 1, "GetTablet": 0, "ReplaceTablet": 2}) allTablets = fhc.GetAllTablets() key2 := TabletToMapKey(tablet2) - if _, ok := allTablets[key2]; !ok { - t.Fatalf("tablet was lost because it's reusing an address recently used by another tablet: %v", key2) - } + assert.Contains(t, allTablets, key2, "tablet was lost because it's reusing an address recently used by another tablet: %v", key2) // Change tablets back to avoid altering later tests. - if _, err := ts.UpdateTabletFields(context.Background(), tablet2.Alias, func(t *topodatapb.Tablet) error { + _, err = ts.UpdateTabletFields(context.Background(), tablet2.Alias, func(t *topodatapb.Tablet) error { t.Hostname = origTablet2.Hostname t.PortMap = origTablet2.PortMap tablet2 = t return nil - }); err != nil { - t.Fatalf("UpdateTabletFields failed: %v", err) - } - if _, err := ts.UpdateTabletFields(context.Background(), tablet.Alias, func(t *topodatapb.Tablet) error { + }) + require.Nil(t, err, "UpdateTabletFields failed") + + _, err = ts.UpdateTabletFields(context.Background(), tablet.Alias, func(t *topodatapb.Tablet) error { t.Hostname = origTablet.Hostname tablet = t return nil - }); err != nil { - t.Fatalf("UpdateTabletFields failed: %v", err) - } + }) + require.Nil(t, err, "UpdateTabletFields failed") + tw.loadTablets() - counts = checkOpCounts(t, counts, map[string]int64{"ListTablets": 1, "GetTablet": 2, "ReplaceTablet": 2}) + counts = checkOpCounts(t, counts, map[string]int64{"ListTablets": 1, "GetTablet": 0, "ReplaceTablet": 2}) } // Remove the tablet and check that it is detected as being gone. - if err := ts.DeleteTablet(context.Background(), tablet.Alias); err != nil { - t.Fatalf("DeleteTablet failed: %v", err) - } - if _, err := topo.FixShardReplication(context.Background(), ts, logger, "aa", "keyspace", "shard"); err != nil { - t.Fatalf("FixShardReplication failed: %v", err) - } + require.NoError(t, ts.DeleteTablet(context.Background(), tablet.Alias)) + + _, err = topo.FixShardReplication(context.Background(), ts, logger, "aa", "keyspace", "shard") + require.Nil(t, err, "FixShardReplication failed") tw.loadTablets() - if refreshKnownTablets { - counts = checkOpCounts(t, counts, map[string]int64{"ListTablets": 1, "GetTablet": 1, "RemoveTablet": 1}) - } else { - counts = checkOpCounts(t, counts, map[string]int64{"ListTablets": 1, "RemoveTablet": 1}) - } + counts = checkOpCounts(t, counts, map[string]int64{"ListTablets": 1, "GetTablet": 0, "RemoveTablet": 1}) checkChecksum(t, tw, 789108290) allTablets = fhc.GetAllTablets() + assert.Len(t, allTablets, 1) key = TabletToMapKey(tablet) - if _, ok := allTablets[key]; ok || len(allTablets) != 1 { - t.Errorf("fhc.GetAllTablets() = %+v; don't want %v", allTablets, key) - } + assert.NotContains(t, allTablets, key) + key = TabletToMapKey(tablet2) - if _, ok := allTablets[key]; !ok || len(allTablets) != 1 || !proto.Equal(allTablets[key], tablet2) { - t.Errorf("fhc.GetAllTablets() = %+v; want %+v", allTablets, tablet2) - } + assert.Contains(t, allTablets, key) + assert.True(t, proto.Equal(tablet2, allTablets[key])) // Remove the other and check that it is detected as being gone. - if err := ts.DeleteTablet(context.Background(), tablet2.Alias); err != nil { - t.Fatalf("DeleteTablet failed: %v", err) - } - if _, err := topo.FixShardReplication(context.Background(), ts, logger, "aa", "keyspace", "shard"); err != nil { - t.Fatalf("FixShardReplication failed: %v", err) - } + require.NoError(t, ts.DeleteTablet(context.Background(), tablet2.Alias)) + _, err = topo.FixShardReplication(context.Background(), ts, logger, "aa", "keyspace", "shard") + require.Nil(t, err, "FixShardReplication failed") tw.loadTablets() checkOpCounts(t, counts, map[string]int64{"ListTablets": 1, "GetTablet": 0, "RemoveTablet": 1}) checkChecksum(t, tw, 0) allTablets = fhc.GetAllTablets() + assert.Len(t, allTablets, 0) key = TabletToMapKey(tablet) - if _, ok := allTablets[key]; ok || len(allTablets) != 0 { - t.Errorf("fhc.GetAllTablets() = %+v; don't want %v", allTablets, key) - } + assert.NotContains(t, allTablets, key) key = TabletToMapKey(tablet2) - if _, ok := allTablets[key]; ok || len(allTablets) != 0 { - t.Errorf("fhc.GetAllTablets() = %+v; don't want %v", allTablets, key) - } + assert.NotContains(t, allTablets, key) tw.Stop() } @@ -402,19 +359,13 @@ func TestFilterByShard(t *testing.T) { for _, tc := range testcases { fbs, err := NewFilterByShard(tc.filters) - if err != nil { - t.Errorf("cannot create FilterByShard for filters %v: %v", tc.filters, err) - } + require.Nil(t, err, "cannot create FilterByShard for filters %v", tc.filters) tablet := &topodatapb.Tablet{ Keyspace: tc.keyspace, Shard: tc.shard, } - - got := fbs.IsIncluded(tablet) - if got != tc.included { - t.Errorf("isIncluded(%v,%v) for filters %v returned %v but expected %v", tc.keyspace, tc.shard, tc.filters, got, tc.included) - } + require.Equal(t, tc.included, fbs.IsIncluded(tablet)) } } @@ -444,7 +395,7 @@ func TestFilterByKeyspace(t *testing.T) { f := NewFilterByKeyspace(testKeyspacesToWatch) ts := memorytopo.NewServer(ctx, testCell) defer ts.Close() - tw := NewCellTabletsWatcher(context.Background(), ts, hc, f, testCell, 10*time.Minute, true, 5) + tw := NewTopologyWatcher(context.Background(), ts, hc, f, testCell, 10*time.Minute, true, 5) for _, test := range testFilterByKeyspace { // Add a new tablet to the topology. @@ -462,22 +413,21 @@ func TestFilterByKeyspace(t *testing.T) { Shard: testShard, } - got := f.IsIncluded(tablet) - if got != test.expected { - t.Errorf("isIncluded(%v) for keyspace %v returned %v but expected %v", test.keyspace, test.keyspace, got, test.expected) - } + assert.Equal(t, test.expected, f.IsIncluded(tablet)) - if err := ts.CreateTablet(context.Background(), tablet); err != nil { - t.Errorf("CreateTablet failed: %v", err) - } + // Make this fatal because there is no point continuing if CreateTablet fails + require.NoError(t, ts.CreateTablet(context.Background(), tablet)) tw.loadTablets() key := TabletToMapKey(tablet) allTablets := hc.GetAllTablets() - if _, ok := allTablets[key]; ok != test.expected && proto.Equal(allTablets[key], tablet) != test.expected { - t.Errorf("Error adding tablet - got %v; want %v", ok, test.expected) + if test.expected { + assert.Contains(t, allTablets, key) + } else { + assert.NotContains(t, allTablets, key) } + assert.Equal(t, test.expected, proto.Equal(tablet, allTablets[key])) // Replace the tablet we added above tabletReplacement := &topodatapb.Tablet{ @@ -492,35 +442,31 @@ func TestFilterByKeyspace(t *testing.T) { Keyspace: test.keyspace, Shard: testShard, } - got = f.IsIncluded(tabletReplacement) - if got != test.expected { - t.Errorf("isIncluded(%v) for keyspace %v returned %v but expected %v", test.keyspace, test.keyspace, got, test.expected) - } - if err := ts.CreateTablet(context.Background(), tabletReplacement); err != nil { - t.Errorf("CreateTablet failed: %v", err) - } + assert.Equal(t, test.expected, f.IsIncluded(tabletReplacement)) + require.NoError(t, ts.CreateTablet(context.Background(), tabletReplacement)) tw.loadTablets() key = TabletToMapKey(tabletReplacement) allTablets = hc.GetAllTablets() - if _, ok := allTablets[key]; ok != test.expected && proto.Equal(allTablets[key], tabletReplacement) != test.expected { - t.Errorf("Error replacing tablet - got %v; want %v", ok, test.expected) + if test.expected { + assert.Contains(t, allTablets, key) + } else { + assert.NotContains(t, allTablets, key) } + assert.Equal(t, test.expected, proto.Equal(tabletReplacement, allTablets[key])) // Delete the tablet - if err := ts.DeleteTablet(context.Background(), tabletReplacement.Alias); err != nil { - t.Fatalf("DeleteTablet failed: %v", err) - } + require.NoError(t, ts.DeleteTablet(context.Background(), tabletReplacement.Alias)) } } -// TestFilterByKeypsaceSkipsIgnoredTablets confirms a bug fix for the case when a TopologyWatcher +// TestFilterByKeyspaceSkipsIgnoredTablets confirms a bug fix for the case when a TopologyWatcher // has a FilterByKeyspace TabletFilter configured along with refreshKnownTablets turned off. We want // to ensure that the TopologyWatcher: -// - does not continuosly call GetTablets for tablets that do not satisfy the filter -// - does not add or remove these filtered out tablets from the its healtcheck -func TestFilterByKeypsaceSkipsIgnoredTablets(t *testing.T) { +// - does not continuously call GetTablets for tablets that do not satisfy the filter +// - does not add or remove these filtered out tablets from its healthcheck +func TestFilterByKeyspaceSkipsIgnoredTablets(t *testing.T) { ctx := utils.LeakCheckContext(t) ts := memorytopo.NewServer(ctx, "aa") @@ -530,7 +476,7 @@ func TestFilterByKeypsaceSkipsIgnoredTablets(t *testing.T) { topologyWatcherOperations.ZeroAll() counts := topologyWatcherOperations.Counts() f := NewFilterByKeyspace(testKeyspacesToWatch) - tw := NewCellTabletsWatcher(context.Background(), ts, fhc, f, "aa", 10*time.Minute, false /*refreshKnownTablets*/, 5) + tw := NewTopologyWatcher(context.Background(), ts, fhc, f, "aa", 10*time.Minute, false /*refreshKnownTablets*/, 5) counts = checkOpCounts(t, counts, map[string]int64{}) checkChecksum(t, tw, 0) @@ -551,7 +497,7 @@ func TestFilterByKeypsaceSkipsIgnoredTablets(t *testing.T) { require.NoError(t, ts.CreateTablet(context.Background(), tablet)) tw.loadTablets() - counts = checkOpCounts(t, counts, map[string]int64{"ListTablets": 1, "GetTablet": 1, "AddTablet": 1}) + counts = checkOpCounts(t, counts, map[string]int64{"ListTablets": 1, "GetTablet": 0, "AddTablet": 1}) checkChecksum(t, tw, 3238442862) // Check tablet is reported by HealthCheck @@ -576,7 +522,7 @@ func TestFilterByKeypsaceSkipsIgnoredTablets(t *testing.T) { require.NoError(t, ts.CreateTablet(context.Background(), tablet2)) tw.loadTablets() - counts = checkOpCounts(t, counts, map[string]int64{"ListTablets": 1, "GetTablet": 1}) + counts = checkOpCounts(t, counts, map[string]int64{"ListTablets": 1, "GetTablet": 0}) checkChecksum(t, tw, 2762153755) // Check the new tablet is NOT reported by HealthCheck. @@ -588,7 +534,7 @@ func TestFilterByKeypsaceSkipsIgnoredTablets(t *testing.T) { // Load the tablets again to show that when refreshKnownTablets is disabled, // only the list is read from the topo and the checksum doesn't change tw.loadTablets() - counts = checkOpCounts(t, counts, map[string]int64{"ListTablets": 1}) + counts = checkOpCounts(t, counts, map[string]int64{"ListTablets": 1, "GetTablet": 0}) checkChecksum(t, tw, 2762153755) // With refreshKnownTablets set to false, changes to the port map for the same tablet alias @@ -600,7 +546,7 @@ func TestFilterByKeypsaceSkipsIgnoredTablets(t *testing.T) { require.NoError(t, err) tw.loadTablets() - counts = checkOpCounts(t, counts, map[string]int64{"ListTablets": 1}) + counts = checkOpCounts(t, counts, map[string]int64{"ListTablets": 1, "GetTablet": 0}) checkChecksum(t, tw, 2762153755) allTablets = fhc.GetAllTablets() @@ -616,7 +562,7 @@ func TestFilterByKeypsaceSkipsIgnoredTablets(t *testing.T) { require.NoError(t, ts.DeleteTablet(context.Background(), tablet.Alias)) tw.loadTablets() - counts = checkOpCounts(t, counts, map[string]int64{"ListTablets": 1, "RemoveTablet": 1}) + counts = checkOpCounts(t, counts, map[string]int64{"ListTablets": 1, "GetTablet": 0, "RemoveTablet": 1}) checkChecksum(t, tw, 789108290) assert.Empty(t, fhc.GetAllTablets()) @@ -624,7 +570,7 @@ func TestFilterByKeypsaceSkipsIgnoredTablets(t *testing.T) { require.NoError(t, ts.DeleteTablet(context.Background(), tablet2.Alias)) tw.loadTablets() - checkOpCounts(t, counts, map[string]int64{"ListTablets": 1}) + checkOpCounts(t, counts, map[string]int64{"ListTablets": 1, "GetTablet": 0}) checkChecksum(t, tw, 0) assert.Empty(t, fhc.GetAllTablets()) diff --git a/go/vt/topo/consultopo/error.go b/go/vt/topo/consultopo/error.go index 42f474e065b..62167a4d295 100644 --- a/go/vt/topo/consultopo/error.go +++ b/go/vt/topo/consultopo/error.go @@ -40,15 +40,16 @@ var ( // are either application-level errors, or context errors. func convertError(err error, nodePath string) error { // Unwrap errors from the Go HTTP client. - if urlErr, ok := err.(*url.Error); ok { + var urlErr *url.Error + if errors.As(err, &urlErr) { err = urlErr.Err } // Convert specific sentinel values. - switch err { - case context.Canceled: + switch { + case errors.Is(err, context.Canceled): return topo.NewError(topo.Interrupted, nodePath) - case context.DeadlineExceeded: + case errors.Is(err, context.DeadlineExceeded): return topo.NewError(topo.Timeout, nodePath) } diff --git a/go/vt/topo/errors.go b/go/vt/topo/errors.go index a645f1aa178..3be4b60b103 100644 --- a/go/vt/topo/errors.go +++ b/go/vt/topo/errors.go @@ -36,6 +36,7 @@ const ( NoUpdateNeeded NoImplementation NoReadOnlyImplementation + ResourceExhausted ) // Error represents a topo error. @@ -68,6 +69,8 @@ func NewError(code ErrorCode, node string) error { message = fmt.Sprintf("no such topology implementation %s", node) case NoReadOnlyImplementation: message = fmt.Sprintf("no read-only topology implementation %s", node) + case ResourceExhausted: + message = fmt.Sprintf("server resource exhausted: %s", node) default: message = fmt.Sprintf("unknown code: %s", node) } diff --git a/go/vt/topo/etcd2topo/error.go b/go/vt/topo/etcd2topo/error.go index e784fecd9b9..5e13d0bdf8d 100644 --- a/go/vt/topo/etcd2topo/error.go +++ b/go/vt/topo/etcd2topo/error.go @@ -45,7 +45,8 @@ func convertError(err error, nodePath string) error { return nil } - if typeErr, ok := err.(rpctypes.EtcdError); ok { + var typeErr rpctypes.EtcdError + if errors.As(err, &typeErr) { switch typeErr.Code() { case codes.NotFound: return topo.NewError(topo.NoNode, nodePath) @@ -61,6 +62,8 @@ func convertError(err error, nodePath string) error { // etcd primary election is failing, so timeout // also sounds reasonable there. return topo.NewError(topo.Timeout, nodePath) + case codes.ResourceExhausted: + return topo.NewError(topo.ResourceExhausted, nodePath) } return err } @@ -74,15 +77,17 @@ func convertError(err error, nodePath string) error { return topo.NewError(topo.Interrupted, nodePath) case codes.DeadlineExceeded: return topo.NewError(topo.Timeout, nodePath) + case codes.ResourceExhausted: + return topo.NewError(topo.ResourceExhausted, nodePath) default: return err } } - switch err { - case context.Canceled: + switch { + case errors.Is(err, context.Canceled): return topo.NewError(topo.Interrupted, nodePath) - case context.DeadlineExceeded: + case errors.Is(err, context.DeadlineExceeded): return topo.NewError(topo.Timeout, nodePath) default: return err diff --git a/go/vt/topo/memorytopo/file.go b/go/vt/topo/memorytopo/file.go index 0007203799f..cc19eb79011 100644 --- a/go/vt/topo/memorytopo/file.go +++ b/go/vt/topo/memorytopo/file.go @@ -187,6 +187,9 @@ func (c *Conn) List(ctx context.Context, filePathPrefix string) ([]topo.KVInfo, if c.factory.err != nil { return nil, c.factory.err } + if c.factory.listErr != nil { + return nil, c.factory.listErr + } dir, file := path.Split(filePathPrefix) // Get the node to list. diff --git a/go/vt/topo/memorytopo/memorytopo.go b/go/vt/topo/memorytopo/memorytopo.go index f24b2f6c89e..ae33bb73edd 100644 --- a/go/vt/topo/memorytopo/memorytopo.go +++ b/go/vt/topo/memorytopo/memorytopo.go @@ -70,6 +70,9 @@ type Factory struct { // err is used for testing purposes to force queries / watches // to return the given error err error + // listErr is used for testing purposed to fake errors from + // calls to List. + listErr error } // HasGlobalReadOnlyCell is part of the topo.Factory interface. @@ -348,3 +351,10 @@ func (f *Factory) recursiveDelete(n *node) { f.recursiveDelete(parent) } } + +func (f *Factory) SetListError(err error) { + f.mu.Lock() + defer f.mu.Unlock() + + f.listErr = err +} diff --git a/go/vt/topo/shard.go b/go/vt/topo/shard.go index 752001438f4..7343db2c0ab 100644 --- a/go/vt/topo/shard.go +++ b/go/vt/topo/shard.go @@ -666,7 +666,7 @@ func (ts *Server) GetTabletMapForShardByCell(ctx context.Context, keyspace, shar // get the tablets for the cells we were able to reach, forward // ErrPartialResult from FindAllTabletAliasesInShard - result, gerr := ts.GetTabletMap(ctx, aliases) + result, gerr := ts.GetTabletMap(ctx, aliases, nil) if gerr == nil && err != nil { gerr = err } diff --git a/go/vt/topo/tablet.go b/go/vt/topo/tablet.go index d17235f6948..2c726490ae4 100644 --- a/go/vt/topo/tablet.go +++ b/go/vt/topo/tablet.go @@ -24,6 +24,8 @@ import ( "sync" "time" + "golang.org/x/sync/semaphore" + "vitess.io/vitess/go/protoutil" "vitess.io/vitess/go/vt/key" @@ -282,10 +284,17 @@ func (ts *Server) GetTabletAliasesByCell(ctx context.Context, cell string) ([]*t return result, nil } +// GetTabletsByCellOptions controls the behavior of +// Server.FindAllShardsInKeyspace. +type GetTabletsByCellOptions struct { + // Concurrency controls the maximum number of concurrent calls to GetTablet. + Concurrency int64 +} + // GetTabletsByCell returns all the tablets in the cell. // It returns ErrNoNode if the cell doesn't exist. // It returns (nil, nil) if the cell exists, but there are no tablets in it. -func (ts *Server) GetTabletsByCell(ctx context.Context, cellAlias string) ([]*TabletInfo, error) { +func (ts *Server) GetTabletsByCell(ctx context.Context, cellAlias string, opt *GetTabletsByCellOptions) ([]*TabletInfo, error) { // If the cell doesn't exist, this will return ErrNoNode. cellConn, err := ts.ConnForCell(ctx, cellAlias) if err != nil { @@ -293,10 +302,12 @@ func (ts *Server) GetTabletsByCell(ctx context.Context, cellAlias string) ([]*Ta } listResults, err := cellConn.List(ctx, TabletsPath) if err != nil || len(listResults) == 0 { - // Currently the ZooKeeper and Memory topo implementations do not support scans + // Currently the ZooKeeper implementation does not support scans // so we fall back to the more costly method of fetching the tablets one by one. - if IsErrType(err, NoImplementation) { - return ts.GetTabletsIndividuallyByCell(ctx, cellAlias) + // In the etcd case, it is possible that the response is too large. We also fall + // back to fetching the tablets one by one in that case. + if IsErrType(err, NoImplementation) || IsErrType(err, ResourceExhausted) { + return ts.GetTabletsIndividuallyByCell(ctx, cellAlias, opt) } if IsErrType(err, NoNode) { return nil, nil @@ -320,7 +331,7 @@ func (ts *Server) GetTabletsByCell(ctx context.Context, cellAlias string) ([]*Ta // directly support the topoConn.List() functionality. // It returns ErrNoNode if the cell doesn't exist. // It returns (nil, nil) if the cell exists, but there are no tablets in it. -func (ts *Server) GetTabletsIndividuallyByCell(ctx context.Context, cell string) ([]*TabletInfo, error) { +func (ts *Server) GetTabletsIndividuallyByCell(ctx context.Context, cell string, opt *GetTabletsByCellOptions) ([]*TabletInfo, error) { // If the cell doesn't exist, this will return ErrNoNode. aliases, err := ts.GetTabletAliasesByCell(ctx, cell) if err != nil { @@ -328,7 +339,7 @@ func (ts *Server) GetTabletsIndividuallyByCell(ctx context.Context, cell string) } sort.Sort(topoproto.TabletAliasList(aliases)) - tabletMap, err := ts.GetTabletMap(ctx, aliases) + tabletMap, err := ts.GetTabletMap(ctx, aliases, opt) if err != nil { // we got another error than topo.ErrNoNode return nil, err @@ -503,41 +514,62 @@ func DeleteTabletReplicationData(ctx context.Context, ts *Server, tablet *topoda } // GetTabletMap tries to read all the tablets in the provided list, -// and returns them all in a map. -// If error is ErrPartialResult, the results in the dictionary are +// and returns them in a map. +// If error is ErrPartialResult, the results in the map are // incomplete, meaning some tablets couldn't be read. // The map is indexed by topoproto.TabletAliasString(tablet alias). -func (ts *Server) GetTabletMap(ctx context.Context, tabletAliases []*topodatapb.TabletAlias) (map[string]*TabletInfo, error) { +func (ts *Server) GetTabletMap(ctx context.Context, tabletAliases []*topodatapb.TabletAlias, opt *GetTabletsByCellOptions) (map[string]*TabletInfo, error) { span, ctx := trace.NewSpan(ctx, "topo.GetTabletMap") span.Annotate("num_tablets", len(tabletAliases)) defer span.Finish() - wg := sync.WaitGroup{} - mutex := sync.Mutex{} + var ( + mu sync.Mutex + wg sync.WaitGroup + tabletMap = make(map[string]*TabletInfo) + returnErr error + // Previously this was always run with unlimited concurrency, so 32 should be fine. + concurrency int64 = 32 + ) - tabletMap := make(map[string]*TabletInfo) - var someError error + if opt != nil && opt.Concurrency > 0 { + concurrency = opt.Concurrency + } + var sem = semaphore.NewWeighted(concurrency) for _, tabletAlias := range tabletAliases { wg.Add(1) go func(tabletAlias *topodatapb.TabletAlias) { defer wg.Done() + if err := sem.Acquire(ctx, 1); err != nil { + // Only happens if context is cancelled. + mu.Lock() + defer mu.Unlock() + log.Warningf("%v: %v", tabletAlias, err) + // We only need to set this on the first error. + if returnErr == nil { + returnErr = NewError(PartialResult, tabletAlias.GetCell()) + } + return + } tabletInfo, err := ts.GetTablet(ctx, tabletAlias) - mutex.Lock() + sem.Release(1) + mu.Lock() + defer mu.Unlock() if err != nil { log.Warningf("%v: %v", tabletAlias, err) // There can be data races removing nodes - ignore them for now. - if !IsErrType(err, NoNode) { - someError = NewError(PartialResult, "") + // We only need to set this on first error. + if returnErr == nil && !IsErrType(err, NoNode) { + returnErr = NewError(PartialResult, tabletAlias.GetCell()) } } else { tabletMap[topoproto.TabletAliasString(tabletAlias)] = tabletInfo } - mutex.Unlock() }(tabletAlias) } wg.Wait() - return tabletMap, someError + return tabletMap, returnErr } // InitTablet creates or updates a tablet. If no parent is specified diff --git a/go/vt/topo/tablet_test.go b/go/vt/topo/tablet_test.go new file mode 100644 index 00000000000..04eea71a8a2 --- /dev/null +++ b/go/vt/topo/tablet_test.go @@ -0,0 +1,118 @@ +/* +Copyright 2023 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package topo_test + +import ( + "context" + "testing" + + "github.com/stretchr/testify/require" + + topodatapb "vitess.io/vitess/go/vt/proto/topodata" + "vitess.io/vitess/go/vt/topo" + "vitess.io/vitess/go/vt/topo/memorytopo" +) + +// Test various cases of calls to GetTabletsByCell. +// GetTabletsByCell first tries to get all the tablets using List. +// If the response is too large, we will get an error, and fall back to one tablet at a time. +func TestServerGetTabletsByCell(t *testing.T) { + tests := []struct { + name string + tablets int + opt *topo.GetTabletsByCellOptions + listError error + }{ + { + name: "negative concurrency", + tablets: 1, + // Ensure this doesn't panic. + opt: &topo.GetTabletsByCellOptions{Concurrency: -1}, + }, + { + name: "single", + tablets: 1, + // Make sure the defaults apply as expected. + opt: nil, + }, + { + name: "multiple", + // should work with more than 1 tablet + tablets: 32, + opt: &topo.GetTabletsByCellOptions{Concurrency: 8}, + }, + { + name: "multiple with list error", + // should work with more than 1 tablet when List returns an error + tablets: 32, + opt: &topo.GetTabletsByCellOptions{Concurrency: 8}, + listError: topo.NewError(topo.ResourceExhausted, ""), + }, + } + + const cell = "zone1" + const keyspace = "keyspace" + const shard = "shard" + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + ts, factory := memorytopo.NewServerAndFactory(ctx, cell) + defer ts.Close() + if tt.listError != nil { + factory.SetListError(tt.listError) + } + + // Create an ephemeral keyspace and generate shard records within + // the keyspace to fetch later. + require.NoError(t, ts.CreateKeyspace(ctx, keyspace, &topodatapb.Keyspace{})) + require.NoError(t, ts.CreateShard(ctx, keyspace, shard)) + + tablets := make([]*topo.TabletInfo, tt.tablets) + + for i := 0; i < tt.tablets; i++ { + tablet := &topodatapb.Tablet{ + Alias: &topodatapb.TabletAlias{ + Cell: cell, + Uid: uint32(i), + }, + Hostname: "host1", + PortMap: map[string]int32{ + "vt": int32(i), + }, + Keyspace: keyspace, + Shard: shard, + } + tInfo := &topo.TabletInfo{Tablet: tablet} + tablets[i] = tInfo + require.NoError(t, ts.CreateTablet(ctx, tablet)) + } + + // Verify that we return a complete list of tablets and that each + // tablet matches what we expect. + out, err := ts.GetTabletsByCell(ctx, cell, tt.opt) + require.NoError(t, err) + require.Len(t, out, tt.tablets) + + for i, tab := range tablets { + require.Equal(t, tab.Tablet, tablets[i].Tablet) + } + }) + } +} diff --git a/go/vt/topo/zk2topo/error.go b/go/vt/topo/zk2topo/error.go index 1ebc3896f40..1149ad60bf3 100644 --- a/go/vt/topo/zk2topo/error.go +++ b/go/vt/topo/zk2topo/error.go @@ -18,6 +18,7 @@ package zk2topo import ( "context" + "errors" "github.com/z-division/go-zookeeper/zk" @@ -26,20 +27,20 @@ import ( // Error codes returned by the zookeeper Go client: func convertError(err error, node string) error { - switch err { - case zk.ErrBadVersion: + switch { + case errors.Is(err, zk.ErrBadVersion): return topo.NewError(topo.BadVersion, node) - case zk.ErrNoNode: + case errors.Is(err, zk.ErrNoNode): return topo.NewError(topo.NoNode, node) - case zk.ErrNodeExists: + case errors.Is(err, zk.ErrNodeExists): return topo.NewError(topo.NodeExists, node) - case zk.ErrNotEmpty: + case errors.Is(err, zk.ErrNotEmpty): return topo.NewError(topo.NodeNotEmpty, node) - case zk.ErrSessionExpired: + case errors.Is(err, zk.ErrSessionExpired): return topo.NewError(topo.Timeout, node) - case context.Canceled: + case errors.Is(err, context.Canceled): return topo.NewError(topo.Interrupted, node) - case context.DeadlineExceeded: + case errors.Is(err, context.DeadlineExceeded): return topo.NewError(topo.Timeout, node) } return err diff --git a/go/vt/topotools/tablet.go b/go/vt/topotools/tablet.go index 8bbca4b8c03..76f9e3d6cec 100644 --- a/go/vt/topotools/tablet.go +++ b/go/vt/topotools/tablet.go @@ -127,7 +127,7 @@ func DoCellsHaveRdonlyTablets(ctx context.Context, ts *topo.Server, cells []stri } for _, cell := range cells { - tablets, err := ts.GetTabletsByCell(ctx, cell) + tablets, err := ts.GetTabletsByCell(ctx, cell, nil) if err != nil { return false, err } diff --git a/go/vt/topotools/utils.go b/go/vt/topotools/utils.go index 6b618383a1e..6d1522e04e7 100644 --- a/go/vt/topotools/utils.go +++ b/go/vt/topotools/utils.go @@ -43,7 +43,7 @@ func GetTabletMapForCell(ctx context.Context, ts *topo.Server, cell string) (map if err != nil { return nil, err } - tabletMap, err := ts.GetTabletMap(ctx, aliases) + tabletMap, err := ts.GetTabletMap(ctx, aliases, nil) if err != nil { // we got another error than topo.ErrNoNode return nil, err @@ -65,7 +65,7 @@ func GetAllTabletsAcrossCells(ctx context.Context, ts *topo.Server) ([]*topo.Tab wg.Add(len(cells)) for i, cell := range cells { go func(i int, cell string) { - results[i], errors[i] = ts.GetTabletsByCell(ctx, cell) + results[i], errors[i] = ts.GetTabletsByCell(ctx, cell, nil) wg.Done() }(i, cell) } diff --git a/go/vt/vtctl/grpcvtctldserver/server.go b/go/vt/vtctl/grpcvtctldserver/server.go index 2ece69876e8..aaa6d8387a6 100644 --- a/go/vt/vtctl/grpcvtctldserver/server.go +++ b/go/vt/vtctl/grpcvtctldserver/server.go @@ -1982,7 +1982,7 @@ func (s *VtctldServer) GetTablets(ctx context.Context, req *vtctldatapb.GetTable case len(req.TabletAliases) > 0: span.Annotate("tablet_aliases", strings.Join(topoproto.TabletAliasList(req.TabletAliases).ToStringSlice(), ",")) - tabletMap, err = s.ts.GetTabletMap(ctx, req.TabletAliases) + tabletMap, err = s.ts.GetTabletMap(ctx, req.TabletAliases, nil) if err != nil { err = fmt.Errorf("GetTabletMap(%v) failed: %w", req.TabletAliases, err) } @@ -2058,7 +2058,7 @@ func (s *VtctldServer) GetTablets(ctx context.Context, req *vtctldatapb.GetTable go func(cell string) { defer wg.Done() - tablets, err := s.ts.GetTabletsByCell(ctx, cell) + tablets, err := s.ts.GetTabletsByCell(ctx, cell, nil) if err != nil { if req.Strict { log.Infof("GetTablets got an error from cell %s: %s. Running in strict mode, so canceling other cell RPCs", cell, err) @@ -4432,7 +4432,7 @@ func (s *VtctldServer) ValidateShard(ctx context.Context, req *vtctldatapb.Valid getTabletMapCtx, getTabletMapCancel := context.WithTimeout(ctx, topo.RemoteOperationTimeout) defer getTabletMapCancel() - tabletMap, _ := s.ts.GetTabletMap(getTabletMapCtx, aliases) + tabletMap, _ := s.ts.GetTabletMap(getTabletMapCtx, aliases, nil) var primaryAlias *topodatapb.TabletAlias for _, alias := range aliases { diff --git a/go/vt/vtctl/grpcvtctldserver/topo.go b/go/vt/vtctl/grpcvtctldserver/topo.go index 70fae6613aa..5ec369ca17f 100644 --- a/go/vt/vtctl/grpcvtctldserver/topo.go +++ b/go/vt/vtctl/grpcvtctldserver/topo.go @@ -161,7 +161,7 @@ func deleteShardCell(ctx context.Context, ts *topo.Server, keyspace string, shar // Get all the tablet records for the aliases we've collected. Note that // GetTabletMap ignores ErrNoNode, which is convenient for our purpose; it // means a tablet was deleted but is still referenced. - tabletMap, err := ts.GetTabletMap(ctx, aliases) + tabletMap, err := ts.GetTabletMap(ctx, aliases, nil) if err != nil { return fmt.Errorf("GetTabletMap() failed: %w", err) } diff --git a/go/vt/vtctl/workflow/server.go b/go/vt/vtctl/workflow/server.go index 95f1ecd42d4..c2b66b840c1 100644 --- a/go/vt/vtctl/workflow/server.go +++ b/go/vt/vtctl/workflow/server.go @@ -2707,7 +2707,7 @@ func (s *Server) DeleteShard(ctx context.Context, keyspace, shard string, recurs // GetTabletMap ignores ErrNoNode, and it's good for // our purpose, it means a tablet was deleted but is // still referenced. - tabletMap, err := s.ts.GetTabletMap(ctx, aliases) + tabletMap, err := s.ts.GetTabletMap(ctx, aliases, nil) if err != nil { return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "GetTabletMap() failed: %v", err) } diff --git a/go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go b/go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go index 4ac2e79d38b..78392a4b078 100644 --- a/go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go +++ b/go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go @@ -83,14 +83,6 @@ type ThrottlerInterface interface { ResetConfiguration() } -// TopologyWatcherInterface defines the public interface that is implemented by -// discovery.LegacyTopologyWatcher. It is only used here to allow mocking out -// go/vt/discovery.LegacyTopologyWatcher. -type TopologyWatcherInterface interface { - Start() - Stop() -} - // TxThrottlerName is the name the wrapped go/vt/throttler object will be registered with // go/vt/throttler.GlobalManager. const TxThrottlerName = "TransactionThrottler" diff --git a/go/vt/wrangler/shard.go b/go/vt/wrangler/shard.go index c1c65b0407b..6b74f32031e 100644 --- a/go/vt/wrangler/shard.go +++ b/go/vt/wrangler/shard.go @@ -113,7 +113,7 @@ func (wr *Wrangler) DeleteShard(ctx context.Context, keyspace, shard string, rec // GetTabletMap ignores ErrNoNode, and it's good for // our purpose, it means a tablet was deleted but is // still referenced. - tabletMap, err := wr.ts.GetTabletMap(ctx, aliases) + tabletMap, err := wr.ts.GetTabletMap(ctx, aliases, nil) if err != nil { return fmt.Errorf("GetTabletMap() failed: %v", err) } diff --git a/go/vt/wrangler/split.go b/go/vt/wrangler/split.go index ba67fd8efef..543d50a808d 100644 --- a/go/vt/wrangler/split.go +++ b/go/vt/wrangler/split.go @@ -40,7 +40,7 @@ const ( // on a Shard. func (wr *Wrangler) SetSourceShards(ctx context.Context, keyspace, shard string, sources []*topodatapb.TabletAlias, tables []string) error { // Read the source tablets. - sourceTablets, err := wr.ts.GetTabletMap(ctx, sources) + sourceTablets, err := wr.ts.GetTabletMap(ctx, sources, nil) if err != nil { return err }