Skip to content

Commit

Permalink
Use opencensus metric package for process metrics (#5486)
Browse files Browse the repository at this point in the history
* Use opencensus metric package for process metrics

Signed-off-by: Bogdan Drutu <bogdandrutu@gmail.com>

* Avoid reading MemStats too often

Signed-off-by: Bogdan Drutu <bogdandrutu@gmail.com>

* Make ProcessMetrics private

Signed-off-by: Bogdan Drutu <bogdandrutu@gmail.com>

* Add changelog entry

Signed-off-by: Bogdan Drutu <bogdandrutu@gmail.com>

* Add tests for failing to register metrics

Signed-off-by: Bogdan Drutu <bogdandrutu@gmail.com>
  • Loading branch information
bogdandrutu authored Jun 8, 2022
1 parent f1ba384 commit 992782d
Show file tree
Hide file tree
Showing 4 changed files with 206 additions and 181 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@

### 💡 Enhancements 💡

- Use OpenCensus `metric` package for process metrics instead of `stats` package (#5486)

### 🧰 Bug fixes 🧰

## v0.53.0 Beta
Expand Down
272 changes: 137 additions & 135 deletions service/internal/telemetry/process_telemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,171 +15,173 @@
package telemetry // import "go.opentelemetry.io/collector/service/internal/telemetry"

import (
"context"
"os"
"runtime"
"sync"
"time"

"github.com/shirou/gopsutil/v3/process"
"go.opencensus.io/metric"
"go.opencensus.io/stats"
"go.opencensus.io/stats/view"
)

// ProcessMetricsViews is a struct that contains views related to process metrics (cpu, mem, etc)
type ProcessMetricsViews struct {
prevTimeUnixNano int64
ballastSizeBytes uint64
views []*view.View
done chan struct{}
proc *process.Process
// processMetrics is a struct that contains views related to process metrics (cpu, mem, etc)
type processMetrics struct {
startTimeUnixNano int64
ballastSizeBytes uint64
proc *process.Process

processUptime *metric.Float64DerivedCumulative
allocMem *metric.Int64DerivedGauge
totalAllocMem *metric.Int64DerivedCumulative
sysMem *metric.Int64DerivedGauge
cpuSeconds *metric.Float64DerivedCumulative
rssMemory *metric.Int64DerivedGauge

// mu protects everything bellow.
mu sync.Mutex
lastMsRead time.Time
ms *runtime.MemStats
}

var mUptime = stats.Float64(
"process/uptime",
"Uptime of the process",
stats.UnitSeconds)
var viewProcessUptime = &view.View{
Name: mUptime.Name(),
Description: mUptime.Description(),
Measure: mUptime,
Aggregation: view.Sum(),
TagKeys: nil,
}

var mRuntimeAllocMem = stats.Int64(
"process/runtime/heap_alloc_bytes",
"Bytes of allocated heap objects (see 'go doc runtime.MemStats.HeapAlloc')",
stats.UnitBytes)
var viewAllocMem = &view.View{
Name: mRuntimeAllocMem.Name(),
Description: mRuntimeAllocMem.Description(),
Measure: mRuntimeAllocMem,
Aggregation: view.LastValue(),
TagKeys: nil,
}

var mRuntimeTotalAllocMem = stats.Int64(
"process/runtime/total_alloc_bytes",
"Cumulative bytes allocated for heap objects (see 'go doc runtime.MemStats.TotalAlloc')",
stats.UnitBytes)
var viewTotalAllocMem = &view.View{
Name: mRuntimeTotalAllocMem.Name(),
Description: mRuntimeTotalAllocMem.Description(),
Measure: mRuntimeTotalAllocMem,
Aggregation: view.LastValue(),
TagKeys: nil,
}
// RegisterProcessMetrics creates a new set of processMetrics (mem, cpu) that can be used to measure
// basic information about this process.
func RegisterProcessMetrics(registry *metric.Registry, ballastSizeBytes uint64) error {
pm := &processMetrics{
startTimeUnixNano: time.Now().UnixNano(),
ballastSizeBytes: ballastSizeBytes,
ms: &runtime.MemStats{},
}
var err error
pm.proc, err = process.NewProcess(int32(os.Getpid()))
if err != nil {
return err
}

var mRuntimeSysMem = stats.Int64(
"process/runtime/total_sys_memory_bytes",
"Total bytes of memory obtained from the OS (see 'go doc runtime.MemStats.Sys')",
stats.UnitBytes)
var viewSysMem = &view.View{
Name: mRuntimeSysMem.Name(),
Description: mRuntimeSysMem.Description(),
Measure: mRuntimeSysMem,
Aggregation: view.LastValue(),
TagKeys: nil,
}
pm.processUptime, err = registry.AddFloat64DerivedCumulative(
"process/uptime",
metric.WithDescription("Uptime of the process"),
metric.WithUnit(stats.UnitSeconds))
if err != nil {
return err
}
if err = pm.processUptime.UpsertEntry(pm.updateProcessUptime); err != nil {
return err
}

var mCPUSeconds = stats.Float64(
"process/cpu_seconds",
"Total CPU user and system time in seconds",
stats.UnitSeconds)
var viewCPUSeconds = &view.View{
Name: mCPUSeconds.Name(),
Description: mCPUSeconds.Description(),
Measure: mCPUSeconds,
Aggregation: view.LastValue(),
TagKeys: nil,
}
pm.allocMem, err = registry.AddInt64DerivedGauge(
"process/runtime/heap_alloc_bytes",
metric.WithDescription("Bytes of allocated heap objects (see 'go doc runtime.MemStats.HeapAlloc')"),
metric.WithUnit(stats.UnitBytes))
if err != nil {
return err
}
if err = pm.allocMem.UpsertEntry(pm.updateAllocMem); err != nil {
return err
}

var mRSSMemory = stats.Int64(
"process/memory/rss",
"Total physical memory (resident set size)",
stats.UnitBytes)
var viewRSSMemory = &view.View{
Name: mRSSMemory.Name(),
Description: mRSSMemory.Description(),
Measure: mRSSMemory,
Aggregation: view.LastValue(),
TagKeys: nil,
}
pm.totalAllocMem, err = registry.AddInt64DerivedCumulative(
"process/runtime/total_alloc_bytes",
metric.WithDescription("Cumulative bytes allocated for heap objects (see 'go doc runtime.MemStats.TotalAlloc')"),
metric.WithUnit(stats.UnitBytes))
if err != nil {
return err
}
if err = pm.totalAllocMem.UpsertEntry(pm.updateTotalAllocMem); err != nil {
return err
}

// NewProcessMetricsViews creates a new set of ProcessMetrics (mem, cpu) that can be used to measure
// basic information about this process.
func NewProcessMetricsViews(ballastSizeBytes uint64) (*ProcessMetricsViews, error) {
pmv := &ProcessMetricsViews{
prevTimeUnixNano: time.Now().UnixNano(),
views: []*view.View{viewProcessUptime, viewAllocMem, viewTotalAllocMem, viewSysMem, viewCPUSeconds, viewRSSMemory},
ballastSizeBytes: ballastSizeBytes,
done: make(chan struct{}),
pm.sysMem, err = registry.AddInt64DerivedGauge(
"process/runtime/total_sys_memory_bytes",
metric.WithDescription("Total bytes of memory obtained from the OS (see 'go doc runtime.MemStats.Sys')"),
metric.WithUnit(stats.UnitBytes))
if err != nil {
return err
}
if err = pm.sysMem.UpsertEntry(pm.updateSysMem); err != nil {
return err
}

pid := os.Getpid()
pm.cpuSeconds, err = registry.AddFloat64DerivedCumulative(
"process/cpu_seconds",
metric.WithDescription("Total CPU user and system time in seconds"),
metric.WithUnit(stats.UnitSeconds))
if err != nil {
return err
}
if err = pm.cpuSeconds.UpsertEntry(pm.updateCPUSeconds); err != nil {
return err
}

var err error
pmv.proc, err = process.NewProcess(int32(pid))
pm.rssMemory, err = registry.AddInt64DerivedGauge(
"process/memory/rss",
metric.WithDescription("Total physical memory (resident set size)"),
metric.WithUnit(stats.UnitBytes))
if err != nil {
return nil, err
return err
}
if err = pm.rssMemory.UpsertEntry(pm.updateRSSMemory); err != nil {
return err
}

return nil
}

return pmv, nil
func (pm *processMetrics) updateProcessUptime() float64 {
now := time.Now().UnixNano()
return float64(now-pm.startTimeUnixNano) / 1e9
}

// StartCollection starts a ticker'd goroutine that will update the PMV measurements every 5 seconds
func (pmv *ProcessMetricsViews) StartCollection() {
go func() {
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
pmv.updateViews()
case <-pmv.done:
return
}
}
}()
func (pm *processMetrics) updateAllocMem() int64 {
pm.mu.Lock()
defer pm.mu.Unlock()
pm.readMemStatsIfNeeded()
return int64(pm.ms.Alloc)
}

// Views returns the views internal to the PMV.
func (pmv *ProcessMetricsViews) Views() []*view.View {
return pmv.views
func (pm *processMetrics) updateTotalAllocMem() int64 {
pm.mu.Lock()
defer pm.mu.Unlock()
pm.readMemStatsIfNeeded()
return int64(pm.ms.TotalAlloc)
}

// StopCollection stops the collection of the process metric information.
func (pmv *ProcessMetricsViews) StopCollection() {
close(pmv.done)
func (pm *processMetrics) updateSysMem() int64 {
pm.mu.Lock()
defer pm.mu.Unlock()
pm.readMemStatsIfNeeded()
return int64(pm.ms.Sys)
}

func (pmv *ProcessMetricsViews) updateViews() {
now := time.Now().UnixNano()
stats.Record(context.Background(), mUptime.M(float64(now-pmv.prevTimeUnixNano)/1e9))
pmv.prevTimeUnixNano = now

ms := &runtime.MemStats{}
pmv.readMemStats(ms)
stats.Record(context.Background(), mRuntimeAllocMem.M(int64(ms.Alloc)))
stats.Record(context.Background(), mRuntimeTotalAllocMem.M(int64(ms.TotalAlloc)))
stats.Record(context.Background(), mRuntimeSysMem.M(int64(ms.Sys)))

if pmv.proc != nil {
if times, err := pmv.proc.Times(); err == nil {
stats.Record(context.Background(), mCPUSeconds.M(times.Total()))
}
if mem, err := pmv.proc.MemoryInfo(); err == nil {
stats.Record(context.Background(), mRSSMemory.M(int64(mem.RSS)))
}
func (pm *processMetrics) updateCPUSeconds() float64 {
times, err := pm.proc.Times()
if err != nil {
return 0
}

return times.Total()
}

func (pm *processMetrics) updateRSSMemory() int64 {
mem, err := pm.proc.MemoryInfo()
if err != nil {
return 0
}
return int64(mem.RSS)
}

func (pmv *ProcessMetricsViews) readMemStats(ms *runtime.MemStats) {
runtime.ReadMemStats(ms)
if pmv.ballastSizeBytes > 0 {
ms.Alloc -= pmv.ballastSizeBytes
ms.HeapAlloc -= pmv.ballastSizeBytes
ms.HeapSys -= pmv.ballastSizeBytes
ms.HeapInuse -= pmv.ballastSizeBytes
func (pm *processMetrics) readMemStatsIfNeeded() {
now := time.Now()
// If last time we read was less than one second ago just reuse the values
if now.Sub(pm.lastMsRead) < time.Second {
return
}
pm.lastMsRead = now
runtime.ReadMemStats(pm.ms)
if pm.ballastSizeBytes > 0 {
pm.ms.Alloc -= pm.ballastSizeBytes
pm.ms.HeapAlloc -= pm.ballastSizeBytes
pm.ms.HeapSys -= pm.ballastSizeBytes
pm.ms.HeapInuse -= pm.ballastSizeBytes
}
}
Loading

0 comments on commit 992782d

Please sign in to comment.