Skip to content

Commit

Permalink
Merge pull request #74 from fako1024/71-improve-afring-throughput-by-…
Browse files Browse the repository at this point in the history
…minimizing-individual-unsafe-casts

[feature] Improve afring throughput using code optimization
  • Loading branch information
fako1024 authored Sep 26, 2023
2 parents 470fa79 + 8ad026e commit 92124d4
Show file tree
Hide file tree
Showing 20 changed files with 605 additions and 348 deletions.
296 changes: 150 additions & 146 deletions capture/afpacket/afring/afring.go

Large diffs are not rendered by default.

24 changes: 19 additions & 5 deletions capture/afpacket/afring/afring_mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,16 @@ const (
blockStatusPollInterval = 10 * time.Millisecond
)

type tPacketHeaderV3Mock struct {
snaplen uint32 // 12-16
pktLen uint32 // 16-20
_ uint32 // skip
pktMac uint16 // 24-26
pktNet uint16 // 26-28
_ [15]uint16 // skip
pktType byte // 58
}

// MockSource denotes a fully mocked ring buffer source, behaving just like one
// Since it wraps a regular Source, it can be used as a stand-in replacement without any further
// code modifications:
Expand Down Expand Up @@ -62,6 +72,7 @@ func NewMockSource(_ string, options ...Option) (*MockSource, error) {
},
eventHandler: mockHandler,
}
src.ipLayerOffsetNum = uint32(src.ipLayerOffset)

