Skip to content

Commit

Permalink
Only use one buffered channel with rate limiting get
Browse files Browse the repository at this point in the history
  • Loading branch information
GraysonWu committed Apr 1, 2021
1 parent ccdd2f5 commit 96e4333
Show file tree
Hide file tree
Showing 7 changed files with 72 additions and 61 deletions.
12 changes: 6 additions & 6 deletions pkg/agent/openflow/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"net"

"github.com/contiv/libOpenflow/protocol"
"github.com/contiv/ofnet/ofctrl"
"k8s.io/klog"

"github.com/vmware-tanzu/antrea/pkg/agent/config"
Expand Down Expand Up @@ -219,9 +218,10 @@ type Client interface {
// the old priority with the desired one, for each priority update on that table.
ReassignFlowPriorities(updates map[uint16]uint16, table binding.TableIDType) error

// SubscribePacketIn subscribes packet-in channel in bridge. This method requires a receiver to
// pop data from "ch" timely, otherwise it will block all inbound messages from OVS.
SubscribePacketIn(reason uint8, ch chan *ofctrl.PacketIn) error
// SubscribePacketIn subscribes packet-in channel in bridge. This method requires a
// receiver to pop data from buffered channel under rate limit. If the buffered
// channel is full, the messages from OVS will be dropped.
SubscribePacketIn(reason uint8, pktInQueue *binding.PacketInQueue) error

// SendTraceflowPacket injects packet to specified OVS port for Openflow.
SendTraceflowPacket(
Expand Down Expand Up @@ -809,8 +809,8 @@ func (c *client) setupPolicyOnlyFlows() error {
return nil
}

func (c *client) SubscribePacketIn(reason uint8, ch chan *ofctrl.PacketIn) error {
return c.bridge.SubscribePacketIn(reason, ch)
func (c *client) SubscribePacketIn(reason uint8, pktInQueue *binding.PacketInQueue) error {
return c.bridge.SubscribePacketIn(reason, pktInQueue)
}

func (c *client) SendTraceflowPacket(
Expand Down
56 changes: 13 additions & 43 deletions pkg/agent/openflow/packetin.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,11 @@
package openflow

import (
"errors"
"fmt"
"golang.org/x/time/rate"

"github.com/contiv/ofnet/ofctrl"
"golang.org/x/time/rate"
"k8s.io/client-go/util/workqueue"
"github.com/vmware-tanzu/antrea/pkg/ovs/openflow"
"k8s.io/klog"
)

Expand All @@ -31,8 +30,6 @@ type PacketInHandler interface {
}

const (
// Max packetInQueue size.
packetInQueueSize int = 256
// PacketIn reasons
PacketInReasonTF ofpPacketInReason = 1
PacketInReasonNP ofpPacketInReason = 0
Expand All @@ -54,36 +51,17 @@ func (c *client) RegisterPacketInHandler(packetHandlerReason uint8, packetHandle
// featureStartPacketIn contains packetin resources specifically for each feature that uses packetin.
type featureStartPacketIn struct {
reason uint8
subscribeCh chan *ofctrl.PacketIn
stopCh <-chan struct{}
packetInQueue workqueue.RateLimitingInterface
packetInQueue *openflow.PacketInQueue
}

func newfeatureStartPacketIn(reason uint8, stopCh <-chan struct{}) *featureStartPacketIn {
featurePacketIn := featureStartPacketIn{reason: reason, stopCh: stopCh}
featurePacketIn.subscribeCh = make(chan *ofctrl.PacketIn)
featurePacketIn.packetInQueue = workqueue.NewNamedRateLimitingQueue(&workqueue.BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(100), 200)}, string(reason))
featurePacketIn.packetInQueue = openflow.NewPacketInQueue(200, rate.Limit(100))

return &featurePacketIn
}

func (f *featureStartPacketIn) ListenPacketIn() {
for {
select {
case pktIn := <-f.subscribeCh:
// Ensure that the queue doesn't grow too big. This is NOT to provide an exact guarantee.
if f.packetInQueue.Len() < packetInQueueSize {
f.packetInQueue.AddRateLimited(pktIn)
} else {
klog.Warningf("Max packetInQueue size exceeded.")
}
case <-f.stopCh:
f.packetInQueue.ShutDown()
return
}
}
}

// StartPacketInHandler is the starting point for processing feature packetin requests.
func (c *client) StartPacketInHandler(packetInStartedReason []uint8, stopCh <-chan struct{}) {
if len(c.packetInHandlers) == 0 || len(packetInStartedReason) == 0 {
Expand All @@ -101,31 +79,23 @@ func (c *client) StartPacketInHandler(packetInStartedReason []uint8, stopCh <-ch
}

func (c *client) subscribeFeaturePacketIn(featurePacketIn *featureStartPacketIn) error {
err := c.SubscribePacketIn(featurePacketIn.reason, featurePacketIn.subscribeCh)
err := c.SubscribePacketIn(featurePacketIn.reason, featurePacketIn.packetInQueue)
if err != nil {
return errors.New(fmt.Sprintf("Subscribe %d PacketIn failed %+v", featurePacketIn.reason, err))
return fmt.Errorf("subscribe %d PacketIn failed %+v", featurePacketIn.reason, err)
}
go c.parsePacketIn(featurePacketIn.packetInQueue, featurePacketIn.reason)
go featurePacketIn.ListenPacketIn()
go c.parsePacketIn(featurePacketIn)
return nil
}

func (c *client) parsePacketIn(packetInQueue workqueue.RateLimitingInterface, packetHandlerReason uint8) {
func (c *client) parsePacketIn(featurePacketIn *featureStartPacketIn) {
for {
obj, quit := packetInQueue.Get()
if quit {
break
}
packetInQueue.Done(obj)
// `Forget` here is a no-op for the BucketRateLimiter implementation.
packetInQueue.Forget(obj)
pktIn, ok := obj.(*ofctrl.PacketIn)
if !ok {
klog.Errorf("Invalid packetin data in queue, skipping.")
continue
pktIn := featurePacketIn.packetInQueue.GetRateLimited(featurePacketIn.stopCh)
if pktIn == nil {
klog.Infof("Received stop signal from stopCh.")
return
}
// Use corresponding handlers subscribed to the reason to handle PacketIn
for name, handler := range c.packetInHandlers[packetHandlerReason] {
for name, handler := range c.packetInHandlers[featurePacketIn.reason] {
err := handler.HandlePacketIn(pktIn)
if err != nil {
klog.Errorf("PacketIn handler %s failed to process packet: %+v", name, err)
Expand Down
3 changes: 1 addition & 2 deletions pkg/agent/openflow/testing/mock_openflow.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pkg/ovs/openflow/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ type Bridge interface {
// SubscribePacketIn registers a consumer to listen to PacketIn messages matching the provided reason. When the
// Bridge receives a PacketIn message with the specified reason, it sends the message to the consumer using the
// provided channel.
SubscribePacketIn(reason uint8, ch chan *ofctrl.PacketIn) error
SubscribePacketIn(reason uint8, pktInQueue *PacketInQueue) error
// AddTLVMap adds a TLV mapping with OVS field tun_metadataX. The value loaded in tun_metadataX is transported by
// Geneve header with the specified <optClass, optType, optLength>. The value of OptLength must be a multiple of 4.
// The value loaded into field tun_metadataX must fit within optLength bytes.
Expand Down
51 changes: 46 additions & 5 deletions pkg/ovs/openflow/ofctrl_bridge.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

"github.com/contiv/libOpenflow/openflow13"
"github.com/contiv/ofnet/ofctrl"
"golang.org/x/time/rate"
"k8s.io/klog"

"github.com/vmware-tanzu/antrea/pkg/agent/metrics"
Expand Down Expand Up @@ -208,10 +209,10 @@ func (b *OFBridge) DumpTableStatus() []TableStatus {
func (b *OFBridge) PacketRcvd(sw *ofctrl.OFSwitch, packet *ofctrl.PacketIn) {
klog.Infof("Received packet: %+v", packet)
reason := packet.Reason
ch, found := b.pktConsumers.Load(reason)
v, found := b.pktConsumers.Load(reason)
if found {
pktCh, _ := ch.(chan *ofctrl.PacketIn)
pktCh <- packet
pktInQueue, _ := v.(*PacketInQueue)
pktInQueue.AddOrDrop(packet)
}
}

Expand Down Expand Up @@ -519,12 +520,52 @@ func (b *OFBridge) AddOFEntriesInBundle(addEntries []OFEntry, modEntries []OFEnt
return nil
}

func (b *OFBridge) SubscribePacketIn(reason uint8, ch chan *ofctrl.PacketIn) error {
type PacketInQueue struct {
rateLimiter *rate.Limiter
packetsCh chan *ofctrl.PacketIn
}

func NewPacketInQueue(size int, r rate.Limit) *PacketInQueue {
return &PacketInQueue{rateLimiter: rate.NewLimiter(r, 1), packetsCh: make(chan *ofctrl.PacketIn, size)}
}

func (q *PacketInQueue) AddOrDrop(packet *ofctrl.PacketIn) bool {
select {
case q.packetsCh <- packet:
return true
default:
// Channel is full.
return false
}
}

func (q *PacketInQueue) GetRateLimited(stopCh <-chan struct{}) *ofctrl.PacketIn {
when := q.rateLimiter.Reserve().Delay()
t := time.NewTimer(when)
defer t.Stop()

select {
case <-stopCh:
return nil
case <-t.C:
break
}
for {
select {
case <-stopCh:
return nil
case packet := <-q.packetsCh:
return packet
}
}
}

func (b *OFBridge) SubscribePacketIn(reason uint8, pktInQueue *PacketInQueue) error {
_, exist := b.pktConsumers.Load(reason)
if exist {
return fmt.Errorf("packetIn reason %d already exists", reason)
}
b.pktConsumers.Store(reason, ch)
b.pktConsumers.Store(reason, pktInQueue)
return nil
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/ovs/openflow/testing/mock_openflow.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 4 additions & 3 deletions test/integration/ovs/ofctrl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/contiv/ofnet/ofctrl"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"golang.org/x/time/rate"

binding "github.com/vmware-tanzu/antrea/pkg/ovs/openflow"
"github.com/vmware-tanzu/antrea/pkg/ovs/ovsconfig"
Expand Down Expand Up @@ -570,8 +571,8 @@ func TestPacketOutIn(t *testing.T) {
defer bridge.Disconnect()

reason := uint8(1)
pktCh := make(chan *ofctrl.PacketIn)
err = bridge.SubscribePacketIn(reason, pktCh)
pktInQueue := binding.NewPacketInQueue(200, rate.Limit(100))
err = bridge.SubscribePacketIn(reason, pktInQueue)
require.Nil(t, err)

srcMAC, _ := net.ParseMAC("11:11:11:11:11:11")
Expand All @@ -588,7 +589,7 @@ func TestPacketOutIn(t *testing.T) {
stopCh := make(chan struct{})

go func() {
pktIn := <-pktCh
pktIn := pktInQueue.GetRateLimited(make(chan struct{}))
matchers := pktIn.GetMatches()

reg2Match := matchers.GetMatchByName("NXM_NX_REG2")
Expand Down

0 comments on commit 96e4333

Please sign in to comment.