Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[feature] Refactor local buffer / memory pool handling for CaptureManager #321

Merged
merged 3 commits into from
May 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 30 additions & 31 deletions pkg/capture/buffer.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
package capture

import (
"fmt"
"unsafe"

"github.com/els0r/goProbe/cmd/goProbe/config"
"github.com/els0r/goProbe/pkg/capture/capturetypes"
"github.com/fako1024/gotools/concurrency"
"golang.org/x/sys/unix"
Expand All @@ -21,18 +19,39 @@ var (

// Initial size of a buffer
initialBufferSize = unix.Getpagesize()

// Global (limited) memory pool used to minimize allocations
memPool = concurrency.NewMemPoolLimitUnique(config.DefaultLocalBufferNumBuffers, initialBufferSize)
maxBufferSize = config.DefaultLocalBufferSizeLimit
)

// LocalBufferPool provides a wrapper around a MemPoolLimitUnique with a maximum size
type LocalBufferPool struct {
NBuffers int
MaxBufferSize int
*concurrency.MemPoolLimitUnique
}

// NewLocalBufferPool initializes a new local buffer pool
func NewLocalBufferPool(nBuffers, maxBufferSize int) *LocalBufferPool {
return &LocalBufferPool{
NBuffers: nBuffers,
MaxBufferSize: maxBufferSize,
MemPoolLimitUnique: concurrency.NewMemPoolLimitUnique(nBuffers, initialBufferSize),
}
}

// LocalBuffer denotes a local packet buffer used to temporarily capture packets
// from the source (e.g. during rotation) to avoid a ring / kernel buffer overflow
type LocalBuffer struct {
data []byte // continuous buffer slice
writeBufPos int // current position in buffer slice
readBufPos int // current position in buffer slice

memPool *LocalBufferPool
}

// NewLocalBuffer initializes a new local buffer using a global memory pool and a maximum buffer size
func NewLocalBuffer(memPool *LocalBufferPool) *LocalBuffer {
return &LocalBuffer{
memPool: memPool,
}
}

// Assign sets the actual underlying data slice (obtained from a memory pool) of this buffer
Expand All @@ -41,7 +60,7 @@ func (l *LocalBuffer) Assign(data []byte) {

// Ascertain the current size of the underlying data slice and grow if required
if len(l.data) < initialBufferSize {
l.data = memPool.Resize(l.data, initialBufferSize)
l.data = l.memPool.Resize(l.data, initialBufferSize)
}
}

Expand All @@ -56,7 +75,7 @@ func (l *LocalBuffer) Reset() {
func (l *LocalBuffer) Usage() float64 {

// Note: maxBufferSize is guarded against zero in setLocalBuffers(), so this cannot cause division by zero
return float64(l.writeBufPos) / float64(maxBufferSize)
return float64(l.writeBufPos) / float64(l.memPool.MaxBufferSize)
}

// Add adds an element to the buffer, returning ok = true if successful
Expand All @@ -67,11 +86,11 @@ func (l *LocalBuffer) Add(epHash []byte, pktType byte, pktSize uint32, isIPv4 bo
if l.writeBufPos+len(epHash)+bufElementAddSize >= len(l.data) {

// If the buffer size is already at its limit, reject the new element
if len(l.data) >= maxBufferSize {
if len(l.data) >= l.memPool.MaxBufferSize {
return false
}

l.grow(min(maxBufferSize, 2*len(l.data)))
l.grow(min(l.memPool.MaxBufferSize, 2*len(l.data)))
}

// Transfer data to the buffer
Expand Down Expand Up @@ -133,26 +152,6 @@ func (l *LocalBuffer) Next() ([]byte, byte, uint32, bool, byte, capturetypes.Par
true
}

///////////////////////////////////////////////////////////////////////////////////

// setLocalBuffers sets the number of (and hence the maximum concurrency for Status() calls) and
// maximum size of the local memory buffers (globally, not per interface)
func setLocalBuffers(nBuffers, sizeLimit int) error {

// Guard against invalid (i.e. zero) buffer size / limits
if nBuffers == 0 || sizeLimit == 0 {
return fmt.Errorf("invalid number of local buffers (%d) / size limit (%d) specified", nBuffers, sizeLimit)
}

if memPool != nil {
memPool.Clear()
}
memPool = concurrency.NewMemPoolLimitUnique(nBuffers, initialBufferSize)
maxBufferSize = sizeLimit

return nil
}

func (l *LocalBuffer) grow(newSize int) {
l.data = memPool.Resize(l.data, newSize)
l.data = l.memPool.Resize(l.data, newSize)
}
5 changes: 4 additions & 1 deletion pkg/capture/buffer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,17 @@ import (
"net"
"testing"

"github.com/els0r/goProbe/cmd/goProbe/config"
"github.com/els0r/goProbe/pkg/capture/capturetypes"
"github.com/fako1024/slimcap/capture"
"github.com/stretchr/testify/require"
)

var testLocalBufferPool = NewLocalBufferPool(1, config.DefaultLocalBufferSizeLimit)

func TestBuffer(t *testing.T) {

localBuf := new(LocalBuffer)
localBuf := NewLocalBuffer(testLocalBufferPool)
data := make([]byte, 128*1024*1024)
localBuf.Assign(data)

Expand Down
11 changes: 6 additions & 5 deletions pkg/capture/capture.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,8 +123,8 @@ type Capture struct {
captureHandle Source
sourceInitFn sourceInitFn

// Error tracking (type / errno specific)
// parsingErrors ParsingErrTracker
// Memory buffer pool
memPool *LocalBufferPool

// WaitGroup tracking active processing
wgProc sync.WaitGroup
Expand Down Expand Up @@ -154,16 +154,17 @@ func (c *Capture) Iface() string {
return c.iface
}

func (c *Capture) run() (err error) {
func (c *Capture) run(memPool *LocalBufferPool) (err error) {

// Set up the packet source and capturing
c.captureHandle, err = c.sourceInitFn(c)
if err != nil {
return fmt.Errorf("failed to initialize capture: %w", err)
}

c.memPool = memPool
c.capLock = concurrency.NewThreePointLock(
concurrency.WithMemPool(memPool),
concurrency.WithMemPool(memPool.MemPoolLimitUnique),
concurrency.WithTimeout(captureLockTimeout),
concurrency.WithLockRequestFn(c.captureHandle.Unblock),
concurrency.WithUnlockRequestFn(c.captureHandle.Unblock),
Expand Down Expand Up @@ -255,7 +256,7 @@ func (c *Capture) process() <-chan error {
// Iniitalize a new local buffer for this interface - this is kept local to avoid
// any possibility of escaping to the heap and / or accidental misuse of the underlying
// memory area
localBuf := new(LocalBuffer)
localBuf := NewLocalBuffer(c.memPool)

// Main packet capture loop which an interface should be in most of the time
for {
Expand Down
44 changes: 36 additions & 8 deletions pkg/capture/capture_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/els0r/goProbe/pkg/goprobe/writeout"
"github.com/els0r/goProbe/pkg/types/hashmap"
"github.com/els0r/telemetry/logging"
"github.com/fako1024/gotools/concurrency"
)

const allowedWriteoutDurationFraction = 0.1
Expand All @@ -33,6 +34,8 @@ type Manager struct {
startedAt time.Time

skipWriteoutSchedule bool

localBufferPool *LocalBufferPool
}

// InitManager initializes a CaptureManager and the underlying writeout logic
Expand All @@ -49,13 +52,6 @@ func InitManager(ctx context.Context, config *config.Config, opts ...ManagerOpti
dbPermissions = config.DB.Permissions
}

// If a local buffer config exists, set the values accordingly (before initializing the manager)
if config.LocalBuffers != nil {
if err := setLocalBuffers(config.LocalBuffers.NumBuffers, config.LocalBuffers.SizeLimit); err != nil {
return nil, fmt.Errorf("failed to set local buffers: %w", err)
}
}

// Initialize the DB writeout handler
writeoutHandler := writeout.NewGoDBHandler(config.DB.Path, encoderType).
WithSyslogWriting(config.SyslogFlows).
Expand All @@ -64,6 +60,11 @@ func InitManager(ctx context.Context, config *config.Config, opts ...ManagerOpti
// Initialize the CaptureManager
captureManager := NewManager(writeoutHandler, opts...)

// Initialize local buffer
if err := captureManager.setLocalBuffers(); err != nil {
return nil, fmt.Errorf("failed to set local buffer(s): %w", err)
}

// Update (i.e. start) all capture routines (implicitly by reloading all configurations) and schedule
// DB writeouts
_, _, _, err = captureManager.Update(ctx, config.Interfaces)
Expand All @@ -87,10 +88,14 @@ func NewManager(writeoutHandler writeout.Handler, opts ...ManagerOption) *Manage
captures: newCaptures(),
writeoutHandler: writeoutHandler,
sourceInitFn: defaultSourceInitFn,

// This is explicit here to ensure that each manager by default has its own memory pool (unless injected)
localBufferPool: NewLocalBufferPool(1, config.DefaultLocalBufferSizeLimit),
}
for _, opt := range opts {
opt(captureManager)
}

return captureManager
}

Expand Down Expand Up @@ -186,6 +191,14 @@ func WithSkipWriteoutSchedule(skip bool) ManagerOption {
}
}

// WithLocalBuffers sets one or multiple local buffers for the capture manager
func WithLocalBuffers(nBuffers, sizeLimit int) ManagerOption {
return func(cm *Manager) {
cm.localBufferPool.NBuffers = nBuffers
cm.localBufferPool.MaxBufferSize = sizeLimit
}
}

// Config returns the runtime config of the capture manager for all (or a set of) interfaces
func (cm *Manager) Config(ifaces ...string) (ifaceConfigs config.Ifaces) {
cm.RLock()
Expand Down Expand Up @@ -378,7 +391,7 @@ func (cm *Manager) update(ctx context.Context, ifaces config.Ifaces, enable, dis
logger.Info("initializing capture / running packet processing")

newCap := newCapture(iface.Name, ifaces[iface.Name]).SetSourceInitFn(cm.sourceInitFn)
if err := newCap.run(); err != nil {
if err := newCap.run(cm.localBufferPool); err != nil {
logger.Errorf("failed to start capture: %s", err)
return
}
Expand Down Expand Up @@ -576,3 +589,18 @@ func (cm *Manager) performWriteout(ctx context.Context, timestamp time.Time, ifa
cm.lastRotation = timestamp
cm.Unlock()
}

func (cm *Manager) setLocalBuffers() error {

// Guard against invalid (i.e. zero) buffer size / limits
if cm.localBufferPool.NBuffers == 0 || cm.localBufferPool.MaxBufferSize == 0 {
return fmt.Errorf("invalid number of local buffers (%d) / size limit (%d) specified", cm.localBufferPool.NBuffers, cm.localBufferPool.MaxBufferSize)
}

if cm.localBufferPool != nil {
cm.localBufferPool.Clear()
}
cm.localBufferPool.MemPoolLimitUnique = concurrency.NewMemPoolLimitUnique(cm.localBufferPool.NBuffers, initialBufferSize)

return nil
}
14 changes: 10 additions & 4 deletions pkg/capture/capture_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,8 +126,6 @@ func testConcurrentMethodAccess(t *testing.T, nIfaces, nIterations int) {

captureManager, ifaceConfigs, testMockSrcs := setupInterfaces(t, defaultMockIfaceConfig, nIfaces)

time.Sleep(time.Second)

wg := sync.WaitGroup{}
wg.Add(3)

Expand Down Expand Up @@ -208,7 +206,14 @@ func setupInterfaces(t *testing.T, cfg config.CaptureConfig, nIfaces int) (*Mana

return src.src, nil
}),
WithLocalBuffers(1, config.DefaultLocalBufferSizeLimit),
)

// If a local buffer config exists, set the values accordingly (before initializing the manager)
if captureManager.localBufferPool.NBuffers > 0 {
require.Nil(t, captureManager.setLocalBuffers())
}

_, _, _, err = captureManager.Update(context.Background(), ifaceConfigs)
require.Nil(t, err)

Expand Down Expand Up @@ -565,11 +570,12 @@ func testDeadlockHighTraffic(t *testing.T) {

func newMockCapture(src capture.SourceZeroCopy) *Capture {
return &Capture{
iface: src.Link().Name,
iface: src.Link().Name,
memPool: testLocalBufferPool,
capLock: concurrency.NewThreePointLock(
concurrency.WithLockRequestFn(src.Unblock),
concurrency.WithUnlockRequestFn(src.Unblock),
concurrency.WithMemPool(memPool),
concurrency.WithMemPool(testLocalBufferPool.MemPoolLimitUnique),
concurrency.WithTimeout(time.Second),
),
flowLog: NewFlowLog(),
Expand Down
Loading