Skip to content

Commit

Permalink
Add UDP support to packetbeat's process monitor (#7571)
Browse files Browse the repository at this point in the history
* Add UDP support to packetbeat's process monitor (#541)

This updates the process monitor with UDP support.

Until now, it would lookup all ports as TCP. This could result in the
wrong process being reported when two different processes used the same
port number, one for TCP and one for UDP.

The Linux and Windows implementations have been updated to fetch
information for UDP ports.

Closes #541

* Document the new process monitor features

- Windows support
- UDP support
- cmdline fields
  • Loading branch information
adriansr authored and andrewkroh committed Jul 12, 2018
1 parent 25df531 commit 9e768f3
Show file tree
Hide file tree
Showing 23 changed files with 233 additions and 66 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,7 @@ https://github.com/elastic/beats/compare/v6.2.3...master[Check the HEAD diff]
- The process monitor now reports the command-line for all processes, under Linux and Windows. {pull}7135[7135]
- Updated the TLS protocol parser with new cipher suites added to TLS 1.3. {issue}7455[7455]
- Flows are enriched with process information using the process monitor. {pull}7507[7507]
- Added UDP support to process monitor. {pull}7571[7571]

*Winlogbeat*

Expand Down
21 changes: 13 additions & 8 deletions packetbeat/docs/packetbeat-options.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -1283,15 +1283,15 @@ process matching is disabled.
When Packetbeat starts, and then periodically afterwards, it scans the process table for
processes that match the configuration file. For each of these processes, it
monitors which file descriptors it has opened. When a new packet is captured,
it reads the list of active TCP connections and matches the corresponding one
it reads the list of active TCP and UDP connections and matches the corresponding one
with the list of file descriptors.

On a Linux system, all this information is available via the `/proc`
file system, so Packetbeat doesn't need a kernel module.

All this information is available via system interfaces: The `/proc` file system
in Linux and the IP Helper API (`iphlpapi.dll`) on Windows, so {beatname_uc}
doesn't need a kernel module.

NOTE: Process monitoring is currently only supported on
Linux systems. Packetbeat automatically disables
Linux and Windows systems. Packetbeat automatically disables
process monitoring when it detects other operating systems.

Example configuration:
Expand All @@ -1314,6 +1314,14 @@ packetbeat.procs:
cmdline_grep: gunicorn
------------------------------------------------------------------------------

When the process monitor is enabled, it will enrich all the events whose source
or destination is a local process. The `cmdline` and/or `client_cmdline` fields
will be added to an event, when the server side or client side of the connection
belong to a local process, respectively. Additionally, you can specify a pattern
using the `cmdline_grep` option, to also name those processes. This will cause
the `proc` and `client_proc` fields to be added to an event, with the name of
the matched process.

[float]
=== Configuration options

Expand Down Expand Up @@ -1350,6 +1358,3 @@ Example configuration:
-------------------------------------------------------------------------------------
packetbeat.shutdown_timeout: 5s
-------------------------------------------------------------------------------------



11 changes: 8 additions & 3 deletions packetbeat/flows/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/elastic/beats/libbeat/beat"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/packetbeat/procs"
"github.com/elastic/beats/packetbeat/protos/applayer"
)

