Skip to content

Commit

Permalink
Cherry-pick #21457 to 7.x: Remove nil-zero metrics and linux-exclusiv…
Browse files Browse the repository at this point in the history
…e metrics from Metricbeat (#21597)

* Remove nil-zero metrics and linux-exclusive metrics from Metricbeat (#21457)

* refactor metricbeat to remove nil-zero metrics and linux-exclusive metrics

* update xpack docs

* fix non-linux diskstat builds

* fix linux test builds

* fix python tests

* move windows files for disk performance

* properly fix test_drop_fields

* try to fix different system test

* mage fmt

* fix windows filesystem tests

* fix platform test

* add changelog

(cherry picked from commit aed4831)

* fix fields
  • Loading branch information
fearful-symmetry authored Oct 6, 2020
1 parent 4dde16d commit bde8416
Show file tree
Hide file tree
Showing 56 changed files with 1,700 additions and 479 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ field. You can revert this change by configuring tags for the module and omittin
- Move service config under metrics and simplify metric types. {pull}18691[18691]
- Fix ECS compliance of user.id field in system/users metricset {pull}19019[19019]
- Rename googlecloud stackdriver metricset to metrics. {pull}19718[19718]
- Remove "invalid zero" metrics on Windows and Darwin, don't report linux-only memory and diskio metrics when running under agent. {pull}21457[21457]

*Packetbeat*

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,11 @@ import (
sigar "github.com/elastic/gosigar"
)

// mapping fields which output by `iostat -x` on linux
// IOMetric contains mapping fields which are outputed by `iostat -x` on linux
//
// Device: rrqm/s wrqm/s r/s w/s rsec/s wsec/s avgrq-sz avgqu-sz await r_await w_await svctm %util
// sda 0.06 0.78 0.09 0.27 9.42 8.06 48.64 0.00 1.34 0.99 1.45 0.77 0.03
type DiskIOMetric struct {
type IOMetric struct {
ReadRequestMergeCountPerSec float64 `json:"rrqmCps"`
WriteRequestMergeCountPerSec float64 `json:"wrqmCps"`
ReadRequestCountPerSec float64 `json:"rrqCps"`
Expand All @@ -46,8 +46,9 @@ type DiskIOMetric struct {
BusyPct float64 `json:"busy"`
}

type DiskIOStat struct {
// IOStat carries disk statistics for all devices
type IOStat struct {
lastDiskIOCounters map[string]disk.IOCountersStat
lastCpu sigar.Cpu
curCpu sigar.Cpu
lastCPU sigar.Cpu
curCPU sigar.Cpu
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ import (
"github.com/elastic/beats/v7/libbeat/metric/system/cpu"
)

func Get_CLK_TCK() uint32 {
// GetCLKTCK emulates the _SC_CLK_TCK syscall
func GetCLKTCK() uint32 {
// return uint32(C.sysconf(C._SC_CLK_TCK))
// NOTE: _SC_CLK_TCK should be fetched from sysconf using cgo
return uint32(100)
Expand All @@ -38,77 +39,78 @@ func IOCounters(names ...string) (map[string]disk.IOCountersStat, error) {
}

// NewDiskIOStat :init DiskIOStat object.
func NewDiskIOStat() *DiskIOStat {
return &DiskIOStat{
func NewDiskIOStat() *IOStat {
return &IOStat{
lastDiskIOCounters: map[string]disk.IOCountersStat{},
}
}

// OpenSampling creates current cpu sampling
// need call as soon as get IOCounters.
func (stat *DiskIOStat) OpenSampling() error {
return stat.curCpu.Get()
func (stat *IOStat) OpenSampling() error {
return stat.curCPU.Get()
}

// CalIOStatistics calculates IO statistics.
func (stat *DiskIOStat) CalIOStatistics(result *DiskIOMetric, counter disk.IOCountersStat) error {
// CalcIOStatistics calculates IO statistics.
func (stat *IOStat) CalcIOStatistics(counter disk.IOCountersStat) (IOMetric, error) {
var last disk.IOCountersStat
var ok bool

// if last counter not found, create one and return all 0
if last, ok = stat.lastDiskIOCounters[counter.Name]; !ok {
stat.lastDiskIOCounters[counter.Name] = counter
return nil
return IOMetric{}, nil
}

// calculate the delta ms between the CloseSampling and OpenSampling
deltams := 1000.0 * float64(stat.curCpu.Total()-stat.lastCpu.Total()) / float64(cpu.NumCores) / float64(Get_CLK_TCK())
deltams := 1000.0 * float64(stat.curCPU.Total()-stat.lastCPU.Total()) / float64(cpu.NumCores) / float64(GetCLKTCK())
if deltams <= 0 {
return errors.New("The delta cpu time between close sampling and open sampling is less or equal to 0")
return IOMetric{}, errors.New("The delta cpu time between close sampling and open sampling is less or equal to 0")
}

rd_ios := counter.ReadCount - last.ReadCount
rd_merges := counter.MergedReadCount - last.MergedReadCount
rd_bytes := counter.ReadBytes - last.ReadBytes
rd_ticks := counter.ReadTime - last.ReadTime
wr_ios := counter.WriteCount - last.WriteCount
wr_merges := counter.MergedWriteCount - last.MergedWriteCount
wr_bytes := counter.WriteBytes - last.WriteBytes
wr_ticks := counter.WriteTime - last.WriteTime
rdIOs := counter.ReadCount - last.ReadCount
rdMerges := counter.MergedReadCount - last.MergedReadCount
rdBytes := counter.ReadBytes - last.ReadBytes
rdTicks := counter.ReadTime - last.ReadTime
wrIOs := counter.WriteCount - last.WriteCount
wrMerges := counter.MergedWriteCount - last.MergedWriteCount
wrBytes := counter.WriteBytes - last.WriteBytes
wrTicks := counter.WriteTime - last.WriteTime
ticks := counter.IoTime - last.IoTime
aveq := counter.WeightedIO - last.WeightedIO
n_ios := rd_ios + wr_ios
n_ticks := rd_ticks + wr_ticks
n_bytes := rd_bytes + wr_bytes
nIOs := rdIOs + wrIOs
nTicks := rdTicks + wrTicks
nBytes := rdBytes + wrBytes
size := float64(0)
wait := float64(0)
svct := float64(0)

if n_ios > 0 {
size = float64(n_bytes) / float64(n_ios)
wait = float64(n_ticks) / float64(n_ios)
svct = float64(ticks) / float64(n_ios)
if nIOs > 0 {
size = float64(nBytes) / float64(nIOs)
wait = float64(nTicks) / float64(nIOs)
svct = float64(ticks) / float64(nIOs)
}

queue := float64(aveq) / deltams
per_sec := func(x uint64) float64 {
perSec := func(x uint64) float64 {
return 1000.0 * float64(x) / deltams
}

result.ReadRequestMergeCountPerSec = per_sec(rd_merges)
result.WriteRequestMergeCountPerSec = per_sec(wr_merges)
result.ReadRequestCountPerSec = per_sec(rd_ios)
result.WriteRequestCountPerSec = per_sec(wr_ios)
result.ReadBytesPerSec = per_sec(rd_bytes)
result.WriteBytesPerSec = per_sec(wr_bytes)
result := IOMetric{}
result.ReadRequestMergeCountPerSec = perSec(rdMerges)
result.WriteRequestMergeCountPerSec = perSec(wrMerges)
result.ReadRequestCountPerSec = perSec(rdIOs)
result.WriteRequestCountPerSec = perSec(wrIOs)
result.ReadBytesPerSec = perSec(rdBytes)
result.WriteBytesPerSec = perSec(wrBytes)
result.AvgRequestSize = size
result.AvgQueueSize = queue
result.AvgAwaitTime = wait
if rd_ios > 0 {
result.AvgReadAwaitTime = float64(rd_ticks) / float64(rd_ios)
if rdIOs > 0 {
result.AvgReadAwaitTime = float64(rdTicks) / float64(rdIOs)
}
if wr_ios > 0 {
result.AvgWriteAwaitTime = float64(wr_ticks) / float64(wr_ios)
if wrIOs > 0 {
result.AvgWriteAwaitTime = float64(wrTicks) / float64(wrIOs)
}
result.AvgServiceTime = svct
result.BusyPct = 100.0 * float64(ticks) / deltams
Expand All @@ -117,10 +119,11 @@ func (stat *DiskIOStat) CalIOStatistics(result *DiskIOMetric, counter disk.IOCou
}

stat.lastDiskIOCounters[counter.Name] = counter
return nil
return result, nil

}

func (stat *DiskIOStat) CloseSampling() {
stat.lastCpu = stat.curCpu
// CloseSampling closes the disk sampler
func (stat *IOStat) CloseSampling() {
stat.lastCPU = stat.curCPU
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,53 +27,11 @@ import (
"github.com/stretchr/testify/assert"

sigar "github.com/elastic/gosigar"

mbtest "github.com/elastic/beats/v7/metricbeat/mb/testing"
"github.com/elastic/beats/v7/metricbeat/module/system"
)

func Test_Get_CLK_TCK(t *testing.T) {
func Test_GetCLKTCK(t *testing.T) {
//usually the tick is 100
assert.Equal(t, uint32(100), Get_CLK_TCK())
}

func TestDataNameFilter(t *testing.T) {
oldFS := system.HostFS
newFS := "_meta/testdata"
system.HostFS = &newFS
defer func() {
system.HostFS = oldFS
}()

conf := map[string]interface{}{
"module": "system",
"metricsets": []string{"diskio"},
"diskio.include_devices": []string{"sda", "sda1", "sda2"},
}

f := mbtest.NewReportingMetricSetV2Error(t, conf)
data, errs := mbtest.ReportingFetchV2Error(f)
assert.Empty(t, errs)
assert.Equal(t, 3, len(data))
}

func TestDataEmptyFilter(t *testing.T) {
oldFS := system.HostFS
newFS := "_meta/testdata"
system.HostFS = &newFS
defer func() {
system.HostFS = oldFS
}()

conf := map[string]interface{}{
"module": "system",
"metricsets": []string{"diskio"},
}

f := mbtest.NewReportingMetricSetV2Error(t, conf)
data, errs := mbtest.ReportingFetchV2Error(f)
assert.Empty(t, errs)
assert.Equal(t, 10, len(data))
assert.Equal(t, uint32(100), GetCLKTCK())
}

func TestDiskIOStat_CalIOStatistics(t *testing.T) {
Expand All @@ -85,27 +43,27 @@ func TestDiskIOStat_CalIOStatistics(t *testing.T) {
Name: "iostat",
}

stat := &DiskIOStat{
stat := &IOStat{
lastDiskIOCounters: map[string]disk.IOCountersStat{
"iostat": disk.IOCountersStat{
"iostat": {
ReadCount: 3,
WriteCount: 5,
ReadTime: 7,
WriteTime: 11,
Name: "iostat",
},
},
lastCpu: sigar.Cpu{Idle: 100},
curCpu: sigar.Cpu{Idle: 1},
lastCPU: sigar.Cpu{Idle: 100},
curCPU: sigar.Cpu{Idle: 1},
}

expected := DiskIOMetric{
expected := IOMetric{
AvgAwaitTime: 24.0 / 22.0,
AvgReadAwaitTime: 1.2,
AvgWriteAwaitTime: 1,
}
var got DiskIOMetric
err := stat.CalIOStatistics(&got, counter)

got, err := stat.CalcIOStatistics(counter)
if err != nil {
t.Fatal(err)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,24 +25,24 @@ import (
)

// NewDiskIOStat :init DiskIOStat object.
func NewDiskIOStat() *DiskIOStat {
return &DiskIOStat{
func NewDiskIOStat() *IOStat {
return &IOStat{
lastDiskIOCounters: map[string]disk.IOCountersStat{},
}
}

// OpenSampling stub for linux implementation.
func (stat *DiskIOStat) OpenSampling() error {
func (stat *IOStat) OpenSampling() error {
return nil
}

// CalIOStatistics stub for linux implementation.
func (stat *DiskIOStat) CalIOStatistics(result *DiskIOMetric, counter disk.IOCountersStat) error {
return errors.New("not implemented out of linux")
// CalcIOStatistics stub for linux implementation.
func (stat *IOStat) CalcIOStatistics(rcounter disk.IOCountersStat) (IOMetric, error) {
return IOMetric{}, errors.New("not implemented out of linux")
}

// CloseSampling stub for linux implementation.
func (stat *DiskIOStat) CloseSampling() {}
func (stat *IOStat) CloseSampling() {}

// IOCounters should map functionality to disk package for linux os.
func IOCounters(names ...string) (map[string]disk.IOCountersStat, error) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,24 +25,24 @@ import (
)

// NewDiskIOStat :init DiskIOStat object.
func NewDiskIOStat() *DiskIOStat {
return &DiskIOStat{
func NewDiskIOStat() *IOStat {
return &IOStat{
lastDiskIOCounters: map[string]disk.IOCountersStat{},
}
}

// OpenSampling stub for linux implementation.
func (stat *DiskIOStat) OpenSampling() error {
func (stat *IOStat) OpenSampling() error {
return nil
}

// CalIOStatistics stub for linux implementation.
func (stat *DiskIOStat) CalIOStatistics(result *DiskIOMetric, counter disk.IOCountersStat) error {
return errors.New("iostat is not implement for Windows")
// CalcIOStatistics stub for linux implementation.
func (stat *IOStat) CalcIOStatistics(counter disk.IOCountersStat) (IOMetric, error) {
return IOMetric{}, errors.New("iostat is not implement for Windows")
}

// CloseSampling stub for linux implementation.
func (stat *DiskIOStat) CloseSampling() {}
func (stat *IOStat) CloseSampling() {}

// IOCounters should map functionality to disk package for linux os.
func IOCounters(names ...string) (map[string]disk.IOCountersStat, error) {
Expand Down
Loading

0 comments on commit bde8416

Please sign in to comment.