Skip to content

Commit

Permalink
use rest config and patch in qps mods
Browse files Browse the repository at this point in the history
  • Loading branch information
vaikas committed Jul 17, 2020
1 parent 95c32e5 commit 3210888
Show file tree
Hide file tree
Showing 7 changed files with 184 additions and 19 deletions.
7 changes: 6 additions & 1 deletion cmd/mtchannel_broker/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,17 @@ import (
// _ "k8s.io/client-go/plugin/pkg/client/auth/gcp"

"knative.dev/pkg/injection/sharedmain"
"knative.dev/pkg/signals"

"knative.dev/eventing/pkg/reconciler/mtbroker"
)

func main() {
sharedmain.Main("mt-broker-controller",
cfg := sharedmain.ParseAndGetConfigOrDie()
// Bump up the client side API quota & burst
cfg.QPS = float32(100)
cfg.Burst = 200
sharedmain.MainWithConfig(signals.NewContext(), "mt-broker-controller", cfg,
mtbroker.NewController,
)
}
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ require (
k8s.io/apiserver v0.17.6
k8s.io/client-go v11.0.1-0.20190805182717-6502b5e7b1b5+incompatible
k8s.io/utils v0.0.0-20200124190032-861946025e34
knative.dev/pkg v0.0.0-20200716140633-f1b82401dc8a
knative.dev/test-infra v0.0.0-20200715185233-6964ba126fee
knative.dev/pkg v0.0.0-20200717170934-a9ea699c7017
knative.dev/test-infra v0.0.0-20200716222033-3c06d840fc70
sigs.k8s.io/yaml v1.2.0
)

Expand Down
14 changes: 4 additions & 10 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1415,8 +1415,6 @@ golang.org/x/tools v0.0.0-20200618134242-20370b0cb4b2/go.mod h1:EkVYQZoAsY45+roY
golang.org/x/tools v0.0.0-20200626171337-aa94e735be7f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE=
golang.org/x/tools v0.0.0-20200706234117-b22de6825cf7/go.mod h1:njjCfa9FT2d7l9Bc6FUM5FLjQPp3cFF28FI3qnDFljA=
golang.org/x/tools v0.0.0-20200709181711-e327e1019dfe/go.mod h1:njjCfa9FT2d7l9Bc6FUM5FLjQPp3cFF28FI3qnDFljA=
golang.org/x/tools v0.0.0-20200710042808-f1c4188a97a1 h1:rD1FcWVsRaMY+l8biE9jbWP5MS/CJJ/90a9TMkMgNrM=
golang.org/x/tools v0.0.0-20200710042808-f1c4188a97a1/go.mod h1:njjCfa9FT2d7l9Bc6FUM5FLjQPp3cFF28FI3qnDFljA=
golang.org/x/tools v0.0.0-20200713011307-fd294ab11aed/go.mod h1:njjCfa9FT2d7l9Bc6FUM5FLjQPp3cFF28FI3qnDFljA=
golang.org/x/tools v0.0.0-20200714190737-9048b464a08d h1:hYhnolbefSSt3WZp66sgmgnEOFv5PD6a5PIcnKJ8jdU=
golang.org/x/tools v0.0.0-20200714190737-9048b464a08d/go.mod h1:njjCfa9FT2d7l9Bc6FUM5FLjQPp3cFF28FI3qnDFljA=
Expand Down Expand Up @@ -1506,8 +1504,6 @@ google.golang.org/genproto v0.0.0-20200618031413-b414f8b61790/go.mod h1:jDfRM7Fc
google.golang.org/genproto v0.0.0-20200626011028-ee7919e894b5/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no=
google.golang.org/genproto v0.0.0-20200707001353-8e8330bf89df/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no=
google.golang.org/genproto v0.0.0-20200709005830-7a2ca40e9dc3/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no=
google.golang.org/genproto v0.0.0-20200710124503-20a17af7bd0e h1:k+p/u26/lVeNEpdxSeUrm7rTvoFckBKaf7gTzgmHyDA=
google.golang.org/genproto v0.0.0-20200710124503-20a17af7bd0e/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no=
google.golang.org/genproto v0.0.0-20200711021454-869866162049/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no=
google.golang.org/genproto v0.0.0-20200715011427-11fb19a81f2c h1:6DWnZZ6EY/59QRRQttZKiktVL23UuQYs7uy75MhhLRM=
google.golang.org/genproto v0.0.0-20200715011427-11fb19a81f2c/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no=
Expand Down Expand Up @@ -1648,20 +1644,19 @@ k8s.io/legacy-cloud-providers v0.17.4/go.mod h1:FikRNoD64ECjkxO36gkDgJeiQWwyZTuB
k8s.io/metrics v0.17.2/go.mod h1:3TkNHET4ROd+NfzNxkjoVfQ0Ob4iZnaHmSEA4vYpwLw=
k8s.io/test-infra v0.0.0-20200514184223-ba32c8aae783/go.mod h1:bW6thaPZfL2hW7ecjx2WYwlP9KQLM47/xIJyttkVk5s=
k8s.io/test-infra v0.0.0-20200617221206-ea73eaeab7ff/go.mod h1:L3+cRvwftUq8IW1TrHji5m3msnc4uck/7LsE/GR/aZk=
k8s.io/test-infra v0.0.0-20200710134549-5891a1a4cc17/go.mod h1:L3+cRvwftUq8IW1TrHji5m3msnc4uck/7LsE/GR/aZk=
k8s.io/test-infra v0.0.0-20200715094037-cc150f5ae724/go.mod h1:D2jUSuQFYy6McY2qbknsLUE9stqN0yIuJ+rjdUAxSCs=
k8s.io/utils v0.0.0-20191114184206-e782cd3c129f/go.mod h1:sZAwmy6armz5eXlNoLmJcl4F1QuKu7sr+mFQ0byX7Ew=
k8s.io/utils v0.0.0-20200124190032-861946025e34 h1:HjlUD6M0K3P8nRXmr2B9o4F9dUy9TCj/aEpReeyi6+k=
k8s.io/utils v0.0.0-20200124190032-861946025e34/go.mod h1:sZAwmy6armz5eXlNoLmJcl4F1QuKu7sr+mFQ0byX7Ew=
knative.dev/caching v0.0.0-20200116200605-67bca2c83dfa/go.mod h1:dHXFU6CGlLlbzaWc32g80cR92iuBSpsslDNBWI8C7eg=
knative.dev/eventing-contrib v0.11.2/go.mod h1:SnXZgSGgMSMLNFTwTnpaOH7hXDzTFtw0J8OmHflNx3g=
knative.dev/pkg v0.0.0-20200207155214-fef852970f43/go.mod h1:pgODObA1dTyhNoFxPZTTjNWfx6F0aKsKzn+vaT9XO/Q=
knative.dev/pkg v0.0.0-20200716140633-f1b82401dc8a h1:5c8rD+RQBiS3p93bxADv2iAIRel11b6vwzwfXSpdtI8=
knative.dev/pkg v0.0.0-20200716140633-f1b82401dc8a/go.mod h1:yFXTzV2GIB30Qs6pdJNCbtKaIEhxH9fsmrrltAqfjWE=
knative.dev/test-infra v0.0.0-20200713220518-5a4c4cad5372 h1:NZzdNmKYP3L7fut/SNOxLgTgXVvQrygXiYpAeIMGMwM=
knative.dev/test-infra v0.0.0-20200713220518-5a4c4cad5372/go.mod h1:vtT6dLs/iNj8pKcfag8CSVqHKNMgyCFtU/g1pV7Bovs=
knative.dev/pkg v0.0.0-20200717170934-a9ea699c7017 h1:Lt+/qhKK6t1IVJrqVz3wXyXhwGrcEhYI6JadYc9Q/Lw=
knative.dev/pkg v0.0.0-20200717170934-a9ea699c7017/go.mod h1:3mm5ZffkmyYnqN+SOq1cN9TX0KTjhEbiZL8YBpP4C4Y=
knative.dev/test-infra v0.0.0-20200715185233-6964ba126fee h1:SH4N5kSRiEgmOcgjFwsyLMipS3sPJlN6dpp783C/ILQ=
knative.dev/test-infra v0.0.0-20200715185233-6964ba126fee/go.mod h1:mAsPDmFmlsTJjRWplWBz8xtEiarSgvGiiOjkGj4Or1g=
knative.dev/test-infra v0.0.0-20200716222033-3c06d840fc70 h1:1510826l+2CBQMaNcDqQeSzz1H6g90hLdWArf5L+SVo=
knative.dev/test-infra v0.0.0-20200716222033-3c06d840fc70/go.mod h1:mAsPDmFmlsTJjRWplWBz8xtEiarSgvGiiOjkGj4Or1g=
modernc.org/cc v1.0.0/go.mod h1:1Sk4//wdnYJiUIxnW8ddKpaOJCF37yAdqYnkxUpaYxw=
modernc.org/golex v1.0.0/go.mod h1:b/QX9oBD/LhixY6NDh+IdGv17hgB+51fET1i2kPSmvk=
modernc.org/mathutil v1.0.0/go.mod h1:wU0vUrJsVWBZ4P6e7xtFJEhFSNsfRLJ8H458uRjg03k=
Expand All @@ -1679,7 +1674,6 @@ rsc.io/letsencrypt v0.0.3/go.mod h1:buyQKZ6IXrRnB7TdkHP0RyEybLx18HHyOSoTyoOLqNY=
rsc.io/quote/v3 v3.1.0/go.mod h1:yEA65RcK8LyAZtP9Kv3t0HmxON59tX3rD+tICJqUlj0=
rsc.io/sampler v1.3.0/go.mod h1:T1hPZKmBbMNahiBKFy5HrXp6adAjACjK9JXDnKaTXpA=
sigs.k8s.io/boskos v0.0.0-20200526191642-45fc818e2d00/go.mod h1:L1ubP7d1CCMSQSjKiZv6dGbh7b4kfoG+dFPj8cfYDnI=
sigs.k8s.io/boskos v0.0.0-20200617235605-f289ba6555ba/go.mod h1:ZO5RV+VxJS9mb6DvZ1yAjywoyq/wQ8b0vDoZxcIA5kE=
sigs.k8s.io/boskos v0.0.0-20200710214748-f5935686c7fc/go.mod h1:ZO5RV+VxJS9mb6DvZ1yAjywoyq/wQ8b0vDoZxcIA5kE=
sigs.k8s.io/controller-runtime v0.5.0/go.mod h1:REiJzC7Y00U+2YkMbT8wxgrsX5USpXKGhb2sCtAXiT8=
sigs.k8s.io/controller-runtime v0.5.4/go.mod h1:JZUwSMVbxDupo0lTJSSFP5pimEyxGynROImSsqIOx1A=
Expand Down
159 changes: 159 additions & 0 deletions vendor/knative.dev/pkg/controller/two_lane_queue.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
/*
Copyright 2020 The Knative Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package controller

import (
"k8s.io/client-go/util/workqueue"
)

// twoLaneQueue is a rate limited queue that wraps around two queues
// -- fast queue (anonymously aliased), whose contents are processed with priority.
// -- slow queue (slowLane queue), whose contents are processed if fast queue has no items.
// All the default methods operate on the fast queue, unless noted otherwise.
type twoLaneQueue struct {
workqueue.RateLimitingInterface
slowLane workqueue.RateLimitingInterface
// consumerQueue is necessary to ensure that we're not reconciling
// the same object at the exact same time (e.g. if it had been enqueued
// in both fast and slow and is the only object there).
consumerQueue workqueue.Interface

name string

fastChan chan interface{}
slowChan chan interface{}
}

// Creates a new twoLaneQueue.
func newTwoLaneWorkQueue(name string) *twoLaneQueue {
rl := workqueue.DefaultControllerRateLimiter()
tlq := &twoLaneQueue{
RateLimitingInterface: workqueue.NewNamedRateLimitingQueue(
rl,
name+"-fast",
),
slowLane: workqueue.NewNamedRateLimitingQueue(
rl,
name+"-slow",
),
consumerQueue: workqueue.NewNamed(name + "-consumer"),
name: name,
fastChan: make(chan interface{}),
slowChan: make(chan interface{}),
}
// Run consumer thread.
go tlq.runConsumer()
// Run producer threads.
go process(tlq.RateLimitingInterface, tlq.fastChan)
go process(tlq.slowLane, tlq.slowChan)
return tlq
}

func process(q workqueue.Interface, ch chan interface{}) {
// Sender closes the channel
defer close(ch)
for {
i, d := q.Get()
// If the queue is empty and we're shutting down — stop the loop.
if d {
break
}
ch <- i
q.Done(i)
}
}

func (tlq *twoLaneQueue) runConsumer() {
// Shutdown flags.
fast, slow := true, true
// When both producer queues are shutdown stop the consumerQueue.
defer tlq.consumerQueue.ShutDown()
// While any of the queues is still running, try to read off of them.
for fast || slow {
// By default drain the fast lane.
// Channels in select are picked random, so first
// we have a select that only looks at the fast lane queue.
if fast {
select {
case item, ok := <-tlq.fastChan:
if !ok {
// This queue is shutdown and drained. Stop looking at it.
fast = false
continue
}
tlq.consumerQueue.Add(item)
continue
default:
// This immediately exits the wait if the fast chan is empty.
}
}

// If the fast lane queue had no items, we can select from both.
// Obviously if suddenly both are populated at the same time there's a
// 50% chance that the slow would be picked first, but this should be
// a rare occasion not to really worry about it.
select {
case item, ok := <-tlq.fastChan:
if !ok {
// This queue is shutdown and drained. Stop looking at it.
fast = false
continue
}
tlq.consumerQueue.Add(item)
case item, ok := <-tlq.slowChan:
if !ok {
// This queue is shutdown and drained. Stop looking at it.
slow = false
continue
}
tlq.consumerQueue.Add(item)
}
}
}

// Shutdown implements workqueue.Interace.
// Shutdown shuts down both queues.
func (tlq *twoLaneQueue) ShutDown() {
tlq.RateLimitingInterface.ShutDown()
tlq.slowLane.ShutDown()
}

// Done implements workqueue.Interface.
// Done marks the item as completed in all the queues.
// NB: this will just re-enqueue the object on the queue that
// didn't originate the object.
func (tlq *twoLaneQueue) Done(i interface{}) {
tlq.consumerQueue.Done(i)
}

// Get implements workqueue.Interface.
// It gets the item from fast lane if it has anything, alternatively
// the slow lane.
func (tlq *twoLaneQueue) Get() (interface{}, bool) {
return tlq.consumerQueue.Get()
}

// Len returns the sum of lengths.
// NB: actual _number_ of unique object might be less than this sum.
func (tlq *twoLaneQueue) Len() int {
return tlq.RateLimitingInterface.Len() + tlq.slowLane.Len() + tlq.consumerQueue.Len()
}

// SlowLane gives direct access to the slow queue.
func (tlq *twoLaneQueue) SlowLane() workqueue.RateLimitingInterface {
return tlq.slowLane
}
12 changes: 8 additions & 4 deletions vendor/knative.dev/pkg/injection/sharedmain/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,10 +168,14 @@ func MainWithConfig(ctx context.Context, component string, cfg *rest.Config, cto

MemStatsOrDie(ctx)

// Adjust our client's rate limits based on the number of controllers we are running.
// cfg.QPS = float32(len(ctors)) * rest.DefaultQPS
cfg.QPS = float32(100)
cfg.Burst = 200
// Respect user provided settings, but if omitted customize the default behavior.
if cfg.QPS == 0 {
// Adjust our client's rate limits based on the number of controllers we are running.
cfg.QPS = float32(len(ctors)) * rest.DefaultQPS
}
if cfg.Burst == 0 {
cfg.Burst = len(ctors) * rest.DefaultBurst
}
ctx, informers := injection.Default.SetupInformers(ctx, cfg)

logger, atomicLevel := SetupLoggerOrDie(ctx, component)
Expand Down
3 changes: 3 additions & 0 deletions vendor/knative.dev/pkg/webhook/webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,9 @@ func (wh *Webhook) Run(stop <-chan struct{}) error {
logger.Info("Starting to fail readiness probes...")
close(wh.stopCh)

// As we start to shutdown, disable keep-alives to avoid clients hanging onto connections.
server.SetKeepAlivesEnabled(false)

// Wait for a grace period for the above to take effect and this Pod's
// endpoint to be removed from the webhook service's Endpoints.
// For this to be effective, it must be greater than the probe's
Expand Down
4 changes: 2 additions & 2 deletions vendor/modules.txt
Original file line number Diff line number Diff line change
Expand Up @@ -966,7 +966,7 @@ k8s.io/utils/buffer
k8s.io/utils/integer
k8s.io/utils/pointer
k8s.io/utils/trace
# knative.dev/pkg v0.0.0-20200716140633-f1b82401dc8a
# knative.dev/pkg v0.0.0-20200717170934-a9ea699c7017
## explicit
knative.dev/pkg/apiextensions/storageversion
knative.dev/pkg/apiextensions/storageversion/cmd/migrate
Expand Down Expand Up @@ -1090,7 +1090,7 @@ knative.dev/pkg/webhook/resourcesemantics
knative.dev/pkg/webhook/resourcesemantics/conversion
knative.dev/pkg/webhook/resourcesemantics/defaulting
knative.dev/pkg/webhook/resourcesemantics/validation
# knative.dev/test-infra v0.0.0-20200715185233-6964ba126fee
# knative.dev/test-infra v0.0.0-20200716222033-3c06d840fc70
## explicit
knative.dev/test-infra/scripts
# sigs.k8s.io/yaml v1.2.0
Expand Down

0 comments on commit 3210888

Please sign in to comment.