for _, opt := range options {
opt(src)
Expand Down Expand Up @@ -129,11 +140,14 @@ func (m *MockSource) addPacket(payload []byte, totalLen uint32, pktType, ipLayer

block := m.ringBuffer.ring[thisBlock*m.blockSize : thisBlock*m.blockSize+m.blockSize]

*(*uint32)(unsafe.Pointer(&block[m.curBlockPos+12])) = uint32(m.snapLen) // #nosec: G103 // snapLen
*(*uint32)(unsafe.Pointer(&block[m.curBlockPos+16])) = totalLen // #nosec: G103 // totalLen
*(*uint32)(unsafe.Pointer(&block[m.curBlockPos+24])) = uint32(mac) // #nosec: G103 // mac
block[m.curBlockPos+58] = pktType // pktType
copy(block[m.curBlockPos+mac:m.curBlockPos+mac+m.snapLen], payload) // payload
*(*tPacketHeaderV3Mock)(unsafe.Pointer(&block[m.curBlockPos+12])) = tPacketHeaderV3Mock{
snaplen: uint32(m.snapLen),
pktLen: totalLen,
pktMac: mac,
pktNet: mac + uint16(m.ipLayerOffset),
pktType: pktType,
} // #nosec: G103
copy(block[m.curBlockPos+mac:m.curBlockPos+mac+m.snapLen], payload) // payload

// Ensure that there is no "stray" nextOffset set from a previous perusal of this ring buffer block which
// might remain in case the block is finalized
Expand Down
3 changes: 3 additions & 0 deletions capture/afpacket/afring/afring_mock_nodrain.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package afring

import (
"errors"
"runtime"
"sync"
"sync/atomic"
"time"
Expand Down Expand Up @@ -68,6 +69,8 @@ func (m *MockSourceNoDrain) Run(releaseInterval time.Duration) (<-chan error, er
m.wgRunning.Add(1)
go func(errs chan error) {

// Minimize scheduler overhead by locking this goroutine to the current thread
runtime.LockOSThread()
defer func() {
close(errs)
m.wgRunning.Done()
Expand Down
211 changes: 112 additions & 99 deletions capture/afpacket/afring/afring_mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -385,114 +385,127 @@ func BenchmarkCaptureMethods(b *testing.B) {
6, []byte{1, 2}, 0, 128)
require.Nil(b, err)

// Setup a mock source
mockSrc, err := NewMockSourceNoDrain("mock",
CaptureLength(link.CaptureLengthMinimalIPv4Transport),
Promiscuous(false),
)
require.Nil(b, err)

for mockSrc.CanAddPackets() {
require.Nil(b, mockSrc.AddPacket(testPacket))
}
_, err = mockSrc.Run(time.Microsecond)
require.Nil(b, err)

b.Run("NextPacket", func(b *testing.B) {
b.ReportAllocs()
for i := 0; i < b.N; i++ {
p, _ := mockSrc.NextPacket(nil)
_ = p
}
})

b.Run("NextPacketInPlace", func(b *testing.B) {
var p capture.Packet = mockSrc.NewPacket()
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
p, _ = mockSrc.NextPacket(p)
_ = p
}
})

b.Run("NextPayload", func(b *testing.B) {
b.ReportAllocs()
for i := 0; i < b.N; i++ {
p, pktType, totalLen, _ := mockSrc.NextPayload(nil)
_ = p
_ = pktType
_ = totalLen
for _, benchConfig := range []struct {
blockSize int
nBlocks int
blockExpiry time.Duration
}{
{10 * 1024 * 1024, 4, time.Microsecond},
{10 * 1024, 512, 10 * time.Nanosecond},
} {

// Setup a mock source
mockSrc, err := NewMockSourceNoDrain("mock",
CaptureLength(link.CaptureLengthMinimalIPv4Transport),
BufferSize(benchConfig.blockSize, benchConfig.nBlocks),
Promiscuous(false),
)
require.Nil(b, err)

for mockSrc.CanAddPackets() {
require.Nil(b, mockSrc.AddPacket(testPacket))
}
})
_, err = mockSrc.Run(benchConfig.blockExpiry)
require.Nil(b, err)

b.Run(fmt.Sprintf("NextPacket_%dkiBx%d", benchConfig.blockSize/1000, benchConfig.nBlocks), func(b *testing.B) {
b.ReportAllocs()
for i := 0; i < b.N; i++ {
p, _ := mockSrc.NextPacket(nil)
_ = p
}
})

b.Run("NextPayloadInPlace", func(b *testing.B) {
pkt := mockSrc.NewPacket()
var p []byte = pkt.Payload()
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
p, pktType, totalLen, _ := mockSrc.NextPayload(p)
_ = p
_ = pktType
_ = totalLen
}
})
b.Run(fmt.Sprintf("NextPacketInPlace_%dkiBx%d", benchConfig.blockSize/1000, benchConfig.nBlocks), func(b *testing.B) {
var p capture.Packet = mockSrc.NewPacket()
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
p, _ = mockSrc.NextPacket(p)
_ = p
}
})

b.Run("NextPayloadZeroCopy", func(b *testing.B) {
b.ReportAllocs()
for i := 0; i < b.N; i++ {
p, pktType, totalLen, _ := mockSrc.NextPayloadZeroCopy()
_ = p
_ = pktType
_ = totalLen
}
})
b.Run(fmt.Sprintf("NextPayload_%dkiBx%d", benchConfig.blockSize/1000, benchConfig.nBlocks), func(b *testing.B) {
b.ReportAllocs()
for i := 0; i < b.N; i++ {
p, pktType, totalLen, _ := mockSrc.NextPayload(nil)
_ = p
_ = pktType
_ = totalLen
}
})

b.Run("NextIPPacket", func(b *testing.B) {
b.ReportAllocs()
for i := 0; i < b.N; i++ {
p, pktType, totalLen, _ := mockSrc.NextIPPacket(nil)
_ = p
_ = pktType
_ = totalLen
}
})
b.Run(fmt.Sprintf("NextPayloadInPlace_%dkiBx%d", benchConfig.blockSize/1000, benchConfig.nBlocks), func(b *testing.B) {
pkt := mockSrc.NewPacket()
var p []byte = pkt.Payload()
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
p, pktType, totalLen, _ := mockSrc.NextPayload(p)
_ = p
_ = pktType
_ = totalLen
}
})

b.Run("NextIPPacketInPlace", func(b *testing.B) {
pkt := mockSrc.NewPacket()
var p capture.IPLayer = pkt.IPLayer()
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
p, pktType, totalLen, _ := mockSrc.NextIPPacket(p)
_ = p
_ = pktType
_ = totalLen
}
})
b.Run(fmt.Sprintf("NextPayloadZeroCopy_%dkiBx%d", benchConfig.blockSize/1000, benchConfig.nBlocks), func(b *testing.B) {
b.ReportAllocs()
for i := 0; i < b.N; i++ {
p, pktType, totalLen, _ := mockSrc.NextPayloadZeroCopy()
_ = p
_ = pktType
_ = totalLen
}
})

b.Run("NextIPPacketZeroCopy", func(b *testing.B) {
b.ReportAllocs()
for i := 0; i < b.N; i++ {
p, pktType, totalLen, _ := mockSrc.NextIPPacketZeroCopy()
_ = p
_ = pktType
_ = totalLen
}
})
b.Run(fmt.Sprintf("NextIPPacket_%dkiBx%d", benchConfig.blockSize/1000, benchConfig.nBlocks), func(b *testing.B) {
b.ReportAllocs()
for i := 0; i < b.N; i++ {
p, pktType, totalLen, _ := mockSrc.NextIPPacket(nil)
_ = p
_ = pktType
_ = totalLen
}
})

b.Run("NextPacketFn", func(b *testing.B) {
b.ReportAllocs()
for i := 0; i < b.N; i++ {
_ = mockSrc.NextPacketFn(func(payload []byte, totalLen uint32, pktType, ipLayerOffset byte) error {
_ = payload
b.Run(fmt.Sprintf("NextIPPacketInPlace_%dkiBx%d", benchConfig.blockSize/1000, benchConfig.nBlocks), func(b *testing.B) {
pkt := mockSrc.NewPacket()
var p capture.IPLayer = pkt.IPLayer()
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
p, pktType, totalLen, _ := mockSrc.NextIPPacket(p)
_ = p
_ = pktType
_ = totalLen
}
})

b.Run(fmt.Sprintf("NextIPPacketZeroCopy_%dkiBx%d", benchConfig.blockSize/1000, benchConfig.nBlocks), func(b *testing.B) {
b.ReportAllocs()
for i := 0; i < b.N; i++ {
p, pktType, totalLen, _ := mockSrc.NextIPPacketZeroCopy()
_ = p
_ = pktType
return nil
})
}
})
_ = totalLen
}
})

b.Run(fmt.Sprintf("NextPacketFn_%dkiBx%d", benchConfig.blockSize/1000, benchConfig.nBlocks), func(b *testing.B) {
b.ReportAllocs()
for i := 0; i < b.N; i++ {
_ = mockSrc.NextPacketFn(func(payload []byte, totalLen uint32, pktType, ipLayerOffset byte) error {
_ = payload
_ = totalLen
_ = pktType
return nil
})
}
})

require.Nil(b, mockSrc.Close())
}
}

func testCaptureMethods(t *testing.T, fn func(t *testing.T, _ *MockSource, _, _ uint16)) {
Expand Down
Loading

0 comments on commit 92124d4

Please sign in to comment.