-
Notifications
You must be signed in to change notification settings - Fork 0
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[bugfix] Ring buffer block not automatically released without call to nextPacket() #34
Changes from all commits
5a2654c
d400158
a8467fd
2283517
4f7283e
489e612
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -5,6 +5,7 @@ import ( | |
"io" | ||
"net" | ||
"sync" | ||
"sync/atomic" | ||
"time" | ||
"unsafe" | ||
|
||
|
@@ -35,8 +36,7 @@ type MockSource struct { | |
mockBlocks chan int | ||
mockBlockCount int | ||
|
||
mockFd *socket.MockFileDescriptor | ||
isClosed bool | ||
MockFd *socket.MockFileDescriptor | ||
|
||
packetAddCallbackFn func(payload []byte, totalLen uint32, pktType, ipLayerOffset byte) | ||
} | ||
|
@@ -83,7 +83,7 @@ func NewMockSource(iface string, options ...Option) (*MockSource, error) { | |
return &MockSource{ | ||
Source: src, | ||
mockBlocks: make(chan int, src.nBlocks), | ||
mockFd: mockFd, | ||
MockFd: mockFd, | ||
}, nil | ||
} | ||
|
||
|
@@ -156,7 +156,7 @@ func (m *MockSource) addPacket(payload []byte, totalLen uint32, pktType, ipLayer | |
|
||
// Similar to the actual kernel ring buffer, we count packets as "seen" when they enter | ||
// the pipeline, not when they are consumed from the buffer | ||
m.mockFd.IncrementPacketCount(1) | ||
m.MockFd.IncrementPacketCount(1) | ||
|
||
// If a callback function was provided, execute it | ||
if m.packetAddCallbackFn != nil { | ||
|
@@ -187,35 +187,18 @@ func (m *MockSource) CanAddPackets() bool { | |
// the ring buffer / TPacketHeader block retirement setting for population of the ring buffer | ||
func (m *MockSource) Pipe(src capture.Source) chan error { | ||
errChan := make(chan error) | ||
go func(errs chan error) { | ||
pipe := make(chan error) | ||
|
||
// Run the next capture attempt in a goroutine to allow timing out the operation | ||
go func(errs chan error) { | ||
for { | ||
go func() { | ||
pipe <- m.AddPacketFromSource(src) | ||
}() | ||
|
||
retry: | ||
select { | ||
// Simulate TPacket block retirement | ||
case <-time.After(time.Duration(m.ringBuffer.tpReq.retire_blk_tov) * time.Millisecond): | ||
|
||
// To ensure the process cannot enter a deadlock, block finalization is forced (just as | ||
// it would be the case for the actual ring buffer) even if no packets were received | ||
m.FinalizeBlock(true) | ||
goto retry | ||
|
||
case err := <-pipe: | ||
if err != nil { | ||
if errors.Is(err, io.EOF) || errors.Is(err, capture.ErrCaptureStopped) { | ||
m.FinalizeBlock(false) | ||
m.Done() | ||
return | ||
} | ||
errs <- err | ||
if err := m.AddPacketFromSource(src); err != nil { | ||
if errors.Is(err, io.EOF) || errors.Is(err, capture.ErrCaptureStopped) { | ||
m.FinalizeBlock(false) | ||
m.Done() | ||
|
||
return | ||
} | ||
errs <- err | ||
return | ||
} | ||
} | ||
}(errChan) | ||
|
@@ -249,7 +232,7 @@ func (m *MockSource) RunNoDrain(releaseInterval time.Duration) chan error { | |
// Queue / trigger a single event equivalent to receiving a new block via the PPOLL syscall and | ||
// instruct the mock socket to not release the semaphore. That way data can be consumed immediately | ||
// at all times | ||
m.mockFd.SetNoRelease(true) | ||
m.MockFd.SetNoRelease(true) | ||
if err := event.ToMockHandler(m.eventHandler).SignalAvailableData(); err != nil { | ||
errs <- err | ||
return | ||
|
@@ -260,7 +243,7 @@ func (m *MockSource) RunNoDrain(releaseInterval time.Duration) chan error { | |
for i := 0; i < m.nBlocks; i++ { | ||
|
||
// If the ring buffer is empty it was apparently closed / free'd | ||
if m.isClosed || len(m.ringBuffer.ring) == 0 { | ||
if len(m.ringBuffer.ring) == 0 { | ||
errs <- nil | ||
return | ||
} | ||
|
@@ -280,6 +263,14 @@ func (m *MockSource) Done() { | |
close(m.mockBlocks) | ||
} | ||
|
||
// ForceBlockRelease releases all blocks to the kernel (in order to "unblock" any potential mock capture | ||
// from the consuming routine without having to attempt a failed packet consumption) | ||
func (m *MockSource) ForceBlockRelease() { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This was needed to work around the conundrum that there is no way to release the current block to the kernel without attempting to call |
||
for i := 0; i < m.nBlocks; i++ { | ||
m.markBlock(i, unix.TP_STATUS_KERNEL) | ||
} | ||
} | ||
|
||
////////////////////////////////////////////////////////////////////////////////////////////////////// | ||
|
||
func (m *MockSource) run(errChan chan error) { | ||
|
@@ -288,7 +279,7 @@ func (m *MockSource) run(errChan chan error) { | |
for block := range m.mockBlocks { | ||
|
||
// If the ring buffer is empty it was apparently closed / free'd | ||
if m.isClosed || len(m.ringBuffer.ring) == 0 { | ||
if len(m.ringBuffer.ring) == 0 { | ||
break | ||
} | ||
|
||
|
@@ -306,16 +297,30 @@ func (m *MockSource) run(errChan chan error) { | |
} | ||
|
||
func (m *MockSource) getBlockStatus(n int) (status uint32) { | ||
return *(*uint32)(unsafe.Pointer(&m.ringBuffer.ring[n*m.blockSize+8])) | ||
return atomic.LoadUint32((*uint32)(unsafe.Pointer(&m.ringBuffer.ring[n*m.blockSize+8]))) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. OKOK, turns out some |
||
} | ||
|
||
func (m *MockSource) markBlock(n int, status uint32) { | ||
*(*uint32)(unsafe.Pointer(&m.ringBuffer.ring[n*m.blockSize+8])) = status | ||
atomic.StoreUint32((*uint32)(unsafe.Pointer(&m.ringBuffer.ring[n*m.blockSize+8])), status) | ||
} | ||
|
||
func (m *MockSource) hasUserlandBlock() bool { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Used in conjunction with |
||
for i := 0; i < m.nBlocks; i++ { | ||
if m.getBlockStatus(i) != unix.TP_STATUS_KERNEL { | ||
return true | ||
} | ||
} | ||
return false | ||
} | ||
|
||
// Close stops / closes the capture source | ||
func (m *MockSource) Close() error { | ||
m.isClosed = true | ||
|
||
// Wait until all blocks have been retuned to the mock kernel | ||
for m.hasUserlandBlock() { | ||
time.Sleep(10 * time.Millisecond) | ||
} | ||
|
||
return m.Source.Close() | ||
} | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've simplified the
Pipe()
method significantly, dropping the whole "fake" block expiry simulation. Thing is: This was racy by definition, because it would create yet another goroutine that was accessing individual parts of the mock ring buffer (and not guarded by the memory barrier of thegetBlockStatus()
andmarkBlock()
calls. Fixing that would have required an additional memory barrier, which would also have affected the normal, non-mock process (both in terms of complication and performance). Given that the expiry feature was nice to have but in no way important I think opting for simplicity and performance is the best course of action.