Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add UDP support to packetbeat's process monitor #7571

Merged
merged 2 commits into from
Jul 12, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,7 @@ https://github.com/elastic/beats/compare/v6.2.3...master[Check the HEAD diff]
- The process monitor now reports the command-line for all processes, under Linux and Windows. {pull}7135[7135]
- Updated the TLS protocol parser with new cipher suites added to TLS 1.3. {issue}7455[7455]
- Flows are enriched with process information using the process monitor. {pull}7507[7507]
- Added UDP support to process monitor. {pull}7571[7571]

*Winlogbeat*

Expand Down
21 changes: 13 additions & 8 deletions packetbeat/docs/packetbeat-options.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -1283,15 +1283,15 @@ process matching is disabled.
When Packetbeat starts, and then periodically afterwards, it scans the process table for
processes that match the configuration file. For each of these processes, it
monitors which file descriptors it has opened. When a new packet is captured,
it reads the list of active TCP connections and matches the corresponding one
it reads the list of active TCP and UDP connections and matches the corresponding one
with the list of file descriptors.

On a Linux system, all this information is available via the `/proc`
file system, so Packetbeat doesn't need a kernel module.

All this information is available via system interfaces: The `/proc` file system
in Linux and the IP Helper API (`iphlpapi.dll`) on Windows, so {beatname_uc}
doesn't need a kernel module.

NOTE: Process monitoring is currently only supported on
Linux systems. Packetbeat automatically disables
Linux and Windows systems. Packetbeat automatically disables
process monitoring when it detects other operating systems.

Example configuration:
Expand All @@ -1314,6 +1314,14 @@ packetbeat.procs:
cmdline_grep: gunicorn
------------------------------------------------------------------------------

When the process monitor is enabled, it will enrich all the events whose source
or destination is a local process. The `cmdline` and/or `client_cmdline` fields
will be added to an event, when the server side or client side of the connection
belong to a local process, respectively. Additionally, you can specify a pattern
using the `cmdline_grep` option, to also name those processes. This will cause
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This totally helps clarify how it works for me.

the `proc` and `client_proc` fields to be added to an event, with the name of
the matched process.

[float]
=== Configuration options

Expand Down Expand Up @@ -1350,6 +1358,3 @@ Example configuration:
-------------------------------------------------------------------------------------
packetbeat.shutdown_timeout: 5s
-------------------------------------------------------------------------------------



11 changes: 8 additions & 3 deletions packetbeat/flows/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/elastic/beats/libbeat/beat"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/packetbeat/procs"
"github.com/elastic/beats/packetbeat/protos/applayer"
)

