-
Notifications
You must be signed in to change notification settings - Fork 0
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[bugfix] Ring buffer block not automatically released without call to nextPacket() #34
[bugfix] Ring buffer block not automatically released without call to nextPacket() #34
Conversation
…ats without resetting counters
@@ -187,35 +187,18 @@ func (m *MockSource) CanAddPackets() bool { | |||
// the ring buffer / TPacketHeader block retirement setting for population of the ring buffer | |||
func (m *MockSource) Pipe(src capture.Source) chan error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've simplified the Pipe()
method significantly, dropping the whole "fake" block expiry simulation. Thing is: This was racy by definition, because it would create yet another goroutine that was accessing individual parts of the mock ring buffer (and not guarded by the memory barrier of the getBlockStatus()
and markBlock()
calls. Fixing that would have required an additional memory barrier, which would also have affected the normal, non-mock process (both in terms of complication and performance). Given that the expiry feature was nice to have but in no way important I think opting for simplicity and performance is the best course of action.
@@ -280,6 +263,14 @@ func (m *MockSource) Done() { | |||
close(m.mockBlocks) | |||
} | |||
|
|||
// ForceBlockRelease releases all blocks to the kernel (in order to "unblock" any potential mock capture | |||
// from the consuming routine without having to attempt a failed packet consumption) | |||
func (m *MockSource) ForceBlockRelease() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was needed to work around the conundrum that there is no way to release the current block to the kernel without attempting to call nextPacket()
again (because the data might still be worked on by the consumer) after the data has been used. That it turn leads to a deadlock in tests, where you quite often know (and want to check) exactly how many packets were sent through the pipeline. This way a test can basically signal that it's done consuming all data and release the block to the mock "kernel" (hence allowing the mock source to terminate cleanly and without races).
@@ -306,21 +297,37 @@ func (m *MockSource) run(errChan chan error) { | |||
} | |||
|
|||
func (m *MockSource) getBlockStatus(n int) (status uint32) { | |||
return *(*uint32)(unsafe.Pointer(&m.ringBuffer.ring[n*m.blockSize+8])) | |||
return atomic.LoadUint32((*uint32)(unsafe.Pointer(&m.ringBuffer.ring[n*m.blockSize+8]))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OKOK, turns out some atomic
magic is needed (for the mock only, but since the overhead is small and tPacket
has no easy way of knowing if it's a mock or not I've just made the whole block status atomic).
atomic.StoreUint32((*uint32)(unsafe.Pointer(&m.ringBuffer.ring[n*m.blockSize+8])), status) | ||
} | ||
|
||
func (m *MockSource) hasUserlandBlock() bool { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Used in conjunction with ForceBlockRelease()
to have the mock wait before really closing (a consumer might still be accessing data).
@@ -115,6 +115,12 @@ func (sd FileDescriptor) Close() error { | |||
return unix.Close(int(sd)) | |||
} | |||
|
|||
// IsOpen determines if the file descriptor is open / valid | |||
func (sd FileDescriptor) IsOpen() bool { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a much clearer (and atomic) way of checking if the file descriptor is open or not.
@@ -44,8 +48,21 @@ func (m *MockFileDescriptor) IncrementPacketCount(delta int) { | |||
m.nPacketsProcessed += delta | |||
} | |||
|
|||
// LastPoll return the timestamp of the last poll on the FileDescriptor | |||
func (m *MockFileDescriptor) LastPoll() int64 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For tests it's helpful to know when the last poll occurred on the mock FD (e.g. to decide if processing is done, which was surprisingly difficult to do atomically 🙈). Of course doesn't affect the normal FD.
Provides several small fixes and some improvements for testing via mock source(s). See inline comments for some pointers.
Closes #33