diff --git a/pkg/capture/capture.go b/pkg/capture/capture.go index c67a6f11..a6124aef 100644 --- a/pkg/capture/capture.go +++ b/pkg/capture/capture.go @@ -44,57 +44,6 @@ var ( // providing the ability to override the default behavior, e.g. in mock tests type sourceInitFn func(*Capture) (Source, error) -// Captures denotes a named set of Capture instances, wrapping a map and the -// required synchronization of all its actions -type captures struct { - Map map[string]*Capture - sync.RWMutex -} - -// newCaptures instantiates a new, empty set of Captures -func newCaptures() *captures { - return &captures{ - Map: make(map[string]*Capture), - RWMutex: sync.RWMutex{}, - } -} - -// Ifaces return the list of names of all interfaces in the set -func (c *captures) Ifaces(ifaces ...string) []string { - if len(ifaces) == 0 { - c.RLock() - ifaces = make([]string, 0, len(c.Map)) - for iface := range c.Map { - ifaces = append(ifaces, iface) - } - c.RUnlock() - } - - return ifaces -} - -// Get safely returns a Capture by name (and an indicator if it exists) -func (c *captures) Get(iface string) (capture *Capture, exists bool) { - c.RLock() - capture, exists = c.Map[iface] - c.RUnlock() - return -} - -// Set safely adds / overwrites a Capture by name -func (c *captures) Set(iface string, capture *Capture) { - c.Lock() - c.Map[iface] = capture - c.Unlock() -} - -// Delete safely removes a Capture from the set by name -func (c *captures) Delete(iface string) { - c.Lock() - delete(c.Map, iface) - c.Unlock() -} - // Capture captures and logs flow data for all traffic on a // given network interface. For each Capture, a goroutine is // spawned at creation time. To avoid leaking this goroutine, @@ -131,6 +80,11 @@ type Capture struct { // startedAt tracks when the capture was started startedAt time.Time + + // Mutex to allow concurrent access to capture components + // This is _unrelated_ to the three-point capture lock to + // interrupt the capture for purposes of e.g. rotation + sync.Mutex } // newCapture creates a new Capture associated with the given iface. @@ -177,6 +131,15 @@ func (c *Capture) run(memPool *LocalBufferPool) (err error) { } func (c *Capture) close() error { + + // Lock the whole capture to pretect against duplicate close() calls + c.Lock() + defer c.Unlock() + + // in case the captureHandle is already closed, return without action + if c.captureHandle == nil { + return nil + } if err := c.captureHandle.Close(); err != nil { return err } diff --git a/pkg/capture/capture_manager.go b/pkg/capture/capture_manager.go index bcbddb71..fd8a7ee7 100644 --- a/pkg/capture/capture_manager.go +++ b/pkg/capture/capture_manager.go @@ -236,8 +236,12 @@ func (cm *Manager) Status(ctx context.Context, ifaces ...string) (statusmap capt return } + // Lock the captures as a whole to protect against conflicting interactions + cm.captures.Lock() + defer cm.captures.Unlock() + for _, iface := range ifaces { - mc, exists := cm.captures.Get(iface) + mc, exists := cm.captures.GetNoLock(iface) if !exists { continue } @@ -251,7 +255,7 @@ func (cm *Manager) Status(ctx context.Context, ifaces ...string) (statusmap capt if err := mc.close(); err != nil { logger.Errorf("failed to close capture after failed three-point lock: %s", err) } - cm.captures.Delete(mc.iface) + cm.captures.DeleteNoLock(mc.iface) return } @@ -265,7 +269,7 @@ func (cm *Manager) Status(ctx context.Context, ifaces ...string) (statusmap capt if err := mc.close(); err != nil { logger.Errorf("failed to close capture after failed three-point lock: %s", err) } - cm.captures.Delete(mc.iface) + cm.captures.DeleteNoLock(mc.iface) } if err != nil { @@ -515,6 +519,10 @@ func (cm *Manager) rotate(ctx context.Context, writeoutChan chan<- capturetypes. // Lock the running capture in order to safely perform rotation tasks if err := mc.capLock.Lock(); err != nil { logger.Errorf("failed to establish rotation three-point lock: %s", err) + if err := mc.close(); err != nil { + logger.Errorf("failed to close capture after failed three-point lock: %s", err) + } + cm.captures.Delete(mc.iface) continue } @@ -527,6 +535,10 @@ func (cm *Manager) rotate(ctx context.Context, writeoutChan chan<- capturetypes. stats := <-statsRes if err := mc.capLock.Unlock(); err != nil { logger.Errorf("failed to release rotation three-point lock: %s", err) + if err := mc.close(); err != nil { + logger.Errorf("failed to close capture after failed three-point lock: %s", err) + } + cm.captures.Delete(mc.iface) } logger.With("elapsed", time.Since(lockStart).Round(time.Microsecond).String()).Debug("interface lock-cycle complete") @@ -560,17 +572,17 @@ func (cm *Manager) logErrors(ctx context.Context, iface string, errsChan <-chan // Ensure there is no conflict with calls to update() that might already be // taking down this interface - cm.Lock() - defer cm.Unlock() + cm.captures.Lock() + defer cm.captures.Unlock() // If the error channel was closed prematurely, we have to assume there was // a critical processing error and tear down the interface - if mc, exists := cm.captures.Get(iface); exists { + if mc, exists := cm.captures.GetNoLock(iface); exists { logger.Info("closing capture / stopping packet processing") if err := mc.close(); err != nil { logger.Warnf("failed to close capture in logging routine (might be expected): %s", err) } - cm.captures.Delete(mc.iface) + cm.captures.DeleteNoLock(mc.iface) } return } diff --git a/pkg/capture/captures.go b/pkg/capture/captures.go new file mode 100644 index 00000000..7f623eb3 --- /dev/null +++ b/pkg/capture/captures.go @@ -0,0 +1,70 @@ +package capture + +import "sync" + +// Captures denotes a named set of Capture instances, wrapping a map and the +// required synchronization of all its actions +type captures struct { + Map map[string]*Capture + sync.RWMutex +} + +// newCaptures instantiates a new, empty set of Captures +func newCaptures() *captures { + return &captures{ + Map: make(map[string]*Capture), + RWMutex: sync.RWMutex{}, + } +} + +// Ifaces return the list of names of all interfaces in the set +func (c *captures) Ifaces(ifaces ...string) []string { + if len(ifaces) == 0 { + c.RLock() + ifaces = make([]string, 0, len(c.Map)) + for iface := range c.Map { + ifaces = append(ifaces, iface) + } + c.RUnlock() + } + + return ifaces +} + +// Get safely returns a Capture by name (and an indicator if it exists) +func (c *captures) Get(iface string) (capture *Capture, exists bool) { + c.RLock() + capture, exists = c.GetNoLock(iface) + c.RUnlock() + return +} + +// GetNoLock returns a Capture by name (and an indicator if it exists) without locking +func (c *captures) GetNoLock(iface string) (capture *Capture, exists bool) { + capture, exists = c.Map[iface] + return +} + +// Set safely adds / overwrites a Capture by name +func (c *captures) Set(iface string, capture *Capture) { + c.Lock() + c.SetNoLock(iface, capture) + c.Unlock() +} + +// SetNoLock adds / overwrites a Capture by name without locking +func (c *captures) SetNoLock(iface string, capture *Capture) { + c.Map[iface] = capture +} + +// Delete safely removes a Capture from the set by name +func (c *captures) Delete(iface string) { + c.Lock() + c.DeleteNoLock(iface) + c.Unlock() +} + +// Delete removes a Capture from the set by name without locking +func (c *captures) DeleteNoLock(iface string) { + delete(c.Map, iface) +}