Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pace operation polling #401

Merged
merged 1 commit into from
Jul 17, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/glbc/app/clients.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func NewGCEClient() *gce.GCECloud {
if err == nil {
cloud := provider.(*gce.GCECloud)
// Configure GCE rate limiting
rl, err := ratelimit.NewGCERateLimiter(flags.F.GCERateLimit.Values())
rl, err := ratelimit.NewGCERateLimiter(flags.F.GCERateLimit.Values(), flags.F.GCEOperationPollInterval)
if err != nil {
glog.Fatalf("Error configuring rate limiting: %v", err)
}
Expand Down
5 changes: 4 additions & 1 deletion pkg/flags/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ var (
DefaultSvcPortName string
DeleteAllOnQuit bool
GCERateLimit RateLimitSpecs
GCEOperationPollInterval time.Duration
HealthCheckPath string
HealthzPort int
Features *Features
Expand Down Expand Up @@ -111,7 +112,7 @@ func defaultLeaderElectionConfiguration() LeaderElectionConfiguration {

func init() {
F.NodePortRanges.ports = []string{DefaultNodePortRange}
F.GCERateLimit.specs = []string{"alpha.Operations.Get,qps,10,100", "beta.Operations.Get,qps,10,100", "ga.Operations.Get,qps,10,100"}
F.GCERateLimit.specs = []string{"alpha.Operations.Get,qps,10,10", "beta.Operations.Get,qps,10,10", "ga.Operations.Get,qps,10,10"}
F.Features = EnabledFeatures()
F.LeaderElection = defaultLeaderElectionConfiguration()
}
Expand Down Expand Up @@ -180,6 +181,8 @@ specify this flag, the default is to rate limit Operations.Get for all versions.
If you do specify this flag one or more times, this default will be overwritten.
If you want to still use the default, simply specify it along with your other
values.`)
flag.DurationVar(&F.GCEOperationPollInterval, "gce-operation-poll-interval", time.Second,
`Minimum time between polling requests to GCE for checking the status of an operation.`)
flag.StringVar(&F.HealthCheckPath, "health-check-path", "/",
`Path used to health-check a backend service. All Services must serve a
200 page on this path. Currently this is only configurable globally.`)
Expand Down
50 changes: 33 additions & 17 deletions pkg/ratelimit/ratelimit.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"fmt"
"strconv"
"strings"
"time"

"github.com/golang/glog"
"k8s.io/client-go/util/flowcontrol"
Expand All @@ -32,12 +33,15 @@ import (
type GCERateLimiter struct {
// Map a RateLimitKey to its rate limiter implementation.
rateLimitImpls map[cloud.RateLimitKey]flowcontrol.RateLimiter
// Minimum polling interval for getting operations. Underlying operations rate limiter
// may increase the time.
operationPollInterval time.Duration
}

// NewGCERateLimiter parses the list of rate limiting specs passed in and
// returns a properly configured cloud.RateLimiter implementation.
// Expected format of specs: {"[version].[service].[operation],[type],[param1],[param2],..", "..."}
func NewGCERateLimiter(specs []string) (*GCERateLimiter, error) {
func NewGCERateLimiter(specs []string, operationPollInterval time.Duration) (*GCERateLimiter, error) {
rateLimitImpls := make(map[cloud.RateLimitKey]flowcontrol.RateLimiter)
// Within each specification, split on comma to get the operation,
// rate limiter type, and extra parameters.
Expand All @@ -62,27 +66,39 @@ func NewGCERateLimiter(specs []string) (*GCERateLimiter, error) {
if len(rateLimitImpls) == 0 {
return nil, nil
}
return &GCERateLimiter{rateLimitImpls}, nil
return &GCERateLimiter{
rateLimitImpls: rateLimitImpls,
operationPollInterval: operationPollInterval,
}, nil
}

// Implementation of cloud.RateLimiter
// Accept looks up the associated flowcontrol.RateLimiter (if exists) and waits on it.
func (l *GCERateLimiter) Accept(ctx context.Context, key *cloud.RateLimitKey) error {
ch := make(chan struct{})
go func() {
// Call flowcontrol.RateLimiter implementation.
impl := l.rateLimitImpl(key)
if impl != nil {
impl.Accept()
var rl cloud.RateLimiter

impl := l.rateLimitImpl(key)
if impl != nil {
// Wrap the flowcontrol.RateLimiter with a AcceptRateLimiter and handle context.
rl = &cloud.AcceptRateLimiter{Acceptor: impl}
} else {
// Check the context then use the cloud NopRateLimiter which accepts immediately.
select {
case <-ctx.Done():
return ctx.Err()
default:
}
close(ch)
}()
select {
case <-ch:
break
case <-ctx.Done():
return ctx.Err()
rl = &cloud.NopRateLimiter{}
}
return nil

if key.Operation == "Get" && key.Service == "Operations" {
// Wait a minimum amount of time regardless of rate limiter.
rl = &cloud.MinimumRateLimiter{
RateLimiter: rl,
Minimum: l.operationPollInterval,
}
}

return rl.Accept(ctx, key)
}

// rateLimitImpl returns the flowcontrol.RateLimiter implementation
Expand Down
5 changes: 3 additions & 2 deletions pkg/ratelimit/ratelimit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package ratelimit

import (
"testing"
"time"
)

func TestGCERateLimiter(t *testing.T) {
Expand All @@ -41,14 +42,14 @@ func TestGCERateLimiter(t *testing.T) {
}

for _, testCase := range validTestCases {
_, err := NewGCERateLimiter(testCase)
_, err := NewGCERateLimiter(testCase, time.Second)
if err != nil {
t.Errorf("Did not expect an error for test case: %v", testCase)
}
}

for _, testCase := range invalidTestCases {
_, err := NewGCERateLimiter(testCase)
_, err := NewGCERateLimiter(testCase, time.Second)
if err == nil {
t.Errorf("Expected an error for test case: %v", testCase)
}
Expand Down