Skip to content

Commit

Permalink
修正 sender 模块状态统计,更新了 metric 模块的一些字段和选项 (#208)
Browse files Browse the repository at this point in the history
* 检查 sender 模块的状态统计,删除冗余代码,修正可能出现问题的地方

* 更改了swap中的几个字段名称,为net添加了一个选项,为net和kernel_vmstat的字段添加前缀

* 将 metric 模块的时间戳放在每个类别里面而不是一个统一的名称
  • Loading branch information
andrewei1316 authored and wonderflow committed Nov 20, 2017
1 parent d407d75 commit e335bc5
Show file tree
Hide file tree
Showing 14 changed files with 87 additions and 47 deletions.
1 change: 1 addition & 0 deletions metric/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ const (

OptionString = "options"
AttributesString = "attributes"
Timestamp = "timestamp"
)

//Collector 收集metrics的接口
Expand Down
4 changes: 4 additions & 0 deletions metric/system/cpu.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package system

import (
"fmt"
"time"

"github.com/qiniu/logkit/metric"
"github.com/qiniu/logkit/utils"
Expand Down Expand Up @@ -125,6 +126,7 @@ func (s *CPUStats) Collect() (datas []map[string]interface{}, err error) {
return nil, fmt.Errorf("error getting CPU info: %s", err)
}

now := time.Now().Format(time.RFC3339Nano)
for _, cts := range times {

total := totalCpuTime(cts)
Expand All @@ -144,6 +146,7 @@ func (s *CPUStats) Collect() (datas []map[string]interface{}, err error) {
CpuTimeGuestNice: cts.GuestNice,
CpuTimeCPU: cts.CPU,
}
fieldsC[TypeMetricCpu+"_"+metric.Timestamp] = now
datas = append(datas, fieldsC)
}

Expand Down Expand Up @@ -179,6 +182,7 @@ func (s *CPUStats) Collect() (datas []map[string]interface{}, err error) {
CpuUsageGuestNice: 100 * (cts.GuestNice - lastCts.GuestNice) / totalDelta,
CpuUsageCPU: cts.CPU,
}
fieldsG[TypeMetricCpu+"_"+metric.Timestamp] = now
datas = append(datas, fieldsG)
}

Expand Down
5 changes: 5 additions & 0 deletions metric/system/disk.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"fmt"
"regexp"
"strings"
"time"

"github.com/qiniu/logkit/metric"
"github.com/qiniu/logkit/utils"
Expand Down Expand Up @@ -91,6 +92,7 @@ func (s *DiskStats) Collect() (datas []map[string]interface{}, err error) {
return nil, fmt.Errorf("error getting disk usage info: %s", err)
}

now := time.Now().Format(time.RFC3339Nano)
for i, du := range disks {
if du.Total == 0 {
// Skip dummy filesystem (procfs, cgroupfs, ...)
Expand All @@ -114,6 +116,7 @@ func (s *DiskStats) Collect() (datas []map[string]interface{}, err error) {
KeyDiskInodesFree: du.InodesFree,
KeyDiskInodesUsed: du.InodesUsed,
}
fields[TypeMetricDisk+"_"+metric.Timestamp] = now
datas = append(datas, fields)
}
return
Expand Down Expand Up @@ -219,6 +222,7 @@ func (s *DiskIOStats) Collect() (datas []map[string]interface{}, err error) {
return nil, fmt.Errorf("error getting disk io info: %s", err)
}

now := time.Now().Format(time.RFC3339Nano)
for _, io := range diskio {
fields := map[string]interface{}{
KeyDiskioReads: io.ReadCount,
Expand All @@ -241,6 +245,7 @@ func (s *DiskIOStats) Collect() (datas []map[string]interface{}, err error) {
fields[KeyDiskioSerial] = "unknown"
}
}
fields[TypeMetricDiskio+"_"+metric.Timestamp] = now
datas = append(datas, fields)
}
return
Expand Down
4 changes: 3 additions & 1 deletion metric/system/kernel_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"io/ioutil"
"os"
"strconv"
"time"

"github.com/qiniu/logkit/metric"
"github.com/qiniu/logkit/utils"
Expand Down Expand Up @@ -71,8 +72,8 @@ func (k *Kernel) Collect() (datas []map[string]interface{}, err error) {
return nil, err
}

now := time.Now().Format(time.RFC3339Nano)
fields := make(map[string]interface{})

dataFields := bytes.Fields(data)
for i, field := range dataFields {
switch {
Expand Down Expand Up @@ -114,6 +115,7 @@ func (k *Kernel) Collect() (datas []map[string]interface{}, err error) {
}
}

fields[TypeMetricKernel+"_"+metric.Timestamp] = now
datas = append(datas, fields)
return
}
Expand Down
8 changes: 5 additions & 3 deletions metric/system/kernel_vmstat_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"io/ioutil"
"os"
"strconv"
"time"

"github.com/qiniu/logkit/metric"
"github.com/qiniu/logkit/utils"
Expand Down Expand Up @@ -44,8 +45,8 @@ func (k *KernelVmstat) Collect() (datas []map[string]interface{}, err error) {
return nil, err
}

now := time.Now().Format(time.RFC3339Nano)
fields := make(map[string]interface{})

dataFields := bytes.Fields(data)
for i, field := range dataFields {

Expand All @@ -58,10 +59,11 @@ func (k *KernelVmstat) Collect() (datas []map[string]interface{}, err error) {
if err != nil {
return nil, err
}

fields[string(field)] = int64(m)
key := "vmstat_" + string(field)
fields[key] = int64(m)
}
}
fields["vmstat_"+metric.Timestamp] = now
datas = append(datas, fields)
return
}
Expand Down
3 changes: 3 additions & 0 deletions metric/system/linux_sysctl_fs.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"io/ioutil"
"strconv"
"time"

"github.com/qiniu/logkit/metric"
"github.com/qiniu/logkit/utils"
Expand Down Expand Up @@ -130,6 +131,7 @@ func (sfs *SysctlFS) gatherOne(name string, fields map[string]interface{}) error
func (sfs *SysctlFS) Collect() (datas []map[string]interface{}, err error) {
fields := map[string]interface{}{}

now := time.Now().Format(time.RFC3339Nano)
for _, n := range []string{"aio-nr", "aio-max-nr", "dquot-nr", "dquot-max", "super-nr", "super-max"} {
sfs.gatherOne(n, fields)
}
Expand All @@ -138,6 +140,7 @@ func (sfs *SysctlFS) Collect() (datas []map[string]interface{}, err error) {
sfs.gatherList("dentry-state", fields, "dentry-nr", "dentry-unused-nr", "dentry-age-limit", "dentry-want-pages")
sfs.gatherList("file-nr", fields, "file-nr", "", "file-max")

fields["sysctl_"+metric.Timestamp] = now
datas = append(datas, fields)
return
}
Expand Down
12 changes: 7 additions & 5 deletions metric/system/memory.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package system

import (
"fmt"
"time"

"github.com/qiniu/logkit/metric"
"github.com/qiniu/logkit/utils"
Expand Down Expand Up @@ -64,6 +65,7 @@ func (s *MemStats) Collect() (datas []map[string]interface{}, err error) {
return nil, fmt.Errorf("error getting virtual memory info: %s", err)
}

now := time.Now().Format(time.RFC3339Nano)
fields := map[string]interface{}{
KeyMemTotal: vm.Total,
KeyMemAvailable: vm.Available,
Expand All @@ -76,6 +78,7 @@ func (s *MemStats) Collect() (datas []map[string]interface{}, err error) {
KeyMemUsedPercent: 100 * float64(vm.Used) / float64(vm.Total),
KeyMemAvailablePercent: 100 * float64(vm.Available) / float64(vm.Total),
}
fields[TypeMetricMem+"_"+metric.Timestamp] = now
datas = append(datas, fields)
return
}
Expand Down Expand Up @@ -129,18 +132,17 @@ func (s *SwapStats) Collect() (datas []map[string]interface{}, err error) {
return nil, fmt.Errorf("error getting swap memory info: %s", err)
}

now := time.Now().Format(time.RFC3339Nano)
fieldsG := map[string]interface{}{
KeySwapIn: swap.Sin,
KeySwapOut: swap.Sout,
KeySwapTotal: swap.Total,
KeySwapUsed: swap.Used,
KeySwapFree: swap.Free,
KeySwapUsedPercent: swap.UsedPercent,
}
fieldsG[TypeMetricSwap+"_"+metric.Timestamp] = now
datas = append(datas, fieldsG)
fieldsC := map[string]interface{}{
KeySwapIn: swap.Sin,
KeySwapOut: swap.Sout,
}
datas = append(datas, fieldsC)
return
}

Expand Down
41 changes: 28 additions & 13 deletions metric/system/net.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"fmt"
"net"
"strings"
"time"

"github.com/qiniu/logkit/metric"
"github.com/qiniu/logkit/utils"
Expand Down Expand Up @@ -41,8 +42,9 @@ var KeyNetUsages = []utils.KeyValue{
type NetIOStats struct {
ps PS

skipChecks bool
Interfaces []string `json:"interfaces"`
skipChecks bool
skipProtoState bool `json:"skip_protocols_state"`
Interfaces []string `json:"interfaces"`
}

func (_ *NetIOStats) Name() string {
Expand All @@ -63,6 +65,15 @@ func (_ *NetIOStats) Config() map[string]interface{} {
Description: "收集特定网卡的信息,用','分隔(interfaces)",
Type: metric.ConfigTypeArray,
},
{
KeyName: "skip_protocols_state",
ChooseOnly: true,
ChooseOptions: []string{"true", "false"},
Default: "true",
DefaultNoUse: false,
Description: "是否忽略各个网络协议的状态信息",
Type: metric.ConfigTypeBool,
},
}
config := map[string]interface{}{
metric.OptionString: configOption,
Expand All @@ -77,6 +88,7 @@ func (s *NetIOStats) Collect() (datas []map[string]interface{}, err error) {
return nil, fmt.Errorf("error getting net io info: %s", err)
}

now := time.Now().Format(time.RFC3339Nano)
for _, io := range netio {
if len(s.Interfaces) != 0 {
var found bool
Expand Down Expand Up @@ -117,22 +129,25 @@ func (s *NetIOStats) Collect() (datas []map[string]interface{}, err error) {
KeyNetDropOut: io.Dropout,
KeyNetInterface: io.Name,
}
fields[TypeMetricNet+"_"+metric.Timestamp] = now
datas = append(datas, fields)
}

// Get system wide stats for different network protocols
// (ignore these stats if the call fails)
netprotos, _ := s.ps.NetProto()
fields := make(map[string]interface{})
for _, proto := range netprotos {
for stat, value := range proto.Stats {
name := fmt.Sprintf("%s_%s", strings.ToLower(proto.Protocol),
strings.ToLower(stat))
fields[name] = value
if !s.skipProtoState {
// Get system wide stats for different network protocols
// (ignore these stats if the call fails)
netprotos, _ := s.ps.NetProto()
fields := make(map[string]interface{})
for _, proto := range netprotos {
for stat, value := range proto.Stats {
name := TypeMetricNet + "_" + strings.ToLower(proto.Protocol) + "_" + strings.ToLower(stat)
fields[name] = value
}
}
fields[KeyNetInterface] = "all"
fields[TypeMetricNet+"_"+metric.Timestamp] = now
datas = append(datas, fields)
}
fields[KeyNetInterface] = "all"
datas = append(datas, fields)
return
}

Expand Down
4 changes: 4 additions & 0 deletions metric/system/netstat.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package system
import (
"fmt"
"syscall"
"time"

"github.com/qiniu/logkit/metric"
"github.com/qiniu/logkit/utils"
Expand Down Expand Up @@ -70,6 +71,8 @@ func (s *NetStats) Collect() (datas []map[string]interface{}, err error) {
if err != nil {
return nil, fmt.Errorf("error getting net connections info: %s", err)
}

now := time.Now().Format(time.RFC3339Nano)
counts := make(map[string]int)
counts["UDP"] = 0

Expand Down Expand Up @@ -101,6 +104,7 @@ func (s *NetStats) Collect() (datas []map[string]interface{}, err error) {
KeyNetstatTcpNone: counts["NONE"],
KeyNetstatUdpSocket: counts["UDP"],
}
fields[TypeMetricNetstat+"_"+metric.Timestamp] = now
datas = append(datas, fields)
return
}
Expand Down
3 changes: 3 additions & 0 deletions metric/system/processes.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"runtime"
"strconv"
"syscall"
"time"

"github.com/qiniu/logkit/metric"
"github.com/qiniu/logkit/utils"
Expand Down Expand Up @@ -102,6 +103,8 @@ func (p *Processes) Collect() (datas []map[string]interface{}, err error) {
return nil, err
}
}
now := time.Now().Format(time.RFC3339Nano)
fields[TypeMetricProcesses+"_"+metric.Timestamp] = now
datas = append(datas, fields)
return
}
Expand Down
3 changes: 3 additions & 0 deletions metric/system/system.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"bytes"
"fmt"
"runtime"
"time"

"github.com/shirou/gopsutil/host"
"github.com/shirou/gopsutil/load"
Expand Down Expand Up @@ -82,6 +83,8 @@ func (_ *SystemStats) Collect() (datas []map[string]interface{}, err error) {
KeySystemUptime: hostinfo.Uptime,
KeySystemUptimeFormat: format_uptime(hostinfo.Uptime),
}
now := time.Now().Format(time.RFC3339Nano)
data[TypeMetricSystem+"_"+metric.Timestamp] = now
datas = append(datas, data)
return
}
Expand Down
5 changes: 0 additions & 5 deletions mgr/metric_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (

const (
KeyMetricType = "type"
KeyMetricTime = "timestamp"
)

const (
Expand Down Expand Up @@ -170,15 +169,13 @@ func (r *MetricRunner) Run() {
// collect data
for _, c := range r.collectors {
tmpdatas, err := c.Collect()
now := time.Now().Format(time.RFC3339Nano)
if err != nil {
log.Errorf("collecter <%v> collect data error: %v", c.Name(), err)
continue
}
for i := range tmpdatas {
if len(tmpdatas[i]) > 0 {
dataCnt++
tmpdatas[i][KeyMetricTime] = now
datas = append(datas, tmpdatas[i])
}
}
Expand Down Expand Up @@ -238,8 +235,6 @@ func (r *MetricRunner) trySend(s sender.Sender, datas []sender.Data, times int)
if se, ok := err.(*utils.StatsError); ok {
err = se.ErrorDetail
if se.Ft {
info.Errors = se.Errors
info.Success = se.Success
r.rs.Lag.Ftlags = se.Ftlag
} else {
if cnt > 1 {
Expand Down
2 changes: 0 additions & 2 deletions mgr/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -328,8 +328,6 @@ func (r *LogExportRunner) trySend(s sender.Sender, datas []sender.Data, times in
if se, ok := err.(*utils.StatsError); ok {
err = se.ErrorDetail
if se.Ft {
info.Errors = se.Errors
info.Success = se.Success
r.rs.Lag.Ftlags = se.Ftlag
} else {
if cnt > 1 {
Expand Down
Loading

0 comments on commit e335bc5

Please sign in to comment.