Skip to content

Commit

Permalink
Use opencensus metric package for process metrics
Browse files Browse the repository at this point in the history
Signed-off-by: Bogdan Drutu <bogdandrutu@gmail.com>
  • Loading branch information
bogdandrutu committed Jun 6, 2022
1 parent 0347b25 commit 99c126b
Show file tree
Hide file tree
Showing 3 changed files with 158 additions and 160 deletions.
242 changes: 115 additions & 127 deletions service/internal/telemetry/process_telemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,166 +15,154 @@
package telemetry // import "go.opentelemetry.io/collector/service/internal/telemetry"

import (
"context"
"os"
"runtime"
"time"

"github.com/shirou/gopsutil/v3/process"
"go.opencensus.io/metric"
"go.opencensus.io/stats"
"go.opencensus.io/stats/view"
)

// ProcessMetricsViews is a struct that contains views related to process metrics (cpu, mem, etc)
type ProcessMetricsViews struct {
prevTimeUnixNano int64
ballastSizeBytes uint64
views []*view.View
done chan struct{}
proc *process.Process
}
// ProcessMetrics is a struct that contains views related to process metrics (cpu, mem, etc)
type ProcessMetrics struct {
startTimeUnixNano int64
ballastSizeBytes uint64

var mUptime = stats.Float64(
"process/uptime",
"Uptime of the process",
stats.UnitSeconds)
var viewProcessUptime = &view.View{
Name: mUptime.Name(),
Description: mUptime.Description(),
Measure: mUptime,
Aggregation: view.Sum(),
TagKeys: nil,
}
processUptime *metric.Float64DerivedCumulative
allocMem *metric.Int64DerivedGauge
totalAllocMem *metric.Int64DerivedCumulative
sysMem *metric.Int64DerivedGauge
cpuSeconds *metric.Float64DerivedCumulative
rssMemory *metric.Int64DerivedGauge

var mRuntimeAllocMem = stats.Int64(
"process/runtime/heap_alloc_bytes",
"Bytes of allocated heap objects (see 'go doc runtime.MemStats.HeapAlloc')",
stats.UnitBytes)
var viewAllocMem = &view.View{
Name: mRuntimeAllocMem.Name(),
Description: mRuntimeAllocMem.Description(),
Measure: mRuntimeAllocMem,
Aggregation: view.LastValue(),
TagKeys: nil,
proc *process.Process
}

var mRuntimeTotalAllocMem = stats.Int64(
"process/runtime/total_alloc_bytes",
"Cumulative bytes allocated for heap objects (see 'go doc runtime.MemStats.TotalAlloc')",
stats.UnitBytes)
var viewTotalAllocMem = &view.View{
Name: mRuntimeTotalAllocMem.Name(),
Description: mRuntimeTotalAllocMem.Description(),
Measure: mRuntimeTotalAllocMem,
Aggregation: view.LastValue(),
TagKeys: nil,
}
// RegisterProcessMetrics creates a new set of ProcessMetrics (mem, cpu) that can be used to measure
// basic information about this process.
func RegisterProcessMetrics(registry *metric.Registry, ballastSizeBytes uint64) error {
pmv := &ProcessMetrics{
startTimeUnixNano: time.Now().UnixNano(),
ballastSizeBytes: ballastSizeBytes,
}

var mRuntimeSysMem = stats.Int64(
"process/runtime/total_sys_memory_bytes",
"Total bytes of memory obtained from the OS (see 'go doc runtime.MemStats.Sys')",
stats.UnitBytes)
var viewSysMem = &view.View{
Name: mRuntimeSysMem.Name(),
Description: mRuntimeSysMem.Description(),
Measure: mRuntimeSysMem,
Aggregation: view.LastValue(),
TagKeys: nil,
}
var err error
pmv.processUptime, err = registry.AddFloat64DerivedCumulative(
"process/uptime",
metric.WithDescription("Uptime of the process"),
metric.WithUnit(stats.UnitSeconds))
if err != nil {
return err
}
if err = pmv.processUptime.UpsertEntry(pmv.updateProcessUptime); err != nil {
return err
}

var mCPUSeconds = stats.Float64(
"process/cpu_seconds",
"Total CPU user and system time in seconds",
stats.UnitSeconds)
var viewCPUSeconds = &view.View{
Name: mCPUSeconds.Name(),
Description: mCPUSeconds.Description(),
Measure: mCPUSeconds,
Aggregation: view.LastValue(),
TagKeys: nil,
}
pmv.allocMem, err = registry.AddInt64DerivedGauge(
"process/runtime/heap_alloc_bytes",
metric.WithDescription("Bytes of allocated heap objects (see 'go doc runtime.MemStats.HeapAlloc')"),
metric.WithUnit(stats.UnitBytes))
if err != nil {
return err
}
if err = pmv.allocMem.UpsertEntry(pmv.updateAllocMem); err != nil {
return err
}

var mRSSMemory = stats.Int64(
"process/memory/rss",
"Total physical memory (resident set size)",
stats.UnitBytes)
var viewRSSMemory = &view.View{
Name: mRSSMemory.Name(),
Description: mRSSMemory.Description(),
Measure: mRSSMemory,
Aggregation: view.LastValue(),
TagKeys: nil,
}
pmv.totalAllocMem, err = registry.AddInt64DerivedCumulative(
"process/runtime/total_alloc_bytes",
metric.WithDescription("Cumulative bytes allocated for heap objects (see 'go doc runtime.MemStats.TotalAlloc')"),
metric.WithUnit(stats.UnitBytes))
if err != nil {
return err
}
if err = pmv.totalAllocMem.UpsertEntry(pmv.updateTotalAllocMem); err != nil {
return err
}

// NewProcessMetricsViews creates a new set of ProcessMetrics (mem, cpu) that can be used to measure
// basic information about this process.
func NewProcessMetricsViews(ballastSizeBytes uint64) (*ProcessMetricsViews, error) {
pmv := &ProcessMetricsViews{
prevTimeUnixNano: time.Now().UnixNano(),
views: []*view.View{viewProcessUptime, viewAllocMem, viewTotalAllocMem, viewSysMem, viewCPUSeconds, viewRSSMemory},
ballastSizeBytes: ballastSizeBytes,
done: make(chan struct{}),
pmv.sysMem, err = registry.AddInt64DerivedGauge(
"process/runtime/total_sys_memory_bytes",
metric.WithDescription("Total bytes of memory obtained from the OS (see 'go doc runtime.MemStats.Sys')"),
metric.WithUnit(stats.UnitBytes))
if err != nil {
return err
}
if err = pmv.sysMem.UpsertEntry(pmv.updateSysMem); err != nil {
return err
}

pid := os.Getpid()
pmv.cpuSeconds, err = registry.AddFloat64DerivedCumulative(
"process/cpu_seconds",
metric.WithDescription("Total CPU user and system time in seconds"),
metric.WithUnit(stats.UnitSeconds))
if err != nil {
return err
}
if err = pmv.cpuSeconds.UpsertEntry(pmv.updateCPUSeconds); err != nil {
return err
}

var err error
pmv.proc, err = process.NewProcess(int32(pid))
pmv.rssMemory, err = registry.AddInt64DerivedGauge(
"process/memory/rss",
metric.WithDescription("Total physical memory (resident set size)"),
metric.WithUnit(stats.UnitBytes))
if err != nil {
return nil, err
return err
}
if err = pmv.rssMemory.UpsertEntry(pmv.updateRSSMemory); err != nil {
return err
}

return pmv, nil
pmv.proc, err = process.NewProcess(int32(os.Getpid()))
if err != nil {
return err
}

return nil
}

// StartCollection starts a ticker'd goroutine that will update the PMV measurements every 5 seconds
func (pmv *ProcessMetricsViews) StartCollection() {
go func() {
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
pmv.updateViews()
case <-pmv.done:
return
}
}
}()
func (pmv *ProcessMetrics) updateProcessUptime() float64 {
now := time.Now().UnixNano()
return float64(now-pmv.startTimeUnixNano) / 1e9
}

// Views returns the views internal to the PMV.
func (pmv *ProcessMetricsViews) Views() []*view.View {
return pmv.views
func (pmv *ProcessMetrics) updateAllocMem() int64 {
ms := runtime.MemStats{}
pmv.readMemStats(&ms)
return int64(ms.Alloc)
}

// StopCollection stops the collection of the process metric information.
func (pmv *ProcessMetricsViews) StopCollection() {
close(pmv.done)
func (pmv *ProcessMetrics) updateTotalAllocMem() int64 {
ms := runtime.MemStats{}
pmv.readMemStats(&ms)
return int64(ms.TotalAlloc)
}

func (pmv *ProcessMetricsViews) updateViews() {
now := time.Now().UnixNano()
stats.Record(context.Background(), mUptime.M(float64(now-pmv.prevTimeUnixNano)/1e9))
pmv.prevTimeUnixNano = now

ms := &runtime.MemStats{}
pmv.readMemStats(ms)
stats.Record(context.Background(), mRuntimeAllocMem.M(int64(ms.Alloc)))
stats.Record(context.Background(), mRuntimeTotalAllocMem.M(int64(ms.TotalAlloc)))
stats.Record(context.Background(), mRuntimeSysMem.M(int64(ms.Sys)))

if pmv.proc != nil {
if times, err := pmv.proc.Times(); err == nil {
stats.Record(context.Background(), mCPUSeconds.M(times.Total()))
}
if mem, err := pmv.proc.MemoryInfo(); err == nil {
stats.Record(context.Background(), mRSSMemory.M(int64(mem.RSS)))
}
func (pmv *ProcessMetrics) updateSysMem() int64 {
ms := runtime.MemStats{}
pmv.readMemStats(&ms)
return int64(ms.Sys)
}

func (pmv *ProcessMetrics) updateCPUSeconds() float64 {
times, err := pmv.proc.Times()
if err != nil {
return 0
}

return times.Total()
}

func (pmv *ProcessMetrics) updateRSSMemory() int64 {
mem, err := pmv.proc.MemoryInfo()
if err != nil {
return 0
}
return int64(mem.RSS)
}

func (pmv *ProcessMetricsViews) readMemStats(ms *runtime.MemStats) {
func (pmv *ProcessMetrics) readMemStats(ms *runtime.MemStats) {
runtime.ReadMemStats(ms)
if pmv.ballastSizeBytes > 0 {
ms.Alloc -= pmv.ballastSizeBytes
Expand Down
51 changes: 28 additions & 23 deletions service/internal/telemetry/process_telemetry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,16 @@ import (

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opencensus.io/stats/view"
"go.opencensus.io/metric"
"go.opencensus.io/metric/metricdata"
)

func TestProcessTelemetry(t *testing.T) {
const ballastSizeBytes uint64 = 0
pmv, err := NewProcessMetricsViews(ballastSizeBytes)
registry := metric.NewRegistry()
err := RegisterProcessMetrics(registry, 0)
require.NoError(t, err)
assert.NotNil(t, pmv)

expectedViews := []string{
expectedMetrics := []string{
// Changing a metric name is a breaking change.
// Adding new metrics is ok as long it follows the conventions described at
// https://pkg.go.dev/go.opentelemetry.io/collector/obsreport?tab=doc#hdr-Naming_Convention_for_New_Metrics
Expand All @@ -40,37 +40,42 @@ func TestProcessTelemetry(t *testing.T) {
"process/cpu_seconds",
"process/memory/rss",
}
processViews := pmv.Views()
assert.Len(t, processViews, len(expectedViews))

require.NoError(t, view.Register(processViews...))
defer view.Unregister(processViews...)

// Check that the views are actually filled.
pmv.updateViews()
<-time.After(200 * time.Millisecond)

for _, viewName := range expectedViews {
rows, err := view.RetrieveData(viewName)
require.NoError(t, err, viewName)
metrics := registry.Read()

require.Len(t, rows, 1, viewName)
row := rows[0]
assert.Len(t, row.Tags, 0)
for _, metricName := range expectedMetrics {
m := findMetric(metrics, metricName)
require.NotNil(t, m)
require.Len(t, m.TimeSeries, 1)
ts := m.TimeSeries[0]
assert.Len(t, ts.LabelValues, 0)
require.Len(t, ts.Points, 1)

var value float64
if viewName == "process/uptime" {
value = row.Data.(*view.SumData).Value
if metricName == "process/uptime" || metricName == "process/cpu_seconds" {
value = ts.Points[0].Value.(float64)
} else {
value = row.Data.(*view.LastValueData).Value
value = float64(ts.Points[0].Value.(int64))
}

if viewName == "process/uptime" || viewName == "process/cpu_seconds" {
if metricName == "process/uptime" || metricName == "process/cpu_seconds" {
// This likely will still be zero when running the test.
assert.True(t, value >= 0, viewName)
assert.True(t, value >= 0, metricName)
continue
}

assert.True(t, value > 0, viewName)
assert.True(t, value > 0, metricName)
}
}

func findMetric(metrics []*metricdata.Metric, name string) *metricdata.Metric {
for _, m := range metrics {
if m.Descriptor.Name == name {
return m
}
}
return nil
}
Loading

0 comments on commit 99c126b

Please sign in to comment.