diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index 5ab427f7247..44004e04687 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -62,6 +62,8 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha1...master[Check the HEAD d *Packetbeat* +- Fix issue with process monitor associating traffic to the wrong process. {issue}9151[9151] {pull}9443[9443] + *Winlogbeat* *Functionbeat* diff --git a/packetbeat/procs/procs.go b/packetbeat/procs/procs.go index 649155dfa79..0b33cae0e8f 100644 --- a/packetbeat/procs/procs.go +++ b/packetbeat/procs/procs.go @@ -33,10 +33,20 @@ import ( // a PID being recycled by the OS const processCacheExpiration = time.Second * 30 +var ( + anyIPv4 = net.IPv4zero.String() + anyIPv6 = net.IPv6unspecified.String() +) + +type endpoint struct { + address string + port uint16 +} + type portProcMapping struct { - port uint16 - pid int - proc *process + endpoint endpoint + pid int + proc *process } type process struct { @@ -50,7 +60,7 @@ type process struct { type processWatcherImpl interface { // GetLocalPortToPIDMapping returns the list of local port numbers and the PID // that owns them. - GetLocalPortToPIDMapping(transport applayer.Transport) (ports map[uint16]int, err error) + GetLocalPortToPIDMapping(transport applayer.Transport) (ports map[endpoint]int, err error) // GetProcessCommandLine returns the command line for a given process. GetProcessCommandLine(pid int) string // GetLocalIPs returns the list of local addresses. @@ -58,7 +68,7 @@ type processWatcherImpl interface { } type ProcessesWatcher struct { - portProcMap map[applayer.Transport]map[uint16]portProcMapping + portProcMap map[applayer.Transport]map[endpoint]portProcMapping localAddrs []net.IP processCache map[int]*process @@ -77,9 +87,9 @@ func (proc *ProcessesWatcher) Init(config ProcsConfig) error { func (proc *ProcessesWatcher) initWithImpl(config ProcsConfig, impl processWatcherImpl) error { proc.impl = impl - proc.portProcMap = map[applayer.Transport]map[uint16]portProcMapping{ - applayer.TransportUDP: make(map[uint16]portProcMapping), - applayer.TransportTCP: make(map[uint16]portProcMapping), + proc.portProcMap = map[applayer.Transport]map[endpoint]portProcMapping{ + applayer.TransportUDP: make(map[endpoint]portProcMapping), + applayer.TransportTCP: make(map[endpoint]portProcMapping), } proc.processCache = make(map[int]*process) @@ -126,25 +136,29 @@ func (proc *ProcessesWatcher) FindProcessesTuple(tuple *common.IPPortTuple, tran } if proc.isLocalIP(tuple.SrcIP) { - if p := proc.findProc(tuple.SrcPort, transport); p != nil { + if p := proc.findProc(tuple.SrcIP, tuple.SrcPort, transport); p != nil { procTuple.Src = []byte(p.name) procTuple.SrcCommand = []byte(p.commandLine) - logp.Debug("procs", "Found process '%s' (%s) for port %d/%s", p.commandLine, p.name, tuple.SrcPort, transport) + if logp.IsDebug("procs") { + logp.Debug("procs", "Found process '%s' (%s) for %s:%d/%s", p.commandLine, p.name, tuple.SrcIP, tuple.SrcPort, transport) + } } } if proc.isLocalIP(tuple.DstIP) { - if p := proc.findProc(tuple.DstPort, transport); p != nil { + if p := proc.findProc(tuple.DstIP, tuple.DstPort, transport); p != nil { procTuple.Dst = []byte(p.name) procTuple.DstCommand = []byte(p.commandLine) - logp.Debug("procs", "Found process '%s' (%s) for port %d/%s", p.commandLine, p.name, tuple.DstPort, transport) + if logp.IsDebug("procs") { + logp.Debug("procs", "Found process '%s' (%s) for %s:%d/%s", p.commandLine, p.name, tuple.DstIP, tuple.DstPort, transport) + } } } return } -func (proc *ProcessesWatcher) findProc(port uint16, transport applayer.Transport) *process { +func (proc *ProcessesWatcher) findProc(address net.IP, port uint16, transport applayer.Transport) *process { defer logp.Recover("FindProc exception") procMap, ok := proc.portProcMap[transport] @@ -152,14 +166,14 @@ func (proc *ProcessesWatcher) findProc(port uint16, transport applayer.Transport return nil } - p, exists := procMap[port] + p, exists := lookupMapping(address, port, procMap) if exists { return p.proc } proc.updateMap(transport) - p, exists = procMap[port] + p, exists = lookupMapping(address, port, procMap) if exists { return p.proc } @@ -167,6 +181,24 @@ func (proc *ProcessesWatcher) findProc(port uint16, transport applayer.Transport return nil } +func lookupMapping(address net.IP, port uint16, procMap map[endpoint]portProcMapping) (p portProcMapping, found bool) { + // Precedence when one socket is bound to a specific IP:port and another one + // to INADDR_ANY and same port is not clear. Seems that the last one to bind + // takes precedence, and we don't have a way to tell. + // This function takes the naive approach of giving precedence to the more + // specific address and then to INADDR_ANY. + if p, found = procMap[endpoint{address.String(), port}]; found { + return + } + + nullAddr := anyIPv4 + if asIPv4 := address.To4(); asIPv4 == nil { + nullAddr = anyIPv6 + } + p, found = procMap[endpoint{nullAddr, port}] + return +} + func (proc *ProcessesWatcher) updateMap(transport applayer.Transport) { if logp.HasSelector("procsdetailed") { start := time.Now() @@ -175,20 +207,20 @@ func (proc *ProcessesWatcher) updateMap(transport applayer.Transport) { }() } - ports, err := proc.impl.GetLocalPortToPIDMapping(transport) + endpoints, err := proc.impl.GetLocalPortToPIDMapping(transport) if err != nil { logp.Err("unable to list local ports: %v", err) } proc.expireProcessCache() - for port, pid := range ports { - proc.updateMappingEntry(transport, port, pid) + for e, pid := range endpoints { + proc.updateMappingEntry(transport, e, pid) } } -func (proc *ProcessesWatcher) updateMappingEntry(transport applayer.Transport, port uint16, pid int) { - prev, ok := proc.portProcMap[transport][port] +func (proc *ProcessesWatcher) updateMappingEntry(transport applayer.Transport, e endpoint, pid int) { + prev, ok := proc.portProcMap[transport][e] if ok && prev.pid == pid { // This port->pid mapping already exists return @@ -203,10 +235,12 @@ func (proc *ProcessesWatcher) updateMappingEntry(transport applayer.Transport, p // We never expire entries from this map. Since there are 65k possible // ports, the size of the dict can be max 1.5 MB, which we consider // reasonable. - proc.portProcMap[transport][port] = portProcMapping{port: port, pid: pid, proc: p} + proc.portProcMap[transport][e] = portProcMapping{endpoint: e, pid: pid, proc: p} - logp.Debug("procsdetailed", "updateMappingEntry(): port=%d/%s pid=%d process='%s' name=%s", - port, transport, pid, p.commandLine, p.name) + if logp.IsDebug("procsdetailed") { + logp.Debug("procsdetailed", "updateMappingEntry(): local=%s:%d/%s pid=%d process='%s' name=%s", + e.address, e.port, transport, pid, p.commandLine, p.name) + } } func (proc *ProcessesWatcher) isLocalIP(ip net.IP) bool { diff --git a/packetbeat/procs/procs_linux.go b/packetbeat/procs/procs_linux.go index 907e9f42bca..e9a48ccfdad 100644 --- a/packetbeat/procs/procs_linux.go +++ b/packetbeat/procs/procs_linux.go @@ -53,7 +53,7 @@ var procFiles = map[applayer.Transport]struct { // GetLocalPortToPIDMapping returns the list of local port numbers and the PID // that owns them. -func (proc *ProcessesWatcher) GetLocalPortToPIDMapping(transport applayer.Transport) (ports map[uint16]int, err error) { +func (proc *ProcessesWatcher) GetLocalPortToPIDMapping(transport applayer.Transport) (ports map[endpoint]int, err error) { sourceFiles, ok := procFiles[transport] if !ok { return nil, fmt.Errorf("unsupported transport protocol id: %d", transport) @@ -81,7 +81,7 @@ func (proc *ProcessesWatcher) GetLocalPortToPIDMapping(transport applayer.Transp socksMap[s.inode] = s } - ports = make(map[uint16]int) + ports = make(map[endpoint]int) for _, pid := range pids.List { inodes, err := findSocketsOfPid("", pid) if err != nil { @@ -91,7 +91,7 @@ func (proc *ProcessesWatcher) GetLocalPortToPIDMapping(transport applayer.Transp for _, inode := range inodes { if sockInfo, exists := socksMap[inode]; exists { - ports[sockInfo.srcPort] = pid + ports[endpoint{address: sockInfo.srcIP.String(), port: sockInfo.srcPort}] = pid } } } diff --git a/packetbeat/procs/procs_other.go b/packetbeat/procs/procs_other.go index b91a132adbe..50c7d1e100f 100644 --- a/packetbeat/procs/procs_other.go +++ b/packetbeat/procs/procs_other.go @@ -23,6 +23,6 @@ import "github.com/elastic/beats/packetbeat/protos/applayer" // GetLocalPortToPIDMapping returns the list of local port numbers and the PID // that owns them. -func (proc *ProcessesWatcher) GetLocalPortToPIDMapping(transport applayer.Transport) (ports map[uint16]int, err error) { +func (proc *ProcessesWatcher) GetLocalPortToPIDMapping(transport applayer.Transport) (ports map[endpoint]int, err error) { return nil, nil } diff --git a/packetbeat/procs/procs_test.go b/packetbeat/procs/procs_test.go index 5278730480e..9079843ec34 100644 --- a/packetbeat/procs/procs_test.go +++ b/packetbeat/procs/procs_test.go @@ -20,37 +20,35 @@ package procs import ( - "fmt" "net" "testing" "github.com/stretchr/testify/assert" - "github.com/elastic/beats/packetbeat/protos/applayer" - "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/logp" + "github.com/elastic/beats/packetbeat/protos/applayer" ) type testingImpl struct { localIPs []net.IP - portToPID map[applayer.Transport]map[uint16]int + portToPID map[applayer.Transport]map[endpoint]int pidToCmdline map[int]string } type runningProcess struct { cmdline string pid int - ports []uint16 + ports []endpoint proto applayer.Transport } func newTestingImpl(localIPs []net.IP, processes []runningProcess) *testingImpl { impl := &testingImpl{ localIPs: localIPs, - portToPID: map[applayer.Transport]map[uint16]int{ - applayer.TransportTCP: make(map[uint16]int), - applayer.TransportUDP: make(map[uint16]int), + portToPID: map[applayer.Transport]map[endpoint]int{ + applayer.TransportTCP: make(map[endpoint]int), + applayer.TransportUDP: make(map[endpoint]int), }, pidToCmdline: make(map[int]string), } @@ -63,7 +61,7 @@ func newTestingImpl(localIPs []net.IP, processes []runningProcess) *testingImpl return impl } -func (impl *testingImpl) GetLocalPortToPIDMapping(transport applayer.Transport) (ports map[uint16]int, err error) { +func (impl *testingImpl) GetLocalPortToPIDMapping(transport applayer.Transport) (ports map[endpoint]int, err error) { return impl.portToPID[transport], nil } @@ -90,40 +88,76 @@ func TestFindProcessTuple(t *testing.T) { } impl := newTestingImpl( []net.IP{ + net.ParseIP("127.0.0.1"), net.ParseIP("192.168.1.1"), net.ParseIP("7777::33"), }, []runningProcess{ + { + cmdline: "/usr/bin/mylocal_service", + pid: 9997, + ports: []endpoint{ + {address: "127.0.0.1", port: 38842}, + }, + proto: applayer.TransportTCP, + }, + { + cmdline: "/usr/local/bin/myexternal_service", + pid: 9998, + ports: []endpoint{ + {address: "192.168.1.1", port: 38842}, + }, + proto: applayer.TransportTCP, + }, + { + cmdline: "/opt/someapp/ipv6_only_app", + pid: 9999, + ports: []endpoint{ + {address: anyIPv6, port: 38842}, + }, + proto: applayer.TransportTCP, + }, { cmdline: "curl -o /dev/null http://example.net/", pid: 101, - ports: []uint16{65535}, - proto: applayer.TransportTCP, + ports: []endpoint{ + {address: anyIPv4, port: 65535}, + }, + proto: applayer.TransportTCP, }, { cmdline: "/usr/X11/bin/webbrowser", pid: 102, - ports: []uint16{3201, 3202, 3203}, - proto: applayer.TransportTCP, + ports: []endpoint{ + {anyIPv4, 3201}, + {anyIPv6, 3201}, + {anyIPv4, 3202}, + {anyIPv4, 3203}, + }, + proto: applayer.TransportTCP, }, { cmdline: "nc -v -l -p 80", pid: 105, - ports: []uint16{80}, - proto: applayer.TransportTCP, + ports: []endpoint{ + {anyIPv4, 80}, + }, + proto: applayer.TransportTCP, }, { cmdline: "bind", pid: 333, - ports: []uint16{53}, - proto: applayer.TransportUDP, + ports: []endpoint{ + {anyIPv6, 53}, + }, + proto: applayer.TransportUDP, }, }) procs := ProcessesWatcher{} err := procs.initWithImpl(config, impl) assert.NoError(t, err) - for idx, testCase := range []struct { + for _, testCase := range []struct { name string srcIP, dstIP, src, dst, srcCmd, dstCmd string srcPort, dstPort int @@ -183,7 +217,7 @@ func TestFindProcessTuple(t *testing.T) { preAction: func() { // add a new running process impl.pidToCmdline[555] = "/usr/bin/nmap -sT -P443 10.0.0.0/8" - impl.portToPID[applayer.TransportTCP][55555] = 555 + impl.portToPID[applayer.TransportTCP][endpoint{anyIPv6, 55555}] = 555 }, proto: applayer.TransportTCP, srcIP: "7777::33", srcPort: 55555, @@ -199,28 +233,52 @@ func TestFindProcessTuple(t *testing.T) { src: "", srcCmd: "", dst: "", dstCmd: "bind", }, + { + name: "Local bound port", + proto: applayer.TransportTCP, + srcIP: "127.0.0.1", srcPort: 38841, + dstIP: "127.0.0.1", dstPort: 38842, + src: "", srcCmd: "", + dst: "", dstCmd: "/usr/bin/mylocal_service", + }, + { + name: "Network bound port", + proto: applayer.TransportTCP, + srcIP: "192.168.255.37", srcPort: 65535, + dstIP: "192.168.1.1", dstPort: 38842, + src: "", srcCmd: "", + dst: "", dstCmd: "/usr/local/bin/myexternal_service", + }, + { + name: "IPv6 bound port", + proto: applayer.TransportTCP, + srcIP: "7fff::11", srcPort: 38842, + dstIP: "7777::33", dstPort: 38842, + src: "", srcCmd: "", + dst: "", dstCmd: "/opt/someapp/ipv6_only_app", + }, } { - msg := fmt.Sprintf("test case #%d: %s", idx+1, testCase.name) - - if testCase.preAction != nil { - testCase.preAction() - } - input := common.IPPortTuple{ - BaseTuple: common.BaseTuple{ - SrcIP: net.ParseIP(testCase.srcIP), - SrcPort: uint16(testCase.srcPort), - DstIP: net.ParseIP(testCase.dstIP), - DstPort: uint16(testCase.dstPort), - }, - } - result := procs.FindProcessesTuple(&input, testCase.proto) - // nil result is not valid - assert.NotNil(t, result, msg) + t.Run(testCase.name, func(t *testing.T) { + if testCase.preAction != nil { + testCase.preAction() + } + input := common.IPPortTuple{ + BaseTuple: common.BaseTuple{ + SrcIP: net.ParseIP(testCase.srcIP), + SrcPort: uint16(testCase.srcPort), + DstIP: net.ParseIP(testCase.dstIP), + DstPort: uint16(testCase.dstPort), + }, + } + result := procs.FindProcessesTuple(&input, testCase.proto) + // nil result is not valid + assert.NotNil(t, result) - assert.Equal(t, testCase.src, string(result.Src), msg) - assert.Equal(t, testCase.dst, string(result.Dst), msg) - assert.Equal(t, testCase.srcCmd, string(result.SrcCommand), msg) - assert.Equal(t, testCase.dstCmd, string(result.DstCommand), msg) + assert.Equal(t, testCase.src, string(result.Src)) + assert.Equal(t, testCase.dst, string(result.Dst)) + assert.Equal(t, testCase.srcCmd, string(result.SrcCommand)) + assert.Equal(t, testCase.dstCmd, string(result.DstCommand)) + }) } } diff --git a/packetbeat/procs/procs_windows.go b/packetbeat/procs/procs_windows.go index 0970b52f51c..87919d18af9 100644 --- a/packetbeat/procs/procs_windows.go +++ b/packetbeat/procs/procs_windows.go @@ -23,6 +23,7 @@ import ( "encoding/binary" "errors" "fmt" + "net" "syscall" "unsafe" @@ -31,6 +32,8 @@ import ( "github.com/elastic/beats/packetbeat/protos/applayer" ) +var machineEndiannes = getMachineEndiannes() + type extractor interface { // Extract extracts useful information from the pointed-to structure Extract(unsafe.Pointer) @@ -38,7 +41,7 @@ type extractor interface { Size() int } -type callbackFn func(uint16, int) +type callbackFn func(net.IP, uint16, int) type extractorFactory func(fn callbackFn) extractor type tcpRowOwnerPIDExtractor callbackFn @@ -64,17 +67,17 @@ var tablesByTransport = map[applayer.Transport][]struct { // GetLocalPortToPIDMapping returns the list of local port numbers and the PID // that owns them. -func (proc *ProcessesWatcher) GetLocalPortToPIDMapping(transport applayer.Transport) (ports map[uint16]int, err error) { +func (proc *ProcessesWatcher) GetLocalPortToPIDMapping(transport applayer.Transport) (ports map[endpoint]int, err error) { tables, ok := tablesByTransport[transport] if !ok { return nil, fmt.Errorf("unsupported transport protocol id: %d", transport) } - storeResults := func(localPort uint16, pid int) { - ports[localPort] = pid + storeResults := func(localIP net.IP, localPort uint16, pid int) { + ports[endpoint{address: localIP.String(), port: localPort}] = pid } - ports = make(map[uint16]int) + ports = make(map[endpoint]int) for _, table := range tables { data, err := getNetTable(table.function, false, table.family, table.class) if err != nil { @@ -147,7 +150,7 @@ func extractUDP6RowOwnerPID(fn callbackFn) extractor { // Extract will parse a row of Size() bytes pointed to by ptr func (e tcpRowOwnerPIDExtractor) Extract(ptr unsafe.Pointer) { row := (*TCPRowOwnerPID)(ptr) - e(uint32FieldToPort(row.localPort), int(row.owningPID)) + e(addressIPv4(row.localAddr), uint32FieldToPort(row.localPort), int(row.owningPID)) } // Size returns the size of a table row @@ -158,7 +161,7 @@ func (tcpRowOwnerPIDExtractor) Size() int { // Extract will parse a row of Size() bytes pointed to by ptr func (e tcp6RowOwnerPIDExtractor) Extract(ptr unsafe.Pointer) { row := (*TCP6RowOwnerPID)(ptr) - e(uint32FieldToPort(row.localPort), int(row.owningPID)) + e(addressIPv6(row.localAddr), uint32FieldToPort(row.localPort), int(row.owningPID)) } // Size returns the size of a table row @@ -169,7 +172,7 @@ func (tcp6RowOwnerPIDExtractor) Size() int { // Extract will parse a row of Size() bytes pointed to by ptr func (e udpRowOwnerPIDExtractor) Extract(ptr unsafe.Pointer) { row := (*UDPRowOwnerPID)(ptr) - e(uint32FieldToPort(row.localPort), int(row.owningPID)) + e(addressIPv4(row.localAddr), uint32FieldToPort(row.localPort), int(row.owningPID)) } // Size returns the size of a table row @@ -180,10 +183,29 @@ func (udpRowOwnerPIDExtractor) Size() int { // Extract will parse a row of Size() bytes pointed to by ptr func (e udp6RowOwnerPIDExtractor) Extract(ptr unsafe.Pointer) { row := (*UDP6RowOwnerPID)(ptr) - e(uint32FieldToPort(row.localPort), int(row.owningPID)) + e(addressIPv6(row.localAddr), uint32FieldToPort(row.localPort), int(row.owningPID)) } // Size returns the size of a table row func (udp6RowOwnerPIDExtractor) Size() int { return int(unsafe.Sizeof(UDP6RowOwnerPID{})) } + +func addressIPv4(value uint32) net.IP { + address := make([]byte, 4) + machineEndiannes.PutUint32(address, value) + return net.IP(address) +} + +func addressIPv6(s [16]byte) net.IP { + return net.IP(s[:]) +} + +func getMachineEndiannes() binary.ByteOrder { + var buf [2]byte + *(*uint16)(unsafe.Pointer(&buf[0])) = 1 + if buf[0] == 1 { + return binary.LittleEndian + } + return binary.BigEndian +} diff --git a/packetbeat/procs/procs_windows_test.go b/packetbeat/procs/procs_windows_test.go index e05220c8062..532d430dd04 100644 --- a/packetbeat/procs/procs_windows_test.go +++ b/packetbeat/procs/procs_windows_test.go @@ -22,6 +22,7 @@ package procs import ( "encoding/hex" "fmt" + "net" "testing" "unsafe" @@ -51,7 +52,7 @@ func TestParseTableRaw(t *testing.T) { "01000000" + "77777777AAAAAAAA12340000BBBBBBBBFFFF0000CCCCCCCC", []portProcMapping{ - {port: 0x1234, pid: 0xCCCCCCCC}, + {endpoint: endpoint{address: "170.170.170.170", port: 0x1234}, pid: 0xCCCCCCCC}, }, false}, {"Two entries (IPv6)", IPv6, "02000000" + @@ -71,16 +72,16 @@ func TestParseTableRaw(t *testing.T) { "FFFF0000" + // pid "", []portProcMapping{ - {port: 0xABCD, pid: 1}, - {port: 0, pid: 0xffff}, + {endpoint: endpoint{address: "1111:2222:3333:4444:5555:6666:7777:8888", port: 0xABCD}, pid: 1}, + {endpoint: endpoint{address: "aaaa:aaaa:aaaa:aaaa:aaaa:aaaa:aaaa:aaaa", port: 0}, pid: 0xffff}, }, false}, } { msg := fmt.Sprintf("Test case #%d: %s", idx+1, testCase.name) table, err := hex.DecodeString(testCase.raw) assert.NoError(t, err, msg) var result []portProcMapping - callback := func(port uint16, pid int) { - result = append(result, portProcMapping{port: port, pid: pid}) + callback := func(ip net.IP, port uint16, pid int) { + result = append(result, portProcMapping{endpoint: endpoint{ip.String(), port}, pid: pid}) } err = parseTable(table, testCase.factory(callback)) if testCase.mustErr {