Skip to content

Commit

Permalink
Fix backport merge conflicts
Browse files Browse the repository at this point in the history
Signed-off-by: Eduardo J. Ortega U <5791035+ejortegau@users.noreply.github.com>
  • Loading branch information
ejortegau committed Mar 21, 2024
1 parent 17f44c2 commit 1edee0a
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 211 deletions.
135 changes: 9 additions & 126 deletions go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"sync"
"sync/atomic"
"time"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"

"google.golang.org/protobuf/encoding/prototext"
"google.golang.org/protobuf/proto"
Expand Down Expand Up @@ -91,7 +92,6 @@ type ThrottlerInterface interface {
UpdateConfiguration(configuration *throttlerdatapb.Configuration, copyZeroValues bool) error
ResetConfiguration()
MaxLag(tabletType topodatapb.TabletType) uint32
<<<<<<< HEAD
}

// TopologyWatcherInterface defines the public interface that is implemented by
Expand All @@ -100,8 +100,6 @@ type ThrottlerInterface interface {
type TopologyWatcherInterface interface {
Start()
Stop()
=======
>>>>>>> 2b25639f25 (TxThrottler: dont throttle unless lag (#14789))
}

// TxThrottlerName is the name the wrapped go/vt/throttler object will be registered with
Expand Down Expand Up @@ -182,23 +180,11 @@ type txThrottlerState struct {
stopHealthCheck context.CancelFunc

healthCheck discovery.HealthCheck
<<<<<<< HEAD
topologyWatchers []TopologyWatcherInterface
=======
healthCheckChan chan *discovery.TabletHealth
healthCheckCells []string
cellsFromTopo bool

// tabletTypes stores the tablet types for throttling
tabletTypes map[topodatapb.TabletType]bool

maxLag int64
done chan bool
waitForTermination sync.WaitGroup
<<<<<<< HEAD
>>>>>>> 2b25639f25 (TxThrottler: dont throttle unless lag (#14789))
=======
>>>>>>> 2b25639f25 (TxThrottler: dont throttle unless lag (#14789))
}

// NewTxThrottler tries to construct a txThrottler from the
Expand Down Expand Up @@ -341,10 +327,10 @@ func newTxThrottlerState(topoServer *topo.Server, config *txThrottlerConfig, tar
result := &txThrottlerState{
config: config,
throttler: t,
done: make(chan bool, 2),
}
createTxThrottlerHealthCheck(topoServer, config, result, target.Cell)

<<<<<<< HEAD
result.topologyWatchers = make(
[]TopologyWatcherInterface, 0, len(config.healthCheckCells))
for _, cell := range config.healthCheckCells {
Expand All @@ -358,30 +344,12 @@ func newTxThrottlerState(topoServer *topo.Server, config *txThrottlerConfig, tar
target.Shard,
discovery.DefaultTopologyWatcherRefreshInterval,
discovery.DefaultTopoReadConcurrency))
=======
state := &txThrottlerStateImpl{
config: config,
healthCheckCells: config.TxThrottlerHealthCheckCells,
tabletTypes: tabletTypes,
throttler: t,
txThrottler: txThrottler,
done: make(chan bool, 1),
}

// get cells from topo if none defined in tabletenv config
if len(state.healthCheckCells) == 0 {
ctx, cancel := context.WithTimeout(context.Background(), topo.RemoteOperationTimeout)
defer cancel()
state.healthCheckCells = fetchKnownCells(ctx, txThrottler.topoServer, target)
state.cellsFromTopo = true
>>>>>>> 2b25639f25 (TxThrottler: dont throttle unless lag (#14789))
}
return result, nil
}

func createTxThrottlerHealthCheck(topoServer *topo.Server, config *txThrottlerConfig, result *txThrottlerState, cell string) {
ctx, cancel := context.WithCancel(context.Background())
<<<<<<< HEAD
result.stopHealthCheck = cancel
result.healthCheck = healthCheckFactory(topoServer, cell, config.healthCheckCells)
ch := result.healthCheck.Subscribe()
Expand All @@ -393,59 +361,8 @@ func createTxThrottlerHealthCheck(topoServer *topo.Server, config *txThrottlerCo
case th := <-ch:
result.StatsUpdate(th)
}
=======
state.stopHealthCheck = cancel
state.initHealthCheckStream(txThrottler.topoServer, target)
go state.healthChecksProcessor(ctx, txThrottler.topoServer, target)
state.waitForTermination.Add(1)
go state.updateMaxLag()

return state, nil
}

func (ts *txThrottlerStateImpl) initHealthCheckStream(topoServer *topo.Server, target *querypb.Target) {
ts.healthCheck = healthCheckFactory(topoServer, target.Cell, ts.healthCheckCells)
ts.healthCheckChan = ts.healthCheck.Subscribe()

}

func (ts *txThrottlerStateImpl) closeHealthCheckStream() {
if ts.healthCheck == nil {
return
}
ts.stopHealthCheck()
ts.healthCheck.Close()
}

func (ts *txThrottlerStateImpl) updateHealthCheckCells(ctx context.Context, topoServer *topo.Server, target *querypb.Target) {
fetchCtx, cancel := context.WithTimeout(ctx, topo.RemoteOperationTimeout)
defer cancel()

knownCells := fetchKnownCells(fetchCtx, topoServer, target)
if !reflect.DeepEqual(knownCells, ts.healthCheckCells) {
log.Info("txThrottler: restarting healthcheck stream due to topology cells update")
ts.healthCheckCells = knownCells
ts.closeHealthCheckStream()
ts.initHealthCheckStream(topoServer, target)
}
}

func (ts *txThrottlerStateImpl) healthChecksProcessor(ctx context.Context, topoServer *topo.Server, target *querypb.Target) {
var cellsUpdateTicks <-chan time.Time
if ts.cellsFromTopo {
ticker := time.NewTicker(ts.config.TxThrottlerTopoRefreshInterval)
cellsUpdateTicks = ticker.C
defer ticker.Stop()
}
for {
select {
case <-ctx.Done():
return
case <-cellsUpdateTicks:
ts.updateHealthCheckCells(ctx, topoServer, target)
case th := <-ts.healthCheckChan:
ts.StatsUpdate(th)
>>>>>>> 2b25639f25 (TxThrottler: dont throttle unless lag (#14789))
result.waitForTermination.Add(1)
go result.updateMaxLag()
}
}(ctx)
}
Expand All @@ -461,49 +378,22 @@ func (ts *txThrottlerState) throttle() bool {

maxLag := atomic.LoadInt64(&ts.maxLag)

return maxLag > ts.config.TxThrottlerConfig.TargetReplicationLagSec &&
return maxLag > ts.config.throttlerConfig.TargetReplicationLagSec &&
ts.throttler.Throttle(0 /* threadId */) > 0
<<<<<<< HEAD
=======
}

func (ts *txThrottlerStateImpl) updateMaxLag() {
func (ts *txThrottlerState) updateMaxLag() {
defer ts.waitForTermination.Done()
// We use half of the target lag to ensure we have enough resolution to see changes in lag below that value
ticker := time.NewTicker(time.Duration(ts.config.TxThrottlerConfig.TargetReplicationLagSec/2) * time.Second)
ticker := time.NewTicker(time.Duration(ts.config.throttlerConfig.TargetReplicationLagSec/2) * time.Second)
defer ticker.Stop()
outerloop:
for {
select {
case <-ticker.C:
var maxLag uint32

for tabletType := range ts.tabletTypes {
maxLagPerTabletType := ts.throttler.MaxLag(tabletType)
if maxLagPerTabletType > maxLag {
maxLag = maxLagPerTabletType
}
}
atomic.StoreInt64(&ts.maxLag, int64(maxLag))
case <-ts.done:
break outerloop
}
}
>>>>>>> 2b25639f25 (TxThrottler: dont throttle unless lag (#14789))
}

func (ts *txThrottlerStateImpl) updateMaxLag() {
defer ts.waitForTermination.Done()
// We use half of the target lag to ensure we have enough resolution to see changes in lag below that value
ticker := time.NewTicker(time.Duration(ts.config.TxThrottlerConfig.TargetReplicationLagSec/2) * time.Second)
defer ticker.Stop()
outerloop:
for {
select {
case <-ticker.C:
var maxLag uint32

for tabletType := range ts.tabletTypes {
for _, tabletType := range *ts.config.tabletTypes {
maxLagPerTabletType := ts.throttler.MaxLag(tabletType)
if maxLagPerTabletType > maxLag {
maxLag = maxLagPerTabletType
Expand All @@ -528,16 +418,9 @@ func (ts *txThrottlerState) deallocateResources() {
ts.healthCheck.Close()
ts.healthCheck = nil

<<<<<<< HEAD
<<<<<<< HEAD
// After ts.healthCheck is closed txThrottlerState.StatsUpdate() is guaranteed not
=======
=======
>>>>>>> 2b25639f25 (TxThrottler: dont throttle unless lag (#14789))
ts.done <- true
ts.waitForTermination.Wait()
// After ts.healthCheck is closed txThrottlerStateImpl.StatsUpdate() is guaranteed not
>>>>>>> 2b25639f25 (TxThrottler: dont throttle unless lag (#14789))
// After ts.healthCheck is closed txThrottlerState.StatsUpdate() is guaranteed not
// to be executing, so we can safely close the throttler.
ts.throttler.Close()
ts.throttler = nil
Expand Down
103 changes: 18 additions & 85 deletions go/vt/vttablet/tabletserver/txthrottler/tx_throttler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,7 @@ package txthrottler
//go:generate mockgen -destination mock_topology_watcher_test.go -package txthrottler vitess.io/vitess/go/vt/vttablet/tabletserver/txthrottler TopologyWatcherInterface

import (
<<<<<<< HEAD
=======
"context"
"sync/atomic"
<<<<<<< HEAD
>>>>>>> 2b25639f25 (TxThrottler: dont throttle unless lag (#14789))
=======
>>>>>>> 2b25639f25 (TxThrottler: dont throttle unless lag (#14789))
"testing"
"time"

Expand Down Expand Up @@ -58,15 +51,7 @@ func TestDisabledThrottler(t *testing.T) {
Shard: "shard",
})
assert.Nil(t, throttler.Open())
<<<<<<< HEAD
<<<<<<< HEAD
assert.False(t, throttler.Throttle(0))
=======
assert.False(t, throttler.Throttle(0, "some-workload"))
>>>>>>> 2b25639f25 (TxThrottler: dont throttle unless lag (#14789))
=======
assert.False(t, throttler.Throttle(0, "some-workload"))
>>>>>>> 2b25639f25 (TxThrottler: dont throttle unless lag (#14789))
throttlerImpl, _ := throttler.(*txThrottler)
assert.Zero(t, throttlerImpl.throttlerRunning.Get())
throttler.Close()
Expand Down Expand Up @@ -127,24 +112,6 @@ func TestEnabledThrottler(t *testing.T) {
call = mockThrottler.EXPECT().Throttle(0)
call.Return(1 * time.Second)
calls = append(calls, call)
<<<<<<< HEAD
=======

// 3
// Nothing gets mocked here because the order of evaluation in txThrottler.Throttle() evaluates first
// whether the priority allows for throttling or not, so no need to mock calls in mockThrottler.Throttle()

// 4
// Nothing gets mocked here because the order of evaluation in txThrottlerStateImpl.Throttle() evaluates first
// whether there is lag or not, so no call to the underlying mockThrottler is issued.

call = mockThrottler.EXPECT().Close()
calls = append(calls, call)

for i := 1; i < len(calls); i++ {
calls[i].After(calls[i-1])
}
>>>>>>> 2b25639f25 (TxThrottler: dont throttle unless lag (#14789))

// 3
// Nothing gets mocked here because the order of evaluation in txThrottler.Throttle() evaluates first
Expand All @@ -167,86 +134,52 @@ func TestEnabledThrottler(t *testing.T) {
config.TxThrottlerTabletTypes = &topoproto.TabletTypeListFlag{topodatapb.TabletType_REPLICA}

env := tabletenv.NewEnv(config, t.Name())
throttler, err := tryCreateTxThrottler(env, ts)
throttlerImpl, err := tryCreateTxThrottler(env, ts)
assert.Nil(t, err)
throttler.InitDBConfig(&querypb.Target{
throttlerImpl.InitDBConfig(&querypb.Target{
Keyspace: "keyspace",
Shard: "shard",
})
assert.Nil(t, throttler.Open())
assert.Equal(t, int64(1), throttler.throttlerRunning.Get())

<<<<<<< HEAD
assert.False(t, throttler.Throttle(100))
assert.Equal(t, int64(1), throttler.requestsTotal.Get())
assert.Zero(t, throttler.requestsThrottled.Get())

throttler.state.StatsUpdate(tabletStats) // This calls replication lag thing
=======
assert.Nil(t, throttlerImpl.Open())
throttlerStateImpl, ok := throttlerImpl.state.(*txThrottlerStateImpl)
assert.True(t, ok)
assert.Equal(t, map[topodatapb.TabletType]bool{topodatapb.TabletType_REPLICA: true}, throttlerStateImpl.tabletTypes)
assert.Equal(t, int64(1), throttlerImpl.throttlerRunning.Get())

// Stop the go routine that keeps updating the cached shard's max lag to prevent it from changing the value in a
// way that will interfere with how we manipulate that value in our tests to evaluate different cases:
throttlerStateImpl.done <- true
throttlerImpl.state.done <- true

// 1 should not throttle due to return value of underlying Throttle(), despite high lag
atomic.StoreInt64(&throttlerStateImpl.maxLag, 20)
assert.False(t, throttlerImpl.Throttle(100, "some-workload"))
assert.Equal(t, int64(1), throttlerImpl.requestsTotal.Counts()["some-workload"])
assert.Zero(t, throttlerImpl.requestsThrottled.Counts()["some-workload"])
atomic.StoreInt64(&throttlerImpl.state.maxLag, 20)
assert.False(t, throttlerImpl.Throttle(100))
assert.Equal(t, int64(1), throttlerImpl.requestsTotal.Get())
assert.Zero(t, throttlerImpl.requestsThrottled.Get())

throttlerImpl.state.StatsUpdate(tabletStats) // This calls replication lag thing
assert.Equal(t, map[string]int64{"cell1.REPLICA": 1}, throttlerImpl.healthChecksReadTotal.Counts())
assert.Equal(t, map[string]int64{"cell1.REPLICA": 1}, throttlerImpl.healthChecksRecordedTotal.Counts())
>>>>>>> 2b25639f25 (TxThrottler: dont throttle unless lag (#14789))
rdonlyTabletStats := &discovery.TabletHealth{
Target: &querypb.Target{
TabletType: topodatapb.TabletType_RDONLY,
},
}
<<<<<<< HEAD
// This call should not be forwarded to the go/vt/throttler.Throttler object.
throttler.state.StatsUpdate(rdonlyTabletStats)
// The second throttle call should reject.
assert.True(t, throttler.Throttle(100))
assert.Equal(t, int64(2), throttler.requestsTotal.Get())
assert.Equal(t, int64(1), throttler.requestsThrottled.Get())

// This call should not throttle due to priority. Check that's the case and counters agree.
assert.False(t, throttler.Throttle(0))
assert.Equal(t, int64(3), throttler.requestsTotal.Get())
assert.Equal(t, int64(1), throttler.requestsThrottled.Get())
throttler.Close()
assert.Zero(t, throttler.throttlerRunning.Get())
=======
// This call should not be forwarded to the go/vt/throttlerImpl.Throttler object.
throttlerImpl.state.StatsUpdate(rdonlyTabletStats)
assert.Equal(t, map[string]int64{"cell1.REPLICA": 1, "cell2.RDONLY": 1}, throttlerImpl.healthChecksReadTotal.Counts())
assert.Equal(t, map[string]int64{"cell1.REPLICA": 1}, throttlerImpl.healthChecksRecordedTotal.Counts())

// 2 should throttle due to return value of underlying Throttle(), high lag & priority = 100
assert.True(t, throttlerImpl.Throttle(100, "some-workload"))
assert.Equal(t, int64(2), throttlerImpl.requestsTotal.Counts()["some-workload"])
assert.Equal(t, int64(1), throttlerImpl.requestsThrottled.Counts()["some-workload"])
assert.True(t, throttlerImpl.Throttle(100))
assert.Equal(t, int64(2), throttlerImpl.requestsTotal.Get())
assert.Equal(t, int64(1), throttlerImpl.requestsThrottled.Get())

// 3 should not throttle despite return value of underlying Throttle() and high lag, due to priority = 0
assert.False(t, throttlerImpl.Throttle(0, "some-workload"))
assert.Equal(t, int64(3), throttlerImpl.requestsTotal.Counts()["some-workload"])
assert.Equal(t, int64(1), throttlerImpl.requestsThrottled.Counts()["some-workload"])
assert.False(t, throttlerImpl.Throttle(0))
assert.Equal(t, int64(3), throttlerImpl.requestsTotal.Get())
assert.Equal(t, int64(1), throttlerImpl.requestsThrottled.Get())

// 4 should not throttle despite return value of underlying Throttle() and priority = 100, due to low lag
atomic.StoreInt64(&throttlerStateImpl.maxLag, 1)
assert.False(t, throttler.Throttle(100, "some-workload"))
assert.Equal(t, int64(4), throttlerImpl.requestsTotal.Counts()["some-workload"])
assert.Equal(t, int64(1), throttlerImpl.requestsThrottled.Counts()["some-workload"])
atomic.StoreInt64(&throttlerImpl.state.maxLag, 1)
assert.False(t, throttlerImpl.Throttle(100))
assert.Equal(t, int64(4), throttlerImpl.requestsTotal.Get())
assert.Equal(t, int64(1), throttlerImpl.requestsThrottled.Get())

throttler.Close()
throttlerImpl.Close()
assert.Zero(t, throttlerImpl.throttlerRunning.Get())
>>>>>>> 2b25639f25 (TxThrottler: dont throttle unless lag (#14789))
}

func TestNewTxThrottler(t *testing.T) {
Expand Down

0 comments on commit 1edee0a

Please sign in to comment.