From 96e43335a8a56cd74e160fb7a0ddc23a164b48ed Mon Sep 17 00:00:00 2001 From: wgrayson Date: Wed, 31 Mar 2021 17:06:04 -0700 Subject: [PATCH] Only use one buffered channel with rate limiting get --- pkg/agent/openflow/client.go | 12 ++--- pkg/agent/openflow/packetin.go | 56 +++++---------------- pkg/agent/openflow/testing/mock_openflow.go | 3 +- pkg/ovs/openflow/interfaces.go | 2 +- pkg/ovs/openflow/ofctrl_bridge.go | 51 +++++++++++++++++-- pkg/ovs/openflow/testing/mock_openflow.go | 2 +- test/integration/ovs/ofctrl_test.go | 7 +-- 7 files changed, 72 insertions(+), 61 deletions(-) diff --git a/pkg/agent/openflow/client.go b/pkg/agent/openflow/client.go index 82f0e19a84e..d952f7e9b69 100644 --- a/pkg/agent/openflow/client.go +++ b/pkg/agent/openflow/client.go @@ -21,7 +21,6 @@ import ( "net" "github.com/contiv/libOpenflow/protocol" - "github.com/contiv/ofnet/ofctrl" "k8s.io/klog" "github.com/vmware-tanzu/antrea/pkg/agent/config" @@ -219,9 +218,10 @@ type Client interface { // the old priority with the desired one, for each priority update on that table. ReassignFlowPriorities(updates map[uint16]uint16, table binding.TableIDType) error - // SubscribePacketIn subscribes packet-in channel in bridge. This method requires a receiver to - // pop data from "ch" timely, otherwise it will block all inbound messages from OVS. - SubscribePacketIn(reason uint8, ch chan *ofctrl.PacketIn) error + // SubscribePacketIn subscribes packet-in channel in bridge. This method requires a + // receiver to pop data from buffered channel under rate limit. If the buffered + // channel is full, the messages from OVS will be dropped. + SubscribePacketIn(reason uint8, pktInQueue *binding.PacketInQueue) error // SendTraceflowPacket injects packet to specified OVS port for Openflow. SendTraceflowPacket( @@ -809,8 +809,8 @@ func (c *client) setupPolicyOnlyFlows() error { return nil } -func (c *client) SubscribePacketIn(reason uint8, ch chan *ofctrl.PacketIn) error { - return c.bridge.SubscribePacketIn(reason, ch) +func (c *client) SubscribePacketIn(reason uint8, pktInQueue *binding.PacketInQueue) error { + return c.bridge.SubscribePacketIn(reason, pktInQueue) } func (c *client) SendTraceflowPacket( diff --git a/pkg/agent/openflow/packetin.go b/pkg/agent/openflow/packetin.go index 1a1c09bd2f8..45844aec1d5 100644 --- a/pkg/agent/openflow/packetin.go +++ b/pkg/agent/openflow/packetin.go @@ -15,12 +15,11 @@ package openflow import ( - "errors" "fmt" + "golang.org/x/time/rate" "github.com/contiv/ofnet/ofctrl" - "golang.org/x/time/rate" - "k8s.io/client-go/util/workqueue" + "github.com/vmware-tanzu/antrea/pkg/ovs/openflow" "k8s.io/klog" ) @@ -31,8 +30,6 @@ type PacketInHandler interface { } const ( - // Max packetInQueue size. - packetInQueueSize int = 256 // PacketIn reasons PacketInReasonTF ofpPacketInReason = 1 PacketInReasonNP ofpPacketInReason = 0 @@ -54,36 +51,17 @@ func (c *client) RegisterPacketInHandler(packetHandlerReason uint8, packetHandle // featureStartPacketIn contains packetin resources specifically for each feature that uses packetin. type featureStartPacketIn struct { reason uint8 - subscribeCh chan *ofctrl.PacketIn stopCh <-chan struct{} - packetInQueue workqueue.RateLimitingInterface + packetInQueue *openflow.PacketInQueue } func newfeatureStartPacketIn(reason uint8, stopCh <-chan struct{}) *featureStartPacketIn { featurePacketIn := featureStartPacketIn{reason: reason, stopCh: stopCh} - featurePacketIn.subscribeCh = make(chan *ofctrl.PacketIn) - featurePacketIn.packetInQueue = workqueue.NewNamedRateLimitingQueue(&workqueue.BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(100), 200)}, string(reason)) + featurePacketIn.packetInQueue = openflow.NewPacketInQueue(200, rate.Limit(100)) return &featurePacketIn } -func (f *featureStartPacketIn) ListenPacketIn() { - for { - select { - case pktIn := <-f.subscribeCh: - // Ensure that the queue doesn't grow too big. This is NOT to provide an exact guarantee. - if f.packetInQueue.Len() < packetInQueueSize { - f.packetInQueue.AddRateLimited(pktIn) - } else { - klog.Warningf("Max packetInQueue size exceeded.") - } - case <-f.stopCh: - f.packetInQueue.ShutDown() - return - } - } -} - // StartPacketInHandler is the starting point for processing feature packetin requests. func (c *client) StartPacketInHandler(packetInStartedReason []uint8, stopCh <-chan struct{}) { if len(c.packetInHandlers) == 0 || len(packetInStartedReason) == 0 { @@ -101,31 +79,23 @@ func (c *client) StartPacketInHandler(packetInStartedReason []uint8, stopCh <-ch } func (c *client) subscribeFeaturePacketIn(featurePacketIn *featureStartPacketIn) error { - err := c.SubscribePacketIn(featurePacketIn.reason, featurePacketIn.subscribeCh) + err := c.SubscribePacketIn(featurePacketIn.reason, featurePacketIn.packetInQueue) if err != nil { - return errors.New(fmt.Sprintf("Subscribe %d PacketIn failed %+v", featurePacketIn.reason, err)) + return fmt.Errorf("subscribe %d PacketIn failed %+v", featurePacketIn.reason, err) } - go c.parsePacketIn(featurePacketIn.packetInQueue, featurePacketIn.reason) - go featurePacketIn.ListenPacketIn() + go c.parsePacketIn(featurePacketIn) return nil } -func (c *client) parsePacketIn(packetInQueue workqueue.RateLimitingInterface, packetHandlerReason uint8) { +func (c *client) parsePacketIn(featurePacketIn *featureStartPacketIn) { for { - obj, quit := packetInQueue.Get() - if quit { - break - } - packetInQueue.Done(obj) - // `Forget` here is a no-op for the BucketRateLimiter implementation. - packetInQueue.Forget(obj) - pktIn, ok := obj.(*ofctrl.PacketIn) - if !ok { - klog.Errorf("Invalid packetin data in queue, skipping.") - continue + pktIn := featurePacketIn.packetInQueue.GetRateLimited(featurePacketIn.stopCh) + if pktIn == nil { + klog.Infof("Received stop signal from stopCh.") + return } // Use corresponding handlers subscribed to the reason to handle PacketIn - for name, handler := range c.packetInHandlers[packetHandlerReason] { + for name, handler := range c.packetInHandlers[featurePacketIn.reason] { err := handler.HandlePacketIn(pktIn) if err != nil { klog.Errorf("PacketIn handler %s failed to process packet: %+v", name, err) diff --git a/pkg/agent/openflow/testing/mock_openflow.go b/pkg/agent/openflow/testing/mock_openflow.go index d9ba62db7a9..a5e88d00d37 100644 --- a/pkg/agent/openflow/testing/mock_openflow.go +++ b/pkg/agent/openflow/testing/mock_openflow.go @@ -20,7 +20,6 @@ package testing import ( - ofctrl "github.com/contiv/ofnet/ofctrl" gomock "github.com/golang/mock/gomock" config "github.com/vmware-tanzu/antrea/pkg/agent/config" types "github.com/vmware-tanzu/antrea/pkg/agent/types" @@ -610,7 +609,7 @@ func (mr *MockClientMockRecorder) StartPacketInHandler(arg0, arg1 interface{}) * } // SubscribePacketIn mocks base method -func (m *MockClient) SubscribePacketIn(arg0 byte, arg1 chan *ofctrl.PacketIn) error { +func (m *MockClient) SubscribePacketIn(arg0 byte, arg1 *openflow.PacketInQueue) error { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "SubscribePacketIn", arg0, arg1) ret0, _ := ret[0].(error) diff --git a/pkg/ovs/openflow/interfaces.go b/pkg/ovs/openflow/interfaces.go index 9f7425cfb81..a6e98245f12 100644 --- a/pkg/ovs/openflow/interfaces.go +++ b/pkg/ovs/openflow/interfaces.go @@ -107,7 +107,7 @@ type Bridge interface { // SubscribePacketIn registers a consumer to listen to PacketIn messages matching the provided reason. When the // Bridge receives a PacketIn message with the specified reason, it sends the message to the consumer using the // provided channel. - SubscribePacketIn(reason uint8, ch chan *ofctrl.PacketIn) error + SubscribePacketIn(reason uint8, pktInQueue *PacketInQueue) error // AddTLVMap adds a TLV mapping with OVS field tun_metadataX. The value loaded in tun_metadataX is transported by // Geneve header with the specified . The value of OptLength must be a multiple of 4. // The value loaded into field tun_metadataX must fit within optLength bytes. diff --git a/pkg/ovs/openflow/ofctrl_bridge.go b/pkg/ovs/openflow/ofctrl_bridge.go index ba71d914ec5..c69fcee782f 100644 --- a/pkg/ovs/openflow/ofctrl_bridge.go +++ b/pkg/ovs/openflow/ofctrl_bridge.go @@ -9,6 +9,7 @@ import ( "github.com/contiv/libOpenflow/openflow13" "github.com/contiv/ofnet/ofctrl" + "golang.org/x/time/rate" "k8s.io/klog" "github.com/vmware-tanzu/antrea/pkg/agent/metrics" @@ -208,10 +209,10 @@ func (b *OFBridge) DumpTableStatus() []TableStatus { func (b *OFBridge) PacketRcvd(sw *ofctrl.OFSwitch, packet *ofctrl.PacketIn) { klog.Infof("Received packet: %+v", packet) reason := packet.Reason - ch, found := b.pktConsumers.Load(reason) + v, found := b.pktConsumers.Load(reason) if found { - pktCh, _ := ch.(chan *ofctrl.PacketIn) - pktCh <- packet + pktInQueue, _ := v.(*PacketInQueue) + pktInQueue.AddOrDrop(packet) } } @@ -519,12 +520,52 @@ func (b *OFBridge) AddOFEntriesInBundle(addEntries []OFEntry, modEntries []OFEnt return nil } -func (b *OFBridge) SubscribePacketIn(reason uint8, ch chan *ofctrl.PacketIn) error { +type PacketInQueue struct { + rateLimiter *rate.Limiter + packetsCh chan *ofctrl.PacketIn +} + +func NewPacketInQueue(size int, r rate.Limit) *PacketInQueue { + return &PacketInQueue{rateLimiter: rate.NewLimiter(r, 1), packetsCh: make(chan *ofctrl.PacketIn, size)} +} + +func (q *PacketInQueue) AddOrDrop(packet *ofctrl.PacketIn) bool { + select { + case q.packetsCh <- packet: + return true + default: + // Channel is full. + return false + } +} + +func (q *PacketInQueue) GetRateLimited(stopCh <-chan struct{}) *ofctrl.PacketIn { + when := q.rateLimiter.Reserve().Delay() + t := time.NewTimer(when) + defer t.Stop() + + select { + case <-stopCh: + return nil + case <-t.C: + break + } + for { + select { + case <-stopCh: + return nil + case packet := <-q.packetsCh: + return packet + } + } +} + +func (b *OFBridge) SubscribePacketIn(reason uint8, pktInQueue *PacketInQueue) error { _, exist := b.pktConsumers.Load(reason) if exist { return fmt.Errorf("packetIn reason %d already exists", reason) } - b.pktConsumers.Store(reason, ch) + b.pktConsumers.Store(reason, pktInQueue) return nil } diff --git a/pkg/ovs/openflow/testing/mock_openflow.go b/pkg/ovs/openflow/testing/mock_openflow.go index bbc8bc39e95..2eeb7eb2b4a 100644 --- a/pkg/ovs/openflow/testing/mock_openflow.go +++ b/pkg/ovs/openflow/testing/mock_openflow.go @@ -262,7 +262,7 @@ func (mr *MockBridgeMockRecorder) SendPacketOut(arg0 interface{}) *gomock.Call { } // SubscribePacketIn mocks base method -func (m *MockBridge) SubscribePacketIn(arg0 byte, arg1 chan *ofctrl.PacketIn) error { +func (m *MockBridge) SubscribePacketIn(arg0 byte, arg1 *openflow.PacketInQueue) error { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "SubscribePacketIn", arg0, arg1) ret0, _ := ret[0].(error) diff --git a/test/integration/ovs/ofctrl_test.go b/test/integration/ovs/ofctrl_test.go index 4ab743d0b3d..cbcbe2a831c 100644 --- a/test/integration/ovs/ofctrl_test.go +++ b/test/integration/ovs/ofctrl_test.go @@ -27,6 +27,7 @@ import ( "github.com/contiv/ofnet/ofctrl" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "golang.org/x/time/rate" binding "github.com/vmware-tanzu/antrea/pkg/ovs/openflow" "github.com/vmware-tanzu/antrea/pkg/ovs/ovsconfig" @@ -570,8 +571,8 @@ func TestPacketOutIn(t *testing.T) { defer bridge.Disconnect() reason := uint8(1) - pktCh := make(chan *ofctrl.PacketIn) - err = bridge.SubscribePacketIn(reason, pktCh) + pktInQueue := binding.NewPacketInQueue(200, rate.Limit(100)) + err = bridge.SubscribePacketIn(reason, pktInQueue) require.Nil(t, err) srcMAC, _ := net.ParseMAC("11:11:11:11:11:11") @@ -588,7 +589,7 @@ func TestPacketOutIn(t *testing.T) { stopCh := make(chan struct{}) go func() { - pktIn := <-pktCh + pktIn := pktInQueue.GetRateLimited(make(chan struct{})) matchers := pktIn.GetMatches() reg2Match := matchers.GetMatchByName("NXM_NX_REG2")