From 12e846bf37917b721719d0fa63340966be5b2366 Mon Sep 17 00:00:00 2001 From: "Alex K." <8418476+fearful-symmetry@users.noreply.github.com> Date: Fri, 4 Oct 2024 13:17:40 -0700 Subject: [PATCH] Fix resource leak bug in auditbeat/socket (#41080) * fix resource leak bug in auditbeat/socket * linter.. * linter... --- .../auditbeat/module/system/socket/events.go | 24 ++---- .../auditbeat/module/system/socket/state.go | 19 ++++- .../module/system/socket/state_test.go | 73 +++++++++++++++++++ 3 files changed, 96 insertions(+), 20 deletions(-) diff --git a/x-pack/auditbeat/module/system/socket/events.go b/x-pack/auditbeat/module/system/socket/events.go index beb0a988a7c..fe390c0ea92 100644 --- a/x-pack/auditbeat/module/system/socket/events.go +++ b/x-pack/auditbeat/module/system/socket/events.go @@ -157,22 +157,6 @@ func (e *tcpConnectResult) Update(s *state) error { return fmt.Errorf("stored thread event has unexpected type %T", ev) } -var tcpStates = []string{ - "(zero)", - "TCP_ESTABLISHED", - "TCP_SYN_SENT", - "TCP_SYN_RECV", - "TCP_FIN_WAIT1", - "TCP_FIN_WAIT2", - "TCP_TIME_WAIT", - "TCP_CLOSE", - "TCP_CLOSE_WAIT", - "TCP_LAST_ACK", - "TCP_LISTEN", - "TCP_CLOSING", - "TCP_NEW_SYN_RECV", -} - type tcpAcceptResult struct { Meta tracing.Metadata `kprobe:"metadata"` Sock uintptr `kprobe:"sock"` @@ -894,7 +878,7 @@ func (e *execveCall) getProcess() *process { var err error p.path, err = filepath.EvalSymlinks(fmt.Sprintf("/proc/%d/exe", e.Meta.PID)) if err != nil { - if pe, ok := err.(*os.PathError); ok && strings.Contains(pe.Path, "(deleted)") { + if pe, ok := err.(*os.PathError); ok && strings.Contains(pe.Path, "(deleted)") { //nolint:errorlint // we're fetching the string body // Keep the deleted path from the PathError. p.path = pe.Path // Keep the basename in case we can't get the process name. @@ -1048,9 +1032,13 @@ func (e *doExit) String() string { // Update the state with the contents of this event. func (e *doExit) Update(s *state) (err error) { - // Only report exits of the main thread, a.k.a process exit + // Report exit of the main thread, + // or a TID that was originally reported by doFork. if e.Meta.PID == e.Meta.TID { err = s.TerminateProcess(e.Meta.PID) + + } else if e.Meta.PID != e.Meta.TID && s.processExists(e.Meta.TID) { + err = s.TerminateProcess(e.Meta.TID) } // Cleanup any saved thread state s.ThreadLeave(e.Meta.TID) diff --git a/x-pack/auditbeat/module/system/socket/state.go b/x-pack/auditbeat/module/system/socket/state.go index 19bb729a844..347c5385921 100644 --- a/x-pack/auditbeat/module/system/socket/state.go +++ b/x-pack/auditbeat/module/system/socket/state.go @@ -308,8 +308,12 @@ func (dt *dnsTracker) AddTransaction(tr dns.Transaction) { } } var list []dns.Transaction + var ok bool if prev := dt.transactionByClient.Get(clientAddr); prev != nil { - list = prev.([]dns.Transaction) + list, ok = prev.([]dns.Transaction) + if !ok { + return + } } list = append(list, tr) dt.transactionByClient.Put(clientAddr, list) @@ -332,7 +336,11 @@ func (dt *dnsTracker) RegisterEndpoint(addr net.UDPAddr, proc *process) { key := addr.String() dt.processByClient.Put(key, proc) if listIf := dt.transactionByClient.Get(key); listIf != nil { - list := listIf.([]dns.Transaction) + list, ok := listIf.([]dns.Transaction) + if !ok { + return + } + for _, tr := range list { proc.addTransaction(tr) } @@ -577,6 +585,13 @@ func (s *state) TerminateProcess(pid uint32) error { return nil } +func (s *state) processExists(pid uint32) bool { + s.Lock() + defer s.Unlock() + _, ok := s.processes[pid] + return ok +} + func (s *state) getProcess(pid uint32) *process { if pid == 0 { return &kernelProcess diff --git a/x-pack/auditbeat/module/system/socket/state_test.go b/x-pack/auditbeat/module/system/socket/state_test.go index 73c52c80af7..bb7aa3729a1 100644 --- a/x-pack/auditbeat/module/system/socket/state_test.go +++ b/x-pack/auditbeat/module/system/socket/state_test.go @@ -16,6 +16,7 @@ import ( "time" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "golang.org/x/sys/unix" "github.com/elastic/beats/v7/auditbeat/tracing" @@ -23,6 +24,7 @@ import ( "github.com/elastic/beats/v7/metricbeat/mb" "github.com/elastic/beats/v7/x-pack/auditbeat/module/system" "github.com/elastic/beats/v7/x-pack/auditbeat/module/system/socket/dns" + "github.com/elastic/elastic-agent-libs/logp" ) type logWrapper testing.T @@ -88,6 +90,76 @@ func makeTestingState(t *testing.T, inactiveTimeout, socketTimeout, closeTimeout return ts } +func TestDNSMemoryUsage(t *testing.T) { + logp.DevelopmentSetup() + rootPID := uint32(1234) + childThread1 := uint32(1235) + childThread2 := uint32(1236) + st := makeTestingState(t, time.Second, time.Second, 0, time.Second) + // construct a fake series of DNS events process events + err := st.OnDNSTransaction(dns.Transaction{ + Client: net.UDPAddr{IP: net.ParseIP("192.168.1.2"), Port: 34074}, + Server: net.UDPAddr{IP: net.ParseIP("192.168.1.53"), Port: 53}, + Domain: "example.com", + Addresses: []net.IP{net.ParseIP("10.10.10.10")}, + }) + require.NoError(t, err) + err = st.OnDNSTransaction(dns.Transaction{ + Client: net.UDPAddr{IP: net.ParseIP("192.168.1.2"), Port: 34074}, + Server: net.UDPAddr{IP: net.ParseIP("192.168.1.53"), Port: 53}, + Domain: "elastic.co", + Addresses: []net.IP{net.ParseIP("10.10.10.11")}, + }) + require.NoError(t, err) + events1 := []event{ + callExecve(meta(rootPID, rootPID, 1), []string{"/usr/bin/curl"}), + &execveRet{Meta: meta(rootPID, rootPID, 2), Retval: int32(rootPID)}, + &udpSendMsgCall{ + Meta: meta(rootPID, rootPID, 2), + LAddr: 33663168, + RAddr: 889301184, + LPort: 6789, // ports are in network byte order + RPort: 13568, + Size: 55, + SIPtr: uintptr(1), + SIAF: unix.AF_INET, + }, + &udpQueueRcvSkb{ + Meta: meta(rootPID, rootPID, 2), + LAddr: 33663168, + LPort: 6789, + Size: 55, + IPHdr: 1, + UDPHdr: 21, + Packet: [256]byte{ + // network body, starting with IP header + 0x00, + 0x40, 0x00, 0x00, 0x20, + 0x00, 0x01, 0x00, 0x00, + 0xff, 0x11, 0xff, 0xff, + 0xc0, 0xa8, 0x1, 0x35, + 0xc0, 0xa8, 0x1, 0x2, + 0x00, 0x35, + }, + }, &forkRet{Meta: meta(rootPID, rootPID, 2), Retval: int(childThread1)}, + &forkRet{Meta: meta(rootPID, rootPID, 2), Retval: int(childThread2)}, + + &doExit{Meta: meta(rootPID, childThread1, 2)}, + &doExit{Meta: meta(rootPID, childThread2, 2)}, + } + st.feedEvents(events1) + + // ensure that we only have one PID in the map, + // and that's the root pid + t.Logf("Got procs: %d", len(st.processes)) + require.Equal(t, 1, len(st.processes)) + require.NotNil(t, st.processes[rootPID]) + + // now close the final process, make sure we've cleaned up the final process + st.feedEvents([]event{&doExit{Meta: meta(rootPID, rootPID, 2)}}) + require.Equal(t, 0, len(st.processes)) +} + func TestTCPConnWithProcess(t *testing.T) { const ( localIP = "192.168.33.10" @@ -97,6 +169,7 @@ func TestTCPConnWithProcess(t *testing.T) { sock uintptr = 0xff1234 ) st := makeTestingState(t, time.Second, time.Second, 0, time.Second) + lPort, rPort := be16(localPort), be16(remotePort) lAddr, rAddr := ipv4(localIP), ipv4(remoteIP) evs := []event{