Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

make ResourceControlInterceptor atomic #848

Merged
merged 2 commits into from
Jun 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 8 additions & 5 deletions internal/client/client_interceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import (

func init() {
ResourceControlSwitch.Store(false)
ResourceControlInterceptor = nil
}

var _ Client = interceptedClient{}
Expand Down Expand Up @@ -77,7 +76,7 @@ var (
// ResourceControlSwitch is used to control whether to enable the resource control.
ResourceControlSwitch atomic.Value
// ResourceControlInterceptor is used to build the resource control interceptor.
ResourceControlInterceptor resourceControlClient.ResourceGroupKVInterceptor
ResourceControlInterceptor atomic.Pointer[resourceControlClient.ResourceGroupKVInterceptor]
)

// buildResourceControlInterceptor builds a resource control interceptor with
Expand All @@ -95,16 +94,20 @@ func buildResourceControlInterceptor(
if len(resourceGroupName) == 0 {
return nil
}

rcInterceptor := ResourceControlInterceptor.Load()
// No resource group interceptor is set.
if ResourceControlInterceptor == nil {
if rcInterceptor == nil {
return nil
}
resourceControlInterceptor := *rcInterceptor

// Make the request info.
reqInfo := resourcecontrol.MakeRequestInfo(req)
// Build the interceptor.
interceptFn := func(next interceptor.RPCInterceptorFunc) interceptor.RPCInterceptorFunc {
return func(target string, req *tikvrpc.Request) (*tikvrpc.Response, error) {
consumption, penalty, err := ResourceControlInterceptor.OnRequestWait(ctx, resourceGroupName, reqInfo)
consumption, penalty, err := resourceControlInterceptor.OnRequestWait(ctx, resourceGroupName, reqInfo)
if err != nil {
return nil, err
}
Expand All @@ -113,7 +116,7 @@ func buildResourceControlInterceptor(
resp, err := next(target, req)
if resp != nil {
respInfo := resourcecontrol.MakeResponseInfo(resp)
consumption, err = ResourceControlInterceptor.OnResponse(resourceGroupName, reqInfo, respInfo)
consumption, err = resourceControlInterceptor.OnResponse(resourceGroupName, reqInfo, respInfo)
if err != nil {
return nil, err
}
Expand Down
4 changes: 2 additions & 2 deletions tikv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -705,12 +705,12 @@ func DisableResourceControl() {

// SetResourceControlInterceptor sets the interceptor for resource control.
func SetResourceControlInterceptor(interceptor resourceControlClient.ResourceGroupKVInterceptor) {
client.ResourceControlInterceptor = interceptor
client.ResourceControlInterceptor.Store(&interceptor)
}

// UnsetResourceControlInterceptor un-sets the interceptor for resource control.
func UnsetResourceControlInterceptor() {
client.ResourceControlInterceptor = nil
client.ResourceControlInterceptor.Store(nil)
}

// Variables defines the variables used by TiKV storage.
Expand Down