Skip to content

Commit

Permalink
Add rate limiting queue for packet-in
Browse files Browse the repository at this point in the history
Before we add rate limiting mechanism in OVS, we use RateLimitingQueue for packet-in event in Antrea agent.
  • Loading branch information
GraysonWu committed Mar 31, 2021
1 parent 08ea67c commit 56c488a
Showing 1 changed file with 10 additions and 4 deletions.
14 changes: 10 additions & 4 deletions pkg/agent/openflow/packetin.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package openflow
import (
"errors"
"fmt"
"golang.org/x/time/rate"

"github.com/contiv/ofnet/ofctrl"
"k8s.io/client-go/util/workqueue"
Expand Down Expand Up @@ -55,13 +56,18 @@ type featureStartPacketIn struct {
reason uint8
subscribeCh chan *ofctrl.PacketIn
stopCh <-chan struct{}
packetInQueue *workqueue.Type
packetInQueue workqueue.RateLimitingInterface
}

func newfeatureStartPacketIn(reason uint8, stopCh <-chan struct{}) *featureStartPacketIn {
featurePacketIn := featureStartPacketIn{reason: reason, stopCh: stopCh}
featurePacketIn.subscribeCh = make(chan *ofctrl.PacketIn)
featurePacketIn.packetInQueue = workqueue.NewNamed(string(reason))
if reason == uint8(PacketInReasonTF) {
featurePacketIn.packetInQueue = workqueue.NewNamedRateLimitingQueue(&workqueue.BucketRateLimiter{Limiter: rate.NewLimiter(rate.Inf, 10)}, string(reason))
} else {
featurePacketIn.packetInQueue = workqueue.NewNamedRateLimitingQueue(&workqueue.BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(5), 10)}, string(reason))
}

return &featurePacketIn
}

Expand All @@ -71,7 +77,7 @@ func (f *featureStartPacketIn) ListenPacketIn() {
case pktIn := <-f.subscribeCh:
// Ensure that the queue doesn't grow too big. This is NOT to provide an exact guarantee.
if f.packetInQueue.Len() < packetInQueueSize {
f.packetInQueue.Add(pktIn)
f.packetInQueue.AddRateLimited(pktIn)
} else {
klog.Warningf("Max packetInQueue size exceeded.")
}
Expand Down Expand Up @@ -108,7 +114,7 @@ func (c *client) subscribeFeaturePacketIn(featurePacketIn *featureStartPacketIn)
return nil
}

func (c *client) parsePacketIn(packetInQueue workqueue.Interface, packetHandlerReason uint8) {
func (c *client) parsePacketIn(packetInQueue workqueue.RateLimitingInterface, packetHandlerReason uint8) {
for {
obj, quit := packetInQueue.Get()
if quit {
Expand Down

0 comments on commit 56c488a

Please sign in to comment.