Skip to content

Commit

Permalink
Decouple Data Fetch part in Topbeat
Browse files Browse the repository at this point in the history
The data fetching and processing part was completely decoupled from Topbeat. This allows to reuse the functionality in metricbeat or other beats.

For the moment the variable functions stayed the same even though they could be simplified in a second step. The goal is to keep the changes as low as possible.
  • Loading branch information
ruflin committed Mar 21, 2016
1 parent e9eaddc commit dab070a
Show file tree
Hide file tree
Showing 8 changed files with 361 additions and 349 deletions.
224 changes: 32 additions & 192 deletions topbeat/beater/topbeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,37 +2,35 @@ package beater

import (
"errors"
"regexp"
"strconv"
"time"

"github.com/elastic/beats/libbeat/beat"
"github.com/elastic/beats/libbeat/cfgfile"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/libbeat/publisher"
"github.com/elastic/beats/topbeat/system"
)

type Topbeat struct {
period time.Duration
procs []string
procsMap system.ProcsMap
lastCpuTimes *system.CpuTimes
lastCpuTimesList []system.CpuTimes
TbConfig ConfigSettings
events publisher.Client
period time.Duration

sysStats bool
procStats bool
fsStats bool
cpuPerCore bool
TbConfig ConfigSettings
events publisher.Client

sysStats bool
fsStats bool

cpu *system.CPU
procStats *system.ProcStats

done chan struct{}
}

func New() *Topbeat {
return &Topbeat{}
return &Topbeat{
cpu: &system.CPU{},
procStats: &system.ProcStats{},
}
}

