From 9e768f3489cc831e0231b8dbafae8fc439b58c62 Mon Sep 17 00:00:00 2001 From: Adrian Serrano Date: Thu, 12 Jul 2018 19:34:15 +0200 Subject: [PATCH] Add UDP support to packetbeat's process monitor (#7571) * Add UDP support to packetbeat's process monitor (#541) This updates the process monitor with UDP support. Until now, it would lookup all ports as TCP. This could result in the wrong process being reported when two different processes used the same port number, one for TCP and one for UDP. The Linux and Windows implementations have been updated to fetch information for UDP ports. Closes #541 * Document the new process monitor features - Windows support - UDP support - cmdline fields --- CHANGELOG.asciidoc | 1 + packetbeat/docs/packetbeat-options.asciidoc | 21 ++++--- packetbeat/flows/worker.go | 11 +++- packetbeat/procs/procs.go | 64 ++++++++++++++------- packetbeat/procs/procs_linux.go | 30 +++++++--- packetbeat/procs/procs_other.go | 4 +- packetbeat/procs/procs_test.go | 54 ++++++++++++++--- packetbeat/procs/procs_windows.go | 53 +++++++++++++++-- packetbeat/procs/syscall_windows.go | 15 +++++ packetbeat/procs/zsyscall_windows.go | 20 +++++++ packetbeat/protos/amqp/amqp_parser.go | 2 +- packetbeat/protos/cassandra/trans.go | 2 +- packetbeat/protos/dns/dns_tcp.go | 2 +- packetbeat/protos/dns/dns_udp.go | 2 +- packetbeat/protos/http/http.go | 2 +- packetbeat/protos/memcache/plugin_tcp.go | 2 +- packetbeat/protos/memcache/plugin_udp.go | 2 +- packetbeat/protos/mongodb/mongodb.go | 2 +- packetbeat/protos/mysql/mysql.go | 2 +- packetbeat/protos/pgsql/pgsql.go | 2 +- packetbeat/protos/redis/redis.go | 2 +- packetbeat/protos/thrift/thrift.go | 2 +- packetbeat/protos/tls/tls.go | 2 +- 23 files changed, 233 insertions(+), 66 deletions(-) diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index 8eefb2a421b..e4f03e9e1f8 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -312,6 +312,7 @@ https://github.com/elastic/beats/compare/v6.2.3...master[Check the HEAD diff] - The process monitor now reports the command-line for all processes, under Linux and Windows. {pull}7135[7135] - Updated the TLS protocol parser with new cipher suites added to TLS 1.3. {issue}7455[7455] - Flows are enriched with process information using the process monitor. {pull}7507[7507] +- Added UDP support to process monitor. {pull}7571[7571] *Winlogbeat* diff --git a/packetbeat/docs/packetbeat-options.asciidoc b/packetbeat/docs/packetbeat-options.asciidoc index 5c2f5b46066..9fa6060fb8c 100644 --- a/packetbeat/docs/packetbeat-options.asciidoc +++ b/packetbeat/docs/packetbeat-options.asciidoc @@ -1283,15 +1283,15 @@ process matching is disabled. When Packetbeat starts, and then periodically afterwards, it scans the process table for processes that match the configuration file. For each of these processes, it monitors which file descriptors it has opened. When a new packet is captured, -it reads the list of active TCP connections and matches the corresponding one +it reads the list of active TCP and UDP connections and matches the corresponding one with the list of file descriptors. -On a Linux system, all this information is available via the `/proc` -file system, so Packetbeat doesn't need a kernel module. - +All this information is available via system interfaces: The `/proc` file system +in Linux and the IP Helper API (`iphlpapi.dll`) on Windows, so {beatname_uc} +doesn't need a kernel module. NOTE: Process monitoring is currently only supported on - Linux systems. Packetbeat automatically disables + Linux and Windows systems. Packetbeat automatically disables process monitoring when it detects other operating systems. Example configuration: @@ -1314,6 +1314,14 @@ packetbeat.procs: cmdline_grep: gunicorn ------------------------------------------------------------------------------ +When the process monitor is enabled, it will enrich all the events whose source +or destination is a local process. The `cmdline` and/or `client_cmdline` fields +will be added to an event, when the server side or client side of the connection +belong to a local process, respectively. Additionally, you can specify a pattern +using the `cmdline_grep` option, to also name those processes. This will cause +the `proc` and `client_proc` fields to be added to an event, with the name of +the matched process. + [float] === Configuration options @@ -1350,6 +1358,3 @@ Example configuration: ------------------------------------------------------------------------------------- packetbeat.shutdown_timeout: 5s ------------------------------------------------------------------------------------- - - - diff --git a/packetbeat/flows/worker.go b/packetbeat/flows/worker.go index e6bb5df050f..8ce9bdbc901 100644 --- a/packetbeat/flows/worker.go +++ b/packetbeat/flows/worker.go @@ -27,6 +27,7 @@ import ( "github.com/elastic/beats/libbeat/beat" "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/packetbeat/procs" + "github.com/elastic/beats/packetbeat/protos/applayer" ) type flowsProcessor struct { @@ -216,6 +217,7 @@ func createEvent( source := common.MapStr{} dest := common.MapStr{} tuple := common.IPPortTuple{} + var proto applayer.Transport // add ethernet layer meta data if src, dst, ok := f.id.EthAddr(); ok { @@ -282,9 +284,11 @@ func createEvent( // udp layer meta data if src, dst, ok := f.id.UDPAddr(); ok { - source["port"] = binary.LittleEndian.Uint16(src) - dest["port"] = binary.LittleEndian.Uint16(dst) + tuple.SrcPort = binary.LittleEndian.Uint16(src) + tuple.DstPort = binary.LittleEndian.Uint16(dst) + source["port"], dest["port"] = tuple.SrcPort, tuple.DstPort fields["transport"] = "udp" + proto = applayer.TransportUDP } // tcp layer meta data @@ -293,6 +297,7 @@ func createEvent( tuple.DstPort = binary.LittleEndian.Uint16(dst) source["port"], dest["port"] = tuple.SrcPort, tuple.DstPort fields["transport"] = "tcp" + proto = applayer.TransportTCP } if id := f.id.ConnectionID(); id != nil { @@ -311,7 +316,7 @@ func createEvent( // Set process information if it's available if tuple.IPLength != 0 && tuple.SrcPort != 0 { - if cmdline := procs.ProcWatcher.FindProcessesTuple(&tuple); cmdline != nil { + if cmdline := procs.ProcWatcher.FindProcessesTuple(&tuple, proto); cmdline != nil { src, dst := common.MakeEndpointPair(tuple.BaseTuple, cmdline) for key, value := range map[string]string{ diff --git a/packetbeat/procs/procs.go b/packetbeat/procs/procs.go index 651e49967a6..649155dfa79 100644 --- a/packetbeat/procs/procs.go +++ b/packetbeat/procs/procs.go @@ -24,6 +24,7 @@ import ( "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/logp" + "github.com/elastic/beats/packetbeat/protos/applayer" "github.com/elastic/gosigar" ) @@ -49,7 +50,7 @@ type process struct { type processWatcherImpl interface { // GetLocalPortToPIDMapping returns the list of local port numbers and the PID // that owns them. - GetLocalPortToPIDMapping() (ports map[uint16]int, err error) + GetLocalPortToPIDMapping(transport applayer.Transport) (ports map[uint16]int, err error) // GetProcessCommandLine returns the command line for a given process. GetProcessCommandLine(pid int) string // GetLocalIPs returns the list of local addresses. @@ -57,7 +58,7 @@ type processWatcherImpl interface { } type ProcessesWatcher struct { - portProcMap map[uint16]portProcMapping + portProcMap map[applayer.Transport]map[uint16]portProcMapping localAddrs []net.IP processCache map[int]*process @@ -76,7 +77,11 @@ func (proc *ProcessesWatcher) Init(config ProcsConfig) error { func (proc *ProcessesWatcher) initWithImpl(config ProcsConfig, impl processWatcherImpl) error { proc.impl = impl - proc.portProcMap = make(map[uint16]portProcMapping) + proc.portProcMap = map[applayer.Transport]map[uint16]portProcMapping{ + applayer.TransportUDP: make(map[uint16]portProcMapping), + applayer.TransportTCP: make(map[uint16]portProcMapping), + } + proc.processCache = make(map[int]*process) proc.enabled = config.Enabled @@ -99,7 +104,21 @@ func (proc *ProcessesWatcher) initWithImpl(config ProcsConfig, impl processWatch return nil } -func (proc *ProcessesWatcher) FindProcessesTuple(tuple *common.IPPortTuple) (procTuple *common.CmdlineTuple) { +// FindProcessesTupleTCP looks up local process information for the source and +// destination addresses of TCP tuple +func (proc *ProcessesWatcher) FindProcessesTupleTCP(tuple *common.IPPortTuple) (procTuple *common.CmdlineTuple) { + return proc.FindProcessesTuple(tuple, applayer.TransportTCP) +} + +// FindProcessesTupleUDP looks up local process information for the source and +// destination addresses of UDP tuple +func (proc *ProcessesWatcher) FindProcessesTupleUDP(tuple *common.IPPortTuple) (procTuple *common.CmdlineTuple) { + return proc.FindProcessesTuple(tuple, applayer.TransportUDP) +} + +// FindProcessesTuple looks up local process information for the source and +// destination addresses of a tuple for the given transport protocol +func (proc *ProcessesWatcher) FindProcessesTuple(tuple *common.IPPortTuple, transport applayer.Transport) (procTuple *common.CmdlineTuple) { procTuple = &common.CmdlineTuple{} if !proc.enabled { @@ -107,35 +126,40 @@ func (proc *ProcessesWatcher) FindProcessesTuple(tuple *common.IPPortTuple) (pro } if proc.isLocalIP(tuple.SrcIP) { - if p := proc.findProc(tuple.SrcPort); p != nil { + if p := proc.findProc(tuple.SrcPort, transport); p != nil { procTuple.Src = []byte(p.name) procTuple.SrcCommand = []byte(p.commandLine) - logp.Debug("procs", "Found process '%s' (%s) for port %d", p.commandLine, p.name, tuple.SrcPort) + logp.Debug("procs", "Found process '%s' (%s) for port %d/%s", p.commandLine, p.name, tuple.SrcPort, transport) } } if proc.isLocalIP(tuple.DstIP) { - if p := proc.findProc(tuple.DstPort); p != nil { + if p := proc.findProc(tuple.DstPort, transport); p != nil { procTuple.Dst = []byte(p.name) procTuple.DstCommand = []byte(p.commandLine) - logp.Debug("procs", "Found process '%s' (%s) for port %d", p.commandLine, p.name, tuple.DstPort) + logp.Debug("procs", "Found process '%s' (%s) for port %d/%s", p.commandLine, p.name, tuple.DstPort, transport) } } return } -func (proc *ProcessesWatcher) findProc(port uint16) *process { +func (proc *ProcessesWatcher) findProc(port uint16, transport applayer.Transport) *process { defer logp.Recover("FindProc exception") - p, exists := proc.portProcMap[port] + procMap, ok := proc.portProcMap[transport] + if !ok { + return nil + } + + p, exists := procMap[port] if exists { return p.proc } - proc.updateMap() + proc.updateMap(transport) - p, exists = proc.portProcMap[port] + p, exists = procMap[port] if exists { return p.proc } @@ -143,7 +167,7 @@ func (proc *ProcessesWatcher) findProc(port uint16) *process { return nil } -func (proc *ProcessesWatcher) updateMap() { +func (proc *ProcessesWatcher) updateMap(transport applayer.Transport) { if logp.HasSelector("procsdetailed") { start := time.Now() defer func() { @@ -151,7 +175,7 @@ func (proc *ProcessesWatcher) updateMap() { }() } - ports, err := proc.impl.GetLocalPortToPIDMapping() + ports, err := proc.impl.GetLocalPortToPIDMapping(transport) if err != nil { logp.Err("unable to list local ports: %v", err) } @@ -159,12 +183,12 @@ func (proc *ProcessesWatcher) updateMap() { proc.expireProcessCache() for port, pid := range ports { - proc.updateMappingEntry(port, pid) + proc.updateMappingEntry(transport, port, pid) } } -func (proc *ProcessesWatcher) updateMappingEntry(port uint16, pid int) { - prev, ok := proc.portProcMap[port] +func (proc *ProcessesWatcher) updateMappingEntry(transport applayer.Transport, port uint16, pid int) { + prev, ok := proc.portProcMap[transport][port] if ok && prev.pid == pid { // This port->pid mapping already exists return @@ -179,10 +203,10 @@ func (proc *ProcessesWatcher) updateMappingEntry(port uint16, pid int) { // We never expire entries from this map. Since there are 65k possible // ports, the size of the dict can be max 1.5 MB, which we consider // reasonable. - proc.portProcMap[port] = portProcMapping{port: port, pid: pid, proc: p} + proc.portProcMap[transport][port] = portProcMapping{port: port, pid: pid, proc: p} - logp.Debug("procsdetailed", "updateMappingEntry(): port=%d pid=%d process='%s' name=%s", - port, pid, p.commandLine, p.name) + logp.Debug("procsdetailed", "updateMappingEntry(): port=%d/%s pid=%d process='%s' name=%s", + port, transport, pid, p.commandLine, p.name) } func (proc *ProcessesWatcher) isLocalIP(ip net.IP) bool { diff --git a/packetbeat/procs/procs_linux.go b/packetbeat/procs/procs_linux.go index eff29718d24..907e9f42bca 100644 --- a/packetbeat/procs/procs_linux.go +++ b/packetbeat/procs/procs_linux.go @@ -32,6 +32,7 @@ import ( "strings" "github.com/elastic/beats/libbeat/logp" + "github.com/elastic/beats/packetbeat/protos/applayer" "github.com/elastic/gosigar" ) @@ -43,22 +44,33 @@ type socketInfo struct { inode uint64 } +var procFiles = map[applayer.Transport]struct { + ipv4, ipv6 string +}{ + applayer.TransportUDP: {"/proc/net/udp", "/proc/net/udp6"}, + applayer.TransportTCP: {"/proc/net/tcp", "/proc/net/tcp6"}, +} + // GetLocalPortToPIDMapping returns the list of local port numbers and the PID // that owns them. -func (proc *ProcessesWatcher) GetLocalPortToPIDMapping() (ports map[uint16]int, err error) { +func (proc *ProcessesWatcher) GetLocalPortToPIDMapping(transport applayer.Transport) (ports map[uint16]int, err error) { + sourceFiles, ok := procFiles[transport] + if !ok { + return nil, fmt.Errorf("unsupported transport protocol id: %d", transport) + } var pids gosigar.ProcList if err = pids.Get(); err != nil { return nil, err } logp.Debug("procs", "getLocalPortsToPIDs()") - ipv4socks, err := socketsFromProc("/proc/net/tcp", false) + ipv4socks, err := socketsFromProc(sourceFiles.ipv4, false) if err != nil { - logp.Err("Parse_Proc_Net_Tcp: %s", err) + logp.Err("GetLocalPortToPIDMapping: parsing '%s': %s", sourceFiles.ipv4, err) return nil, err } - ipv6socks, err := socketsFromProc("/proc/net/tcp6", true) + ipv6socks, err := socketsFromProc(sourceFiles.ipv6, true) if err != nil { - logp.Err("Parse_Proc_Net_Tcp ipv6: %s", err) + logp.Err("GetLocalPortToPIDMapping: parsing '%s': %s", sourceFiles.ipv6, err) return nil, err } socksMap := map[uint64]*socketInfo{} @@ -126,11 +138,11 @@ func socketsFromProc(filename string, ipv6 bool) ([]*socketInfo, error) { return nil, err } defer file.Close() - return parseProcNetTCP(file, ipv6) + return parseProcNetProto(file, ipv6) } -// Parses the /proc/net/tcp file -func parseProcNetTCP(input io.Reader, ipv6 bool) ([]*socketInfo, error) { +// Parses the /proc/net/(tcp|udp)6? file +func parseProcNetProto(input io.Reader, ipv6 bool) ([]*socketInfo, error) { buf := bufio.NewReader(input) sockets := []*socketInfo{} @@ -139,7 +151,7 @@ func parseProcNetTCP(input io.Reader, ipv6 bool) ([]*socketInfo, error) { for err != io.EOF { line, err = buf.ReadBytes('\n') if err != nil && err != io.EOF { - logp.Err("Error reading proc net tcp file: %s", err) + logp.Err("Error reading proc net file: %s", err) return nil, err } words := bytes.Fields(line) diff --git a/packetbeat/procs/procs_other.go b/packetbeat/procs/procs_other.go index 98698fe46f3..b91a132adbe 100644 --- a/packetbeat/procs/procs_other.go +++ b/packetbeat/procs/procs_other.go @@ -19,8 +19,10 @@ package procs +import "github.com/elastic/beats/packetbeat/protos/applayer" + // GetLocalPortToPIDMapping returns the list of local port numbers and the PID // that owns them. -func (proc *ProcessesWatcher) GetLocalPortToPIDMapping() (ports map[uint16]int, err error) { +func (proc *ProcessesWatcher) GetLocalPortToPIDMapping(transport applayer.Transport) (ports map[uint16]int, err error) { return nil, nil } diff --git a/packetbeat/procs/procs_test.go b/packetbeat/procs/procs_test.go index d8fa7e3b304..5278730480e 100644 --- a/packetbeat/procs/procs_test.go +++ b/packetbeat/procs/procs_test.go @@ -26,13 +26,15 @@ import ( "github.com/stretchr/testify/assert" + "github.com/elastic/beats/packetbeat/protos/applayer" + "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/logp" ) type testingImpl struct { localIPs []net.IP - portToPID map[uint16]int + portToPID map[applayer.Transport]map[uint16]int pidToCmdline map[int]string } @@ -40,25 +42,29 @@ type runningProcess struct { cmdline string pid int ports []uint16 + proto applayer.Transport } func newTestingImpl(localIPs []net.IP, processes []runningProcess) *testingImpl { impl := &testingImpl{ - localIPs: localIPs, - portToPID: make(map[uint16]int), + localIPs: localIPs, + portToPID: map[applayer.Transport]map[uint16]int{ + applayer.TransportTCP: make(map[uint16]int), + applayer.TransportUDP: make(map[uint16]int), + }, pidToCmdline: make(map[int]string), } for _, proc := range processes { for _, port := range proc.ports { - impl.portToPID[port] = proc.pid + impl.portToPID[proc.proto][port] = proc.pid } impl.pidToCmdline[proc.pid] = proc.cmdline } return impl } -func (impl *testingImpl) GetLocalPortToPIDMapping() (ports map[uint16]int, err error) { - return impl.portToPID, nil +func (impl *testingImpl) GetLocalPortToPIDMapping(transport applayer.Transport) (ports map[uint16]int, err error) { + return impl.portToPID[transport], nil } func (impl *testingImpl) GetProcessCommandLine(pid int) string { @@ -92,16 +98,25 @@ func TestFindProcessTuple(t *testing.T) { cmdline: "curl -o /dev/null http://example.net/", pid: 101, ports: []uint16{65535}, + proto: applayer.TransportTCP, }, { cmdline: "/usr/X11/bin/webbrowser", pid: 102, ports: []uint16{3201, 3202, 3203}, + proto: applayer.TransportTCP, }, { cmdline: "nc -v -l -p 80", pid: 105, ports: []uint16{80}, + proto: applayer.TransportTCP, + }, + { + cmdline: "bind", + pid: 333, + ports: []uint16{53}, + proto: applayer.TransportUDP, }, }) procs := ProcessesWatcher{} @@ -112,10 +127,12 @@ func TestFindProcessTuple(t *testing.T) { name string srcIP, dstIP, src, dst, srcCmd, dstCmd string srcPort, dstPort int + proto applayer.Transport preAction func() }{ { name: "Unrelated local HTTP client", + proto: applayer.TransportTCP, srcIP: "127.0.0.1", srcPort: 12345, dstIP: "1.2.3.4", dstPort: 80, src: "", srcCmd: "", @@ -123,6 +140,7 @@ func TestFindProcessTuple(t *testing.T) { }, { name: "Web browser (IPv6)", + proto: applayer.TransportTCP, srcIP: "7777::0:33", srcPort: 3201, dstIP: "1234:1234::AAAA", dstPort: 443, src: "", srcCmd: "/usr/X11/bin/webbrowser", @@ -130,13 +148,23 @@ func TestFindProcessTuple(t *testing.T) { }, { name: "Curl request", + proto: applayer.TransportTCP, srcIP: "192.168.1.1", srcPort: 65535, dstIP: "1.1.1.1", dstPort: 80, src: "Curl", srcCmd: "curl -o /dev/null http://example.net/", dst: "", dstCmd: "", }, + { + name: "Unrelated UDP using same port as TCP", + proto: applayer.TransportUDP, + srcIP: "192.168.1.1", srcPort: 65535, + dstIP: "1.1.1.1", dstPort: 80, + src: "", srcCmd: "", + dst: "", dstCmd: "", + }, { name: "Local web browser to netcat server", + proto: applayer.TransportTCP, srcIP: "127.0.0.1", srcPort: 3202, dstIP: "127.0.0.1", dstPort: 80, src: "", srcCmd: "/usr/X11/bin/webbrowser", @@ -144,6 +172,7 @@ func TestFindProcessTuple(t *testing.T) { }, { name: "External to netcat server", + proto: applayer.TransportTCP, srcIP: "192.168.1.2", srcPort: 3203, dstIP: "192.168.1.1", dstPort: 80, src: "", srcCmd: "", @@ -154,13 +183,22 @@ func TestFindProcessTuple(t *testing.T) { preAction: func() { // add a new running process impl.pidToCmdline[555] = "/usr/bin/nmap -sT -P443 10.0.0.0/8" - impl.portToPID[55555] = 555 + impl.portToPID[applayer.TransportTCP][55555] = 555 }, + proto: applayer.TransportTCP, srcIP: "7777::33", srcPort: 55555, dstIP: "10.1.2.3", dstPort: 443, src: "NMap", srcCmd: "/usr/bin/nmap -sT -P443 10.0.0.0/8", dst: "", dstCmd: "", }, + { + name: "DNS request (UDP)", + proto: applayer.TransportUDP, + srcIP: "1234:5678::55", srcPort: 533, + dstIP: "7777::33", dstPort: 53, + src: "", srcCmd: "", + dst: "", dstCmd: "bind", + }, } { msg := fmt.Sprintf("test case #%d: %s", idx+1, testCase.name) @@ -175,7 +213,7 @@ func TestFindProcessTuple(t *testing.T) { DstPort: uint16(testCase.dstPort), }, } - result := procs.FindProcessesTuple(&input) + result := procs.FindProcessesTuple(&input, testCase.proto) // nil result is not valid assert.NotNil(t, result, msg) diff --git a/packetbeat/procs/procs_windows.go b/packetbeat/procs/procs_windows.go index 16e6257aaeb..0970b52f51c 100644 --- a/packetbeat/procs/procs_windows.go +++ b/packetbeat/procs/procs_windows.go @@ -27,6 +27,8 @@ import ( "unsafe" "golang.org/x/sys/windows" + + "github.com/elastic/beats/packetbeat/protos/applayer" ) type extractor interface { @@ -41,20 +43,33 @@ type extractorFactory func(fn callbackFn) extractor type tcpRowOwnerPIDExtractor callbackFn type tcp6RowOwnerPIDExtractor callbackFn +type udpRowOwnerPIDExtractor callbackFn +type udp6RowOwnerPIDExtractor callbackFn -var tables = []struct { +var tablesByTransport = map[applayer.Transport][]struct { family uint32 function GetExtendedTableFn class uint32 extractor extractorFactory }{ - {windows.AF_INET, _GetExtendedTcpTable, TCP_TABLE_OWNER_PID_ALL, extractTCPRowOwnerPID}, - {windows.AF_INET6, _GetExtendedTcpTable, TCP_TABLE_OWNER_PID_ALL, extractTCP6RowOwnerPID}, + applayer.TransportTCP: { + {windows.AF_INET, _GetExtendedTcpTable, TCP_TABLE_OWNER_PID_ALL, extractTCPRowOwnerPID}, + {windows.AF_INET6, _GetExtendedTcpTable, TCP_TABLE_OWNER_PID_ALL, extractTCP6RowOwnerPID}, + }, + applayer.TransportUDP: { + {windows.AF_INET, _GetExtendedUdpTable, UDP_TABLE_OWNER_PID, extractUDPRowOwnerPID}, + {windows.AF_INET6, _GetExtendedUdpTable, UDP_TABLE_OWNER_PID, extractUDP6RowOwnerPID}, + }, } // GetLocalPortToPIDMapping returns the list of local port numbers and the PID // that owns them. -func (proc *ProcessesWatcher) GetLocalPortToPIDMapping() (ports map[uint16]int, err error) { +func (proc *ProcessesWatcher) GetLocalPortToPIDMapping(transport applayer.Transport) (ports map[uint16]int, err error) { + tables, ok := tablesByTransport[transport] + if !ok { + return nil, fmt.Errorf("unsupported transport protocol id: %d", transport) + } + storeResults := func(localPort uint16, pid int) { ports[localPort] = pid } @@ -121,6 +136,14 @@ func extractTCP6RowOwnerPID(fn callbackFn) extractor { return tcp6RowOwnerPIDExtractor(fn) } +func extractUDPRowOwnerPID(fn callbackFn) extractor { + return udpRowOwnerPIDExtractor(fn) +} + +func extractUDP6RowOwnerPID(fn callbackFn) extractor { + return udp6RowOwnerPIDExtractor(fn) +} + // Extract will parse a row of Size() bytes pointed to by ptr func (e tcpRowOwnerPIDExtractor) Extract(ptr unsafe.Pointer) { row := (*TCPRowOwnerPID)(ptr) @@ -142,3 +165,25 @@ func (e tcp6RowOwnerPIDExtractor) Extract(ptr unsafe.Pointer) { func (tcp6RowOwnerPIDExtractor) Size() int { return int(unsafe.Sizeof(TCP6RowOwnerPID{})) } + +// Extract will parse a row of Size() bytes pointed to by ptr +func (e udpRowOwnerPIDExtractor) Extract(ptr unsafe.Pointer) { + row := (*UDPRowOwnerPID)(ptr) + e(uint32FieldToPort(row.localPort), int(row.owningPID)) +} + +// Size returns the size of a table row +func (udpRowOwnerPIDExtractor) Size() int { + return int(unsafe.Sizeof(UDPRowOwnerPID{})) +} + +// Extract will parse a row of Size() bytes pointed to by ptr +func (e udp6RowOwnerPIDExtractor) Extract(ptr unsafe.Pointer) { + row := (*UDP6RowOwnerPID)(ptr) + e(uint32FieldToPort(row.localPort), int(row.owningPID)) +} + +// Size returns the size of a table row +func (udp6RowOwnerPIDExtractor) Size() int { + return int(unsafe.Sizeof(UDP6RowOwnerPID{})) +} diff --git a/packetbeat/procs/syscall_windows.go b/packetbeat/procs/syscall_windows.go index 16cc4ee165a..f3f1aa00baf 100644 --- a/packetbeat/procs/syscall_windows.go +++ b/packetbeat/procs/syscall_windows.go @@ -24,6 +24,7 @@ import ( ) const ( + UDP_TABLE_OWNER_PID = 1 TCP_TABLE_OWNER_PID_ALL = 5 sizeOfDWORD = 4 @@ -51,6 +52,19 @@ type TCP6RowOwnerPID struct { owningPID uint32 } +type UDPRowOwnerPID struct { + localAddr uint32 + localPort uint32 + owningPID uint32 +} + +type UDP6RowOwnerPID struct { + localAddr [16]byte + localScopeID uint32 + localPort uint32 + owningPID uint32 +} + // GetExtendedTableFn is the prototype for GetExtendedTcpTable and GetExtendedUdpTable type GetExtendedTableFn func(pTcpTable uintptr, pdwSize *uint32, bOrder bool, ulAf uint32, tableClass uint32, reserved uint32) (code syscall.Errno, err error) @@ -59,3 +73,4 @@ type GetExtendedTableFn func(pTcpTable uintptr, pdwSize *uint32, bOrder bool, ul // Windows API calls //sys _GetExtendedTcpTable(pTcpTable uintptr, pdwSize *uint32, bOrder bool, ulAf uint32, tableClass uint32, reserved uint32) (code syscall.Errno, err error) = iphlpapi.GetExtendedTcpTable +//sys _GetExtendedUdpTable(pTcpTable uintptr, pdwSize *uint32, bOrder bool, ulAf uint32, tableClass uint32, reserved uint32) (code syscall.Errno, err error) = iphlpapi.GetExtendedUdpTable diff --git a/packetbeat/procs/zsyscall_windows.go b/packetbeat/procs/zsyscall_windows.go index 94e1a299f02..b313698211e 100644 --- a/packetbeat/procs/zsyscall_windows.go +++ b/packetbeat/procs/zsyscall_windows.go @@ -57,6 +57,7 @@ var ( modiphlpapi = windows.NewLazySystemDLL("iphlpapi.dll") procGetExtendedTcpTable = modiphlpapi.NewProc("GetExtendedTcpTable") + procGetExtendedUdpTable = modiphlpapi.NewProc("GetExtendedUdpTable") ) func _GetExtendedTcpTable(pTcpTable uintptr, pdwSize *uint32, bOrder bool, ulAf uint32, tableClass uint32, reserved uint32) (code syscall.Errno, err error) { @@ -77,3 +78,22 @@ func _GetExtendedTcpTable(pTcpTable uintptr, pdwSize *uint32, bOrder bool, ulAf } return } + +func _GetExtendedUdpTable(pTcpTable uintptr, pdwSize *uint32, bOrder bool, ulAf uint32, tableClass uint32, reserved uint32) (code syscall.Errno, err error) { + var _p0 uint32 + if bOrder { + _p0 = 1 + } else { + _p0 = 0 + } + r0, _, e1 := syscall.Syscall6(procGetExtendedUdpTable.Addr(), 6, uintptr(pTcpTable), uintptr(unsafe.Pointer(pdwSize)), uintptr(_p0), uintptr(ulAf), uintptr(tableClass), uintptr(reserved)) + code = syscall.Errno(r0) + if code == 0 { + if e1 != 0 { + err = errnoErr(e1) + } else { + err = syscall.EINVAL + } + } + return +} diff --git a/packetbeat/protos/amqp/amqp_parser.go b/packetbeat/protos/amqp/amqp_parser.go index 8c2d78e110e..2e59641bd90 100644 --- a/packetbeat/protos/amqp/amqp_parser.go +++ b/packetbeat/protos/amqp/amqp_parser.go @@ -336,7 +336,7 @@ func (amqp *amqpPlugin) handleAmqp(m *amqpMessage, tcptuple *common.TCPTuple, di debugf("A message is ready to be handled") m.tcpTuple = *tcptuple m.direction = dir - m.cmdlineTuple = procs.ProcWatcher.FindProcessesTuple(tcptuple.IPPort()) + m.cmdlineTuple = procs.ProcWatcher.FindProcessesTupleTCP(tcptuple.IPPort()) if m.method == "basic.publish" { amqp.handlePublishing(m) diff --git a/packetbeat/protos/cassandra/trans.go b/packetbeat/protos/cassandra/trans.go index a1ed3c2aa90..49eb7be7caf 100644 --- a/packetbeat/protos/cassandra/trans.go +++ b/packetbeat/protos/cassandra/trans.go @@ -59,7 +59,7 @@ func (trans *transactions) onMessage( var err error msg.Tuple = *tuple msg.Transport = applayer.TransportTCP - msg.CmdlineTuple = procs.ProcWatcher.FindProcessesTuple(&msg.Tuple) + msg.CmdlineTuple = procs.ProcWatcher.FindProcessesTupleTCP(&msg.Tuple) if msg.IsRequest { if isDebug { diff --git a/packetbeat/protos/dns/dns_tcp.go b/packetbeat/protos/dns/dns_tcp.go index 445dc8091ca..bce95d6c766 100644 --- a/packetbeat/protos/dns/dns_tcp.go +++ b/packetbeat/protos/dns/dns_tcp.go @@ -150,7 +150,7 @@ func (dns *dnsPlugin) handleDNS(conn *dnsConnectionData, tcpTuple *common.TCPTup message := conn.data[dir].message dnsTuple := dnsTupleFromIPPort(&message.tuple, transportTCP, decodedData.Id) - message.cmdlineTuple = procs.ProcWatcher.FindProcessesTuple(tcpTuple.IPPort()) + message.cmdlineTuple = procs.ProcWatcher.FindProcessesTupleTCP(tcpTuple.IPPort()) message.data = decodedData message.length += decodeOffset diff --git a/packetbeat/protos/dns/dns_udp.go b/packetbeat/protos/dns/dns_udp.go index f89087ae560..d563a97837b 100644 --- a/packetbeat/protos/dns/dns_udp.go +++ b/packetbeat/protos/dns/dns_udp.go @@ -47,7 +47,7 @@ func (dns *dnsPlugin) ParseUDP(pkt *protos.Packet) { dnsMsg := &dnsMessage{ ts: pkt.Ts, tuple: pkt.Tuple, - cmdlineTuple: procs.ProcWatcher.FindProcessesTuple(&pkt.Tuple), + cmdlineTuple: procs.ProcWatcher.FindProcessesTupleUDP(&pkt.Tuple), data: dnsPkt, length: packetSize, } diff --git a/packetbeat/protos/http/http.go b/packetbeat/protos/http/http.go index caea7e1229e..7415fea3988 100644 --- a/packetbeat/protos/http/http.go +++ b/packetbeat/protos/http/http.go @@ -415,7 +415,7 @@ func (http *httpPlugin) handleHTTP( m.tcpTuple = *tcptuple m.direction = dir - m.cmdlineTuple = procs.ProcWatcher.FindProcessesTuple(tcptuple.IPPort()) + m.cmdlineTuple = procs.ProcWatcher.FindProcessesTupleTCP(tcptuple.IPPort()) http.hideHeaders(m) if m.isRequest { diff --git a/packetbeat/protos/memcache/plugin_tcp.go b/packetbeat/protos/memcache/plugin_tcp.go index 4b9e2226ef0..ff9f15f266b 100644 --- a/packetbeat/protos/memcache/plugin_tcp.go +++ b/packetbeat/protos/memcache/plugin_tcp.go @@ -191,7 +191,7 @@ func (mc *memcache) onTCPMessage( ) error { msg.Tuple = *tuple msg.Transport = applayer.TransportTCP - msg.CmdlineTuple = procs.ProcWatcher.FindProcessesTuple(tuple) + msg.CmdlineTuple = procs.ProcWatcher.FindProcessesTupleTCP(tuple) if msg.IsRequest { return mc.onTCPRequest(conn, tuple, dir, msg) diff --git a/packetbeat/protos/memcache/plugin_udp.go b/packetbeat/protos/memcache/plugin_udp.go index 2287f317491..e0041ccf83f 100644 --- a/packetbeat/protos/memcache/plugin_udp.go +++ b/packetbeat/protos/memcache/plugin_udp.go @@ -184,7 +184,7 @@ func (mc *memcache) onUDPMessage( } msg.Tuple = *tuple msg.Transport = applayer.TransportUDP - msg.CmdlineTuple = procs.ProcWatcher.FindProcessesTuple(tuple) + msg.CmdlineTuple = procs.ProcWatcher.FindProcessesTupleUDP(tuple) done := false var err error diff --git a/packetbeat/protos/mongodb/mongodb.go b/packetbeat/protos/mongodb/mongodb.go index 549feeb2016..5b7eb9d3dea 100644 --- a/packetbeat/protos/mongodb/mongodb.go +++ b/packetbeat/protos/mongodb/mongodb.go @@ -217,7 +217,7 @@ func (mongodb *mongodbPlugin) handleMongodb( m.tcpTuple = *tcptuple m.direction = dir - m.cmdlineTuple = procs.ProcWatcher.FindProcessesTuple(tcptuple.IPPort()) + m.cmdlineTuple = procs.ProcWatcher.FindProcessesTupleTCP(tcptuple.IPPort()) if m.isResponse { debugf("MongoDB response message") diff --git a/packetbeat/protos/mysql/mysql.go b/packetbeat/protos/mysql/mysql.go index 096254aac5a..dacbdb5c769 100644 --- a/packetbeat/protos/mysql/mysql.go +++ b/packetbeat/protos/mysql/mysql.go @@ -582,7 +582,7 @@ func handleMysql(mysql *mysqlPlugin, m *mysqlMessage, tcptuple *common.TCPTuple, m.tcpTuple = *tcptuple m.direction = dir - m.cmdlineTuple = procs.ProcWatcher.FindProcessesTuple(tcptuple.IPPort()) + m.cmdlineTuple = procs.ProcWatcher.FindProcessesTupleTCP(tcptuple.IPPort()) m.raw = rawMsg if m.isRequest { diff --git a/packetbeat/protos/pgsql/pgsql.go b/packetbeat/protos/pgsql/pgsql.go index c9dfa5d5809..3159bea89ca 100644 --- a/packetbeat/protos/pgsql/pgsql.go +++ b/packetbeat/protos/pgsql/pgsql.go @@ -361,7 +361,7 @@ var handlePgsql = func(pgsql *pgsqlPlugin, m *pgsqlMessage, tcptuple *common.TCP m.tcpTuple = *tcptuple m.direction = dir - m.cmdlineTuple = procs.ProcWatcher.FindProcessesTuple(tcptuple.IPPort()) + m.cmdlineTuple = procs.ProcWatcher.FindProcessesTupleTCP(tcptuple.IPPort()) if m.isRequest { pgsql.receivedPgsqlRequest(m) diff --git a/packetbeat/protos/redis/redis.go b/packetbeat/protos/redis/redis.go index 5de7c39f752..4e51dbb2e52 100644 --- a/packetbeat/protos/redis/redis.go +++ b/packetbeat/protos/redis/redis.go @@ -241,7 +241,7 @@ func (redis *redisPlugin) handleRedis( ) { m.tcpTuple = *tcptuple m.direction = dir - m.cmdlineTuple = procs.ProcWatcher.FindProcessesTuple(tcptuple.IPPort()) + m.cmdlineTuple = procs.ProcWatcher.FindProcessesTupleTCP(tcptuple.IPPort()) if m.isRequest { conn.requests.append(m) // wait for response diff --git a/packetbeat/protos/thrift/thrift.go b/packetbeat/protos/thrift/thrift.go index cb4e7bca7cd..25088cc87ef 100644 --- a/packetbeat/protos/thrift/thrift.go +++ b/packetbeat/protos/thrift/thrift.go @@ -895,7 +895,7 @@ func (thrift *thriftPlugin) messageComplete(tcptuple *common.TCPTuple, dir uint8 // all ok, go to next level stream.message.tcpTuple = *tcptuple stream.message.direction = dir - stream.message.cmdlineTuple = procs.ProcWatcher.FindProcessesTuple(tcptuple.IPPort()) + stream.message.cmdlineTuple = procs.ProcWatcher.FindProcessesTupleTCP(tcptuple.IPPort()) if stream.message.frameSize == 0 { stream.message.frameSize = uint32(stream.parseOffset - stream.message.start) } diff --git a/packetbeat/protos/tls/tls.go b/packetbeat/protos/tls/tls.go index 33f0494e355..5e0ea77452f 100644 --- a/packetbeat/protos/tls/tls.go +++ b/packetbeat/protos/tls/tls.go @@ -161,7 +161,7 @@ func (plugin *tlsPlugin) doParse( st := conn.streams[dir] if st == nil { st = newStream(tcptuple) - st.cmdlineTuple = procs.ProcWatcher.FindProcessesTuple(tcptuple.IPPort()) + st.cmdlineTuple = procs.ProcWatcher.FindProcessesTupleTCP(tcptuple.IPPort()) conn.streams[dir] = st }