Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[release-19.0] Fix race in replicationLagModule of go/vt/throttle (#16078) #16899

Merged
merged 3 commits into from
Oct 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 40 additions & 0 deletions go/vt/throttler/replication_lag_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import (
"sort"
"sync"
"time"

"vitess.io/vitess/go/vt/discovery"
Expand All @@ -30,6 +31,8 @@
// The map key is replicationLagRecord.LegacyTabletStats.Key.
entries map[string]*replicationLagHistory

mu sync.Mutex

// slowReplicas is a set of slow replicas.
// The map key is replicationLagRecord.LegacyTabletStats.Key.
// This map will always be recomputed by sortByLag() and must not be modified
Expand Down Expand Up @@ -60,6 +63,9 @@

// add inserts or updates "r" in the cache for the replica with the key "r.Key".
func (c *replicationLagCache) add(r replicationLagRecord) {
c.mu.Lock()
defer c.mu.Unlock()

if !r.Serving {
// Tablet is down. Do no longer track it.
delete(c.entries, discovery.TabletToMapKey(r.Tablet))
Expand All @@ -76,9 +82,35 @@
entry.add(r)
}

// maxLag returns the maximum replication lag for the entries in cache.
func (c *replicationLagCache) maxLag() (maxLag uint32) {
c.mu.Lock()
defer c.mu.Unlock()

for key := range c.entries {
if c.isIgnored(key) {
continue

Check warning on line 92 in go/vt/throttler/replication_lag_cache.go

View check run for this annotation

Codecov / codecov/patch

go/vt/throttler/replication_lag_cache.go#L92

Added line #L92 was not covered by tests
}

entry := c.entries[key]
if entry == nil {
continue

Check warning on line 97 in go/vt/throttler/replication_lag_cache.go

View check run for this annotation

Codecov / codecov/patch

go/vt/throttler/replication_lag_cache.go#L97

Added line #L97 was not covered by tests
}

latest := entry.latest()
if lag := latest.Stats.ReplicationLagSeconds; lag > maxLag {
maxLag = lag
}
}

return maxLag
}

// latest returns the current lag record for the given LegacyTabletStats.Key string.
// A zero record is returned if there is no latest entry.
func (c *replicationLagCache) latest(key string) replicationLagRecord {
c.mu.Lock()
defer c.mu.Unlock()
entry, ok := c.entries[key]
if !ok {
return replicationLagRecord{}
Expand All @@ -90,6 +122,8 @@
// or just after it.
// If there is no such record, a zero record is returned.
func (c *replicationLagCache) atOrAfter(key string, at time.Time) replicationLagRecord {
c.mu.Lock()
defer c.mu.Unlock()
entry, ok := c.entries[key]
if !ok {
return replicationLagRecord{}
Expand All @@ -100,6 +134,9 @@
// sortByLag sorts all replicas by their latest replication lag value and
// tablet uid and updates the c.slowReplicas set.
func (c *replicationLagCache) sortByLag(ignoreNSlowestReplicas int, minimumReplicationLag int64) {
c.mu.Lock()
defer c.mu.Unlock()

// Reset the current list of ignored replicas.
c.slowReplicas = make(map[string]bool)

Expand Down Expand Up @@ -142,6 +179,9 @@
// this slow replica.
// "key" refers to ReplicationLagRecord.LegacyTabletStats.Key.
func (c *replicationLagCache) ignoreSlowReplica(key string) bool {
c.mu.Lock()
defer c.mu.Unlock()

if len(c.slowReplicas) == 0 {
// No slow replicas at all.
return false
Expand Down
9 changes: 9 additions & 0 deletions go/vt/throttler/replication_lag_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (
"testing"
"time"

"github.com/stretchr/testify/require"

"vitess.io/vitess/go/vt/discovery"
)

Expand Down Expand Up @@ -91,3 +93,10 @@ func TestReplicationLagCache_SortByLag(t *testing.T) {
t.Fatal("r1 should be tracked as a slow replica")
}
}

func TestReplicationLagCache_MaxLag(t *testing.T) {
c := newReplicationLagCache(2)
c.add(lagRecord(sinceZero(1*time.Second), r1, 30))
c.add(lagRecord(sinceZero(1*time.Second), r2, 1))
require.Equal(t, uint32(30), c.maxLag())
}
18 changes: 3 additions & 15 deletions go/vt/throttler/throttler.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,22 +229,10 @@
// the provided type, excluding ignored tablets.
func (t *Throttler) MaxLag(tabletType topodata.TabletType) uint32 {
cache := t.maxReplicationLagModule.lagCacheByType(tabletType)

var maxLag uint32
cacheEntries := cache.entries

for key := range cacheEntries {
if cache.isIgnored(key) {
continue
}

lag := cache.latest(key).Stats.ReplicationLagSeconds
if lag > maxLag {
maxLag = lag
}
if cache == nil {
return 0

Check warning on line 233 in go/vt/throttler/throttler.go

View check run for this annotation

Codecov / codecov/patch

go/vt/throttler/throttler.go#L233

Added line #L233 was not covered by tests
}

return maxLag
return cache.maxLag()
}

// ThreadFinished marks threadID as finished and redistributes the thread's
Expand Down
83 changes: 83 additions & 0 deletions go/vt/throttler/throttler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,26 @@ limitations under the License.
package throttler

import (
"context"
"runtime"
"strings"
"sync"
"testing"
"time"

"github.com/stretchr/testify/require"

"vitess.io/vitess/go/vt/discovery"
"vitess.io/vitess/go/vt/proto/query"
"vitess.io/vitess/go/vt/proto/topodata"
)

// testTabletTypes is the list of tablet types to test.
var testTabletTypes = []topodata.TabletType{
topodata.TabletType_REPLICA,
topodata.TabletType_RDONLY,
}

// The main purpose of the benchmarks below is to demonstrate the functionality
// of the throttler in the real-world (using a non-faked time.Now).
// The benchmark values should be as close as possible to the request interval
Expand Down Expand Up @@ -426,3 +440,72 @@ func TestThreadFinished_SecondCallPanics(t *testing.T) {
}()
throttler.ThreadFinished(0)
}

func TestThrottlerMaxLag(t *testing.T) {
fc := &fakeClock{}
throttler, err := newThrottlerWithClock(t.Name(), "queries", 1, 1, 10, fc.now)
require.NoError(t, err)
defer throttler.Close()

require.NotNil(t, throttler)
require.NotNil(t, throttler.maxReplicationLagModule)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

var wg sync.WaitGroup

// run .add() and .MaxLag() concurrently to detect races
for _, tabletType := range testTabletTypes {
wg.Add(1)
go func() {
defer wg.Done()
for {
select {
case <-ctx.Done():
return
default:
throttler.MaxLag(tabletType)
}
}
}()

wg.Add(1)
go func() {
defer wg.Done()
for {
select {
case <-ctx.Done():
return
default:
cache := throttler.maxReplicationLagModule.lagCacheByType(tabletType)
require.NotNil(t, cache)
cache.add(replicationLagRecord{
time: time.Now(),
TabletHealth: discovery.TabletHealth{
Serving: true,
Stats: &query.RealtimeStats{
ReplicationLagSeconds: 5,
},
Tablet: &topodata.Tablet{
Hostname: t.Name(),
Type: tabletType,
PortMap: map[string]int32{
"test": 15999,
},
},
},
})
}
}
}()
}
time.Sleep(time.Second)
cancel()
wg.Wait()

// check .MaxLag()
for _, tabletType := range testTabletTypes {
require.Equal(t, uint32(5), throttler.MaxLag(tabletType))
}
}
Loading