Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make packetbeat process monitor aware of bound address #9443

Merged
merged 2 commits into from
Dec 11, 2018
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 57 additions & 23 deletions packetbeat/procs/procs.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,20 @@ import (
// a PID being recycled by the OS
const processCacheExpiration = time.Second * 30

var (
anyIPv4 = net.IPv4zero.String()
anyIPv6 = net.IPv6unspecified.String()
)

type endpoint struct {
address string
port uint16
}

type portProcMapping struct {
port uint16
pid int
proc *process
endpoint endpoint
pid int
proc *process
}

type process struct {
Expand All @@ -50,15 +60,15 @@ type process struct {
type processWatcherImpl interface {
// GetLocalPortToPIDMapping returns the list of local port numbers and the PID
// that owns them.
GetLocalPortToPIDMapping(transport applayer.Transport) (ports map[uint16]int, err error)
GetLocalPortToPIDMapping(transport applayer.Transport) (ports map[endpoint]int, err error)
// GetProcessCommandLine returns the command line for a given process.
GetProcessCommandLine(pid int) string
// GetLocalIPs returns the list of local addresses.
GetLocalIPs() ([]net.IP, error)
}

type ProcessesWatcher struct {
portProcMap map[applayer.Transport]map[uint16]portProcMapping
portProcMap map[applayer.Transport]map[endpoint]portProcMapping
localAddrs []net.IP
processCache map[int]*process

Expand All @@ -77,9 +87,9 @@ func (proc *ProcessesWatcher) Init(config ProcsConfig) error {

func (proc *ProcessesWatcher) initWithImpl(config ProcsConfig, impl processWatcherImpl) error {
proc.impl = impl
proc.portProcMap = map[applayer.Transport]map[uint16]portProcMapping{
applayer.TransportUDP: make(map[uint16]portProcMapping),
applayer.TransportTCP: make(map[uint16]portProcMapping),
proc.portProcMap = map[applayer.Transport]map[endpoint]portProcMapping{
applayer.TransportUDP: make(map[endpoint]portProcMapping),
applayer.TransportTCP: make(map[endpoint]portProcMapping),
}

proc.processCache = make(map[int]*process)
Expand Down Expand Up @@ -126,47 +136,69 @@ func (proc *ProcessesWatcher) FindProcessesTuple(tuple *common.IPPortTuple, tran
}

if proc.isLocalIP(tuple.SrcIP) {
if p := proc.findProc(tuple.SrcPort, transport); p != nil {
if p := proc.findProc(tuple.SrcIP, tuple.SrcPort, transport); p != nil {
procTuple.Src = []byte(p.name)
procTuple.SrcCommand = []byte(p.commandLine)
logp.Debug("procs", "Found process '%s' (%s) for port %d/%s", p.commandLine, p.name, tuple.SrcPort, transport)
if logp.IsDebug("procs") {
logp.Debug("procs", "Found process '%s' (%s) for %s:%d/%s", p.commandLine, p.name, tuple.SrcIP, tuple.SrcPort, transport)
}
}
}

if proc.isLocalIP(tuple.DstIP) {
if p := proc.findProc(tuple.DstPort, transport); p != nil {
if p := proc.findProc(tuple.DstIP, tuple.DstPort, transport); p != nil {
procTuple.Dst = []byte(p.name)
procTuple.DstCommand = []byte(p.commandLine)
logp.Debug("procs", "Found process '%s' (%s) for port %d/%s", p.commandLine, p.name, tuple.DstPort, transport)
if logp.IsDebug("procs") {
logp.Debug("procs", "Found process '%s' (%s) for %s:%d/%s", p.commandLine, p.name, tuple.DstIP, tuple.DstPort, transport)
}
}
}

return
}

func (proc *ProcessesWatcher) findProc(port uint16, transport applayer.Transport) *process {
func (proc *ProcessesWatcher) findProc(address net.IP, port uint16, transport applayer.Transport) *process {
defer logp.Recover("FindProc exception")

procMap, ok := proc.portProcMap[transport]
if !ok {
return nil
}

p, exists := procMap[port]
p, exists := lookupMapping(address, port, procMap)
if exists {
return p.proc
}

proc.updateMap(transport)

p, exists = procMap[port]
p, exists = lookupMapping(address, port, procMap)
if exists {
return p.proc
}

return nil
}

func lookupMapping(address net.IP, port uint16, procMap map[endpoint]portProcMapping) (p portProcMapping, found bool) {
// Precedence when one socket is bound to a specific IP:port and another one
// to INADDR_ANY and same port is not clear. Seems that the last one to bind
// takes precedence, and we don't have a way to tell.
// This function takes the naive approach of giving precedence to the more
// specific address and then to INADDR_ANY.
if p, found = procMap[endpoint{address.String(), port}]; found {
return
}

nullAddr := anyIPv4
if asIPv4 := address.To4(); asIPv4 == nil {
nullAddr = anyIPv6
}
p, found = procMap[endpoint{nullAddr, port}]
return
}

func (proc *ProcessesWatcher) updateMap(transport applayer.Transport) {
if logp.HasSelector("procsdetailed") {
start := time.Now()
Expand All @@ -175,20 +207,20 @@ func (proc *ProcessesWatcher) updateMap(transport applayer.Transport) {
}()
}

ports, err := proc.impl.GetLocalPortToPIDMapping(transport)
endpoints, err := proc.impl.GetLocalPortToPIDMapping(transport)
if err != nil {
logp.Err("unable to list local ports: %v", err)
}

proc.expireProcessCache()

for port, pid := range ports {
proc.updateMappingEntry(transport, port, pid)
for e, pid := range endpoints {
proc.updateMappingEntry(transport, e, pid)
}
}

func (proc *ProcessesWatcher) updateMappingEntry(transport applayer.Transport, port uint16, pid int) {
prev, ok := proc.portProcMap[transport][port]
func (proc *ProcessesWatcher) updateMappingEntry(transport applayer.Transport, e endpoint, pid int) {
prev, ok := proc.portProcMap[transport][e]
if ok && prev.pid == pid {
// This port->pid mapping already exists
return
Expand All @@ -203,10 +235,12 @@ func (proc *ProcessesWatcher) updateMappingEntry(transport applayer.Transport, p
// We never expire entries from this map. Since there are 65k possible
// ports, the size of the dict can be max 1.5 MB, which we consider
// reasonable.
proc.portProcMap[transport][port] = portProcMapping{port: port, pid: pid, proc: p}
proc.portProcMap[transport][e] = portProcMapping{endpoint: e, pid: pid, proc: p}

logp.Debug("procsdetailed", "updateMappingEntry(): port=%d/%s pid=%d process='%s' name=%s",
port, transport, pid, p.commandLine, p.name)
if logp.IsDebug("procsdetailed") {
logp.Debug("procsdetailed", "updateMappingEntry(): local=%s:%d/%s pid=%d process='%s' name=%s",
e.address, e.port, transport, pid, p.commandLine, p.name)
}
}

func (proc *ProcessesWatcher) isLocalIP(ip net.IP) bool {
Expand Down
6 changes: 3 additions & 3 deletions packetbeat/procs/procs_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ var procFiles = map[applayer.Transport]struct {

// GetLocalPortToPIDMapping returns the list of local port numbers and the PID
// that owns them.
func (proc *ProcessesWatcher) GetLocalPortToPIDMapping(transport applayer.Transport) (ports map[uint16]int, err error) {
func (proc *ProcessesWatcher) GetLocalPortToPIDMapping(transport applayer.Transport) (ports map[endpoint]int, err error) {
sourceFiles, ok := procFiles[transport]
if !ok {
return nil, fmt.Errorf("unsupported transport protocol id: %d", transport)
Expand Down Expand Up @@ -81,7 +81,7 @@ func (proc *ProcessesWatcher) GetLocalPortToPIDMapping(transport applayer.Transp
socksMap[s.inode] = s
}

ports = make(map[uint16]int)
ports = make(map[endpoint]int)
for _, pid := range pids.List {
inodes, err := findSocketsOfPid("", pid)
if err != nil {
Expand All @@ -91,7 +91,7 @@ func (proc *ProcessesWatcher) GetLocalPortToPIDMapping(transport applayer.Transp

for _, inode := range inodes {
if sockInfo, exists := socksMap[inode]; exists {
ports[sockInfo.srcPort] = pid
ports[endpoint{address: sockInfo.srcIP.String(), port: sockInfo.srcPort}] = pid
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion packetbeat/procs/procs_other.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,6 @@ import "github.com/elastic/beats/packetbeat/protos/applayer"

// GetLocalPortToPIDMapping returns the list of local port numbers and the PID
// that owns them.
func (proc *ProcessesWatcher) GetLocalPortToPIDMapping(transport applayer.Transport) (ports map[uint16]int, err error) {
func (proc *ProcessesWatcher) GetLocalPortToPIDMapping(transport applayer.Transport) (ports map[endpoint]int, err error) {
return nil, nil
}
Loading