func (tb *Topbeat) Config(b *beat.Beat) error {
Expand All @@ -49,9 +47,9 @@ func (tb *Topbeat) Config(b *beat.Beat) error {
tb.period = 10 * time.Second
}
if tb.TbConfig.Input.Procs != nil {
tb.procs = *tb.TbConfig.Input.Procs
tb.procStats.Procs = *tb.TbConfig.Input.Procs
} else {
tb.procs = []string{".*"} //all processes
tb.procStats.Procs = []string{".*"} //all processes
}

if tb.TbConfig.Input.Stats.System != nil {
Expand All @@ -60,32 +58,32 @@ func (tb *Topbeat) Config(b *beat.Beat) error {
tb.sysStats = true
}
if tb.TbConfig.Input.Stats.Proc != nil {
tb.procStats = *tb.TbConfig.Input.Stats.Proc
tb.procStats.ProcStats = *tb.TbConfig.Input.Stats.Proc
} else {
tb.procStats = true
tb.procStats.ProcStats = true
}
if tb.TbConfig.Input.Stats.Filesystem != nil {
tb.fsStats = *tb.TbConfig.Input.Stats.Filesystem
} else {
tb.fsStats = true
}
if tb.TbConfig.Input.Stats.CpuPerCore != nil {
tb.cpuPerCore = *tb.TbConfig.Input.Stats.CpuPerCore
tb.cpu.CpuPerCore = *tb.TbConfig.Input.Stats.CpuPerCore
} else {
tb.cpuPerCore = false
tb.cpu.CpuPerCore = false
}

if !tb.sysStats && !tb.procStats && !tb.fsStats {
if !tb.sysStats && !tb.procStats.ProcStats && !tb.fsStats {
return errors.New("Invalid statistics configuration")
}

logp.Debug("topbeat", "Init topbeat")
logp.Debug("topbeat", "Follow processes %q\n", tb.procs)
logp.Debug("topbeat", "Follow processes %q\n", tb.procStats.Procs)
logp.Debug("topbeat", "Period %v\n", tb.period)
logp.Debug("topbeat", "System statistics %t\n", tb.sysStats)
logp.Debug("topbeat", "Process statistics %t\n", tb.procStats)
logp.Debug("topbeat", "Process statistics %t\n", tb.procStats.ProcStats)
logp.Debug("topbeat", "File system statistics %t\n", tb.fsStats)
logp.Debug("topbeat", "Cpu usage per core %t\n", tb.cpuPerCore)
logp.Debug("topbeat", "Cpu usage per core %t\n", tb.cpu.CpuPerCore)

return nil
}
Expand All @@ -99,7 +97,7 @@ func (tb *Topbeat) Setup(b *beat.Beat) error {
func (t *Topbeat) Run(b *beat.Beat) error {
var err error

t.initProcStats()
t.procStats.InitProcStats()

ticker := time.NewTicker(t.period)
defer ticker.Stop()
Expand All @@ -114,25 +112,29 @@ func (t *Topbeat) Run(b *beat.Beat) error {
timerStart := time.Now()

if t.sysStats {
err = t.exportSystemStats()
event, err := t.cpu.GetSystemStats()
if err != nil {
logp.Err("Error reading system stats: %v", err)
break
}
t.events.PublishEvent(event)
}
if t.procStats {
err = t.exportProcStats()
if t.procStats.ProcStats {
events, err := t.procStats.GetProcStats()
if err != nil {
logp.Err("Error reading proc stats: %v", err)
break
}
t.events.PublishEvents(events)
}
if t.fsStats {
err = t.exportFileSystemStats()
events, err := system.GetFileSystemStats()
if err != nil {
logp.Err("Error reading fs stats: %v", err)
break
}
t.events.PublishEvents(events)

}

timerEnd := time.Now()
Expand All @@ -152,165 +154,3 @@ func (tb *Topbeat) Cleanup(b *beat.Beat) error {
func (t *Topbeat) Stop() {
close(t.done)
}

func (t *Topbeat) initProcStats() {

t.procsMap = make(system.ProcsMap)

if len(t.procs) == 0 {
return
}

pids, err := system.Pids()
if err != nil {
logp.Warn("Getting the initial list of pids: %v", err)
}

for _, pid := range pids {
process, err := system.GetProcess(pid, "")
if err != nil {
logp.Debug("topbeat", "Skip process pid=%d: %v", pid, err)
continue
}
t.procsMap[process.Pid] = process
}
}

func (t *Topbeat) exportProcStats() error {

if len(t.procs) == 0 {
return nil
}

pids, err := system.Pids()
if err != nil {
logp.Warn("Getting the list of pids: %v", err)
return err
}

newProcs := make(system.ProcsMap, len(pids))
for _, pid := range pids {
var cmdline string
if previousProc := t.procsMap[pid]; previousProc != nil {
cmdline = previousProc.CmdLine
}

process, err := system.GetProcess(pid, cmdline)
if err != nil {
logp.Debug("topbeat", "Skip process pid=%d: %v", pid, err)
continue
}

if t.MatchProcess(process.Name) {

newProcs[process.Pid] = process

last, ok := t.procsMap[process.Pid]
if ok {
t.procsMap[process.Pid] = process
}
proc := system.GetProcessEvent(process, last)

event := common.MapStr{
"@timestamp": common.Time(time.Now()),
"type": "process",
"count": 1,
"proc": proc,
}

t.events.PublishEvent(event)
}
}
t.procsMap = newProcs
return nil
}

func (t *Topbeat) exportSystemStats() error {
load_stat, err := system.GetSystemLoad()
if err != nil {
logp.Warn("Getting load statistics: %v", err)
return err
}
cpuStat, err := system.GetCpuTimes()
if err != nil {
logp.Warn("Getting cpu times: %v", err)
return err
}

t.addCpuPercentage(cpuStat)

memStat, err := system.GetMemory()
if err != nil {
logp.Warn("Getting memory details: %v", err)
return err
}
system.AddMemPercentage(memStat)

swapStat, err := system.GetSwap()
if err != nil {
logp.Warn("Getting swap details: %v", err)
return err
}
system.AddSwapPercentage(swapStat)

event := common.MapStr{
"@timestamp": common.Time(time.Now()),
"type": "system",
"load": load_stat,
"count": 1,
"cpu": system.GetCpuStatEvent(cpuStat),
"mem": system.GetMemoryEvent(memStat),
"swap": system.GetSwapEvent(swapStat),
}

if t.cpuPerCore {

cpuCoreStat, err := system.GetCpuTimesList()
if err != nil {
logp.Warn("Getting cpu core times: %v", err)
return err
}
t.addCpuPercentageList(cpuCoreStat)

cpus := common.MapStr{}

for coreNumber, stat := range cpuCoreStat {
cpus["cpu"+strconv.Itoa(coreNumber)] = system.GetCpuStatEvent(&stat)
}
event["cpus"] = cpus
}

t.events.PublishEvent(event)

return nil
}

func (t *Topbeat) exportFileSystemStats() error {
fss, err := system.GetFileSystemList()
if err != nil {
logp.Warn("Getting filesystem list: %v", err)
return err
}

t.events.PublishEvents(system.CollectFileSystemStats(fss))
return nil
}

func (t *Topbeat) MatchProcess(name string) bool {

for _, reg := range t.procs {
matched, _ := regexp.MatchString(reg, name)
if matched {
return true
}
}
return false
}

func (t *Topbeat) addCpuPercentage(t2 *system.CpuTimes) {
t.lastCpuTimes = system.GetCpuPercentage(t.lastCpuTimes, t2)
}

func (t *Topbeat) addCpuPercentageList(t2 []system.CpuTimes) {
t.lastCpuTimesList = system.GetCpuPercentageList(t.lastCpuTimesList, t2)
}
Loading

0 comments on commit dab070a

Please sign in to comment.