Skip to content

Commit

Permalink
txthrottler: move ThrottlerInterface to go/vt/throttler, use `s…
Browse files Browse the repository at this point in the history
…lices` pkg, add stats (vitessio#16248)

Signed-off-by: Tim Vaillancourt <tim@timvaillancourt.com>
  • Loading branch information
timvaillancourt committed Aug 7, 2024
1 parent 1b333db commit 940fb68
Show file tree
Hide file tree
Showing 14 changed files with 166 additions and 146 deletions.
4 changes: 2 additions & 2 deletions go/vt/throttler/demo/throttler_demo.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ type replica struct {

// throttler is used to enforce the maximum rate at which replica applies
// transactions. It must not be confused with the client's throttler.
throttler *throttler.Throttler
throttler throttler.Throttler
lastHealthUpdate time.Time
lagUpdateInterval time.Duration

Expand Down Expand Up @@ -224,7 +224,7 @@ type client struct {
primary *primary

healthCheck discovery.HealthCheck
throttler *throttler.Throttler
throttler throttler.Throttler

stopChan chan struct{}
wg sync.WaitGroup
Expand Down
10 changes: 5 additions & 5 deletions go/vt/throttler/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,16 +64,16 @@ type managerImpl struct {
// mu guards all fields in this group.
mu sync.Mutex
// throttlers tracks all running throttlers (by their name).
throttlers map[string]*Throttler
throttlers map[string]Throttler
}

func newManager() *managerImpl {
return &managerImpl{
throttlers: make(map[string]*Throttler),
throttlers: make(map[string]Throttler),
}
}

func (m *managerImpl) registerThrottler(name string, throttler *Throttler) error {
func (m *managerImpl) registerThrottler(name string, throttler Throttler) error {
m.mu.Lock()
defer m.mu.Unlock()

Expand Down Expand Up @@ -207,7 +207,7 @@ func (m *managerImpl) throttlerNamesLocked() []string {

// log returns the most recent changes of the MaxReplicationLag module.
// There will be one result for each processed replication lag record.
func (m *managerImpl) log(throttlerName string) ([]result, error) {
func (m *managerImpl) log(throttlerName string) ([]Result, error) {
m.mu.Lock()
defer m.mu.Unlock()

Expand All @@ -216,5 +216,5 @@ func (m *managerImpl) log(throttlerName string) ([]result, error) {
return nil, fmt.Errorf("throttler: %v does not exist", throttlerName)
}

return t.log(), nil
return t.Log(), nil
}
2 changes: 1 addition & 1 deletion go/vt/throttler/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ var (

type managerTestFixture struct {
m *managerImpl
t1, t2 *Throttler
t1, t2 Throttler
}

func (f *managerTestFixture) setUp() error {
Expand Down
20 changes: 10 additions & 10 deletions go/vt/throttler/max_replication_lag_module.go
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,7 @@ func (m *MaxReplicationLagModule) recalculateRate(lagRecordNow replicationLagRec

m.memory.ageBadRate(now)

r := result{
r := Result{
Now: now,
RateChange: unchangedRate,
lastRateChange: m.lastRateChange,
Expand Down Expand Up @@ -445,7 +445,7 @@ func stateGreater(a, b state) bool {
// and we should not skip the current replica ("lagRecordNow").
// Even if it's the same replica we may skip it and return false because
// we want to wait longer for the propagation of the current rate change.
func (m *MaxReplicationLagModule) isReplicaUnderTest(r *result, now time.Time, testedState state, lagRecordNow replicationLagRecord) bool {
func (m *MaxReplicationLagModule) isReplicaUnderTest(r *Result, now time.Time, testedState state, lagRecordNow replicationLagRecord) bool {
if m.replicaUnderTest == nil {
return true
}
Expand All @@ -471,7 +471,7 @@ func (m *MaxReplicationLagModule) isReplicaUnderTest(r *result, now time.Time, t
return true
}

func (m *MaxReplicationLagModule) increaseRate(r *result, now time.Time, lagRecordNow replicationLagRecord) {
func (m *MaxReplicationLagModule) increaseRate(r *Result, now time.Time, lagRecordNow replicationLagRecord) {
m.markCurrentRateAsBadOrGood(r, now, stateIncreaseRate, unknown)

oldRate := m.rate.Get()
Expand Down Expand Up @@ -559,7 +559,7 @@ func (m *MaxReplicationLagModule) minTestDurationUntilNextIncrease(increase floa
return minDuration
}

func (m *MaxReplicationLagModule) decreaseAndGuessRate(r *result, now time.Time, lagRecordNow replicationLagRecord) {
func (m *MaxReplicationLagModule) decreaseAndGuessRate(r *Result, now time.Time, lagRecordNow replicationLagRecord) {
// Guess replication rate based on the difference in the replication lag of this
// particular replica.
lagRecordBefore := m.lagCache(lagRecordNow).atOrAfter(discovery.TabletToMapKey(lagRecordNow.Tablet), m.lastRateChange)
Expand Down Expand Up @@ -630,7 +630,7 @@ func (m *MaxReplicationLagModule) decreaseAndGuessRate(r *result, now time.Time,
// guessReplicationRate guesses the actual replication rate based on the new bac
// Note that "lagDifference" can be positive (lag increased) or negative (lag
// decreased).
func (m *MaxReplicationLagModule) guessReplicationRate(r *result, avgPrimaryRate float64, lagBefore, lagNow int64, lagDifference, d time.Duration) (int64, string) {
func (m *MaxReplicationLagModule) guessReplicationRate(r *Result, avgPrimaryRate float64, lagBefore, lagNow int64, lagDifference, d time.Duration) (int64, string) {
// avgReplicationRate is the average rate (per second) at which the replica
// applied transactions from the replication stream. We infer the value
// from the relative change in the replication lag.
Expand Down Expand Up @@ -675,14 +675,14 @@ func (m *MaxReplicationLagModule) guessReplicationRate(r *result, avgPrimaryRate
return int64(newRate), reason
}

func (m *MaxReplicationLagModule) emergency(r *result, now time.Time, lagRecordNow replicationLagRecord) {
func (m *MaxReplicationLagModule) emergency(r *Result, now time.Time, lagRecordNow replicationLagRecord) {
m.markCurrentRateAsBadOrGood(r, now, stateEmergency, unknown)

decreaseReason := fmt.Sprintf("replication lag went beyond max: %d > %d", lagRecordNow.lag(), m.config.MaxReplicationLagSec)
m.decreaseRateByPercentage(r, now, lagRecordNow, stateEmergency, m.config.EmergencyDecrease, decreaseReason)
}

func (m *MaxReplicationLagModule) decreaseRateByPercentage(r *result, now time.Time, lagRecordNow replicationLagRecord, newState state, decrease float64, decreaseReason string) {
func (m *MaxReplicationLagModule) decreaseRateByPercentage(r *Result, now time.Time, lagRecordNow replicationLagRecord, newState state, decrease float64, decreaseReason string) {
oldRate := m.rate.Get()
rate := int64(float64(oldRate) - float64(oldRate)*decrease)
if rate == 0 {
Expand All @@ -694,7 +694,7 @@ func (m *MaxReplicationLagModule) decreaseRateByPercentage(r *result, now time.T
m.updateRate(r, newState, rate, reason, now, lagRecordNow, m.config.MinDurationBetweenDecreases())
}

func (m *MaxReplicationLagModule) updateRate(r *result, newState state, rate int64, reason string, now time.Time, lagRecordNow replicationLagRecord, testDuration time.Duration) {
func (m *MaxReplicationLagModule) updateRate(r *Result, newState state, rate int64, reason string, now time.Time, lagRecordNow replicationLagRecord, testDuration time.Duration) {
oldRate := m.rate.Get()

m.currentState = newState
Expand Down Expand Up @@ -722,7 +722,7 @@ func (m *MaxReplicationLagModule) updateRate(r *result, newState state, rate int

// markCurrentRateAsBadOrGood determines the actual rate between the last rate
// change and "now" and determines if that rate was bad or good.
func (m *MaxReplicationLagModule) markCurrentRateAsBadOrGood(r *result, now time.Time, newState state, replicationLagChange replicationLagChange) {
func (m *MaxReplicationLagModule) markCurrentRateAsBadOrGood(r *Result, now time.Time, newState state, replicationLagChange replicationLagChange) {
if m.lastRateChange.IsZero() {
// Module was just started. We don't have any data points yet.
r.GoodOrBad = ignoredRate
Expand Down Expand Up @@ -796,6 +796,6 @@ func (m *MaxReplicationLagModule) markCurrentRateAsBadOrGood(r *result, now time
}
}

func (m *MaxReplicationLagModule) log() []result {
func (m *MaxReplicationLagModule) log() []Result {
return m.results.latestValues()
}
24 changes: 12 additions & 12 deletions go/vt/throttler/result.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,10 @@ state (old/tested/new): {{.OldState}}/{{.TestedState}}/{{.NewState}}
lag before: {{.LagBefore}} ({{.AgeOfBeforeLag}} ago) rates (primary/replica): {{.PrimaryRate}}/{{.GuessedReplicationRate}} backlog (old/new): {{.GuessedReplicationBacklogOld}}/{{.GuessedReplicationBacklogNew}}
reason: {{.Reason}}`))

// result is generated by the MaxReplicationLag module for each processed
// Result is generated by the MaxReplicationLag module for each processed
// "replicationLagRecord".
// It captures the details and the decision of the processing.
type result struct {
type Result struct {
Now time.Time
RateChange rateChange
lastRateChange time.Time
Expand All @@ -80,33 +80,33 @@ type result struct {
GuessedReplicationBacklogNew int
}

func (r result) String() string {
func (r Result) String() string {
var b bytes.Buffer
if err := resultStringTemplate.Execute(&b, r); err != nil {
panic(fmt.Sprintf("failed to Execute() template: %v", err))
}
return b.String()
}

func (r result) Alias() string {
func (r Result) Alias() string {
return topoproto.TabletAliasString(r.LagRecordNow.Tablet.Alias)
}

func (r result) TimeSinceLastRateChange() string {
func (r Result) TimeSinceLastRateChange() string {
if r.lastRateChange.IsZero() {
return "n/a"
}
return fmt.Sprintf("%.1fs", r.Now.Sub(r.lastRateChange).Seconds())
}

func (r result) LagBefore() string {
func (r Result) LagBefore() string {
if r.LagRecordBefore.isZero() {
return "n/a"
}
return fmt.Sprintf("%ds", r.LagRecordBefore.Stats.ReplicationLagSeconds)
}

func (r result) AgeOfBeforeLag() string {
func (r Result) AgeOfBeforeLag() string {
if r.LagRecordBefore.isZero() {
return "n/a"
}
Expand All @@ -123,18 +123,18 @@ type resultRing struct {
// started reusing entries.
wrapped bool
// values is the underlying ring buffer.
values []result
values []Result
}

// newResultRing creates a new resultRing.
func newResultRing(capacity int) *resultRing {
return &resultRing{
values: make([]result, capacity),
values: make([]Result, capacity),
}
}

// add inserts a new result into the ring buffer.
func (rr *resultRing) add(r result) {
func (rr *resultRing) add(r Result) {
rr.mu.Lock()
defer rr.mu.Unlock()

Expand All @@ -148,7 +148,7 @@ func (rr *resultRing) add(r result) {

// latestValues returns all values of the buffer. Entries are sorted in reverse
// chronological order i.e. newer items come first.
func (rr *resultRing) latestValues() []result {
func (rr *resultRing) latestValues() []Result {
rr.mu.Lock()
defer rr.mu.Unlock()

Expand All @@ -162,7 +162,7 @@ func (rr *resultRing) latestValues() []result {
count = rr.position
}

results := make([]result, count)
results := make([]Result, count)
for i := 0; i < count; i++ {
pos := start - i
if pos < 0 {
Expand Down
20 changes: 10 additions & 10 deletions go/vt/throttler/result_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import (
)

var (
resultIncreased = result{
resultIncreased = Result{
Now: sinceZero(1234 * time.Millisecond),
RateChange: increasedRate,
lastRateChange: sinceZero(1 * time.Millisecond),
Expand All @@ -45,7 +45,7 @@ var (
GuessedReplicationBacklogOld: 0,
GuessedReplicationBacklogNew: 0,
}
resultDecreased = result{
resultDecreased = Result{
Now: sinceZero(5000 * time.Millisecond),
RateChange: decreasedRate,
lastRateChange: sinceZero(1234 * time.Millisecond),
Expand All @@ -67,7 +67,7 @@ var (
GuessedReplicationBacklogOld: 10,
GuessedReplicationBacklogNew: 20,
}
resultEmergency = result{
resultEmergency = Result{
Now: sinceZero(10123 * time.Millisecond),
RateChange: decreasedRate,
lastRateChange: sinceZero(5000 * time.Millisecond),
Expand All @@ -93,7 +93,7 @@ var (

func TestResultString(t *testing.T) {
testcases := []struct {
r result
r Result
want string
}{
{
Expand Down Expand Up @@ -135,27 +135,27 @@ reason: emergency state decreased the rate`,

func TestResultRing(t *testing.T) {
// Test data.
r1 := result{Reason: "r1"}
r2 := result{Reason: "r2"}
r3 := result{Reason: "r3"}
r1 := Result{Reason: "r1"}
r2 := Result{Reason: "r2"}
r3 := Result{Reason: "r3"}

rr := newResultRing(2)

// Use the ring partially.
rr.add(r1)
if got, want := rr.latestValues(), []result{r1}; !reflect.DeepEqual(got, want) {
if got, want := rr.latestValues(), []Result{r1}; !reflect.DeepEqual(got, want) {
t.Fatalf("items not correctly added to resultRing. got = %v, want = %v", got, want)
}

// Use it fully.
rr.add(r2)
if got, want := rr.latestValues(), []result{r2, r1}; !reflect.DeepEqual(got, want) {
if got, want := rr.latestValues(), []Result{r2, r1}; !reflect.DeepEqual(got, want) {
t.Fatalf("items not correctly added to resultRing. got = %v, want = %v", got, want)
}

// Let it wrap.
rr.add(r3)
if got, want := rr.latestValues(), []result{r3, r2}; !reflect.DeepEqual(got, want) {
if got, want := rr.latestValues(), []Result{r3, r2}; !reflect.DeepEqual(got, want) {
t.Fatalf("resultRing did not wrap correctly. got = %v, want = %v", got, want)
}
}
Loading

0 comments on commit 940fb68

Please sign in to comment.