type flowsProcessor struct {
Expand Down Expand Up @@ -216,6 +217,7 @@ func createEvent(
source := common.MapStr{}
dest := common.MapStr{}
tuple := common.IPPortTuple{}
var proto applayer.Transport

// add ethernet layer meta data
if src, dst, ok := f.id.EthAddr(); ok {
Expand Down Expand Up @@ -282,9 +284,11 @@ func createEvent(

// udp layer meta data
if src, dst, ok := f.id.UDPAddr(); ok {
source["port"] = binary.LittleEndian.Uint16(src)
dest["port"] = binary.LittleEndian.Uint16(dst)
tuple.SrcPort = binary.LittleEndian.Uint16(src)
tuple.DstPort = binary.LittleEndian.Uint16(dst)
source["port"], dest["port"] = tuple.SrcPort, tuple.DstPort
fields["transport"] = "udp"
proto = applayer.TransportUDP
}

// tcp layer meta data
Expand All @@ -293,6 +297,7 @@ func createEvent(
tuple.DstPort = binary.LittleEndian.Uint16(dst)
source["port"], dest["port"] = tuple.SrcPort, tuple.DstPort
fields["transport"] = "tcp"
proto = applayer.TransportTCP
}

if id := f.id.ConnectionID(); id != nil {
Expand All @@ -311,7 +316,7 @@ func createEvent(

// Set process information if it's available
if tuple.IPLength != 0 && tuple.SrcPort != 0 {
if cmdline := procs.ProcWatcher.FindProcessesTuple(&tuple); cmdline != nil {
if cmdline := procs.ProcWatcher.FindProcessesTuple(&tuple, proto); cmdline != nil {
src, dst := common.MakeEndpointPair(tuple.BaseTuple, cmdline)

for key, value := range map[string]string{
Expand Down
64 changes: 44 additions & 20 deletions packetbeat/procs/procs.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (

"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/packetbeat/protos/applayer"
"github.com/elastic/gosigar"
)

Expand All @@ -49,15 +50,15 @@ type process struct {
type processWatcherImpl interface {
// GetLocalPortToPIDMapping returns the list of local port numbers and the PID
// that owns them.
GetLocalPortToPIDMapping() (ports map[uint16]int, err error)
GetLocalPortToPIDMapping(transport applayer.Transport) (ports map[uint16]int, err error)
// GetProcessCommandLine returns the command line for a given process.
GetProcessCommandLine(pid int) string
// GetLocalIPs returns the list of local addresses.
GetLocalIPs() ([]net.IP, error)
}

type ProcessesWatcher struct {
portProcMap map[uint16]portProcMapping
portProcMap map[applayer.Transport]map[uint16]portProcMapping
localAddrs []net.IP
processCache map[int]*process

Expand All @@ -76,7 +77,11 @@ func (proc *ProcessesWatcher) Init(config ProcsConfig) error {

func (proc *ProcessesWatcher) initWithImpl(config ProcsConfig, impl processWatcherImpl) error {
proc.impl = impl
proc.portProcMap = make(map[uint16]portProcMapping)
proc.portProcMap = map[applayer.Transport]map[uint16]portProcMapping{
applayer.TransportUDP: make(map[uint16]portProcMapping),
applayer.TransportTCP: make(map[uint16]portProcMapping),
}

proc.processCache = make(map[int]*process)

proc.enabled = config.Enabled
Expand All @@ -99,72 +104,91 @@ func (proc *ProcessesWatcher) initWithImpl(config ProcsConfig, impl processWatch
return nil
}

func (proc *ProcessesWatcher) FindProcessesTuple(tuple *common.IPPortTuple) (procTuple *common.CmdlineTuple) {
// FindProcessesTupleTCP looks up local process information for the source and
// destination addresses of TCP tuple
func (proc *ProcessesWatcher) FindProcessesTupleTCP(tuple *common.IPPortTuple) (procTuple *common.CmdlineTuple) {
return proc.FindProcessesTuple(tuple, applayer.TransportTCP)
}

// FindProcessesTupleUDP looks up local process information for the source and
// destination addresses of UDP tuple
func (proc *ProcessesWatcher) FindProcessesTupleUDP(tuple *common.IPPortTuple) (procTuple *common.CmdlineTuple) {
return proc.FindProcessesTuple(tuple, applayer.TransportUDP)
}

// FindProcessesTuple looks up local process information for the source and
// destination addresses of a tuple for the given transport protocol
func (proc *ProcessesWatcher) FindProcessesTuple(tuple *common.IPPortTuple, transport applayer.Transport) (procTuple *common.CmdlineTuple) {
procTuple = &common.CmdlineTuple{}

if !proc.enabled {
return
}

if proc.isLocalIP(tuple.SrcIP) {
if p := proc.findProc(tuple.SrcPort); p != nil {
if p := proc.findProc(tuple.SrcPort, transport); p != nil {
procTuple.Src = []byte(p.name)
procTuple.SrcCommand = []byte(p.commandLine)
logp.Debug("procs", "Found process '%s' (%s) for port %d", p.commandLine, p.name, tuple.SrcPort)
logp.Debug("procs", "Found process '%s' (%s) for port %d/%s", p.commandLine, p.name, tuple.SrcPort, transport)
}
}

if proc.isLocalIP(tuple.DstIP) {
if p := proc.findProc(tuple.DstPort); p != nil {
if p := proc.findProc(tuple.DstPort, transport); p != nil {
procTuple.Dst = []byte(p.name)
procTuple.DstCommand = []byte(p.commandLine)
logp.Debug("procs", "Found process '%s' (%s) for port %d", p.commandLine, p.name, tuple.DstPort)
logp.Debug("procs", "Found process '%s' (%s) for port %d/%s", p.commandLine, p.name, tuple.DstPort, transport)
}
}

return
}

func (proc *ProcessesWatcher) findProc(port uint16) *process {
func (proc *ProcessesWatcher) findProc(port uint16, transport applayer.Transport) *process {
defer logp.Recover("FindProc exception")

p, exists := proc.portProcMap[port]
procMap, ok := proc.portProcMap[transport]
if !ok {
return nil
}

p, exists := procMap[port]
if exists {
return p.proc
}

proc.updateMap()
proc.updateMap(transport)

p, exists = proc.portProcMap[port]
p, exists = procMap[port]
if exists {
return p.proc
}

return nil
}

func (proc *ProcessesWatcher) updateMap() {
func (proc *ProcessesWatcher) updateMap(transport applayer.Transport) {
if logp.HasSelector("procsdetailed") {
start := time.Now()
defer func() {
logp.Debug("procsdetailed", "updateMap() took %v", time.Now().Sub(start))
}()
}

ports, err := proc.impl.GetLocalPortToPIDMapping()
ports, err := proc.impl.GetLocalPortToPIDMapping(transport)
if err != nil {
logp.Err("unable to list local ports: %v", err)
}

proc.expireProcessCache()

for port, pid := range ports {
proc.updateMappingEntry(port, pid)
proc.updateMappingEntry(transport, port, pid)
}
}

func (proc *ProcessesWatcher) updateMappingEntry(port uint16, pid int) {
prev, ok := proc.portProcMap[port]
func (proc *ProcessesWatcher) updateMappingEntry(transport applayer.Transport, port uint16, pid int) {
prev, ok := proc.portProcMap[transport][port]
if ok && prev.pid == pid {
// This port->pid mapping already exists
return
Expand All @@ -179,10 +203,10 @@ func (proc *ProcessesWatcher) updateMappingEntry(port uint16, pid int) {
// We never expire entries from this map. Since there are 65k possible
// ports, the size of the dict can be max 1.5 MB, which we consider
// reasonable.
proc.portProcMap[port] = portProcMapping{port: port, pid: pid, proc: p}
proc.portProcMap[transport][port] = portProcMapping{port: port, pid: pid, proc: p}

logp.Debug("procsdetailed", "updateMappingEntry(): port=%d pid=%d process='%s' name=%s",
port, pid, p.commandLine, p.name)
logp.Debug("procsdetailed", "updateMappingEntry(): port=%d/%s pid=%d process='%s' name=%s",
port, transport, pid, p.commandLine, p.name)
}

func (proc *ProcessesWatcher) isLocalIP(ip net.IP) bool {
Expand Down
30 changes: 21 additions & 9 deletions packetbeat/procs/procs_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"strings"

"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/packetbeat/protos/applayer"
"github.com/elastic/gosigar"
)

Expand All @@ -43,22 +44,33 @@ type socketInfo struct {
inode uint64
}

var procFiles = map[applayer.Transport]struct {
ipv4, ipv6 string
}{
applayer.TransportUDP: {"/proc/net/udp", "/proc/net/udp6"},
applayer.TransportTCP: {"/proc/net/tcp", "/proc/net/tcp6"},
}

// GetLocalPortToPIDMapping returns the list of local port numbers and the PID
// that owns them.
func (proc *ProcessesWatcher) GetLocalPortToPIDMapping() (ports map[uint16]int, err error) {
func (proc *ProcessesWatcher) GetLocalPortToPIDMapping(transport applayer.Transport) (ports map[uint16]int, err error) {
sourceFiles, ok := procFiles[transport]
if !ok {
return nil, fmt.Errorf("unsupported transport protocol id: %d", transport)
}
var pids gosigar.ProcList
if err = pids.Get(); err != nil {
return nil, err
}
logp.Debug("procs", "getLocalPortsToPIDs()")
ipv4socks, err := socketsFromProc("/proc/net/tcp", false)
ipv4socks, err := socketsFromProc(sourceFiles.ipv4, false)
if err != nil {
logp.Err("Parse_Proc_Net_Tcp: %s", err)
logp.Err("GetLocalPortToPIDMapping: parsing '%s': %s", sourceFiles.ipv4, err)
return nil, err
}
ipv6socks, err := socketsFromProc("/proc/net/tcp6", true)
ipv6socks, err := socketsFromProc(sourceFiles.ipv6, true)
if err != nil {
logp.Err("Parse_Proc_Net_Tcp ipv6: %s", err)
logp.Err("GetLocalPortToPIDMapping: parsing '%s': %s", sourceFiles.ipv6, err)
return nil, err
}
socksMap := map[uint64]*socketInfo{}
Expand Down Expand Up @@ -126,11 +138,11 @@ func socketsFromProc(filename string, ipv6 bool) ([]*socketInfo, error) {
return nil, err
}
defer file.Close()
return parseProcNetTCP(file, ipv6)
return parseProcNetProto(file, ipv6)
}

// Parses the /proc/net/tcp file
func parseProcNetTCP(input io.Reader, ipv6 bool) ([]*socketInfo, error) {
// Parses the /proc/net/(tcp|udp)6? file
func parseProcNetProto(input io.Reader, ipv6 bool) ([]*socketInfo, error) {
buf := bufio.NewReader(input)

sockets := []*socketInfo{}
Expand All @@ -139,7 +151,7 @@ func parseProcNetTCP(input io.Reader, ipv6 bool) ([]*socketInfo, error) {
for err != io.EOF {
line, err = buf.ReadBytes('\n')
if err != nil && err != io.EOF {
logp.Err("Error reading proc net tcp file: %s", err)
logp.Err("Error reading proc net file: %s", err)
return nil, err
}
words := bytes.Fields(line)
Expand Down
4 changes: 3 additions & 1 deletion packetbeat/procs/procs_other.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@

package procs

import "github.com/elastic/beats/packetbeat/protos/applayer"

// GetLocalPortToPIDMapping returns the list of local port numbers and the PID
// that owns them.
func (proc *ProcessesWatcher) GetLocalPortToPIDMapping() (ports map[uint16]int, err error) {
func (proc *ProcessesWatcher) GetLocalPortToPIDMapping(transport applayer.Transport) (ports map[uint16]int, err error) {
return nil, nil
}
Loading