Skip to content

Commit

Permalink
Add rate limiting queue for packet-in
Browse files Browse the repository at this point in the history
Before we add rate limiting mechanism in OVS, we use RateLimitingQueue for packet-in event in Antrea agent.
  • Loading branch information
GraysonWu committed Mar 31, 2021
1 parent 08ea67c commit ccdd2f5
Showing 1 changed file with 8 additions and 4 deletions.
12 changes: 8 additions & 4 deletions pkg/agent/openflow/packetin.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"fmt"

"github.com/contiv/ofnet/ofctrl"
"golang.org/x/time/rate"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog"
)
Expand Down Expand Up @@ -55,13 +56,14 @@ type featureStartPacketIn struct {
reason uint8
subscribeCh chan *ofctrl.PacketIn
stopCh <-chan struct{}
packetInQueue *workqueue.Type
packetInQueue workqueue.RateLimitingInterface
}

func newfeatureStartPacketIn(reason uint8, stopCh <-chan struct{}) *featureStartPacketIn {
featurePacketIn := featureStartPacketIn{reason: reason, stopCh: stopCh}
featurePacketIn.subscribeCh = make(chan *ofctrl.PacketIn)
featurePacketIn.packetInQueue = workqueue.NewNamed(string(reason))
featurePacketIn.packetInQueue = workqueue.NewNamedRateLimitingQueue(&workqueue.BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(100), 200)}, string(reason))

return &featurePacketIn
}

Expand All @@ -71,7 +73,7 @@ func (f *featureStartPacketIn) ListenPacketIn() {
case pktIn := <-f.subscribeCh:
// Ensure that the queue doesn't grow too big. This is NOT to provide an exact guarantee.
if f.packetInQueue.Len() < packetInQueueSize {
f.packetInQueue.Add(pktIn)
f.packetInQueue.AddRateLimited(pktIn)
} else {
klog.Warningf("Max packetInQueue size exceeded.")
}
Expand Down Expand Up @@ -108,13 +110,15 @@ func (c *client) subscribeFeaturePacketIn(featurePacketIn *featureStartPacketIn)
return nil
}

func (c *client) parsePacketIn(packetInQueue workqueue.Interface, packetHandlerReason uint8) {
func (c *client) parsePacketIn(packetInQueue workqueue.RateLimitingInterface, packetHandlerReason uint8) {
for {
obj, quit := packetInQueue.Get()
if quit {
break
}
packetInQueue.Done(obj)
// `Forget` here is a no-op for the BucketRateLimiter implementation.
packetInQueue.Forget(obj)
pktIn, ok := obj.(*ofctrl.PacketIn)
if !ok {
klog.Errorf("Invalid packetin data in queue, skipping.")
Expand Down

0 comments on commit ccdd2f5

Please sign in to comment.