Skip to content

Commit

Permalink
Make packetbeat process monitor local address aware
Browse files Browse the repository at this point in the history
The current implementation of Packetbeat's process monitor detects the
local process by looking at the local port number used. However,
differnet processes can be bound to the same port on different
interfaces.

This patch fixes the problem by looking up the process by using the
tuple (address, port).

Fixes elastic#9151
  • Loading branch information
adriansr committed Dec 7, 2018
1 parent 0248d78 commit 0341a3d
Show file tree
Hide file tree
Showing 6 changed files with 195 additions and 80 deletions.
80 changes: 57 additions & 23 deletions packetbeat/procs/procs.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,20 @@ import (
// a PID being recycled by the OS
const processCacheExpiration = time.Second * 30

var (
anyIPv4 = net.IPv4zero.String()
anyIPv6 = net.IPv6unspecified.String()
)

type endpoint struct {
address string
port uint16
}

type portProcMapping struct {
port uint16
pid int
proc *process
endpoint endpoint
pid int
proc *process
}

type process struct {
Expand All @@ -50,15 +60,15 @@ type process struct {
type processWatcherImpl interface {
// GetLocalPortToPIDMapping returns the list of local port numbers and the PID
// that owns them.
GetLocalPortToPIDMapping(transport applayer.Transport) (ports map[uint16]int, err error)
GetLocalPortToPIDMapping(transport applayer.Transport) (ports map[endpoint]int, err error)
// GetProcessCommandLine returns the command line for a given process.
GetProcessCommandLine(pid int) string
// GetLocalIPs returns the list of local addresses.
GetLocalIPs() ([]net.IP, error)
}

type ProcessesWatcher struct {
portProcMap map[applayer.Transport]map[uint16]portProcMapping
portProcMap map[applayer.Transport]map[endpoint]portProcMapping
localAddrs []net.IP
processCache map[int]*process

Expand All @@ -77,9 +87,9 @@ func (proc *ProcessesWatcher) Init(config ProcsConfig) error {

func (proc *ProcessesWatcher) initWithImpl(config ProcsConfig, impl processWatcherImpl) error {
proc.impl = impl
proc.portProcMap = map[applayer.Transport]map[uint16]portProcMapping{
applayer.TransportUDP: make(map[uint16]portProcMapping),
applayer.TransportTCP: make(map[uint16]portProcMapping),
proc.portProcMap = map[applayer.Transport]map[endpoint]portProcMapping{
applayer.TransportUDP: make(map[endpoint]portProcMapping),
applayer.TransportTCP: make(map[endpoint]portProcMapping),
}

proc.processCache = make(map[int]*process)
Expand Down Expand Up @@ -126,47 +136,69 @@ func (proc *ProcessesWatcher) FindProcessesTuple(tuple *common.IPPortTuple, tran
}

if proc.isLocalIP(tuple.SrcIP) {
if p := proc.findProc(tuple.SrcPort, transport); p != nil {
if p := proc.findProc(tuple.SrcIP, tuple.SrcPort, transport); p != nil {
procTuple.Src = []byte(p.name)
procTuple.SrcCommand = []byte(p.commandLine)
logp.Debug("procs", "Found process '%s' (%s) for port %d/%s", p.commandLine, p.name, tuple.SrcPort, transport)
if logp.IsDebug("procs") {
logp.Debug("procs", "Found process '%s' (%s) for %s:%d/%s", p.commandLine, p.name, tuple.SrcIP, tuple.SrcPort, transport)
}
}
}

if proc.isLocalIP(tuple.DstIP) {
if p := proc.findProc(tuple.DstPort, transport); p != nil {
if p := proc.findProc(tuple.DstIP, tuple.DstPort, transport); p != nil {
procTuple.Dst = []byte(p.name)
procTuple.DstCommand = []byte(p.commandLine)
logp.Debug("procs", "Found process '%s' (%s) for port %d/%s", p.commandLine, p.name, tuple.DstPort, transport)
if logp.IsDebug("procs") {
logp.Debug("procs", "Found process '%s' (%s) for %s:%d/%s", p.commandLine, p.name, tuple.DstIP, tuple.DstPort, transport)
}
}
}

return
}

func (proc *ProcessesWatcher) findProc(port uint16, transport applayer.Transport) *process {
func (proc *ProcessesWatcher) findProc(address net.IP, port uint16, transport applayer.Transport) *process {
defer logp.Recover("FindProc exception")

procMap, ok := proc.portProcMap[transport]
if !ok {
return nil
}

p, exists := procMap[port]
p, exists := lookupMapping(address, port, procMap)
if exists {
return p.proc
}

proc.updateMap(transport)

p, exists = procMap[port]
p, exists = lookupMapping(address, port, procMap)
if exists {
return p.proc
}

return nil
}

func lookupMapping(address net.IP, port uint16, procMap map[endpoint]portProcMapping) (p portProcMapping, found bool) {
// Precedence when one socket is bound to a specific IP:port and another one
// to INADDR_ANY and same port is not clear. Seems that the last one to bind
// takes precedence, and we don't have a way to tell.
// This function takes the naive approach of giving precedence to the more
// specific address and then to INADDR_ANY.
if p, found = procMap[endpoint{address.String(), port}]; found {
return
}

nullAddr := anyIPv4
if asIPv4 := address.To4(); asIPv4 == nil {
nullAddr = anyIPv6
}
p, found = procMap[endpoint{nullAddr, port}]
return
}

func (proc *ProcessesWatcher) updateMap(transport applayer.Transport) {
if logp.HasSelector("procsdetailed") {
start := time.Now()
Expand All @@ -175,20 +207,20 @@ func (proc *ProcessesWatcher) updateMap(transport applayer.Transport) {
}()
}

ports, err := proc.impl.GetLocalPortToPIDMapping(transport)
endpoints, err := proc.impl.GetLocalPortToPIDMapping(transport)
if err != nil {
logp.Err("unable to list local ports: %v", err)
}

proc.expireProcessCache()

for port, pid := range ports {
proc.updateMappingEntry(transport, port, pid)
for e, pid := range endpoints {
proc.updateMappingEntry(transport, e, pid)
}
}

func (proc *ProcessesWatcher) updateMappingEntry(transport applayer.Transport, port uint16, pid int) {
prev, ok := proc.portProcMap[transport][port]
func (proc *ProcessesWatcher) updateMappingEntry(transport applayer.Transport, e endpoint, pid int) {
prev, ok := proc.portProcMap[transport][e]
if ok && prev.pid == pid {
// This port->pid mapping already exists
return
Expand All @@ -203,10 +235,12 @@ func (proc *ProcessesWatcher) updateMappingEntry(transport applayer.Transport, p
// We never expire entries from this map. Since there are 65k possible
// ports, the size of the dict can be max 1.5 MB, which we consider
// reasonable.
proc.portProcMap[transport][port] = portProcMapping{port: port, pid: pid, proc: p}
proc.portProcMap[transport][e] = portProcMapping{endpoint: e, pid: pid, proc: p}

logp.Debug("procsdetailed", "updateMappingEntry(): port=%d/%s pid=%d process='%s' name=%s",
port, transport, pid, p.commandLine, p.name)
if logp.IsDebug("procsdetailed") {
logp.Debug("procsdetailed", "updateMappingEntry(): local=%s:%d/%s pid=%d process='%s' name=%s",
e.address, e.port, transport, pid, p.commandLine, p.name)
}
}

func (proc *ProcessesWatcher) isLocalIP(ip net.IP) bool {
Expand Down
6 changes: 3 additions & 3 deletions packetbeat/procs/procs_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ var procFiles = map[applayer.Transport]struct {

// GetLocalPortToPIDMapping returns the list of local port numbers and the PID
// that owns them.
func (proc *ProcessesWatcher) GetLocalPortToPIDMapping(transport applayer.Transport) (ports map[uint16]int, err error) {
func (proc *ProcessesWatcher) GetLocalPortToPIDMapping(transport applayer.Transport) (ports map[endpoint]int, err error) {
sourceFiles, ok := procFiles[transport]
if !ok {
return nil, fmt.Errorf("unsupported transport protocol id: %d", transport)
Expand Down Expand Up @@ -81,7 +81,7 @@ func (proc *ProcessesWatcher) GetLocalPortToPIDMapping(transport applayer.Transp
socksMap[s.inode] = s
}

ports = make(map[uint16]int)
ports = make(map[endpoint]int)
for _, pid := range pids.List {
inodes, err := findSocketsOfPid("", pid)
if err != nil {
Expand All @@ -91,7 +91,7 @@ func (proc *ProcessesWatcher) GetLocalPortToPIDMapping(transport applayer.Transp

for _, inode := range inodes {
if sockInfo, exists := socksMap[inode]; exists {
ports[sockInfo.srcPort] = pid
ports[endpoint{address: sockInfo.srcIP.String(), port: sockInfo.srcPort}] = pid
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion packetbeat/procs/procs_other.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,6 @@ import "github.com/elastic/beats/packetbeat/protos/applayer"

// GetLocalPortToPIDMapping returns the list of local port numbers and the PID
// that owns them.
func (proc *ProcessesWatcher) GetLocalPortToPIDMapping(transport applayer.Transport) (ports map[uint16]int, err error) {
func (proc *ProcessesWatcher) GetLocalPortToPIDMapping(transport applayer.Transport) (ports map[endpoint]int, err error) {
return nil, nil
}
Loading

0 comments on commit 0341a3d

Please sign in to comment.