Skip to content

Commit

Permalink
Make packetbeat process monitor aware of bound address (#9443) (#9490)
Browse files Browse the repository at this point in the history
The current implementation of Packetbeat's process monitor detects the
local process by looking at the local port number used. However,
different processes can be bound to the same port on different
interfaces.

This patch fixes the problem by looking up the process by using the
tuple (address, port).

There's still a case where Packetbeat can pick the wrong process:

When one process is bound to INADDR_ANY and another to a specific
local address. Testing suggests that the last socket to be bound takes
precedence over the other. However, I couldn't find this behavior
documented anywhere.

As Packetbeat can't tell which socket was bound first, there's no way
of telling which process is really receiving the traffic. This PR will give
precedence to a socket bound to a local IP-address over a socket
bound to INADDR_ANY.



Fixes #9151

(cherry picked from commit 824d443)
  • Loading branch information
adriansr authored Dec 12, 2018
1 parent 3283a9a commit 425c3ed
Show file tree
Hide file tree
Showing 7 changed files with 197 additions and 80 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@ https://github.com/elastic/beats/compare/v6.5.0...6.x[Check the HEAD diff]

*Packetbeat*

- Fix issue with process monitor associating traffic to the wrong process. {issue}9151[9151] {pull}9443[9443]

*Winlogbeat*

*Functionbeat*
Expand Down
80 changes: 57 additions & 23 deletions packetbeat/procs/procs.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,20 @@ import (
// a PID being recycled by the OS
const processCacheExpiration = time.Second * 30

var (
anyIPv4 = net.IPv4zero.String()
anyIPv6 = net.IPv6unspecified.String()
)

type endpoint struct {
address string
port uint16
}

type portProcMapping struct {
port uint16
pid int
proc *process
endpoint endpoint
pid int
proc *process
}

type process struct {
Expand All @@ -50,15 +60,15 @@ type process struct {
type processWatcherImpl interface {
// GetLocalPortToPIDMapping returns the list of local port numbers and the PID
// that owns them.
GetLocalPortToPIDMapping(transport applayer.Transport) (ports map[uint16]int, err error)
GetLocalPortToPIDMapping(transport applayer.Transport) (ports map[endpoint]int, err error)
// GetProcessCommandLine returns the command line for a given process.
GetProcessCommandLine(pid int) string
// GetLocalIPs returns the list of local addresses.
GetLocalIPs() ([]net.IP, error)
}

type ProcessesWatcher struct {
portProcMap map[applayer.Transport]map[uint16]portProcMapping
portProcMap map[applayer.Transport]map[endpoint]portProcMapping
localAddrs []net.IP
processCache map[int]*process

Expand All @@ -77,9 +87,9 @@ func (proc *ProcessesWatcher) Init(config ProcsConfig) error {

func (proc *ProcessesWatcher) initWithImpl(config ProcsConfig, impl processWatcherImpl) error {
proc.impl = impl
proc.portProcMap = map[applayer.Transport]map[uint16]portProcMapping{
applayer.TransportUDP: make(map[uint16]portProcMapping),
applayer.TransportTCP: make(map[uint16]portProcMapping),
proc.portProcMap = map[applayer.Transport]map[endpoint]portProcMapping{
applayer.TransportUDP: make(map[endpoint]portProcMapping),
applayer.TransportTCP: make(map[endpoint]portProcMapping),
}

proc.processCache = make(map[int]*process)
Expand Down Expand Up @@ -126,47 +136,69 @@ func (proc *ProcessesWatcher) FindProcessesTuple(tuple *common.IPPortTuple, tran
}

if proc.isLocalIP(tuple.SrcIP) {
if p := proc.findProc(tuple.SrcPort, transport); p != nil {
if p := proc.findProc(tuple.SrcIP, tuple.SrcPort, transport); p != nil {
procTuple.Src = []byte(p.name)
procTuple.SrcCommand = []byte(p.commandLine)
logp.Debug("procs", "Found process '%s' (%s) for port %d/%s", p.commandLine, p.name, tuple.SrcPort, transport)
if logp.IsDebug("procs") {
logp.Debug("procs", "Found process '%s' (%s) for %s:%d/%s", p.commandLine, p.name, tuple.SrcIP, tuple.SrcPort, transport)
}
}
}

if proc.isLocalIP(tuple.DstIP) {
if p := proc.findProc(tuple.DstPort, transport); p != nil {
if p := proc.findProc(tuple.DstIP, tuple.DstPort, transport); p != nil {
procTuple.Dst = []byte(p.name)
procTuple.DstCommand = []byte(p.commandLine)
logp.Debug("procs", "Found process '%s' (%s) for port %d/%s", p.commandLine, p.name, tuple.DstPort, transport)
if logp.IsDebug("procs") {
logp.Debug("procs", "Found process '%s' (%s) for %s:%d/%s", p.commandLine, p.name, tuple.DstIP, tuple.DstPort, transport)
}
}
}

return
}

func (proc *ProcessesWatcher) findProc(port uint16, transport applayer.Transport) *process {
func (proc *ProcessesWatcher) findProc(address net.IP, port uint16, transport applayer.Transport) *process {
defer logp.Recover("FindProc exception")

procMap, ok := proc.portProcMap[transport]
if !ok {
return nil
}

p, exists := procMap[port]
p, exists := lookupMapping(address, port, procMap)
if exists {
return p.proc
}

proc.updateMap(transport)

p, exists = procMap[port]
p, exists = lookupMapping(address, port, procMap)
if exists {
return p.proc
}

return nil
}

func lookupMapping(address net.IP, port uint16, procMap map[endpoint]portProcMapping) (p portProcMapping, found bool) {
// Precedence when one socket is bound to a specific IP:port and another one
// to INADDR_ANY and same port is not clear. Seems that the last one to bind
// takes precedence, and we don't have a way to tell.
// This function takes the naive approach of giving precedence to the more
// specific address and then to INADDR_ANY.
if p, found = procMap[endpoint{address.String(), port}]; found {
return
}

nullAddr := anyIPv4
if asIPv4 := address.To4(); asIPv4 == nil {
nullAddr = anyIPv6
}
p, found = procMap[endpoint{nullAddr, port}]
return
}

func (proc *ProcessesWatcher) updateMap(transport applayer.Transport) {
if logp.HasSelector("procsdetailed") {
start := time.Now()
Expand All @@ -175,20 +207,20 @@ func (proc *ProcessesWatcher) updateMap(transport applayer.Transport) {
}()
}

ports, err := proc.impl.GetLocalPortToPIDMapping(transport)
endpoints, err := proc.impl.GetLocalPortToPIDMapping(transport)
if err != nil {
logp.Err("unable to list local ports: %v", err)
}

proc.expireProcessCache()

for port, pid := range ports {
proc.updateMappingEntry(transport, port, pid)
for e, pid := range endpoints {
proc.updateMappingEntry(transport, e, pid)
}
}

func (proc *ProcessesWatcher) updateMappingEntry(transport applayer.Transport, port uint16, pid int) {
prev, ok := proc.portProcMap[transport][port]
func (proc *ProcessesWatcher) updateMappingEntry(transport applayer.Transport, e endpoint, pid int) {
prev, ok := proc.portProcMap[transport][e]
if ok && prev.pid == pid {
// This port->pid mapping already exists
return
Expand All @@ -203,10 +235,12 @@ func (proc *ProcessesWatcher) updateMappingEntry(transport applayer.Transport, p
// We never expire entries from this map. Since there are 65k possible
// ports, the size of the dict can be max 1.5 MB, which we consider
// reasonable.
proc.portProcMap[transport][port] = portProcMapping{port: port, pid: pid, proc: p}
proc.portProcMap[transport][e] = portProcMapping{endpoint: e, pid: pid, proc: p}

logp.Debug("procsdetailed", "updateMappingEntry(): port=%d/%s pid=%d process='%s' name=%s",
port, transport, pid, p.commandLine, p.name)
if logp.IsDebug("procsdetailed") {
logp.Debug("procsdetailed", "updateMappingEntry(): local=%s:%d/%s pid=%d process='%s' name=%s",
e.address, e.port, transport, pid, p.commandLine, p.name)
}
}

func (proc *ProcessesWatcher) isLocalIP(ip net.IP) bool {
Expand Down
6 changes: 3 additions & 3 deletions packetbeat/procs/procs_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ var procFiles = map[applayer.Transport]struct {

// GetLocalPortToPIDMapping returns the list of local port numbers and the PID
// that owns them.
func (proc *ProcessesWatcher) GetLocalPortToPIDMapping(transport applayer.Transport) (ports map[uint16]int, err error) {
func (proc *ProcessesWatcher) GetLocalPortToPIDMapping(transport applayer.Transport) (ports map[endpoint]int, err error) {
sourceFiles, ok := procFiles[transport]
if !ok {
return nil, fmt.Errorf("unsupported transport protocol id: %d", transport)
Expand Down Expand Up @@ -81,7 +81,7 @@ func (proc *ProcessesWatcher) GetLocalPortToPIDMapping(transport applayer.Transp
socksMap[s.inode] = s
}

ports = make(map[uint16]int)
ports = make(map[endpoint]int)
for _, pid := range pids.List {
inodes, err := findSocketsOfPid("", pid)
if err != nil {
Expand All @@ -91,7 +91,7 @@ func (proc *ProcessesWatcher) GetLocalPortToPIDMapping(transport applayer.Transp

for _, inode := range inodes {
if sockInfo, exists := socksMap[inode]; exists {
ports[sockInfo.srcPort] = pid
ports[endpoint{address: sockInfo.srcIP.String(), port: sockInfo.srcPort}] = pid
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion packetbeat/procs/procs_other.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,6 @@ import "github.com/elastic/beats/packetbeat/protos/applayer"

// GetLocalPortToPIDMapping returns the list of local port numbers and the PID
// that owns them.
func (proc *ProcessesWatcher) GetLocalPortToPIDMapping(transport applayer.Transport) (ports map[uint16]int, err error) {
func (proc *ProcessesWatcher) GetLocalPortToPIDMapping(transport applayer.Transport) (ports map[endpoint]int, err error) {
return nil, nil
}
Loading

0 comments on commit 425c3ed

Please sign in to comment.