diff --git a/metric/registry.go b/metric/registry.go index e2f824bd8..77131015e 100644 --- a/metric/registry.go +++ b/metric/registry.go @@ -7,6 +7,7 @@ const ( OptionString = "options" AttributesString = "attributes" + Timestamp = "timestamp" ) //Collector 收集metrics的接口 diff --git a/metric/system/cpu.go b/metric/system/cpu.go index 0c24cdfce..933e1aec2 100644 --- a/metric/system/cpu.go +++ b/metric/system/cpu.go @@ -2,6 +2,7 @@ package system import ( "fmt" + "time" "github.com/qiniu/logkit/metric" "github.com/qiniu/logkit/utils" @@ -125,6 +126,7 @@ func (s *CPUStats) Collect() (datas []map[string]interface{}, err error) { return nil, fmt.Errorf("error getting CPU info: %s", err) } + now := time.Now().Format(time.RFC3339Nano) for _, cts := range times { total := totalCpuTime(cts) @@ -144,6 +146,7 @@ func (s *CPUStats) Collect() (datas []map[string]interface{}, err error) { CpuTimeGuestNice: cts.GuestNice, CpuTimeCPU: cts.CPU, } + fieldsC[TypeMetricCpu+"_"+metric.Timestamp] = now datas = append(datas, fieldsC) } @@ -179,6 +182,7 @@ func (s *CPUStats) Collect() (datas []map[string]interface{}, err error) { CpuUsageGuestNice: 100 * (cts.GuestNice - lastCts.GuestNice) / totalDelta, CpuUsageCPU: cts.CPU, } + fieldsG[TypeMetricCpu+"_"+metric.Timestamp] = now datas = append(datas, fieldsG) } diff --git a/metric/system/disk.go b/metric/system/disk.go index f12e9bfd3..4378205fc 100644 --- a/metric/system/disk.go +++ b/metric/system/disk.go @@ -4,6 +4,7 @@ import ( "fmt" "regexp" "strings" + "time" "github.com/qiniu/logkit/metric" "github.com/qiniu/logkit/utils" @@ -91,6 +92,7 @@ func (s *DiskStats) Collect() (datas []map[string]interface{}, err error) { return nil, fmt.Errorf("error getting disk usage info: %s", err) } + now := time.Now().Format(time.RFC3339Nano) for i, du := range disks { if du.Total == 0 { // Skip dummy filesystem (procfs, cgroupfs, ...) @@ -114,6 +116,7 @@ func (s *DiskStats) Collect() (datas []map[string]interface{}, err error) { KeyDiskInodesFree: du.InodesFree, KeyDiskInodesUsed: du.InodesUsed, } + fields[TypeMetricDisk+"_"+metric.Timestamp] = now datas = append(datas, fields) } return @@ -219,6 +222,7 @@ func (s *DiskIOStats) Collect() (datas []map[string]interface{}, err error) { return nil, fmt.Errorf("error getting disk io info: %s", err) } + now := time.Now().Format(time.RFC3339Nano) for _, io := range diskio { fields := map[string]interface{}{ KeyDiskioReads: io.ReadCount, @@ -241,6 +245,7 @@ func (s *DiskIOStats) Collect() (datas []map[string]interface{}, err error) { fields[KeyDiskioSerial] = "unknown" } } + fields[TypeMetricDiskio+"_"+metric.Timestamp] = now datas = append(datas, fields) } return diff --git a/metric/system/kernel_linux.go b/metric/system/kernel_linux.go index 19d9a0bfd..cbe40fcb3 100644 --- a/metric/system/kernel_linux.go +++ b/metric/system/kernel_linux.go @@ -8,6 +8,7 @@ import ( "io/ioutil" "os" "strconv" + "time" "github.com/qiniu/logkit/metric" "github.com/qiniu/logkit/utils" @@ -71,8 +72,8 @@ func (k *Kernel) Collect() (datas []map[string]interface{}, err error) { return nil, err } + now := time.Now().Format(time.RFC3339Nano) fields := make(map[string]interface{}) - dataFields := bytes.Fields(data) for i, field := range dataFields { switch { @@ -114,6 +115,7 @@ func (k *Kernel) Collect() (datas []map[string]interface{}, err error) { } } + fields[TypeMetricKernel+"_"+metric.Timestamp] = now datas = append(datas, fields) return } diff --git a/metric/system/kernel_vmstat_linux.go b/metric/system/kernel_vmstat_linux.go index c64701149..3a3e7b22f 100644 --- a/metric/system/kernel_vmstat_linux.go +++ b/metric/system/kernel_vmstat_linux.go @@ -8,6 +8,7 @@ import ( "io/ioutil" "os" "strconv" + "time" "github.com/qiniu/logkit/metric" "github.com/qiniu/logkit/utils" @@ -44,8 +45,8 @@ func (k *KernelVmstat) Collect() (datas []map[string]interface{}, err error) { return nil, err } + now := time.Now().Format(time.RFC3339Nano) fields := make(map[string]interface{}) - dataFields := bytes.Fields(data) for i, field := range dataFields { @@ -58,10 +59,11 @@ func (k *KernelVmstat) Collect() (datas []map[string]interface{}, err error) { if err != nil { return nil, err } - - fields[string(field)] = int64(m) + key := "vmstat_" + string(field) + fields[key] = int64(m) } } + fields["vmstat_"+metric.Timestamp] = now datas = append(datas, fields) return } diff --git a/metric/system/linux_sysctl_fs.go b/metric/system/linux_sysctl_fs.go index a1925099c..3bb488e47 100644 --- a/metric/system/linux_sysctl_fs.go +++ b/metric/system/linux_sysctl_fs.go @@ -4,6 +4,7 @@ import ( "bytes" "io/ioutil" "strconv" + "time" "github.com/qiniu/logkit/metric" "github.com/qiniu/logkit/utils" @@ -130,6 +131,7 @@ func (sfs *SysctlFS) gatherOne(name string, fields map[string]interface{}) error func (sfs *SysctlFS) Collect() (datas []map[string]interface{}, err error) { fields := map[string]interface{}{} + now := time.Now().Format(time.RFC3339Nano) for _, n := range []string{"aio-nr", "aio-max-nr", "dquot-nr", "dquot-max", "super-nr", "super-max"} { sfs.gatherOne(n, fields) } @@ -138,6 +140,7 @@ func (sfs *SysctlFS) Collect() (datas []map[string]interface{}, err error) { sfs.gatherList("dentry-state", fields, "dentry-nr", "dentry-unused-nr", "dentry-age-limit", "dentry-want-pages") sfs.gatherList("file-nr", fields, "file-nr", "", "file-max") + fields["sysctl_"+metric.Timestamp] = now datas = append(datas, fields) return } diff --git a/metric/system/memory.go b/metric/system/memory.go index 65b0b05cf..809b1ff3a 100644 --- a/metric/system/memory.go +++ b/metric/system/memory.go @@ -2,6 +2,7 @@ package system import ( "fmt" + "time" "github.com/qiniu/logkit/metric" "github.com/qiniu/logkit/utils" @@ -64,6 +65,7 @@ func (s *MemStats) Collect() (datas []map[string]interface{}, err error) { return nil, fmt.Errorf("error getting virtual memory info: %s", err) } + now := time.Now().Format(time.RFC3339Nano) fields := map[string]interface{}{ KeyMemTotal: vm.Total, KeyMemAvailable: vm.Available, @@ -76,6 +78,7 @@ func (s *MemStats) Collect() (datas []map[string]interface{}, err error) { KeyMemUsedPercent: 100 * float64(vm.Used) / float64(vm.Total), KeyMemAvailablePercent: 100 * float64(vm.Available) / float64(vm.Total), } + fields[TypeMetricMem+"_"+metric.Timestamp] = now datas = append(datas, fields) return } @@ -129,18 +132,17 @@ func (s *SwapStats) Collect() (datas []map[string]interface{}, err error) { return nil, fmt.Errorf("error getting swap memory info: %s", err) } + now := time.Now().Format(time.RFC3339Nano) fieldsG := map[string]interface{}{ + KeySwapIn: swap.Sin, + KeySwapOut: swap.Sout, KeySwapTotal: swap.Total, KeySwapUsed: swap.Used, KeySwapFree: swap.Free, KeySwapUsedPercent: swap.UsedPercent, } + fieldsG[TypeMetricSwap+"_"+metric.Timestamp] = now datas = append(datas, fieldsG) - fieldsC := map[string]interface{}{ - KeySwapIn: swap.Sin, - KeySwapOut: swap.Sout, - } - datas = append(datas, fieldsC) return } diff --git a/metric/system/net.go b/metric/system/net.go index 1bbdc59f6..d20a0d8a8 100644 --- a/metric/system/net.go +++ b/metric/system/net.go @@ -4,6 +4,7 @@ import ( "fmt" "net" "strings" + "time" "github.com/qiniu/logkit/metric" "github.com/qiniu/logkit/utils" @@ -41,8 +42,9 @@ var KeyNetUsages = []utils.KeyValue{ type NetIOStats struct { ps PS - skipChecks bool - Interfaces []string `json:"interfaces"` + skipChecks bool + skipProtoState bool `json:"skip_protocols_state"` + Interfaces []string `json:"interfaces"` } func (_ *NetIOStats) Name() string { @@ -63,6 +65,15 @@ func (_ *NetIOStats) Config() map[string]interface{} { Description: "收集特定网卡的信息,用','分隔(interfaces)", Type: metric.ConfigTypeArray, }, + { + KeyName: "skip_protocols_state", + ChooseOnly: true, + ChooseOptions: []string{"true", "false"}, + Default: "true", + DefaultNoUse: false, + Description: "是否忽略各个网络协议的状态信息", + Type: metric.ConfigTypeBool, + }, } config := map[string]interface{}{ metric.OptionString: configOption, @@ -77,6 +88,7 @@ func (s *NetIOStats) Collect() (datas []map[string]interface{}, err error) { return nil, fmt.Errorf("error getting net io info: %s", err) } + now := time.Now().Format(time.RFC3339Nano) for _, io := range netio { if len(s.Interfaces) != 0 { var found bool @@ -117,22 +129,25 @@ func (s *NetIOStats) Collect() (datas []map[string]interface{}, err error) { KeyNetDropOut: io.Dropout, KeyNetInterface: io.Name, } + fields[TypeMetricNet+"_"+metric.Timestamp] = now datas = append(datas, fields) } - // Get system wide stats for different network protocols - // (ignore these stats if the call fails) - netprotos, _ := s.ps.NetProto() - fields := make(map[string]interface{}) - for _, proto := range netprotos { - for stat, value := range proto.Stats { - name := fmt.Sprintf("%s_%s", strings.ToLower(proto.Protocol), - strings.ToLower(stat)) - fields[name] = value + if !s.skipProtoState { + // Get system wide stats for different network protocols + // (ignore these stats if the call fails) + netprotos, _ := s.ps.NetProto() + fields := make(map[string]interface{}) + for _, proto := range netprotos { + for stat, value := range proto.Stats { + name := TypeMetricNet + "_" + strings.ToLower(proto.Protocol) + "_" + strings.ToLower(stat) + fields[name] = value + } } + fields[KeyNetInterface] = "all" + fields[TypeMetricNet+"_"+metric.Timestamp] = now + datas = append(datas, fields) } - fields[KeyNetInterface] = "all" - datas = append(datas, fields) return } diff --git a/metric/system/netstat.go b/metric/system/netstat.go index 3f21bef12..c4445b8f3 100644 --- a/metric/system/netstat.go +++ b/metric/system/netstat.go @@ -3,6 +3,7 @@ package system import ( "fmt" "syscall" + "time" "github.com/qiniu/logkit/metric" "github.com/qiniu/logkit/utils" @@ -70,6 +71,8 @@ func (s *NetStats) Collect() (datas []map[string]interface{}, err error) { if err != nil { return nil, fmt.Errorf("error getting net connections info: %s", err) } + + now := time.Now().Format(time.RFC3339Nano) counts := make(map[string]int) counts["UDP"] = 0 @@ -101,6 +104,7 @@ func (s *NetStats) Collect() (datas []map[string]interface{}, err error) { KeyNetstatTcpNone: counts["NONE"], KeyNetstatUdpSocket: counts["UDP"], } + fields[TypeMetricNetstat+"_"+metric.Timestamp] = now datas = append(datas, fields) return } diff --git a/metric/system/processes.go b/metric/system/processes.go index 5336ca0e1..8c61f6e8d 100644 --- a/metric/system/processes.go +++ b/metric/system/processes.go @@ -13,6 +13,7 @@ import ( "runtime" "strconv" "syscall" + "time" "github.com/qiniu/logkit/metric" "github.com/qiniu/logkit/utils" @@ -102,6 +103,8 @@ func (p *Processes) Collect() (datas []map[string]interface{}, err error) { return nil, err } } + now := time.Now().Format(time.RFC3339Nano) + fields[TypeMetricProcesses+"_"+metric.Timestamp] = now datas = append(datas, fields) return } diff --git a/metric/system/system.go b/metric/system/system.go index 1b7cd9e7c..f16999b82 100644 --- a/metric/system/system.go +++ b/metric/system/system.go @@ -5,6 +5,7 @@ import ( "bytes" "fmt" "runtime" + "time" "github.com/shirou/gopsutil/host" "github.com/shirou/gopsutil/load" @@ -82,6 +83,8 @@ func (_ *SystemStats) Collect() (datas []map[string]interface{}, err error) { KeySystemUptime: hostinfo.Uptime, KeySystemUptimeFormat: format_uptime(hostinfo.Uptime), } + now := time.Now().Format(time.RFC3339Nano) + data[TypeMetricSystem+"_"+metric.Timestamp] = now datas = append(datas, data) return } diff --git a/mgr/metric_runner.go b/mgr/metric_runner.go index 4df2a7137..45d500331 100644 --- a/mgr/metric_runner.go +++ b/mgr/metric_runner.go @@ -20,7 +20,6 @@ import ( const ( KeyMetricType = "type" - KeyMetricTime = "timestamp" ) const ( @@ -170,7 +169,6 @@ func (r *MetricRunner) Run() { // collect data for _, c := range r.collectors { tmpdatas, err := c.Collect() - now := time.Now().Format(time.RFC3339Nano) if err != nil { log.Errorf("collecter <%v> collect data error: %v", c.Name(), err) continue @@ -178,7 +176,6 @@ func (r *MetricRunner) Run() { for i := range tmpdatas { if len(tmpdatas[i]) > 0 { dataCnt++ - tmpdatas[i][KeyMetricTime] = now datas = append(datas, tmpdatas[i]) } } @@ -238,8 +235,6 @@ func (r *MetricRunner) trySend(s sender.Sender, datas []sender.Data, times int) if se, ok := err.(*utils.StatsError); ok { err = se.ErrorDetail if se.Ft { - info.Errors = se.Errors - info.Success = se.Success r.rs.Lag.Ftlags = se.Ftlag } else { if cnt > 1 { diff --git a/mgr/runner.go b/mgr/runner.go index 335af31b1..1ec55e13b 100644 --- a/mgr/runner.go +++ b/mgr/runner.go @@ -328,8 +328,6 @@ func (r *LogExportRunner) trySend(s sender.Sender, datas []sender.Data, times in if se, ok := err.(*utils.StatsError); ok { err = se.ErrorDetail if se.Ft { - info.Errors = se.Errors - info.Success = se.Success r.rs.Lag.Ftlags = se.Ftlag } else { if cnt > 1 { diff --git a/sender/fault_tolerant_sender.go b/sender/fault_tolerant_sender.go index 4e2b89295..d2dc7532f 100644 --- a/sender/fault_tolerant_sender.go +++ b/sender/fault_tolerant_sender.go @@ -60,7 +60,7 @@ type FtSender struct { runnerName string opt *FtOption stats utils.StatsInfo - mutex *sync.Mutex + statsMutex *sync.Mutex } type FtOption struct { @@ -130,7 +130,7 @@ func newFtSender(innerSender Sender, runnerName string, opt *FtOption) (*FtSende procs: opt.procs, runnerName: runnerName, opt: opt, - mutex: new(sync.Mutex), + statsMutex: new(sync.Mutex), } go ftSender.asyncSendLogFromDiskQueue() return &ftSender, nil @@ -159,20 +159,19 @@ func (ft *FtSender) Send(datas []Data) error { } if nowDatas != nil { se.ErrorDetail = reqerr.NewSendError("save data to backend queue error", ConvertDatasBack(nowDatas), reqerr.TypeDefault) - ft.mutex.Lock() - ft.stats.Errors += int64(len(nowDatas)) + ft.statsMutex.Lock() ft.stats.LastError = se.ErrorDetail.Error() - ft.mutex.Unlock() + ft.statsMutex.Unlock() } } } else { err := ft.saveToFile(datas) if err != nil { se.ErrorDetail = err - ft.mutex.Lock() + ft.statsMutex.Lock() ft.stats.LastError = err.Error() ft.stats.Errors += int64(len(datas)) - ft.mutex.Unlock() + ft.statsMutex.Unlock() } else { se.ErrorDetail = nil } @@ -286,26 +285,30 @@ func ConvertDatasBack(ins []Data) []map[string]interface{} { // trySendDatas 尝试发送数据,如果失败,将失败数据加入backup queue,并睡眠指定时间。返回结果为是否正常发送 func (ft *FtSender) trySendDatas(datas []Data, failSleep int, isRetry bool) (backDataContext []*datasContext, err error) { - err = ft.innerSender.Send(datas) - if err == nil { - ft.mutex.Lock() - ft.stats.Success += int64(len(datas)) - if isRetry { - ft.stats.Errors -= int64(len(datas)) - } - ft.mutex.Unlock() - } if c, ok := err.(*utils.StatsError); ok { err = c.ErrorDetail - ft.mutex.Lock() + ft.statsMutex.Lock() if isRetry { ft.stats.Errors -= c.Success } else { ft.stats.Errors += c.Errors } ft.stats.Success += c.Success - ft.mutex.Unlock() + ft.statsMutex.Unlock() + } else if err != nil { + if !isRetry { + ft.statsMutex.Lock() + ft.stats.Errors += int64(len(datas)) + ft.statsMutex.Unlock() + } + } else { + ft.statsMutex.Lock() + ft.stats.Success += int64(len(datas)) + if isRetry { + ft.stats.Errors -= int64(len(datas)) + } + ft.statsMutex.Unlock() } if err != nil { retDatasContext := ft.handleSendError(err, datas)