-
Notifications
You must be signed in to change notification settings - Fork 373
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add rate limiting queue for packet-in #2015
Conversation
56c488a
to
7c1319d
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@GraysonWu please update the rate as suggested and provide CPU usage data with & without your change, otherwise it's hard to evaluate the value of the PR and whether it does address the issue. Also provide the parameters you use to run hping3
and generate traffic.
pkg/agent/openflow/packetin.go
Outdated
if reason == uint8(PacketInReasonTF) { | ||
featurePacketIn.packetInQueue = workqueue.NewNamedRateLimitingQueue(&workqueue.BucketRateLimiter{Limiter: rate.NewLimiter(rate.Inf, 10)}, string(reason)) | ||
} else { | ||
featurePacketIn.packetInQueue = workqueue.NewNamedRateLimitingQueue(&workqueue.BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(5), 10)}, string(reason)) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think a rate limit of 5 is a bit too conservative, how about 100? With a burst size of 200?
I feel like we can also use this for Traceflow
pkg/agent/openflow/packetin.go
Outdated
@@ -108,7 +114,7 @@ func (c *client) subscribeFeaturePacketIn(featurePacketIn *featureStartPacketIn) | |||
return nil | |||
} | |||
|
|||
func (c *client) parsePacketIn(packetInQueue workqueue.Interface, packetHandlerReason uint8) { | |||
func (c *client) parsePacketIn(packetInQueue workqueue.RateLimitingInterface, packetHandlerReason uint8) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we call packetInQueue.Forget(obj)
after packetInQueue.Done(obj)
even though it is a no-op (and add a comment to mention it is a no-op for the BucketRateLimiter
implementation
pkg/agent/openflow/packetin.go
Outdated
@@ -71,7 +77,7 @@ func (f *featureStartPacketIn) ListenPacketIn() { | |||
case pktIn := <-f.subscribeCh: | |||
// Ensure that the queue doesn't grow too big. This is NOT to provide an exact guarantee. | |||
if f.packetInQueue.Len() < packetInQueueSize { | |||
f.packetInQueue.Add(pktIn) | |||
f.packetInQueue.AddRateLimited(pktIn) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As far as I can tell from code, AddRateLimited
adds a waitFor
item to a channel of 1000 buckets if there is no available rate limter bucket, so it will be blocking when the channel is full. And it doesn't drop any items even it exceeds its rate limit.
Maybe the length check against the queue can avoid appending more items when it exceeds the length of the queue. However the length of the ratelimitingQueue is the items that are ready to be processed (their ratelimiting waiting time have expired), not including the items in the waitingForAddCh
channel. If the consumers process the ready items very quickly, it could happen that there are 1000 waiting items in the waitingForAddCh and 0 ready items in the queue, and a waiting item is poped from the channel and pushed to the queue every 10ms (if the rate limit is 100), and each incoming packet-in event will block here for 10ms when calling AddRateLimited and they will not be handled properly as it will only be handled after 10s (1000*10ms) when Traceflow request or connection already expire.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to drop the packets here. Otherwise the backpressure will propagate to ofnet, and we will essentially block the loop that receives all messages from the switch (including bundle acknowledgements?): https://github.com/wenyingd/ofnet/blob/171b6795a2da8d488d38ae318ca3ce043481fc59/ofctrl/ofSwitch.go#L306
I am not a big fan of the current architecture: unless I am missing something, I think we have one channel / queue too many (subscribeCh
and packetInQueue
) and that everything could be done with a single channel.
My preference would be to change and simplify the code as follows:
- remove
packetInQueue
altogether - make
subscribeCh
a buffered channel - change
PacketRcvd
(ofctfl_bridge.go) to drop the packet if the subscribe channel is full (avoid backpressure to ofnet) - when calling
SubscribePacketIn
, provide a rate limiter to control how fast packets can be dequeued by a consumer (this ensures low CPU usage of consumer)
maybe we can define a simple queue like this one:
type PacketInQueue struct {
rateLimiter *rate.Limiter
packetsCh chan *ofctrl.PacketIn
}
func NewPacketInQueue(size int, r rate.Limit) *PacketInQueue {
return &PacketInQueue{rateLimiter: rate.NewLimiter(r, 1), packetsCh: make(chan *ofctrl.PacketIn, size)}
}
func (q *PacketInQueue) AddOrDrop(packet *ofctrl.PacketIn) bool {
select {
case q.packetsCh <- packet:
return true
default:
// channel is full
return false
}
}
func (q *PacketInQueue) GetRateLimited(stopCh <-chan struct{}) *ofctrl.PacketIn {
when := q.rateLimiter.Reserve().Delay()
t := time.NewTimer(when)
defer t.Stop()
for {
select {
case <- stopCh:
return nil
case <- t.C:
break
}
}
for {
select {
case <- stopCh:
return nil
case packet := <- q.packetsCh:
return packet
}
}
}
// receiver side:
go func() {
for {
packet := q.GetRateLimited(stopCh)
if packet == nil {
return
}
// call all registered handlers
}
}
Of course we should continue to have one instance of PacketInQueue
per packet type so that Traceflow packets get their own queue.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After my test (hping3 --flood 10.10.1.114 -p 80 -2
), there is no significant difference in CPU usage whether we use this rateLimitQueue. I guess continuously calling AddRateLimited
at a high rate also costs a lot of CPU? I think @antoninbas 's simplified code looks good and we can drop the packet over rate in this way. To make sure the Traceflow won't be choked or dropped by other packet-in messages, maybe make TraceFlow has its own queue would be better?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As I wrote at the end, yes Traceflow should have its own queue
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My bad miss the end statement.
Then let me try if this way can save the CPU usage.
Before we add rate limiting mechanism in OVS, we use RateLimitingQueue for packet-in event in Antrea agent.
7c1319d
to
ccdd2f5
Compare
Pushed with what @antoninbas suggested here #2015 (comment). Tested using
With channel buffer size 200, rate.Limit(rate.Inf, 1), CPU usage:
|
Thanks @GraysonWu. At leasts we are not making things a bit better in terms of both CPU & memory :) |
bfafe85
to
96e4333
Compare
It's NP with reject action. Let me try more hping to see the result. |
@GraysonWu could your try logging as well? |
Sure. |
4911d6a
to
9222dd5
Compare
Codecov Report
@@ Coverage Diff @@
## main #2015 +/- ##
===========================================
- Coverage 65.12% 41.99% -23.14%
===========================================
Files 197 256 +59
Lines 17407 18773 +1366
===========================================
- Hits 11336 7883 -3453
- Misses 4883 9794 +4911
+ Partials 1188 1096 -92
Flags with carried forward coverage won't be shown. Click here to find out more.
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
code looks good to me, let's see if @tnqn has an objection to the change
9222dd5
to
584f524
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I found an issue. It will affect the measurements you did previously, so I believe these needs to be measured again.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ignore my earlier comment, it was incorrect. I just checked the Go documentation :)
LGTM
4ab770c
to
daa7379
Compare
daa7379
to
e5066af
Compare
/test-all |
Before we add rate limiting mechanism in OVS, we use RateLimitingQueue for packet-in event in Antrea agent.
I have tested it using
hping3
to simulate packet flood and agent can handle packet-in under the rate limit.