Skip to content

Commit

Permalink
Add process monitor capabilities to flows (elastic#541)
Browse files Browse the repository at this point in the history
Flows are updated to include information about the local processes that
generate them.

When one or both ends of a flow is identified as a local process, this
feature will add the following fields:

"proc": Process name for the server-side of the flow, as set up in packetbeat.procs.monitored.
"cmdline": Command-line for the local process on the server-side of the flow.
"client_proc": Process name for the client-side of the flow, as set up in packetbeat.procs.monitored.
"client_cmdline": Command-line for the local process on the client-side of the flow.
  • Loading branch information
adriansr committed Jul 4, 2018
1 parent 41fbad0 commit c3cfb67
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 8 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,7 @@ https://github.com/elastic/beats/compare/v6.2.3...master[Check the HEAD diff]
- HTTP publishes an Error event for unmatched requests or responses. {pull}6794[6794]
- The process monitor now reports the command-line for all processes, under Linux and Windows. {pull}7135[7135]
- Updated the TLS protocol parser with new cipher suites added to TLS 1.3. {issue}7455[7455]
- Flows are enriched with process information using the process monitor. {pull}7507[7507]

*Winlogbeat*

Expand Down
8 changes: 8 additions & 0 deletions packetbeat/_meta/fields.yml
Original file line number Diff line number Diff line change
Expand Up @@ -79,10 +79,18 @@
description: >
The name of the process that served the transaction.
- name: cmdline
description: >
The command-line of the process that served the transaction.
- name: client_proc
description: >
The name of the process that initiated the transaction.
- name: client_cmdline
description: >
The command-line of the process that initiated the transaction.
- name: release
description: >
The software release of the service serving the transaction.
Expand Down
59 changes: 51 additions & 8 deletions packetbeat/flows/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (

"github.com/elastic/beats/libbeat/beat"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/packetbeat/procs"
)

type flowsProcessor struct {
Expand Down Expand Up @@ -214,6 +215,7 @@ func createEvent(

source := common.MapStr{}
dest := common.MapStr{}
tuple := common.IPPortTuple{}

// add ethernet layer meta data
if src, dst, ok := f.id.EthAddr(); ok {
Expand All @@ -238,22 +240,44 @@ func createEvent(

// ipv4 layer meta data
if src, dst, ok := f.id.OutterIPv4Addr(); ok {
source["outer_ip"] = net.IP(src).String()
dest["outer_ip"] = net.IP(dst).String()
srcIP, dstIP := net.IP(src), net.IP(dst)
source["outer_ip"] = srcIP.String()
dest["outer_ip"] = dstIP.String()
tuple.SrcIP = srcIP
tuple.DstIP = dstIP
tuple.IPLength = 4
}
if src, dst, ok := f.id.IPv4Addr(); ok {
source["ip"] = net.IP(src).String()
dest["ip"] = net.IP(dst).String()
srcIP, dstIP := net.IP(src), net.IP(dst)
source["ip"] = srcIP.String()
dest["ip"] = dstIP.String()
// Save IPs for process matching if an outer layer was not present
if tuple.IPLength == 0 {
tuple.SrcIP = srcIP
tuple.DstIP = dstIP
tuple.IPLength = 4
}
}

// ipv6 layer meta data
if src, dst, ok := f.id.OutterIPv6Addr(); ok {
source["outer_ipv6"] = net.IP(src).String()
dest["outer_ipv6"] = net.IP(dst).String()
srcIP, dstIP := net.IP(src), net.IP(dst)
source["outer_ipv6"] = srcIP.String()
dest["outer_ipv6"] = dstIP.String()
tuple.SrcIP = srcIP
tuple.DstIP = dstIP
tuple.IPLength = 6
}
if src, dst, ok := f.id.IPv6Addr(); ok {
srcIP, dstIP := net.IP(src), net.IP(dst)
source["ipv6"] = net.IP(src).String()
dest["ipv6"] = net.IP(dst).String()
// Save IPs for process matching if an outer layer was not present
if tuple.IPLength == 0 {
tuple.SrcIP = srcIP
tuple.DstIP = dstIP
tuple.IPLength = 6
}
}

// udp layer meta data
Expand All @@ -265,8 +289,9 @@ func createEvent(

// tcp layer meta data
if src, dst, ok := f.id.TCPAddr(); ok {
source["port"] = binary.LittleEndian.Uint16(src)
dest["port"] = binary.LittleEndian.Uint16(dst)
tuple.SrcPort = binary.LittleEndian.Uint16(src)
tuple.DstPort = binary.LittleEndian.Uint16(dst)
source["port"], dest["port"] = tuple.SrcPort, tuple.DstPort
fields["transport"] = "tcp"
}

Expand All @@ -284,6 +309,24 @@ func createEvent(
fields["source"] = source
fields["dest"] = dest

// Set process information if it's available
if tuple.IPLength != 0 && tuple.SrcPort != 0 {
if cmdline := procs.ProcWatcher.FindProcessesTuple(&tuple); cmdline != nil {
src, dst := common.MakeEndpointPair(tuple.BaseTuple, cmdline)

for key, value := range map[string]string{
"client_proc": src.Name,
"client_cmdline": src.Cmdline,
"proc": dst.Name,
"cmdline": dst.Cmdline,
} {
if len(value) != 0 {
fields[key] = value
}
}
}
}

return beat.Event{
Timestamp: timestamp,
Fields: fields,
Expand Down

0 comments on commit c3cfb67

Please sign in to comment.