diff --git a/go/vt/throttler/replication_lag_cache.go b/go/vt/throttler/replication_lag_cache.go index c9c2e94f113..ab26c0bc6b8 100644 --- a/go/vt/throttler/replication_lag_cache.go +++ b/go/vt/throttler/replication_lag_cache.go @@ -18,6 +18,7 @@ package throttler import ( "sort" + "sync" "time" "vitess.io/vitess/go/vt/discovery" @@ -30,6 +31,8 @@ type replicationLagCache struct { // The map key is replicationLagRecord.LegacyTabletStats.Key. entries map[string]*replicationLagHistory + mu sync.Mutex + // slowReplicas is a set of slow replicas. // The map key is replicationLagRecord.LegacyTabletStats.Key. // This map will always be recomputed by sortByLag() and must not be modified @@ -60,6 +63,9 @@ func newReplicationLagCache(historyCapacityPerReplica int) *replicationLagCache // add inserts or updates "r" in the cache for the replica with the key "r.Key". func (c *replicationLagCache) add(r replicationLagRecord) { + c.mu.Lock() + defer c.mu.Unlock() + if !r.Serving { // Tablet is down. Do no longer track it. delete(c.entries, discovery.TabletToMapKey(r.Tablet)) @@ -76,9 +82,35 @@ func (c *replicationLagCache) add(r replicationLagRecord) { entry.add(r) } +// maxLag returns the maximum replication lag for the entries in cache. +func (c *replicationLagCache) maxLag() (maxLag uint32) { + c.mu.Lock() + defer c.mu.Unlock() + + for key := range c.entries { + if c.isIgnored(key) { + continue + } + + entry, ok := c.entries[key] + if !ok { + continue + } + + latest := entry.latest() + if lag := latest.Stats.ReplicationLagSeconds; lag > maxLag { + maxLag = lag + } + } + + return maxLag +} + // latest returns the current lag record for the given LegacyTabletStats.Key string. // A zero record is returned if there is no latest entry. func (c *replicationLagCache) latest(key string) replicationLagRecord { + c.mu.Lock() + defer c.mu.Unlock() entry, ok := c.entries[key] if !ok { return replicationLagRecord{} @@ -90,6 +122,8 @@ func (c *replicationLagCache) latest(key string) replicationLagRecord { // or just after it. // If there is no such record, a zero record is returned. func (c *replicationLagCache) atOrAfter(key string, at time.Time) replicationLagRecord { + c.mu.Lock() + defer c.mu.Unlock() entry, ok := c.entries[key] if !ok { return replicationLagRecord{} @@ -100,6 +134,9 @@ func (c *replicationLagCache) atOrAfter(key string, at time.Time) replicationLag // sortByLag sorts all replicas by their latest replication lag value and // tablet uid and updates the c.slowReplicas set. func (c *replicationLagCache) sortByLag(ignoreNSlowestReplicas int, minimumReplicationLag int64) { + c.mu.Lock() + defer c.mu.Unlock() + // Reset the current list of ignored replicas. c.slowReplicas = make(map[string]bool) @@ -142,6 +179,9 @@ func (a byLagAndTabletUID) Less(i, j int) bool { // this slow replica. // "key" refers to ReplicationLagRecord.LegacyTabletStats.Key. func (c *replicationLagCache) ignoreSlowReplica(key string) bool { + c.mu.Lock() + defer c.mu.Unlock() + if len(c.slowReplicas) == 0 { // No slow replicas at all. return false diff --git a/go/vt/throttler/replication_lag_cache_test.go b/go/vt/throttler/replication_lag_cache_test.go index 312f97e1999..9b34210d096 100644 --- a/go/vt/throttler/replication_lag_cache_test.go +++ b/go/vt/throttler/replication_lag_cache_test.go @@ -20,6 +20,8 @@ import ( "testing" "time" + "github.com/stretchr/testify/require" + "vitess.io/vitess/go/vt/discovery" ) @@ -91,3 +93,10 @@ func TestReplicationLagCache_SortByLag(t *testing.T) { t.Fatal("r1 should be tracked as a slow replica") } } + +func TestReplicationLagCache_MaxLag(t *testing.T) { + c := newReplicationLagCache(2) + c.add(lagRecord(sinceZero(1*time.Second), r1, 30)) + c.add(lagRecord(sinceZero(1*time.Second), r2, 1)) + require.Equal(t, uint32(30), c.maxLag()) +} diff --git a/go/vt/throttler/throttler.go b/go/vt/throttler/throttler.go index b71918de0c0..cd237548b3b 100644 --- a/go/vt/throttler/throttler.go +++ b/go/vt/throttler/throttler.go @@ -245,22 +245,10 @@ func (t *ThrottlerImpl) Throttle(threadID int) time.Duration { // the provided type, excluding ignored tablets. func (t *ThrottlerImpl) MaxLag(tabletType topodata.TabletType) uint32 { cache := t.maxReplicationLagModule.lagCacheByType(tabletType) - - var maxLag uint32 - cacheEntries := cache.entries - - for key := range cacheEntries { - if cache.isIgnored(key) { - continue - } - - lag := cache.latest(key).Stats.ReplicationLagSeconds - if lag > maxLag { - maxLag = lag - } + if cache == nil { + return 0 } - - return maxLag + return cache.maxLag() } // ThreadFinished marks threadID as finished and redistributes the thread's diff --git a/go/vt/throttler/throttler_test.go b/go/vt/throttler/throttler_test.go index 317eb3822c7..e7e7c13c466 100644 --- a/go/vt/throttler/throttler_test.go +++ b/go/vt/throttler/throttler_test.go @@ -17,12 +17,18 @@ limitations under the License. package throttler import ( + "context" "runtime" "strings" + "sync" "testing" "time" "github.com/stretchr/testify/require" + + "vitess.io/vitess/go/vt/discovery" + "vitess.io/vitess/go/vt/proto/query" + "vitess.io/vitess/go/vt/proto/topodata" ) // The main purpose of the benchmarks below is to demonstrate the functionality @@ -432,3 +438,78 @@ func TestThreadFinished_SecondCallPanics(t *testing.T) { }() throttler.ThreadFinished(0) } + +func TestThrottlerMaxLag(t *testing.T) { + fc := &fakeClock{} + throttler, err := newThrottlerWithClock(t.Name(), "queries", 1, 1, 10, fc.now) + require.NoError(t, err) + defer throttler.Close() + + require.NotNil(t, throttler) + throttlerImpl, ok := throttler.(*ThrottlerImpl) + require.True(t, ok) + require.NotNil(t, throttlerImpl.maxReplicationLagModule) + + ctx, cancel := context.WithCancel(context.Background()) + var wg sync.WaitGroup + + // run .add() and .MaxLag() concurrently to detect races + for _, tabletType := range []topodata.TabletType{ + topodata.TabletType_REPLICA, + topodata.TabletType_RDONLY, + } { + wg.Add(1) + go func(wg *sync.WaitGroup, ctx context.Context, t *ThrottlerImpl, tabletType topodata.TabletType) { + defer wg.Done() + for { + select { + case <-ctx.Done(): + return + default: + throttler.MaxLag(tabletType) + } + } + }(&wg, ctx, throttlerImpl, tabletType) + + wg.Add(1) + go func(wg *sync.WaitGroup, ctx context.Context, throttler *ThrottlerImpl, tabletType topodata.TabletType) { + defer wg.Done() + for { + select { + case <-ctx.Done(): + return + default: + cache := throttler.maxReplicationLagModule.lagCacheByType(tabletType) + require.NotNil(t, cache) + cache.add(replicationLagRecord{ + time: time.Now(), + TabletHealth: discovery.TabletHealth{ + Serving: true, + Stats: &query.RealtimeStats{ + ReplicationLagSeconds: 5, + }, + Tablet: &topodata.Tablet{ + Hostname: t.Name(), + Type: tabletType, + PortMap: map[string]int32{ + "test": 15999, + }, + }, + }, + }) + } + } + }(&wg, ctx, throttlerImpl, tabletType) + } + time.Sleep(time.Second) + cancel() + wg.Wait() + + // check .MaxLag() + for _, tabletType := range []topodata.TabletType{ + topodata.TabletType_REPLICA, + topodata.TabletType_RDONLY, + } { + require.Equal(t, uint32(5), throttler.MaxLag(tabletType)) + } +}