diff --git a/topbeat/beater/topbeat.go b/topbeat/beater/topbeat.go index d69923ae2f7..701d2cbf125 100644 --- a/topbeat/beater/topbeat.go +++ b/topbeat/beater/topbeat.go @@ -2,37 +2,35 @@ package beater import ( "errors" - "regexp" - "strconv" "time" "github.com/elastic/beats/libbeat/beat" "github.com/elastic/beats/libbeat/cfgfile" - "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/logp" "github.com/elastic/beats/libbeat/publisher" "github.com/elastic/beats/topbeat/system" ) type Topbeat struct { - period time.Duration - procs []string - procsMap system.ProcsMap - lastCpuTimes *system.CpuTimes - lastCpuTimesList []system.CpuTimes - TbConfig ConfigSettings - events publisher.Client + period time.Duration - sysStats bool - procStats bool - fsStats bool - cpuPerCore bool + TbConfig ConfigSettings + events publisher.Client + + sysStats bool + fsStats bool + + cpu *system.CPU + procStats *system.ProcStats done chan struct{} } func New() *Topbeat { - return &Topbeat{} + return &Topbeat{ + cpu: &system.CPU{}, + procStats: &system.ProcStats{}, + } } func (tb *Topbeat) Config(b *beat.Beat) error { @@ -49,9 +47,9 @@ func (tb *Topbeat) Config(b *beat.Beat) error { tb.period = 10 * time.Second } if tb.TbConfig.Input.Procs != nil { - tb.procs = *tb.TbConfig.Input.Procs + tb.procStats.Procs = *tb.TbConfig.Input.Procs } else { - tb.procs = []string{".*"} //all processes + tb.procStats.Procs = []string{".*"} //all processes } if tb.TbConfig.Input.Stats.System != nil { @@ -60,9 +58,9 @@ func (tb *Topbeat) Config(b *beat.Beat) error { tb.sysStats = true } if tb.TbConfig.Input.Stats.Proc != nil { - tb.procStats = *tb.TbConfig.Input.Stats.Proc + tb.procStats.ProcStats = *tb.TbConfig.Input.Stats.Proc } else { - tb.procStats = true + tb.procStats.ProcStats = true } if tb.TbConfig.Input.Stats.Filesystem != nil { tb.fsStats = *tb.TbConfig.Input.Stats.Filesystem @@ -70,22 +68,22 @@ func (tb *Topbeat) Config(b *beat.Beat) error { tb.fsStats = true } if tb.TbConfig.Input.Stats.CpuPerCore != nil { - tb.cpuPerCore = *tb.TbConfig.Input.Stats.CpuPerCore + tb.cpu.CpuPerCore = *tb.TbConfig.Input.Stats.CpuPerCore } else { - tb.cpuPerCore = false + tb.cpu.CpuPerCore = false } - if !tb.sysStats && !tb.procStats && !tb.fsStats { + if !tb.sysStats && !tb.procStats.ProcStats && !tb.fsStats { return errors.New("Invalid statistics configuration") } logp.Debug("topbeat", "Init topbeat") - logp.Debug("topbeat", "Follow processes %q\n", tb.procs) + logp.Debug("topbeat", "Follow processes %q\n", tb.procStats.Procs) logp.Debug("topbeat", "Period %v\n", tb.period) logp.Debug("topbeat", "System statistics %t\n", tb.sysStats) - logp.Debug("topbeat", "Process statistics %t\n", tb.procStats) + logp.Debug("topbeat", "Process statistics %t\n", tb.procStats.ProcStats) logp.Debug("topbeat", "File system statistics %t\n", tb.fsStats) - logp.Debug("topbeat", "Cpu usage per core %t\n", tb.cpuPerCore) + logp.Debug("topbeat", "Cpu usage per core %t\n", tb.cpu.CpuPerCore) return nil } @@ -99,7 +97,7 @@ func (tb *Topbeat) Setup(b *beat.Beat) error { func (t *Topbeat) Run(b *beat.Beat) error { var err error - t.initProcStats() + t.procStats.InitProcStats() ticker := time.NewTicker(t.period) defer ticker.Stop() @@ -114,25 +112,29 @@ func (t *Topbeat) Run(b *beat.Beat) error { timerStart := time.Now() if t.sysStats { - err = t.exportSystemStats() + event, err := t.cpu.GetSystemStats() if err != nil { logp.Err("Error reading system stats: %v", err) break } + t.events.PublishEvent(event) } - if t.procStats { - err = t.exportProcStats() + if t.procStats.ProcStats { + events, err := t.procStats.GetProcStats() if err != nil { logp.Err("Error reading proc stats: %v", err) break } + t.events.PublishEvents(events) } if t.fsStats { - err = t.exportFileSystemStats() + events, err := system.GetFileSystemStats() if err != nil { logp.Err("Error reading fs stats: %v", err) break } + t.events.PublishEvents(events) + } timerEnd := time.Now() @@ -152,165 +154,3 @@ func (tb *Topbeat) Cleanup(b *beat.Beat) error { func (t *Topbeat) Stop() { close(t.done) } - -func (t *Topbeat) initProcStats() { - - t.procsMap = make(system.ProcsMap) - - if len(t.procs) == 0 { - return - } - - pids, err := system.Pids() - if err != nil { - logp.Warn("Getting the initial list of pids: %v", err) - } - - for _, pid := range pids { - process, err := system.GetProcess(pid, "") - if err != nil { - logp.Debug("topbeat", "Skip process pid=%d: %v", pid, err) - continue - } - t.procsMap[process.Pid] = process - } -} - -func (t *Topbeat) exportProcStats() error { - - if len(t.procs) == 0 { - return nil - } - - pids, err := system.Pids() - if err != nil { - logp.Warn("Getting the list of pids: %v", err) - return err - } - - newProcs := make(system.ProcsMap, len(pids)) - for _, pid := range pids { - var cmdline string - if previousProc := t.procsMap[pid]; previousProc != nil { - cmdline = previousProc.CmdLine - } - - process, err := system.GetProcess(pid, cmdline) - if err != nil { - logp.Debug("topbeat", "Skip process pid=%d: %v", pid, err) - continue - } - - if t.MatchProcess(process.Name) { - - newProcs[process.Pid] = process - - last, ok := t.procsMap[process.Pid] - if ok { - t.procsMap[process.Pid] = process - } - proc := system.GetProcessEvent(process, last) - - event := common.MapStr{ - "@timestamp": common.Time(time.Now()), - "type": "process", - "count": 1, - "proc": proc, - } - - t.events.PublishEvent(event) - } - } - t.procsMap = newProcs - return nil -} - -func (t *Topbeat) exportSystemStats() error { - load_stat, err := system.GetSystemLoad() - if err != nil { - logp.Warn("Getting load statistics: %v", err) - return err - } - cpuStat, err := system.GetCpuTimes() - if err != nil { - logp.Warn("Getting cpu times: %v", err) - return err - } - - t.addCpuPercentage(cpuStat) - - memStat, err := system.GetMemory() - if err != nil { - logp.Warn("Getting memory details: %v", err) - return err - } - system.AddMemPercentage(memStat) - - swapStat, err := system.GetSwap() - if err != nil { - logp.Warn("Getting swap details: %v", err) - return err - } - system.AddSwapPercentage(swapStat) - - event := common.MapStr{ - "@timestamp": common.Time(time.Now()), - "type": "system", - "load": load_stat, - "count": 1, - "cpu": system.GetCpuStatEvent(cpuStat), - "mem": system.GetMemoryEvent(memStat), - "swap": system.GetSwapEvent(swapStat), - } - - if t.cpuPerCore { - - cpuCoreStat, err := system.GetCpuTimesList() - if err != nil { - logp.Warn("Getting cpu core times: %v", err) - return err - } - t.addCpuPercentageList(cpuCoreStat) - - cpus := common.MapStr{} - - for coreNumber, stat := range cpuCoreStat { - cpus["cpu"+strconv.Itoa(coreNumber)] = system.GetCpuStatEvent(&stat) - } - event["cpus"] = cpus - } - - t.events.PublishEvent(event) - - return nil -} - -func (t *Topbeat) exportFileSystemStats() error { - fss, err := system.GetFileSystemList() - if err != nil { - logp.Warn("Getting filesystem list: %v", err) - return err - } - - t.events.PublishEvents(system.CollectFileSystemStats(fss)) - return nil -} - -func (t *Topbeat) MatchProcess(name string) bool { - - for _, reg := range t.procs { - matched, _ := regexp.MatchString(reg, name) - if matched { - return true - } - } - return false -} - -func (t *Topbeat) addCpuPercentage(t2 *system.CpuTimes) { - t.lastCpuTimes = system.GetCpuPercentage(t.lastCpuTimes, t2) -} - -func (t *Topbeat) addCpuPercentageList(t2 []system.CpuTimes) { - t.lastCpuTimesList = system.GetCpuPercentageList(t.lastCpuTimesList, t2) -} diff --git a/topbeat/beater/topbeat_test.go b/topbeat/beater/topbeat_test.go index ba5cb19b401..222b97e0b07 100644 --- a/topbeat/beater/topbeat_test.go +++ b/topbeat/beater/topbeat_test.go @@ -1,160 +1,3 @@ // +build !integration package beater - -import ( - "testing" - "time" - - "github.com/elastic/beats/topbeat/system" - sigar "github.com/elastic/gosigar" - "github.com/stretchr/testify/assert" -) - -func TestMatchProcs(t *testing.T) { - - var beat = Topbeat{} - - beat.procs = []string{".*"} - assert.True(t, beat.MatchProcess("topbeat")) - - beat.procs = []string{"topbeat"} - assert.False(t, beat.MatchProcess("burn")) - - // match no processes - beat.procs = []string{"$^"} - assert.False(t, beat.MatchProcess("burn")) -} - -func TestMemPercentage(t *testing.T) { - - m := system.MemStat{ - Mem: sigar.Mem{ - Total: 7, - Used: 5, - Free: 2, - }, - } - system.AddMemPercentage(&m) - assert.Equal(t, m.UsedPercent, 0.71) - - m = system.MemStat{ - Mem: sigar.Mem{Total: 0}, - } - system.AddMemPercentage(&m) - assert.Equal(t, m.UsedPercent, 0.0) -} - -func TestActualMemPercentage(t *testing.T) { - - m := system.MemStat{ - Mem: sigar.Mem{ - Total: 7, - ActualUsed: 5, - ActualFree: 2, - }, - } - system.AddMemPercentage(&m) - assert.Equal(t, m.ActualUsedPercent, 0.71) - - m = system.MemStat{ - Mem: sigar.Mem{ - Total: 0, - }, - } - system.AddMemPercentage(&m) - assert.Equal(t, m.ActualUsedPercent, 0.0) -} - -func TestCpuPercentage(t *testing.T) { - - beat := Topbeat{} - - cpu1 := system.CpuTimes{ - Cpu: sigar.Cpu{ - User: 10855311, - Nice: 0, - Sys: 2021040, - Idle: 17657874, - Wait: 0, - Irq: 0, - SoftIrq: 0, - Stolen: 0, - }, - } - - beat.addCpuPercentage(&cpu1) - - assert.Equal(t, cpu1.UserPercent, 0.0) - assert.Equal(t, cpu1.SystemPercent, 0.0) - - cpu2 := system.CpuTimes{ - Cpu: sigar.Cpu{ - User: 10855693, - Nice: 0, - Sys: 2021058, - Idle: 17657876, - Wait: 0, - Irq: 0, - SoftIrq: 0, - Stolen: 0, - }, - } - - beat.addCpuPercentage(&cpu2) - - assert.Equal(t, cpu2.UserPercent, 0.9502) - assert.Equal(t, cpu2.SystemPercent, 0.0448) -} - -func TestProcMemPercentage(t *testing.T) { - - beat := Topbeat{} - - p := system.Process{ - Pid: 3456, - Mem: sigar.ProcMem{ - Resident: 1416, - Size: 145164088, - }, - } - - beat.procsMap = make(system.ProcsMap) - beat.procsMap[p.Pid] = &p - - rssPercent := system.GetProcMemPercentage(&p, 10000) - assert.Equal(t, rssPercent, 0.14) -} - -func TestProcCpuPercentage(t *testing.T) { - - beat := Topbeat{} - - ctime := time.Now() - - p2 := system.Process{ - Pid: 3545, - Cpu: sigar.ProcTime{ - User: 14794, - Sys: 47, - Total: 14841, - }, - Ctime: ctime, - } - - p1 := system.Process{ - Pid: 3545, - Cpu: sigar.ProcTime{ - User: 11345, - Sys: 37, - Total: 11382, - }, - Ctime: ctime.Add(-1 * time.Second), - } - - beat.procsMap = make(system.ProcsMap) - beat.procsMap[p1.Pid] = &p1 - - totalPercent := system.GetProcCpuPercentage(&p1, &p2) - assert.Equal(t, totalPercent, 3.459) -} diff --git a/topbeat/system/cpu.go b/topbeat/system/cpu.go index a041626a10f..2e79378f704 100644 --- a/topbeat/system/cpu.go +++ b/topbeat/system/cpu.go @@ -2,9 +2,19 @@ package system import ( "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/libbeat/logp" sigar "github.com/elastic/gosigar" + "strconv" + "time" ) +type CPU struct { + CpuPerCore bool + + LastCpuTimes *CpuTimes + LastCpuTimesList []CpuTimes +} + type CpuTimes struct { sigar.Cpu UserPercent float64 `json:"user_p"` @@ -97,3 +107,69 @@ func GetCpuStatEvent(cpuStat *CpuTimes) common.MapStr { "system_p": cpuStat.SystemPercent, } } + +func (cpu *CPU) AddCpuPercentage(t2 *CpuTimes) { + cpu.LastCpuTimes = GetCpuPercentage(cpu.LastCpuTimes, t2) +} + +func (cpu *CPU) AddCpuPercentageList(t2 []CpuTimes) { + cpu.LastCpuTimesList = GetCpuPercentageList(cpu.LastCpuTimesList, t2) +} + +func (cpu *CPU) GetSystemStats() (common.MapStr, error) { + loadStat, err := GetSystemLoad() + if err != nil { + logp.Warn("Getting load statistics: %v", err) + return nil, err + } + cpuStat, err := GetCpuTimes() + if err != nil { + logp.Warn("Getting cpu times: %v", err) + return nil, err + } + + cpu.AddCpuPercentage(cpuStat) + + memStat, err := GetMemory() + if err != nil { + logp.Warn("Getting memory details: %v", err) + return nil, err + } + AddMemPercentage(memStat) + + swapStat, err := GetSwap() + if err != nil { + logp.Warn("Getting swap details: %v", err) + return nil, err + } + AddSwapPercentage(swapStat) + + event := common.MapStr{ + "@timestamp": common.Time(time.Now()), + "type": "system", + "count": 1, + "load": loadStat, + "cpu": GetCpuStatEvent(cpuStat), + "mem": GetMemoryEvent(memStat), + "swap": GetSwapEvent(swapStat), + } + + if cpu.CpuPerCore { + + cpuCoreStat, err := GetCpuTimesList() + if err != nil { + logp.Warn("Getting cpu core times: %v", err) + return nil, err + } + cpu.AddCpuPercentageList(cpuCoreStat) + + cpus := common.MapStr{} + + for coreNumber, stat := range cpuCoreStat { + cpus["cpu"+strconv.Itoa(coreNumber)] = GetCpuStatEvent(&stat) + } + event["cpus"] = cpus + } + + return event, nil +} diff --git a/topbeat/system/cpu_test.go b/topbeat/system/cpu_test.go index ee5fd7d4cb3..8ed0011d32b 100644 --- a/topbeat/system/cpu_test.go +++ b/topbeat/system/cpu_test.go @@ -5,6 +5,7 @@ package system import ( "testing" + "github.com/elastic/gosigar" "github.com/stretchr/testify/assert" ) @@ -19,3 +20,44 @@ func TestGetCpuTimes(t *testing.T) { assert.True(t, (cpu_stat.Sys > 0)) } + +func TestCpuPercentage(t *testing.T) { + + cpu := CPU{} + + cpu1 := CpuTimes{ + Cpu: gosigar.Cpu{ + User: 10855311, + Nice: 0, + Sys: 2021040, + Idle: 17657874, + Wait: 0, + Irq: 0, + SoftIrq: 0, + Stolen: 0, + }, + } + + cpu.AddCpuPercentage(&cpu1) + + assert.Equal(t, cpu1.UserPercent, 0.0) + assert.Equal(t, cpu1.SystemPercent, 0.0) + + cpu2 := CpuTimes{ + Cpu: gosigar.Cpu{ + User: 10855693, + Nice: 0, + Sys: 2021058, + Idle: 17657876, + Wait: 0, + Irq: 0, + SoftIrq: 0, + Stolen: 0, + }, + } + + cpu.AddCpuPercentage(&cpu2) + + assert.Equal(t, cpu2.UserPercent, 0.9502) + assert.Equal(t, cpu2.SystemPercent, 0.0448) +} diff --git a/topbeat/system/filesystem.go b/topbeat/system/filesystem.go index e13f46e2f6d..53ab5d35cd4 100644 --- a/topbeat/system/filesystem.go +++ b/topbeat/system/filesystem.go @@ -87,3 +87,13 @@ func GetFilesystemEvent(fsStat *FileSystemStat) common.MapStr { "used_p": fsStat.UsedPercent, } } + +func GetFileSystemStats() ([]common.MapStr, error) { + fss, err := GetFileSystemList() + if err != nil { + logp.Warn("Getting filesystem list: %v", err) + return nil, err + } + + return CollectFileSystemStats(fss), nil +} diff --git a/topbeat/system/process.go b/topbeat/system/process.go index 23ae78a9afb..373a6093fdf 100644 --- a/topbeat/system/process.go +++ b/topbeat/system/process.go @@ -2,6 +2,7 @@ package system import ( "fmt" + "regexp" "strings" "time" @@ -24,6 +25,12 @@ type Process struct { Ctime time.Time } +type ProcStats struct { + ProcStats bool + Procs []string + ProcsMap ProcsMap +} + func GetProcess(pid int, cmdline string) (*Process, error) { state := sigar.ProcState{} if err := state.Get(pid); err != nil { @@ -149,3 +156,88 @@ func GetProcCpuPercentage(last *Process, current *Process) float64 { } return 0 } + +func (procStats *ProcStats) MatchProcess(name string) bool { + + for _, reg := range procStats.Procs { + matched, _ := regexp.MatchString(reg, name) + if matched { + return true + } + } + return false +} + +func (procStats *ProcStats) InitProcStats() { + + procStats.ProcsMap = make(ProcsMap) + + if len(procStats.Procs) == 0 { + return + } + + pids, err := Pids() + if err != nil { + logp.Warn("Getting the initial list of pids: %v", err) + } + + for _, pid := range pids { + process, err := GetProcess(pid, "") + if err != nil { + logp.Debug("topbeat", "Skip process pid=%d: %v", pid, err) + continue + } + procStats.ProcsMap[process.Pid] = process + } +} + +func (procStats *ProcStats) GetProcStats() ([]common.MapStr, error) { + + if len(procStats.Procs) == 0 { + return nil, nil + } + + pids, err := Pids() + if err != nil { + logp.Warn("Getting the list of pids: %v", err) + return nil, err + } + + events := []common.MapStr{} + newProcs := make(ProcsMap, len(pids)) + for _, pid := range pids { + var cmdline string + if previousProc := procStats.ProcsMap[pid]; previousProc != nil { + cmdline = previousProc.CmdLine + } + + process, err := GetProcess(pid, cmdline) + if err != nil { + logp.Debug("topbeat", "Skip process pid=%d: %v", pid, err) + continue + } + + if procStats.MatchProcess(process.Name) { + + newProcs[process.Pid] = process + + last, ok := procStats.ProcsMap[process.Pid] + if ok { + procStats.ProcsMap[process.Pid] = process + } + proc := GetProcessEvent(process, last) + + event := common.MapStr{ + "@timestamp": common.Time(time.Now()), + "type": "process", + "count": 1, + "proc": proc, + } + + events = append(events, event) + } + } + + procStats.ProcsMap = newProcs + return events, nil +} diff --git a/topbeat/system/process_test.go b/topbeat/system/process_test.go index c33ff623d75..098d3f7c539 100644 --- a/topbeat/system/process_test.go +++ b/topbeat/system/process_test.go @@ -6,6 +6,7 @@ import ( "testing" "time" + "github.com/elastic/gosigar" "github.com/stretchr/testify/assert" ) @@ -67,6 +68,73 @@ func TestProcState(t *testing.T) { assert.Equal(t, getProcState('Z'), "zombie") } +func TestMatchProcs(t *testing.T) { + + var procStats = ProcStats{} + + procStats.Procs = []string{".*"} + assert.True(t, procStats.MatchProcess("topbeat")) + + procStats.Procs = []string{"topbeat"} + assert.False(t, procStats.MatchProcess("burn")) + + // match no processes + procStats.Procs = []string{"$^"} + assert.False(t, procStats.MatchProcess("burn")) +} + +func TestProcMemPercentage(t *testing.T) { + + procStats := ProcStats{} + + p := Process{ + Pid: 3456, + Mem: gosigar.ProcMem{ + Resident: 1416, + Size: 145164088, + }, + } + + procStats.ProcsMap = make(ProcsMap) + procStats.ProcsMap[p.Pid] = &p + + rssPercent := GetProcMemPercentage(&p, 10000) + assert.Equal(t, rssPercent, 0.14) +} + +func TestProcCpuPercentage(t *testing.T) { + + procStats := ProcStats{} + + ctime := time.Now() + + p2 := Process{ + Pid: 3545, + Cpu: gosigar.ProcTime{ + User: 14794, + Sys: 47, + Total: 14841, + }, + Ctime: ctime, + } + + p1 := Process{ + Pid: 3545, + Cpu: gosigar.ProcTime{ + User: 11345, + Sys: 37, + Total: 11382, + }, + Ctime: ctime.Add(-1 * time.Second), + } + + procStats.ProcsMap = make(ProcsMap) + procStats.ProcsMap[p1.Pid] = &p1 + + totalPercent := GetProcCpuPercentage(&p1, &p2) + assert.Equal(t, totalPercent, 3.459) +} + // BenchmarkGetProcess runs a benchmark of the GetProcess method with caching // of the command line arguments enabled. func BenchmarkGetProcess(b *testing.B) { diff --git a/topbeat/system/system_test.go b/topbeat/system/system_test.go index a594a196166..471c9b3991a 100644 --- a/topbeat/system/system_test.go +++ b/topbeat/system/system_test.go @@ -6,6 +6,7 @@ import ( "runtime" "testing" + "github.com/elastic/gosigar" "github.com/stretchr/testify/assert" ) @@ -53,3 +54,43 @@ func TestGetSwap(t *testing.T) { assert.True(t, (swap.Used >= 0)) assert.True(t, (swap.Free >= 0)) } + +func TestMemPercentage(t *testing.T) { + + m := MemStat{ + Mem: gosigar.Mem{ + Total: 7, + Used: 5, + Free: 2, + }, + } + AddMemPercentage(&m) + assert.Equal(t, m.UsedPercent, 0.71) + + m = MemStat{ + Mem: gosigar.Mem{Total: 0}, + } + AddMemPercentage(&m) + assert.Equal(t, m.UsedPercent, 0.0) +} + +func TestActualMemPercentage(t *testing.T) { + + m := MemStat{ + Mem: gosigar.Mem{ + Total: 7, + ActualUsed: 5, + ActualFree: 2, + }, + } + AddMemPercentage(&m) + assert.Equal(t, m.ActualUsedPercent, 0.71) + + m = MemStat{ + Mem: gosigar.Mem{ + Total: 0, + }, + } + AddMemPercentage(&m) + assert.Equal(t, m.ActualUsedPercent, 0.0) +}