Skip to content

Commit

Permalink
controller: fix the low_ru request missed (#8368) (#8373)
Browse files Browse the repository at this point in the history
close #8349

controller: fix the low_ru request missed 

The problem is that `c.run.currentRequests` is shared by all groups.
If one group triggers a token request that isn't handled by the response, the other group's requests will be discarded.
Here, we do not discard the low_ru triggers.

Signed-off-by: ti-chi-bot <ti-community-prow-bot@tidb.io>
Signed-off-by: nolouch <nolouch@gmail.com>

Co-authored-by: ShuNing <nolouch@gmail.com>
Co-authored-by: nolouch <nolouch@gmail.com>
  • Loading branch information
ti-chi-bot and nolouch authored Jul 8, 2024
1 parent cfb671f commit cb8da1b
Show file tree
Hide file tree
Showing 9 changed files with 176 additions and 16 deletions.
5 changes: 3 additions & 2 deletions client/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,9 @@ require (
github.com/prometheus/client_model v0.5.0 // indirect
github.com/prometheus/common v0.46.0 // indirect
github.com/prometheus/procfs v0.12.0 // indirect
golang.org/x/net v0.20.0 // indirect
golang.org/x/sys v0.16.0 // indirect
github.com/stretchr/objx v0.5.0 // indirect
golang.org/x/net v0.23.0 // indirect
golang.org/x/sys v0.18.0 // indirect
golang.org/x/text v0.14.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20231106174013-bbf56f31fb17 // indirect
google.golang.org/protobuf v1.32.0 // indirect
Expand Down
9 changes: 5 additions & 4 deletions client/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ github.com/rogpeppe/go-internal v1.10.0/go.mod h1:UQnix2H7Ngw/k4C5ijL5+65zddjncj
github.com/sergi/go-diff v1.1.0/go.mod h1:STckp+ISIX8hZLjrqAeVduY0gWCT9IjLuqbuNXdaHfM=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/objx v0.5.0 h1:1zr/of2m5FGMsad5YfcqgdqdWrIhu+EBEJRhR1U7z/c=
github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
Expand Down Expand Up @@ -110,8 +111,8 @@ golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLL
golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM=
golang.org/x/net v0.20.0 h1:aCL9BSgETF1k+blQaYUBx9hJ9LOGP3gAVemcZlf1Kpo=
golang.org/x/net v0.20.0/go.mod h1:z8BVo6PvndSri0LbOE3hAn0apkU+1YvI6E70E9jsnvY=
golang.org/x/net v0.23.0 h1:7EYJ93RZ9vYSZAIb2x3lnuvqO5zneoD6IvWjuhfxjTs=
golang.org/x/net v0.23.0/go.mod h1:JKghWKKOSdJwpW2GEx0Ja7fmaKnMsbu+MWVZTokSYmg=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
Expand All @@ -122,8 +123,8 @@ golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.16.0 h1:xWw16ngr6ZMtmxDyKyIgsE93KNKz5HKmMa3b8ALHidU=
golang.org/x/sys v0.16.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.18.0 h1:DBdB3niSjOA/O0blCZBqDefyWNYveAYMNF1Wum0DYQ4=
golang.org/x/sys v0.18.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
Expand Down
12 changes: 9 additions & 3 deletions client/resource_group/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -323,9 +323,7 @@ func (c *ResourceGroupsController) Start(ctx context.Context) {
case notifyMsg := <-c.lowTokenNotifyChan:
c.executeOnAllGroups((*groupCostController).updateRunState)
c.executeOnAllGroups((*groupCostController).updateAvgRequestResourcePerSec)
if len(c.run.currentRequests) == 0 {
c.collectTokenBucketRequests(c.loopCtx, FromLowRU, lowToken /* select low tokens resource group */, notifyMsg)
}
c.collectTokenBucketRequests(c.loopCtx, FromLowRU, lowToken /* select low tokens resource group */, notifyMsg)
if c.run.inDegradedMode {
c.executeOnAllGroups((*groupCostController).applyDegradedMode)
}
Expand Down Expand Up @@ -1173,11 +1171,19 @@ func (gc *groupCostController) collectRequestAndConsumption(selectTyp selectType
switch selectTyp {
case periodicReport:
selected = selected || gc.shouldReportConsumption()
failpoint.Inject("triggerPeriodicReport", func(val failpoint.Value) {
selected = gc.name == val.(string)
})
fallthrough
case lowToken:
if counter.limiter.IsLowTokens() {
selected = true
}
failpoint.Inject("triggerLowRUReport", func(val failpoint.Value) {
if selectTyp == lowToken {
selected = gc.name == val.(string)
}
})
}
request := &rmpb.RequestUnitItem{
Type: typ,
Expand Down
139 changes: 139 additions & 0 deletions client/resource_group/controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,12 @@ import (
"testing"
"time"

"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/meta_storagepb"
rmpb "github.com/pingcap/kvproto/pkg/resource_manager"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
pd "github.com/tikv/pd/client"
"github.com/tikv/pd/client/errs"
)

Expand Down Expand Up @@ -132,3 +136,138 @@ func TestResourceGroupThrottledError(t *testing.T) {
re.Error(err)
re.True(errs.ErrClientResourceGroupThrottled.Equal(err))
}

// MockResourceGroupProvider is a mock implementation of the ResourceGroupProvider interface.
type MockResourceGroupProvider struct {
mock.Mock
}

func (m *MockResourceGroupProvider) GetResourceGroup(ctx context.Context, resourceGroupName string, opts ...pd.GetResourceGroupOption) (*rmpb.ResourceGroup, error) {
args := m.Called(ctx, resourceGroupName, opts)
return args.Get(0).(*rmpb.ResourceGroup), args.Error(1)
}

func (m *MockResourceGroupProvider) ListResourceGroups(ctx context.Context, opts ...pd.GetResourceGroupOption) ([]*rmpb.ResourceGroup, error) {
args := m.Called(ctx, opts)
return args.Get(0).([]*rmpb.ResourceGroup), args.Error(1)
}

func (m *MockResourceGroupProvider) AddResourceGroup(ctx context.Context, metaGroup *rmpb.ResourceGroup) (string, error) {
args := m.Called(ctx, metaGroup)
return args.String(0), args.Error(1)
}

func (m *MockResourceGroupProvider) ModifyResourceGroup(ctx context.Context, metaGroup *rmpb.ResourceGroup) (string, error) {
args := m.Called(ctx, metaGroup)
return args.String(0), args.Error(1)
}

func (m *MockResourceGroupProvider) DeleteResourceGroup(ctx context.Context, resourceGroupName string) (string, error) {
args := m.Called(ctx, resourceGroupName)
return args.String(0), args.Error(1)
}

func (m *MockResourceGroupProvider) AcquireTokenBuckets(ctx context.Context, request *rmpb.TokenBucketsRequest) ([]*rmpb.TokenBucketResponse, error) {
args := m.Called(ctx, request)
return args.Get(0).([]*rmpb.TokenBucketResponse), args.Error(1)
}

func (m *MockResourceGroupProvider) LoadResourceGroups(ctx context.Context) ([]*rmpb.ResourceGroup, int64, error) {
args := m.Called(ctx)
return args.Get(0).([]*rmpb.ResourceGroup), args.Get(1).(int64), args.Error(2)
}

func (m *MockResourceGroupProvider) Watch(ctx context.Context, key []byte, opts ...pd.OpOption) (chan []*meta_storagepb.Event, error) {
args := m.Called(ctx, key, opts)
return args.Get(0).(chan []*meta_storagepb.Event), args.Error(1)
}

func (m *MockResourceGroupProvider) Get(ctx context.Context, key []byte, opts ...pd.OpOption) (*meta_storagepb.GetResponse, error) {
args := m.Called(ctx, key, opts)
return args.Get(0).(*meta_storagepb.GetResponse), args.Error(1)
}

func TestControllerWithTwoGroupRequestConcurrency(t *testing.T) {
re := require.New(t)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
mockProvider := new(MockResourceGroupProvider)

mockProvider.On("Get", mock.Anything, mock.Anything, mock.Anything).Return(&meta_storagepb.GetResponse{}, nil)
// LoadResourceGroups
mockProvider.On("LoadResourceGroups", mock.Anything).Return([]*rmpb.ResourceGroup{}, int64(0), nil)
// Watch
mockProvider.On("Watch", mock.Anything, mock.Anything, mock.Anything).Return(make(chan []*meta_storagepb.Event), nil)

re.NoError(failpoint.Enable("github.com/tikv/pd/client/resource_group/controller/triggerPeriodicReport", fmt.Sprintf("return(\"%s\")", "default")))
defer failpoint.Disable("github.com/tikv/pd/client/resource_group/controller/triggerPeriodicReport")
re.NoError(failpoint.Enable("github.com/tikv/pd/client/resource_group/controller/triggerLowRUReport", fmt.Sprintf("return(\"%s\")", "test-group")))
defer failpoint.Disable("github.com/tikv/pd/client/resource_group/controller/triggerLowRUReport")

controller, _ := NewResourceGroupController(ctx, 1, mockProvider, nil)
controller.Start(ctx)

defaultResourceGroup := &rmpb.ResourceGroup{Name: "default", Mode: rmpb.GroupMode_RUMode, RUSettings: &rmpb.GroupRequestUnitSettings{RU: &rmpb.TokenBucket{Settings: &rmpb.TokenLimitSettings{FillRate: 1000000}}}}
testResourceGroup := &rmpb.ResourceGroup{Name: "test-group", Mode: rmpb.GroupMode_RUMode, RUSettings: &rmpb.GroupRequestUnitSettings{RU: &rmpb.TokenBucket{Settings: &rmpb.TokenLimitSettings{FillRate: 1000000}}}}
mockProvider.On("GetResourceGroup", mock.Anything, "default", mock.Anything).Return(defaultResourceGroup, nil)
mockProvider.On("GetResourceGroup", mock.Anything, "test-group", mock.Anything).Return(testResourceGroup, nil)

c1, err := controller.tryGetResourceGroup(ctx, "default")
re.NoError(err)
re.Equal(defaultResourceGroup, c1.meta)

c2, err := controller.tryGetResourceGroup(ctx, "test-group")
re.NoError(err)
re.Equal(testResourceGroup, c2.meta)

var expectResp []*rmpb.TokenBucketResponse
recTestGroupAcquireTokenRequest := make(chan bool)
mockProvider.On("AcquireTokenBuckets", mock.Anything, mock.Anything).Run(func(args mock.Arguments) {
request := args.Get(1).(*rmpb.TokenBucketsRequest)
var responses []*rmpb.TokenBucketResponse
for _, req := range request.Requests {
if req.ResourceGroupName == "default" {
// no response the default group request, that's mean `len(c.run.currentRequests) != 0` always.
time.Sleep(100 * time.Second)
responses = append(responses, &rmpb.TokenBucketResponse{
ResourceGroupName: "default",
GrantedRUTokens: []*rmpb.GrantedRUTokenBucket{
{
GrantedTokens: &rmpb.TokenBucket{
Tokens: 100000,
},
},
},
})
} else {
responses = append(responses, &rmpb.TokenBucketResponse{
ResourceGroupName: req.ResourceGroupName,
GrantedRUTokens: []*rmpb.GrantedRUTokenBucket{
{
GrantedTokens: &rmpb.TokenBucket{
Tokens: 100000,
},
},
},
})
}
}
// receive test-group request
if len(request.Requests) == 1 && request.Requests[0].ResourceGroupName == "test-group" {
recTestGroupAcquireTokenRequest <- true
}
expectResp = responses
}).Return(expectResp, nil)
// wait default group request token by PeriodicReport.
time.Sleep(2 * time.Second)
counter := c2.run.requestUnitTokens[0]
counter.limiter.mu.Lock()
counter.limiter.notify()
counter.limiter.mu.Unlock()
select {
case res := <-recTestGroupAcquireTokenRequest:
re.True(res)
case <-time.After(5 * time.Second):
re.Fail("timeout")
}
}
15 changes: 14 additions & 1 deletion client/resource_group/controller/limiter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,18 @@ var (
t8 = t0.Add(time.Duration(8) * d)
)

func resetTime() {
t0 = time.Now()
t1 = t0.Add(time.Duration(1) * d)
t2 = t0.Add(time.Duration(2) * d)
t3 = t0.Add(time.Duration(3) * d)
t4 = t0.Add(time.Duration(4) * d)
t5 = t0.Add(time.Duration(5) * d)
t6 = t0.Add(time.Duration(6) * d)
t7 = t0.Add(time.Duration(7) * d)
t8 = t0.Add(time.Duration(8) * d)
}

type request struct {
t time.Time
n float64
Expand Down Expand Up @@ -144,6 +156,7 @@ func TestNotify(t *testing.T) {
}

func TestCancel(t *testing.T) {
resetTime()
ctx := context.Background()
ctx1, cancel1 := context.WithDeadline(ctx, t2)
re := require.New(t)
Expand All @@ -161,8 +174,8 @@ func TestCancel(t *testing.T) {
checkTokens(re, lim1, t2, 7)
checkTokens(re, lim2, t2, 2)
d, err := WaitReservations(ctx, t2, []*Reservation{r1, r2})
re.Equal(4*time.Second, d)
re.Error(err)
re.Equal(4*time.Second, d)
re.Contains(err.Error(), "estimated wait time 4s, ltb state is 1.00:-4.00")
checkTokens(re, lim1, t3, 13)
checkTokens(re, lim2, t3, 3)
Expand Down
2 changes: 1 addition & 1 deletion tests/integrations/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ require (
golang.org/x/crypto v0.21.0 // indirect
golang.org/x/exp v0.0.0-20230711005742-c3f37128e5a4 // indirect
golang.org/x/image v0.10.0 // indirect
golang.org/x/net v0.22.0 // indirect
golang.org/x/net v0.23.0 // indirect
golang.org/x/oauth2 v0.18.0 // indirect
golang.org/x/sync v0.6.0 // indirect
golang.org/x/sys v0.18.0 // indirect
Expand Down
4 changes: 2 additions & 2 deletions tests/integrations/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -598,8 +598,8 @@ golang.org/x/net v0.0.0-20210805182204-aaa1db679c0d/go.mod h1:9nx3DQGgdP8bBQD5qx
golang.org/x/net v0.0.0-20211123203042-d83791d6bcd9/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c=
golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs=
golang.org/x/net v0.22.0 h1:9sGLhx7iRIHEiX0oAJ3MRZMUCElJgy7Br1nO+AMN3Tc=
golang.org/x/net v0.22.0/go.mod h1:JKghWKKOSdJwpW2GEx0Ja7fmaKnMsbu+MWVZTokSYmg=
golang.org/x/net v0.23.0 h1:7EYJ93RZ9vYSZAIb2x3lnuvqO5zneoD6IvWjuhfxjTs=
golang.org/x/net v0.23.0/go.mod h1:JKghWKKOSdJwpW2GEx0Ja7fmaKnMsbu+MWVZTokSYmg=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
golang.org/x/oauth2 v0.0.0-20200107190931-bf48bf16ab8d/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
golang.org/x/oauth2 v0.18.0 h1:09qnuIAgzdx1XplqJvW6CQqMCtGZykZWcXzPMPUusvI=
Expand Down
2 changes: 1 addition & 1 deletion tools/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ require (
golang.org/x/crypto v0.21.0 // indirect
golang.org/x/exp v0.0.0-20230711005742-c3f37128e5a4 // indirect
golang.org/x/image v0.10.0 // indirect
golang.org/x/net v0.22.0 // indirect
golang.org/x/net v0.23.0 // indirect
golang.org/x/oauth2 v0.18.0 // indirect
golang.org/x/sync v0.6.0 // indirect
golang.org/x/sys v0.18.0 // indirect
Expand Down
4 changes: 2 additions & 2 deletions tools/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -595,8 +595,8 @@ golang.org/x/net v0.0.0-20210805182204-aaa1db679c0d/go.mod h1:9nx3DQGgdP8bBQD5qx
golang.org/x/net v0.0.0-20211123203042-d83791d6bcd9/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c=
golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs=
golang.org/x/net v0.22.0 h1:9sGLhx7iRIHEiX0oAJ3MRZMUCElJgy7Br1nO+AMN3Tc=
golang.org/x/net v0.22.0/go.mod h1:JKghWKKOSdJwpW2GEx0Ja7fmaKnMsbu+MWVZTokSYmg=
golang.org/x/net v0.23.0 h1:7EYJ93RZ9vYSZAIb2x3lnuvqO5zneoD6IvWjuhfxjTs=
golang.org/x/net v0.23.0/go.mod h1:JKghWKKOSdJwpW2GEx0Ja7fmaKnMsbu+MWVZTokSYmg=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
golang.org/x/oauth2 v0.0.0-20200107190931-bf48bf16ab8d/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
golang.org/x/oauth2 v0.18.0 h1:09qnuIAgzdx1XplqJvW6CQqMCtGZykZWcXzPMPUusvI=
Expand Down

0 comments on commit cb8da1b

Please sign in to comment.