Skip to content

Commit

Permalink
Improve handling of global and local locks to mitigate capture deadlocks
Browse files Browse the repository at this point in the history
  • Loading branch information
fako1024 committed Jul 11, 2024
1 parent fc4b41a commit b6956fb
Show file tree
Hide file tree
Showing 3 changed files with 103 additions and 58 deletions.
65 changes: 14 additions & 51 deletions pkg/capture/capture.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,57 +44,6 @@ var (
// providing the ability to override the default behavior, e.g. in mock tests
type sourceInitFn func(*Capture) (Source, error)

// Captures denotes a named set of Capture instances, wrapping a map and the
// required synchronization of all its actions
type captures struct {
Map map[string]*Capture
sync.RWMutex
}

// newCaptures instantiates a new, empty set of Captures
func newCaptures() *captures {
return &captures{
Map: make(map[string]*Capture),
RWMutex: sync.RWMutex{},
}
}

// Ifaces return the list of names of all interfaces in the set
func (c *captures) Ifaces(ifaces ...string) []string {
if len(ifaces) == 0 {
c.RLock()
ifaces = make([]string, 0, len(c.Map))
for iface := range c.Map {
ifaces = append(ifaces, iface)
}
c.RUnlock()
}

return ifaces
}

// Get safely returns a Capture by name (and an indicator if it exists)
func (c *captures) Get(iface string) (capture *Capture, exists bool) {
c.RLock()
capture, exists = c.Map[iface]
c.RUnlock()
return
}

// Set safely adds / overwrites a Capture by name
func (c *captures) Set(iface string, capture *Capture) {
c.Lock()
c.Map[iface] = capture
c.Unlock()
}

// Delete safely removes a Capture from the set by name
func (c *captures) Delete(iface string) {
c.Lock()
delete(c.Map, iface)
c.Unlock()
}

// Capture captures and logs flow data for all traffic on a
// given network interface. For each Capture, a goroutine is
// spawned at creation time. To avoid leaking this goroutine,
Expand Down Expand Up @@ -131,6 +80,11 @@ type Capture struct {

// startedAt tracks when the capture was started
startedAt time.Time

// Mutex to allow concurrent access to capture components
// This is _unrelated_ to the three-point capture lock to
// interrupt the capture for purposes of e.g. rotation
sync.Mutex
}

// newCapture creates a new Capture associated with the given iface.
Expand Down Expand Up @@ -177,6 +131,15 @@ func (c *Capture) run(memPool *LocalBufferPool) (err error) {
}

func (c *Capture) close() error {

// Lock the whole capture to pretect against duplicate close() calls
c.Lock()
defer c.Unlock()

// in case the captureHandle is already closed, return without action
if c.captureHandle == nil {
return nil
}
if err := c.captureHandle.Close(); err != nil {
return err
}
Expand Down
26 changes: 19 additions & 7 deletions pkg/capture/capture_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,8 +236,12 @@ func (cm *Manager) Status(ctx context.Context, ifaces ...string) (statusmap capt
return
}

// Lock the captures as a whole to protect against conflicting interactions
cm.captures.Lock()
defer cm.captures.Unlock()

for _, iface := range ifaces {
mc, exists := cm.captures.Get(iface)
mc, exists := cm.captures.GetNoLock(iface)
if !exists {
continue
}
Expand All @@ -251,7 +255,7 @@ func (cm *Manager) Status(ctx context.Context, ifaces ...string) (statusmap capt
if err := mc.close(); err != nil {
logger.Errorf("failed to close capture after failed three-point lock: %s", err)
}
cm.captures.Delete(mc.iface)
cm.captures.DeleteNoLock(mc.iface)

return
}
Expand All @@ -265,7 +269,7 @@ func (cm *Manager) Status(ctx context.Context, ifaces ...string) (statusmap capt
if err := mc.close(); err != nil {
logger.Errorf("failed to close capture after failed three-point lock: %s", err)
}
cm.captures.Delete(mc.iface)
cm.captures.DeleteNoLock(mc.iface)
}

if err != nil {
Expand Down Expand Up @@ -515,6 +519,10 @@ func (cm *Manager) rotate(ctx context.Context, writeoutChan chan<- capturetypes.
// Lock the running capture in order to safely perform rotation tasks
if err := mc.capLock.Lock(); err != nil {
logger.Errorf("failed to establish rotation three-point lock: %s", err)
if err := mc.close(); err != nil {
logger.Errorf("failed to close capture after failed three-point lock: %s", err)
}
cm.captures.Delete(mc.iface)
continue
}

Expand All @@ -527,6 +535,10 @@ func (cm *Manager) rotate(ctx context.Context, writeoutChan chan<- capturetypes.
stats := <-statsRes
if err := mc.capLock.Unlock(); err != nil {
logger.Errorf("failed to release rotation three-point lock: %s", err)
if err := mc.close(); err != nil {
logger.Errorf("failed to close capture after failed three-point lock: %s", err)
}
cm.captures.Delete(mc.iface)
}
logger.With("elapsed", time.Since(lockStart).Round(time.Microsecond).String()).Debug("interface lock-cycle complete")

Expand Down Expand Up @@ -560,17 +572,17 @@ func (cm *Manager) logErrors(ctx context.Context, iface string, errsChan <-chan

// Ensure there is no conflict with calls to update() that might already be
// taking down this interface
cm.Lock()
defer cm.Unlock()
cm.captures.Lock()
defer cm.captures.Unlock()

// If the error channel was closed prematurely, we have to assume there was
// a critical processing error and tear down the interface
if mc, exists := cm.captures.Get(iface); exists {
if mc, exists := cm.captures.GetNoLock(iface); exists {
logger.Info("closing capture / stopping packet processing")
if err := mc.close(); err != nil {
logger.Warnf("failed to close capture in logging routine (might be expected): %s", err)
}
cm.captures.Delete(mc.iface)
cm.captures.DeleteNoLock(mc.iface)
}
return
}
Expand Down
70 changes: 70 additions & 0 deletions pkg/capture/captures.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package capture

import "sync"

// Captures denotes a named set of Capture instances, wrapping a map and the
// required synchronization of all its actions
type captures struct {
Map map[string]*Capture
sync.RWMutex
}

// newCaptures instantiates a new, empty set of Captures
func newCaptures() *captures {
return &captures{
Map: make(map[string]*Capture),
RWMutex: sync.RWMutex{},
}
}

// Ifaces return the list of names of all interfaces in the set
func (c *captures) Ifaces(ifaces ...string) []string {
if len(ifaces) == 0 {
c.RLock()
ifaces = make([]string, 0, len(c.Map))
for iface := range c.Map {
ifaces = append(ifaces, iface)
}
c.RUnlock()
}

return ifaces
}

// Get safely returns a Capture by name (and an indicator if it exists)
func (c *captures) Get(iface string) (capture *Capture, exists bool) {
c.RLock()
capture, exists = c.GetNoLock(iface)
c.RUnlock()
return
}

// GetNoLock returns a Capture by name (and an indicator if it exists) without locking
func (c *captures) GetNoLock(iface string) (capture *Capture, exists bool) {
capture, exists = c.Map[iface]
return
}

// Set safely adds / overwrites a Capture by name
func (c *captures) Set(iface string, capture *Capture) {
c.Lock()
c.SetNoLock(iface, capture)
c.Unlock()
}

// SetNoLock adds / overwrites a Capture by name without locking
func (c *captures) SetNoLock(iface string, capture *Capture) {
c.Map[iface] = capture
}

// Delete safely removes a Capture from the set by name
func (c *captures) Delete(iface string) {
c.Lock()
c.DeleteNoLock(iface)
c.Unlock()
}

// Delete removes a Capture from the set by name without locking
func (c *captures) DeleteNoLock(iface string) {
delete(c.Map, iface)
}

0 comments on commit b6956fb

Please sign in to comment.