type flowsProcessor struct {
Expand Down Expand Up @@ -216,6 +217,7 @@ func createEvent(
source := common.MapStr{}
dest := common.MapStr{}
tuple := common.IPPortTuple{}
var proto applayer.Transport

// add ethernet layer meta data
if src, dst, ok := f.id.EthAddr(); ok {
Expand Down Expand Up @@ -282,9 +284,11 @@ func createEvent(

// udp layer meta data
if src, dst, ok := f.id.UDPAddr(); ok {
source["port"] = binary.LittleEndian.Uint16(src)
dest["port"] = binary.LittleEndian.Uint16(dst)
tuple.SrcPort = binary.LittleEndian.Uint16(src)
tuple.DstPort = binary.LittleEndian.Uint16(dst)
source["port"], dest["port"] = tuple.SrcPort, tuple.DstPort
fields["transport"] = "udp"
proto = applayer.TransportUDP
}

// tcp layer meta data
Expand All @@ -293,6 +297,7 @@ func createEvent(
tuple.DstPort = binary.LittleEndian.Uint16(dst)
source["port"], dest["port"] = tuple.SrcPort, tuple.DstPort
fields["transport"] = "tcp"
proto = applayer.TransportTCP
}

if id := f.id.ConnectionID(); id != nil {
Expand All @@ -311,7 +316,7 @@ func createEvent(

// Set process information if it's available
if tuple.IPLength != 0 && tuple.SrcPort != 0 {
if cmdline := procs.ProcWatcher.FindProcessesTuple(&tuple); cmdline != nil {
if cmdline := procs.ProcWatcher.FindProcessesTuple(&tuple, proto); cmdline != nil {
src, dst := common.MakeEndpointPair(tuple.BaseTuple, cmdline)

for key, value := range map[string]string{
Expand Down
64 changes: 44 additions & 20 deletions packetbeat/procs/procs.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (

"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/packetbeat/protos/applayer"
"github.com/elastic/gosigar"
)

Expand All @@ -49,15 +50,15 @@ type process struct {
type processWatcherImpl interface {
// GetLocalPortToPIDMapping returns the list of local port numbers and the PID
// that owns them.
GetLocalPortToPIDMapping() (ports map[uint16]int, err error)
GetLocalPortToPIDMapping(transport applayer.Transport) (ports map[uint16]int, err error)
// GetProcessCommandLine returns the command line for a given process.
GetProcessCommandLine(pid int) string
// GetLocalIPs returns the list of local addresses.
GetLocalIPs() ([]net.IP, error)
}

type ProcessesWatcher struct {
portProcMap map[uint16]portProcMapping
portProcMap map[applayer.Transport]map[uint16]portProcMapping
localAddrs []net.IP
processCache map[int]*process

Expand All @@ -76,7 +77,11 @@ func (proc *ProcessesWatcher) Init(config ProcsConfig) error {

func (proc *ProcessesWatcher) initWithImpl(config ProcsConfig, impl processWatcherImpl) error {
proc.impl = impl
proc.portProcMap = make(map[uint16]portProcMapping)
proc.portProcMap = map[applayer.Transport]map[uint16]portProcMapping{
applayer.TransportUDP: make(map[uint16]portProcMapping),
applayer.TransportTCP: make(map[uint16]portProcMapping),
}

proc.processCache = make(map[int]*process)

proc.enabled = config.Enabled
Expand All @@ -99,72 +104,91 @@ func (proc *ProcessesWatcher) initWithImpl(config ProcsConfig, impl processWatch
return nil
}

func (proc *ProcessesWatcher) FindProcessesTuple(tuple *common.IPPortTuple) (procTuple *common.CmdlineTuple) {
// FindProcessesTupleTCP looks up local process information for the source and
// destination addresses of TCP tuple
func (proc *ProcessesWatcher) FindProcessesTupleTCP(tuple *common.IPPortTuple) (procTuple *common.CmdlineTuple) {
return proc.FindProcessesTuple(tuple, applayer.TransportTCP)
}

// FindProcessesTupleUDP looks up local process information for the source and
// destination addresses of UDP tuple
func (proc *ProcessesWatcher) FindProcessesTupleUDP(tuple *common.IPPortTuple) (procTuple *common.CmdlineTuple) {
return proc.FindProcessesTuple(tuple, applayer.TransportUDP)
}

// FindProcessesTuple looks up local process information for the source and
// destination addresses of a tuple for the given transport protocol
func (proc *ProcessesWatcher) FindProcessesTuple(tuple *common.IPPortTuple, transport applayer.Transport) (procTuple *common.CmdlineTuple) {
procTuple = &common.CmdlineTuple{}

if !proc.enabled {
return
}

if proc.isLocalIP(tuple.SrcIP) {
if p := proc.findProc(tuple.SrcPort); p != nil {
if p := proc.findProc(tuple.SrcPort, transport); p != nil {
procTuple.Src = []byte(p.name)
procTuple.SrcCommand = []byte(p.commandLine)
logp.Debug("procs", "Found process '%s' (%s) for port %d", p.commandLine, p.name, tuple.SrcPort)
logp.Debug("procs", "Found process '%s' (%s) for port %d/%s", p.commandLine, p.name, tuple.SrcPort, transport)
}
}

if proc.isLocalIP(tuple.DstIP) {
if p := proc.findProc(tuple.DstPort); p != nil {
if p := proc.findProc(tuple.DstPort, transport); p != nil {
procTuple.Dst = []byte(p.name)
procTuple.DstCommand = []byte(p.commandLine)
logp.Debug("procs", "Found process '%s' (%s) for port %d", p.commandLine, p.name, tuple.DstPort)
logp.Debug("procs", "Found process '%s' (%s) for port %d/%s", p.commandLine, p.name, tuple.DstPort, transport)
}
}

return
}

func (proc *ProcessesWatcher) findProc(port uint16) *process {
func (proc *ProcessesWatcher) findProc(port uint16, transport applayer.Transport) *process {
defer logp.Recover("FindProc exception")

p, exists := proc.portProcMap[port]
procMap, ok := proc.portProcMap[transport]
if !ok {
return nil
}

p, exists := procMap[port]
if exists {
return p.proc
}

proc.updateMap()
proc.updateMap(transport)

p, exists = proc.portProcMap[port]
p, exists = procMap[port]
if exists {
return p.proc
}

return nil
}

func (proc *ProcessesWatcher) updateMap() {
func (proc *ProcessesWatcher) updateMap(transport applayer.Transport) {
if logp.HasSelector("procsdetailed") {
start := time.Now()
defer func() {
logp.Debug("procsdetailed", "updateMap() took %v", time.Now().Sub(start))
}()
}

ports, err := proc.impl.GetLocalPortToPIDMapping()
ports, err := proc.impl.GetLocalPortToPIDMapping(transport)
if err != nil {
logp.Err("unable to list local ports: %v", err)
}

proc.expireProcessCache()

for port, pid := range ports {
proc.updateMappingEntry(port, pid)
proc.updateMappingEntry(transport, port, pid)
}
}

func (proc *ProcessesWatcher) updateMappingEntry(port uint16, pid int) {
prev, ok := proc.portProcMap[port]
func (proc *ProcessesWatcher) updateMappingEntry(transport applayer.Transport, port uint16, pid int) {
prev, ok := proc.portProcMap[transport][port]
if ok && prev.pid == pid {
// This port->pid mapping already exists
return
Expand All @@ -179,10 +203,10 @@ func (proc *ProcessesWatcher) updateMappingEntry(port uint16, pid int) {
// We never expire entries from this map. Since there are 65k possible
// ports, the size of the dict can be max 1.5 MB, which we consider
// reasonable.
proc.portProcMap[port] = portProcMapping{port: port, pid: pid, proc: p}
proc.portProcMap[transport][port] = portProcMapping{port: port, pid: pid, proc: p}

logp.Debug("procsdetailed", "updateMappingEntry(): port=%d pid=%d process='%s' name=%s",
port, pid, p.commandLine, p.name)
logp.Debug("procsdetailed", "updateMappingEntry(): port=%d/%s pid=%d process='%s' name=%s",
port, transport, pid, p.commandLine, p.name)
}

func (proc *ProcessesWatcher) isLocalIP(ip net.IP) bool {
Expand Down
30 changes: 21 additions & 9 deletions packetbeat/procs/procs_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"strings"

"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/packetbeat/protos/applayer"
"github.com/elastic/gosigar"
)

Expand All @@ -43,22 +44,33 @@ type socketInfo struct {
inode uint64
}

var procFiles = map[applayer.Transport]struct {
ipv4, ipv6 string
}{
applayer.TransportUDP: {"/proc/net/udp", "/proc/net/udp6"},
applayer.TransportTCP: {"/proc/net/tcp", "/proc/net/tcp6"},
}

// GetLocalPortToPIDMapping returns the list of local port numbers and the PID
// that owns them.
func (proc *ProcessesWatcher) GetLocalPortToPIDMapping() (ports map[uint16]int, err error) {
func (proc *ProcessesWatcher) GetLocalPortToPIDMapping(transport applayer.Transport) (ports map[uint16]int, err error) {
sourceFiles, ok := procFiles[transport]
if !ok {
return nil, fmt.Errorf("unsupported transport protocol id: %d", transport)
}
var pids gosigar.ProcList
if err = pids.Get(); err != nil {
return nil, err
}
logp.Debug("procs", "getLocalPortsToPIDs()")
ipv4socks, err := socketsFromProc("/proc/net/tcp", false)
ipv4socks, err := socketsFromProc(sourceFiles.ipv4, false)
if err != nil {
logp.Err("Parse_Proc_Net_Tcp: %s", err)
logp.Err("GetLocalPortToPIDMapping: parsing '%s': %s", sourceFiles.ipv4, err)
return nil, err
}
ipv6socks, err := socketsFromProc("/proc/net/tcp6", true)
ipv6socks, err := socketsFromProc(sourceFiles.ipv6, true)
if err != nil {
logp.Err("Parse_Proc_Net_Tcp ipv6: %s", err)
logp.Err("GetLocalPortToPIDMapping: parsing '%s': %s", sourceFiles.ipv6, err)
return nil, err
}
socksMap := map[uint64]*socketInfo{}
Expand Down Expand Up @@ -126,11 +138,11 @@ func socketsFromProc(filename string, ipv6 bool) ([]*socketInfo, error) {
return nil, err
}
defer file.Close()
return parseProcNetTCP(file, ipv6)
return parseProcNetProto(file, ipv6)
}

// Parses the /proc/net/tcp file
func parseProcNetTCP(input io.Reader, ipv6 bool) ([]*socketInfo, error) {
// Parses the /proc/net/(tcp|udp)6? file
func parseProcNetProto(input io.Reader, ipv6 bool) ([]*socketInfo, error) {
buf := bufio.NewReader(input)

sockets := []*socketInfo{}
Expand All @@ -139,7 +151,7 @@ func parseProcNetTCP(input io.Reader, ipv6 bool) ([]*socketInfo, error) {
for err != io.EOF {
line, err = buf.ReadBytes('\n')
if err != nil && err != io.EOF {
logp.Err("Error reading proc net tcp file: %s", err)
logp.Err("Error reading proc net file: %s", err)
return nil, err
}
words := bytes.Fields(line)
Expand Down
4 changes: 3 additions & 1 deletion packetbeat/procs/procs_other.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@

package procs

import "github.com/elastic/beats/packetbeat/protos/applayer"

// GetLocalPortToPIDMapping returns the list of local port numbers and the PID
// that owns them.
func (proc *ProcessesWatcher) GetLocalPortToPIDMapping() (ports map[uint16]int, err error) {
func (proc *ProcessesWatcher) GetLocalPortToPIDMapping(transport applayer.Transport) (ports map[uint16]int, err error) {
return nil, nil
}
Loading

0 comments on commit 9e768f3

Please sign in to comment.