diff --git a/pkg/agent/openflow/packetin.go b/pkg/agent/openflow/packetin.go index e135bc701d6..1a1c09bd2f8 100644 --- a/pkg/agent/openflow/packetin.go +++ b/pkg/agent/openflow/packetin.go @@ -19,6 +19,7 @@ import ( "fmt" "github.com/contiv/ofnet/ofctrl" + "golang.org/x/time/rate" "k8s.io/client-go/util/workqueue" "k8s.io/klog" ) @@ -55,13 +56,14 @@ type featureStartPacketIn struct { reason uint8 subscribeCh chan *ofctrl.PacketIn stopCh <-chan struct{} - packetInQueue *workqueue.Type + packetInQueue workqueue.RateLimitingInterface } func newfeatureStartPacketIn(reason uint8, stopCh <-chan struct{}) *featureStartPacketIn { featurePacketIn := featureStartPacketIn{reason: reason, stopCh: stopCh} featurePacketIn.subscribeCh = make(chan *ofctrl.PacketIn) - featurePacketIn.packetInQueue = workqueue.NewNamed(string(reason)) + featurePacketIn.packetInQueue = workqueue.NewNamedRateLimitingQueue(&workqueue.BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(100), 200)}, string(reason)) + return &featurePacketIn } @@ -71,7 +73,7 @@ func (f *featureStartPacketIn) ListenPacketIn() { case pktIn := <-f.subscribeCh: // Ensure that the queue doesn't grow too big. This is NOT to provide an exact guarantee. if f.packetInQueue.Len() < packetInQueueSize { - f.packetInQueue.Add(pktIn) + f.packetInQueue.AddRateLimited(pktIn) } else { klog.Warningf("Max packetInQueue size exceeded.") } @@ -108,13 +110,15 @@ func (c *client) subscribeFeaturePacketIn(featurePacketIn *featureStartPacketIn) return nil } -func (c *client) parsePacketIn(packetInQueue workqueue.Interface, packetHandlerReason uint8) { +func (c *client) parsePacketIn(packetInQueue workqueue.RateLimitingInterface, packetHandlerReason uint8) { for { obj, quit := packetInQueue.Get() if quit { break } packetInQueue.Done(obj) + // `Forget` here is a no-op for the BucketRateLimiter implementation. + packetInQueue.Forget(obj) pktIn, ok := obj.(*ofctrl.PacketIn) if !ok { klog.Errorf("Invalid packetin data in queue, skipping.")