Skip to content

Commit

Permalink
Merge pull request #25 from fako1024/24-incorrect-handling-of-mock-ri…
Browse files Browse the repository at this point in the history
…ng-buffer-wrap-around

[bugfix] Fix incorrect handling of mock ring buffer wrap around
  • Loading branch information
fako1024 authored May 4, 2023
2 parents 57f5e00 + 25749b8 commit 9ff071e
Show file tree
Hide file tree
Showing 4 changed files with 43 additions and 45 deletions.
8 changes: 4 additions & 4 deletions capture/afpacket/afring/afring.go
Original file line number Diff line number Diff line change
Expand Up @@ -366,12 +366,12 @@ fetch:
}

// Handle rare cases of runaway packets
if s.curTPacketHeader.getStatus()&tPacketStatusCopy != 0 {
if s.curTPacketHeader.getStatus()&unix.TP_STATUS_COPY != 0 {
if s.curTPacketHeader.nPktsLeft != 0 {
fmt.Println(s.link.Name, "WUT (after runaway packet)?", s.curTPacketHeader.nPktsLeft)
}
s.curTPacketHeader.setStatus(tPacketStatusKernel)
s.offset = (s.offset + 1) % int(s.tpReq.frameNr)
s.curTPacketHeader.setStatus(unix.TP_STATUS_KERNEL)
s.offset = (s.offset + 1) % int(s.tpReq.blockNr)
s.curTPacketHeader = s.nextTPacketHeader()

continue
Expand All @@ -390,7 +390,7 @@ fetch:
if nextPos != 0 {
fmt.Println(s.link.Name, "WUT (after resetting)?", s.curTPacketHeader.nPktsLeft, nextPos)
}
s.curTPacketHeader.setStatus(tPacketStatusKernel)
s.curTPacketHeader.setStatus(unix.TP_STATUS_KERNEL)
s.offset = (s.offset + 1) % int(s.tpReq.blockNr)
s.curTPacketHeader = nil
goto fetch
Expand Down
43 changes: 23 additions & 20 deletions capture/afpacket/afring/afring_mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,10 @@ import (
"golang.org/x/sys/unix"
)

const mac = 82
const (
mac = 82
blockStatusPollInterval = 10 * time.Millisecond
)

// MockSource denotes a fully mocked ring buffer source, behaving just like one
// Since it wraps a regular Source, it can be used as a stand-in replacement without any further
Expand Down Expand Up @@ -51,7 +54,7 @@ func NewMockSource(iface string, options ...Option) (*MockSource, error) {
blockSize: tPacketDefaultBlockSize,
nBlocks: tPacketDefaultBlockNr,

ipLayerOffset: link.Type(link.TypeEthernet).IpHeaderOffset(),
ipLayerOffset: link.TypeEthernet.IpHeaderOffset(),
link: &link.Link{
Type: link.TypeEthernet,
Interface: &net.Interface{
Expand Down Expand Up @@ -95,8 +98,8 @@ func (m *MockSource) PacketAddCallbackFn(fn func(payload []byte, totalLen uint32
// This can happen prior to calling run or continuously while consuming data, mimicking the
// function of an actual ring buffer. Consequently, if the ring buffer is full and elements not
// yet consumed this function may block
func (m *MockSource) AddPacket(pkt capture.Packet) {
m.addPacket(pkt.Payload(), pkt.TotalLen(), pkt.Type(), 0)
func (m *MockSource) AddPacket(pkt capture.Packet) error {
return m.addPacket(pkt.Payload(), pkt.TotalLen(), pkt.Type(), 0)
}

// AddPacketFromSource consumes a single packet from the provided source and adds it to the source
Expand All @@ -116,12 +119,19 @@ func (m *MockSource) addPacket(payload []byte, totalLen uint32, pktType, ipLayer
if m.curBlockPos > 0 {
m.FinalizeBlock(false)
}
m.curBlockPos = tPacketHeaderLen
thisBlock = m.mockBlockCount % m.nBlocks

*(*uint32)(unsafe.Pointer(&m.ringBuffer.ring[thisBlock*m.blockSize])) = 3 // version
*(*uint32)(unsafe.Pointer(&m.ringBuffer.ring[thisBlock*m.blockSize+8])) = unix.TP_STATUS_KERNEL // status
*(*uint32)(unsafe.Pointer(&m.ringBuffer.ring[thisBlock*m.blockSize+16])) = tPacketHeaderLen // offsetToFirstPkt
// Ensure that the packet has already been consumed to avoid race conditions (since there is no
// feedback from the receiver we can only poll until the packet status is not TP_STATUS_KERNEL)
for m.getBlockStatus(thisBlock) != unix.TP_STATUS_KERNEL {
time.Sleep(blockStatusPollInterval)
}

m.markBlock(thisBlock, unix.TP_STATUS_CSUMNOTREADY)
m.curBlockPos = tPacketHeaderLen
*(*uint32)(unsafe.Pointer(&m.ringBuffer.ring[thisBlock*m.blockSize])) = 3 // version
*(*uint32)(unsafe.Pointer(&m.ringBuffer.ring[thisBlock*m.blockSize+12])) = 0 // nPkts
*(*uint32)(unsafe.Pointer(&m.ringBuffer.ring[thisBlock*m.blockSize+16])) = tPacketHeaderLen // offsetToFirstPkt
}

block := m.ringBuffer.ring[thisBlock*m.blockSize : thisBlock*m.blockSize+m.blockSize]
Expand All @@ -136,7 +146,7 @@ func (m *MockSource) addPacket(payload []byte, totalLen uint32, pktType, ipLayer
if m.curBlockPos > tPacketHeaderLen {
*(*uint32)(unsafe.Pointer(&block[m.curBlockPos-mac-m.snapLen])) = uint32(mac + m.snapLen) // nextOffset
}
*(*uint32)(unsafe.Pointer(&block[12])) = *(*uint32)(unsafe.Pointer(&block[12])) + 1 // nPkts // TPacket data
*(*uint32)(unsafe.Pointer(&block[12])) = *(*uint32)(unsafe.Pointer(&block[12])) + 1 // nPkts
m.curBlockPos += mac + m.snapLen

// Similar to the actual kernel ring buffer, we count packets as "seen" when they enter
Expand Down Expand Up @@ -277,17 +287,8 @@ func (m *MockSource) run(errChan chan error) {
break
}

thisBlock := block % m.nBlocks

// Ensure that the packet has already been consumed to avoid race conditions (since there is no
// feedback from the receiver we can only poll until the packet status is not TP_STATUS_USER)
for m.getBlockStatus(thisBlock) == unix.TP_STATUS_USER {
time.Sleep(100 * time.Millisecond)
}

// Store the next block in the ring buffer and mark it to be available to the reader
// copy(m.ringBuffer.ring[thisBlock*m.blockSize:thisBlock*m.blockSize+m.blockSize], block)
m.markBlock(thisBlock, unix.TP_STATUS_USER)
// Mark the next block in the ring buffer, making it available to the reader / userspace
m.markBlock(block%m.nBlocks, unix.TP_STATUS_USER)

// Queue / trigger an event equivalent to receiving a new block via the PPOLL syscall
if err := event.ToMockHandler(m.eventHandler).SignalAvailableData(); err != nil {
Expand All @@ -307,11 +308,13 @@ func (m *MockSource) markBlock(n int, status uint32) {
*(*uint32)(unsafe.Pointer(&m.ringBuffer.ring[n*m.blockSize+8])) = status
}

// Close stops / closes the capture source
func (m *MockSource) Close() error {
m.isClosed = true
return m.Source.Close()
}

// Free releases any pending resources from the capture source (must be called after Close())
func (m *MockSource) Free() error {
m.ringBuffer.ring = nil
return nil
Expand Down
33 changes: 16 additions & 17 deletions capture/afpacket/afring/afring_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ func TestBlockSizeAlignment(t *testing.T) {
require.Nil(t, err)
require.Equal(t, uint32(1<<i), req.blockSize)
require.Equal(t, uint32(blockNr), req.blockNr)
require.Equal(t, int(blockNr*(1<<i)), req.blockSizeNr())
require.Equal(t, blockNr*(1<<i), req.blockSizeNr())
}
for i := 0; i < 32; i++ {
_, err := newTPacketRequestForBuffer((1<<i)+1, blockNr, snapLen)
Expand Down Expand Up @@ -121,7 +121,7 @@ func TestFillRingBuffer(t *testing.T) {
6, []byte{byte(i), byte(j)}, byte(i+j)%5, int(i+j))
require.Nil(t, err)

mockSrc.AddPacket(p)
require.Nil(t, mockSrc.AddPacket(p))
i++
j++
}
Expand Down Expand Up @@ -221,7 +221,7 @@ func TestCaptureMethods(t *testing.T) {
testCaptureMethods(t, func(t *testing.T, src *MockSource, i, j uint16) {
err := src.NextPacketFn(func(payload []byte, totalLen uint32, pktType, ipLayerOffset byte) error {
require.Equal(t, src.link.Type.IpHeaderOffset(), ipLayerOffset)
require.Equal(t, uint32(i+j), totalLen)
require.Equal(t, int(i*1000+j), int(totalLen))
require.Equal(t, byte(i+j)%5, pktType)
require.Equal(t, fmt.Sprintf("1.2.3.%d:%d => 4.5.6.%d:%d (proto: %d)", i%254+1, i, j%254+1, j, 6), capture.IPLayer(payload[ipLayerOffset:]).String())
return nil
Expand All @@ -237,7 +237,7 @@ func TestPipe(t *testing.T) {
mockSrc, err := NewMockSource("mock",
CaptureLength(link.CaptureLengthMinimalIPv4Transport),
Promiscuous(false),
BufferSize(1024*1024, 5),
BufferSize(1024*16, 8),
)
require.Nil(t, err)

Expand All @@ -253,10 +253,10 @@ func TestPipe(t *testing.T) {
net.ParseIP(fmt.Sprintf("4.5.6.%d", j%254+1)),
i,
j,
6, []byte{byte(i), byte(j)}, byte(i+j)%5, int(i+j))
6, []byte{byte(i), byte(j)}, byte(i+j)%5, int(i*1000+j))
require.Nil(t, err)

mockSrc.AddPacket(p)
require.Nil(t, mockSrc.AddPacket(p))
}
}
mockSrc.FinalizeBlock(false)
Expand Down Expand Up @@ -311,7 +311,7 @@ func BenchmarkCaptureMethods(b *testing.B) {
require.Nil(b, err)

for mockSrc.CanAddPackets() {
mockSrc.AddPacket(testPacket)
require.Nil(b, mockSrc.AddPacket(testPacket))
}
mockSrc.RunNoDrain(time.Microsecond)

Expand Down Expand Up @@ -412,13 +412,13 @@ func BenchmarkCaptureMethods(b *testing.B) {
})
}

func testCaptureMethods(t *testing.T, fn func(t *testing.T, src *MockSource, i, j uint16)) {
func testCaptureMethods(t *testing.T, fn func(t *testing.T, _ *MockSource, _, _ uint16)) {

// Setup a mock source
mockSrc, err := NewMockSource("mock",
CaptureLength(link.CaptureLengthMinimalIPv4Transport),
Promiscuous(false),
BufferSize(1024*1024, 5),
BufferSize(1024*16, 8),
)
require.Nil(t, err)

Expand All @@ -434,15 +434,14 @@ func testCaptureMethods(t *testing.T, fn func(t *testing.T, src *MockSource, i,
net.ParseIP(fmt.Sprintf("4.5.6.%d", j%254+1)),
i,
j,
6, []byte{byte(i), byte(j)}, byte(i+j)%5, int(i+j))
6, []byte{byte(i), byte(j)}, byte(i+j)%5, int(i*1000+j))
require.Nil(t, err)

mockSrc.AddPacket(p)
require.Nil(t, mockSrc.AddPacket(p))
}
}
mockSrc.FinalizeBlock(false)
mockSrc.Done()

}()

// Consume data from the source via the respective method
Expand All @@ -469,15 +468,15 @@ func validatePacket(t *testing.T, p capture.Packet, i, j uint16) {
}

func validateIPPacket(t *testing.T, p capture.IPLayer, pktType capture.PacketType, totalLen uint32, i, j uint16) {
require.Equal(t, uint32(i+j), totalLen)
require.Equal(t, byte(i+j)%5, pktType)
require.Equal(t, fmt.Sprintf("1.2.3.%d:%d => 4.5.6.%d:%d (proto: %d)", i%254+1, i, j%254+1, j, 6), p.String())
require.Equalf(t, int(i*1000+j), int(totalLen), "i=%d, j=%d", i, j)
require.Equalf(t, byte(i+j)%5, pktType, "i=%d, j=%d", i, j)
require.Equalf(t, fmt.Sprintf("1.2.3.%d:%d => 4.5.6.%d:%d (proto: %d)", i%254+1, i, j%254+1, j, 6), p.String(), "i=%d, j=%d", i, j)
c, err := capture.BuildPacket(
net.ParseIP(fmt.Sprintf("1.2.3.%d", i%254+1)),
net.ParseIP(fmt.Sprintf("4.5.6.%d", j%254+1)),
i,
j,
6, []byte{byte(i), byte(j)}, byte(i+j)%5, int(i+j))
6, []byte{byte(i), byte(j)}, byte(i+j)%5, int(i*1000+j))
require.Nil(t, err)
require.Equalf(t, c.IPLayer(), p[:len(c.IPLayer())], "%v vs. %v", c.IPLayer(), p)
require.Equalf(t, c.IPLayer(), p[:len(c.IPLayer())], "%v vs. %v , i=%d, j=%d", c.IPLayer(), p, i, j)
}
4 changes: 0 additions & 4 deletions capture/afpacket/afring/tpacket.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,6 @@ const (
)

const (
tPacketStatusKernel = 0
tPacketStatusUser = (1 << 0)
tPacketStatusCopy = (1 << 1)

tPacketDefaultBlockNr = 4 // sizeof(tpacket3_hdr)
tPacketDefaultBlockSize = (1 << 20) // 1 MiB
tPacketDefaultBlockTOV = 100 // ms
Expand Down

0 comments on commit 9ff071e

Please sign in to comment.