Skip to content

Commit

Permalink
Initial ingress rate limiting implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
rramkumar1 committed Mar 15, 2018
1 parent 43838b1 commit d0d8c13
Show file tree
Hide file tree
Showing 5 changed files with 245 additions and 2 deletions.
7 changes: 7 additions & 0 deletions cmd/glbc/app/clients.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
_ "k8s.io/client-go/plugin/pkg/client/auth/gcp"

"k8s.io/ingress-gce/pkg/flags"
"k8s.io/ingress-gce/pkg/ratelimit"
"k8s.io/ingress-gce/pkg/utils"
)

Expand Down Expand Up @@ -95,6 +96,12 @@ func NewGCEClient() *gce.GCECloud {
provider, err := cloudprovider.GetCloudProvider("gce", configReader())
if err == nil {
cloud := provider.(*gce.GCECloud)
// Configure GCE rate limiting
rl, err := ratelimit.NewGCERateLimiter(flags.F.GCERateLimit.Values())
if err != nil {
glog.Errorf("Error in configuring rate limiting: %v", err)
}
cloud.SetRateLimiter(rl)
// If this controller is scheduled on a node without compute/rw
// it won't be allowed to list backends. We can assume that the
// user has no need for Ingress in this case. If they grant
Expand Down
26 changes: 26 additions & 0 deletions pkg/flags/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ var (
ConfigFilePath string
DefaultSvc string
DeleteAllOnQuit bool
GCERateLimit RateLimitSpecs
HealthCheckPath string
HealthzPort int
InCluster bool
Expand All @@ -58,6 +59,7 @@ var (

func init() {
F.NodePortRanges.ports = []string{DefaultNodePortRange}
F.GCERateLimit.specs = []string{}
}

// Register flags with the command line parser.
Expand Down Expand Up @@ -85,6 +87,11 @@ the default backend.`)
external cloud resources as it's shutting down. Mostly used for testing. In
normal environments the controller should only delete a loadbalancer if the
associated Ingress is deleted.`)
flag.Var(&F.GCERateLimit, "gce-ratelimit",
`Optional, can be used to rate limit certain GCE API calls. Example usage:
--gce-ratelimit=ga.Addresses.Get,qps,1.5,5
(limit ga.Addresses.Get to maximum of 1.5 qps with a burst of 5).
Use the flag more than once to rate limit more than one call.`)
flag.StringVar(&F.HealthCheckPath, "health-check-path", "/",
`Path used to health-check a backend service. All Services must serve a
200 page on this path. Currently this is only configurable globally.`)
Expand Down Expand Up @@ -113,6 +120,25 @@ L7 load balancing. CSV values accepted. Example: -node-port-ranges=80,8080,400-5
`This flag has been deprecated and no longer has any effect.`)
}

type RateLimitSpecs struct {
specs []string
}

// Part of the flag.Value interface.
func (r *RateLimitSpecs) String() string {
return strings.Join(r.specs, ";")
}

// Set supports the flag being repeated multiple times. Part of the flag.Value interface.
func (r *RateLimitSpecs) Set(value string) error {
r.specs = append(r.specs, value)
return nil
}

func (r *RateLimitSpecs) Values() []string {
return r.specs
}

type PortRanges struct {
ports []string
isSet bool
Expand Down
142 changes: 142 additions & 0 deletions pkg/ratelimit/ratelimit.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package ratelimit

import (
"context"
"fmt"
"strconv"
"strings"

"github.com/golang/glog"
"k8s.io/client-go/util/flowcontrol"
"k8s.io/kubernetes/pkg/cloudprovider/providers/gce/cloud"
"k8s.io/kubernetes/pkg/cloudprovider/providers/gce/cloud/meta"
)

// GCERateLimiter implements cloud.RateLimiter
type GCERateLimiter struct {
// Map a RateLimitKey to its rate limiter implementation.
rateLimitImpls map[*cloud.RateLimitKey]flowcontrol.RateLimiter
}

// NewGCERateLimiter parses the list of rate limiting specs passed in and
// returns a properly configured cloud.RateLimiter implementation.
// Expected format of specs: {"[version].[service].[operation],[type],[param1],[param2],..", "..."}
func NewGCERateLimiter(specs []string) (*GCERateLimiter, error) {
rateLimitImpls := make(map[*cloud.RateLimitKey]flowcontrol.RateLimiter)
// Within each specification, split on comma to get the operation,
// rate limiter type, and extra parameters.
for _, spec := range specs {
params := strings.Split(spec, ",")
if len(params) < 2 {
return nil, fmt.Errorf("Must at least specify operation and rate limiter type.")
}
// params[0] should consist of the operation to rate limit.
key, err := constructRateLimitKey(params[0])
if err != nil {
return nil, err
}
// params[1:] should consist of the rate limiter type and extra params.
impl, err := constructRateLimitImpl(params[1:])
if err != nil {
return nil, err
}
rateLimitImpls[key] = impl
glog.Infof("Configured rate limiting for: %v", key)
}
if len(rateLimitImpls) == 0 {
return nil, nil
}
return &GCERateLimiter{rateLimitImpls}, nil
}

// Implementation of cloud.RateLimiter
func (l *GCERateLimiter) Accept(ctx context.Context, key *cloud.RateLimitKey) error {
ch := make(chan struct{})
go func() {
// Call flowcontrol.RateLimiter implementation.
impl := l.rateLimitImpl(key)
if impl != nil {
impl.Accept()
}
close(ch)
}()
select {
case <-ch:
break
case <-ctx.Done():
return ctx.Err()
}
return nil
}

// rateLimitImpl returns the flowcontrol.RateLimiter implementation
// associated with the passed in key.
func (l *GCERateLimiter) rateLimitImpl(key *cloud.RateLimitKey) flowcontrol.RateLimiter {
// Since the passed in key will have the ProjectID field filled in, we need to
// create a copy which does not, so that retreiving the rate limiter implementation
// through the map works as expected.
keyCopy := &cloud.RateLimitKey{
ProjectID: "",
Operation: key.Operation,
Version: key.Version,
Service: key.Service,
}
return l.rateLimitImpls[keyCopy]
}

// Expected format of param is [version].[service].[operation]
func constructRateLimitKey(param string) (*cloud.RateLimitKey, error) {
params := strings.Split(param, ".")
if len(params) != 3 {
return nil, fmt.Errorf("Must specify operation in [version].[service].[operation] format.")
}
// TODO(rramkumar): Add another layer of validation here?
version := meta.Version(params[0])
service := params[1]
operation := params[2]
return &cloud.RateLimitKey{
ProjectID: "",
Operation: operation,
Version: version,
Service: service,
}, nil
}

// constructRateLimitImpl parses the slice and returns a flowcontrol.RateLimiter
// Expected format is [type],[param1],[param2],...
func constructRateLimitImpl(params []string) (flowcontrol.RateLimiter, error) {
// For now, only the "qps" type is supported.
rlType := params[0]
implArgs := params[1:]
if rlType == "qps" {
if len(implArgs) != 2 {
return nil, fmt.Errorf("Invalid number of args for rate limiter type %v. Expected %d, Got %v", rlType, 2, len(implArgs))
}
qps, err := strconv.ParseFloat(implArgs[0], 32)
if err != nil || qps <= 0 {
return nil, fmt.Errorf("Invalid argument for rate limiter type %v. Either %v is not a float or not greater than 0.", rlType, implArgs[0])
}
burst, err := strconv.Atoi(implArgs[1])
if err != nil {
return nil, fmt.Errorf("Invalid argument for rate limiter type %v. Expected %v to be a int.", rlType, implArgs[1])
}
return flowcontrol.NewTokenBucketRateLimiter(float32(qps), burst), nil
}
return nil, fmt.Errorf("Invalid rate limiter type provided: %v", rlType)
}
55 changes: 55 additions & 0 deletions pkg/ratelimit/ratelimit_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package ratelimit

import (
"testing"
)

func TestConfigureGCERateLimiting(t *testing.T) {
validTestCases := [][]string{
[]string{"ga.Addresses.Get,qps,1.5,5"},
[]string{"ga.Addresses.List,qps,2,10"},
[]string{"ga.Addresses.Get,qps,1.5,5", "ga.Firewalls.Get,qps,1.5,5"},
}
invalidTestCases := [][]string{
[]string{"gaAddresses.Get,qps,1.5,5"},
[]string{"gaAddresses.Get,qps,0,5"},
[]string{"gaAddresses.Get,qps,-1,5"},
[]string{"ga.Addresses.Get,qps,1.5.5"},
[]string{"gaAddresses.Get,qps,1.5,5.5"},
[]string{"gaAddressesGet,qps,1.5,5.5"},
[]string{"gaAddressesGet,qps,1.5"},
[]string{"ga.Addresses.Get,foo,1.5,5"},
[]string{"ga.Addresses.Get,1.5,5"},
[]string{"ga.Addresses.Get,qps,1.5,5", "gaFirewalls.Get,qps,1.5,5"},
}

for _, testCase := range validTestCases {
_, err := NewGCERateLimiter(testCase)
if err != nil {
t.Errorf("Did not expect an error for test case: %v", testCase)
}
}

for _, testCase := range invalidTestCases {
_, err := NewGCERateLimiter(testCase)
if err == nil {
t.Errorf("Expected an error for test case: %v", testCase)
}
}
}
17 changes: 15 additions & 2 deletions vendor/k8s.io/kubernetes/pkg/cloudprovider/providers/gce/gce.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit d0d8c13

Please sign in to comment.