Skip to content

Commit

Permalink
Instrument proctelemetry.ProcessMetrics (#67)
Browse files Browse the repository at this point in the history
* Instrument proctelemetry.ProcessMetrics
  • Loading branch information
moh-osman3 committed Feb 1, 2023
1 parent 272ebe6 commit 5b668aa
Show file tree
Hide file tree
Showing 3 changed files with 259 additions and 17 deletions.
145 changes: 137 additions & 8 deletions service/internal/proctelemetry/process_telemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package proctelemetry // import "go.opentelemetry.io/collector/service/internal/proctelemetry"

import (
"context"
"os"
"runtime"
"sync"
Expand All @@ -23,6 +24,19 @@ import (
"github.com/shirou/gopsutil/v3/process"
"go.opencensus.io/metric"
"go.opencensus.io/stats"
otelmetric "go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/metric/instrument"
"go.opentelemetry.io/otel/metric/instrument/asyncfloat64"
"go.opentelemetry.io/otel/metric/instrument/asyncint64"
"go.opentelemetry.io/otel/metric/unit"

"go.opentelemetry.io/collector/featuregate"
"go.opentelemetry.io/collector/internal/obsreportconfig"
)

const (
scopeName = "go.opentelemetry.io/collector/service/process_telemetry"
processNameKey = "process_name"
)

// processMetrics is a struct that contains views related to process metrics (cpu, mem, etc)
Expand All @@ -38,6 +52,17 @@ type processMetrics struct {
cpuSeconds *metric.Float64DerivedCumulative
rssMemory *metric.Int64DerivedGauge

// otel metrics
otelProcessUptime asyncfloat64.Counter
otelAllocMem asyncint64.Gauge
otelTotalAllocMem asyncint64.Counter
otelSysMem asyncint64.Gauge
otelCpuSeconds asyncfloat64.Counter
otelRSSMemory asyncint64.Gauge

meter otelmetric.Meter
useOtelForMetrics bool

// mu protects everything bellow.
mu sync.Mutex
lastMsRead time.Time
Expand All @@ -46,19 +71,33 @@ type processMetrics struct {

// RegisterProcessMetrics creates a new set of processMetrics (mem, cpu) that can be used to measure
// basic information about this process.
func RegisterProcessMetrics(registry *metric.Registry, ballastSizeBytes uint64) error {
func RegisterProcessMetrics(ctx context.Context, ocRegistry *metric.Registry, mp otelmetric.MeterProvider, registry *featuregate.Registry, ballastSizeBytes uint64) error {
var err error
pm := &processMetrics{
startTimeUnixNano: time.Now().UnixNano(),
ballastSizeBytes: ballastSizeBytes,
ms: &runtime.MemStats{},
useOtelForMetrics: registry.IsEnabled(obsreportconfig.UseOtelForInternalMetricsfeatureGateID),
}

if pm.useOtelForMetrics {
pm.meter = mp.Meter(scopeName)
}
var err error
pm.proc, err = process.NewProcess(int32(os.Getpid()))
if err != nil {
return err
}

pm.processUptime, err = registry.AddFloat64DerivedCumulative(
if pm.useOtelForMetrics {
return pm.recordWithOtel(ctx)
}
return pm.recordWithOC(ocRegistry)
}

func (pm *processMetrics) recordWithOC(ocRegistry *metric.Registry) error {
var err error

pm.processUptime, err = ocRegistry.AddFloat64DerivedCumulative(
"process/uptime",
metric.WithDescription("Uptime of the process"),
metric.WithUnit(stats.UnitSeconds))
Expand All @@ -69,7 +108,7 @@ func RegisterProcessMetrics(registry *metric.Registry, ballastSizeBytes uint64)
return err
}

pm.allocMem, err = registry.AddInt64DerivedGauge(
pm.allocMem, err = ocRegistry.AddInt64DerivedGauge(
"process/runtime/heap_alloc_bytes",
metric.WithDescription("Bytes of allocated heap objects (see 'go doc runtime.MemStats.HeapAlloc')"),
metric.WithUnit(stats.UnitBytes))
Expand All @@ -80,7 +119,7 @@ func RegisterProcessMetrics(registry *metric.Registry, ballastSizeBytes uint64)
return err
}

pm.totalAllocMem, err = registry.AddInt64DerivedCumulative(
pm.totalAllocMem, err = ocRegistry.AddInt64DerivedCumulative(
"process/runtime/total_alloc_bytes",
metric.WithDescription("Cumulative bytes allocated for heap objects (see 'go doc runtime.MemStats.TotalAlloc')"),
metric.WithUnit(stats.UnitBytes))
Expand All @@ -91,7 +130,7 @@ func RegisterProcessMetrics(registry *metric.Registry, ballastSizeBytes uint64)
return err
}

pm.sysMem, err = registry.AddInt64DerivedGauge(
pm.sysMem, err = ocRegistry.AddInt64DerivedGauge(
"process/runtime/total_sys_memory_bytes",
metric.WithDescription("Total bytes of memory obtained from the OS (see 'go doc runtime.MemStats.Sys')"),
metric.WithUnit(stats.UnitBytes))
Expand All @@ -102,7 +141,7 @@ func RegisterProcessMetrics(registry *metric.Registry, ballastSizeBytes uint64)
return err
}

pm.cpuSeconds, err = registry.AddFloat64DerivedCumulative(
pm.cpuSeconds, err = ocRegistry.AddFloat64DerivedCumulative(
"process/cpu_seconds",
metric.WithDescription("Total CPU user and system time in seconds"),
metric.WithUnit(stats.UnitSeconds))
Expand All @@ -113,7 +152,7 @@ func RegisterProcessMetrics(registry *metric.Registry, ballastSizeBytes uint64)
return err
}

pm.rssMemory, err = registry.AddInt64DerivedGauge(
pm.rssMemory, err = ocRegistry.AddInt64DerivedGauge(
"process/memory/rss",
metric.WithDescription("Total physical memory (resident set size)"),
metric.WithUnit(stats.UnitBytes))
Expand All @@ -127,6 +166,96 @@ func RegisterProcessMetrics(registry *metric.Registry, ballastSizeBytes uint64)
return nil
}

func (pm *processMetrics) recordWithOtel(ctx context.Context) error {
var err error

pm.otelProcessUptime, err = pm.meter.AsyncFloat64().Counter(
"process_uptime",
instrument.WithDescription("Uptime of the process"),
instrument.WithUnit(unit.Unit("s")))
if err != nil {
return err
}
err = pm.meter.RegisterCallback([]instrument.Asynchronous{pm.otelProcessUptime}, func(ctx context.Context) {
pm.otelProcessUptime.Observe(ctx, pm.updateProcessUptime())
})
if err != nil {
return err
}

pm.otelAllocMem, err = pm.meter.AsyncInt64().Gauge(
"process_runtime_heap_alloc_bytes",
instrument.WithDescription("Bytes of allocated heap objects (see 'go doc runtime.MemStats.HeapAlloc')"),
instrument.WithUnit(unit.Bytes))
if err != nil {
return err
}
err = pm.meter.RegisterCallback([]instrument.Asynchronous{pm.otelAllocMem}, func(ctx context.Context) {
pm.otelAllocMem.Observe(ctx, pm.updateAllocMem())
})
if err != nil {
return err
}

pm.otelTotalAllocMem, err = pm.meter.AsyncInt64().Counter(
"process_runtime_total_alloc_bytes",
instrument.WithDescription("Cumulative bytes allocated for heap objects (see 'go doc runtime.MemStats.TotalAlloc')"),
instrument.WithUnit(unit.Bytes))
if err != nil {
return err
}
err = pm.meter.RegisterCallback([]instrument.Asynchronous{pm.otelTotalAllocMem}, func(ctx context.Context) {
pm.otelTotalAllocMem.Observe(ctx, pm.updateTotalAllocMem())
})
if err != nil {
return err
}

pm.otelSysMem, err = pm.meter.AsyncInt64().Gauge(
"process_runtime_total_sys_memory_bytes",
instrument.WithDescription("Total bytes of memory obtained from the OS (see 'go doc runtime.MemStats.Sys')"),
instrument.WithUnit(unit.Bytes))
if err != nil {
return err
}
err = pm.meter.RegisterCallback([]instrument.Asynchronous{pm.otelSysMem}, func(ctx context.Context) {
pm.otelSysMem.Observe(ctx, pm.updateSysMem())
})
if err != nil {
return err
}

pm.otelCpuSeconds, err = pm.meter.AsyncFloat64().Counter(
"process_cpu_seconds",
instrument.WithDescription("Total CPU user and system time in seconds"),
instrument.WithUnit(unit.Unit("s")))
if err != nil {
return err
}
err = pm.meter.RegisterCallback([]instrument.Asynchronous{pm.otelCpuSeconds}, func(ctx context.Context) {
pm.otelCpuSeconds.Observe(ctx, pm.updateCPUSeconds())
})
if err != nil {
return err
}

pm.otelRSSMemory, err = pm.meter.AsyncInt64().Gauge(
"process_memory_rss",
instrument.WithDescription("Total physical memory (resident set size)"),
instrument.WithUnit(unit.Bytes))
if err != nil {
return err
}
err = pm.meter.RegisterCallback([]instrument.Asynchronous{pm.otelRSSMemory}, func(ctx context.Context) {
pm.otelRSSMemory.Observe(ctx, pm.updateRSSMemory())
})
if err != nil {
return err
}

return nil
}

func (pm *processMetrics) updateProcessUptime() float64 {
now := time.Now().UnixNano()
return float64(now-pm.startTimeUnixNano) / 1e9
Expand Down
129 changes: 121 additions & 8 deletions service/internal/proctelemetry/process_telemetry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,41 @@
package proctelemetry

import (
"context"
"net/http"
"net/http/httptest"
"testing"
"time"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
io_prometheus_client "github.com/prometheus/client_model/go"
"github.com/prometheus/common/expfmt"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opencensus.io/metric"
"go.opencensus.io/metric/metricdata"
"go.opencensus.io/stats/view"
otelprom "go.opentelemetry.io/otel/exporters/prometheus"
otelmetric "go.opentelemetry.io/otel/metric"
sdkmetric "go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/resource"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/config/configtelemetry"
"go.opentelemetry.io/collector/featuregate"
"go.opentelemetry.io/collector/internal/obsreportconfig"
)

type testTelemetry struct {
component.TelemetrySettings
views []*view.View
promHandler http.Handler
meterProvider *sdkmetric.MeterProvider
expectedMetrics []string
}

var expectedMetrics = []string{
// Changing a metric name is a breaking change.
// Adding new metrics is ok as long it follows the conventions described at
Expand All @@ -36,14 +62,99 @@ var expectedMetrics = []string{
"process/memory/rss",
}

func TestProcessTelemetry(t *testing.T) {
registry := metric.NewRegistry()
require.NoError(t, RegisterProcessMetrics(registry, 0))
var otelExpectedMetrics = []string{
// OTel Go adds `_total` suffix
"process_uptime",
"process_runtime_heap_alloc_bytes",
"process_runtime_total_alloc_bytes",
"process_runtime_total_sys_memory_bytes",
"process_cpu_seconds",
"process_memory_rss",
}

func setupTelemetry(t *testing.T) testTelemetry {
settings := testTelemetry{
TelemetrySettings: componenttest.NewNopTelemetrySettings(),
expectedMetrics: otelExpectedMetrics,
}
settings.TelemetrySettings.MetricsLevel = configtelemetry.LevelNormal

settings.views = obsreportconfig.AllViews(configtelemetry.LevelNormal)
err := view.Register(settings.views...)
require.NoError(t, err)

promReg := prometheus.NewRegistry()
exporter, err := otelprom.New(otelprom.WithRegisterer(promReg), otelprom.WithoutUnits())
require.NoError(t, err)

settings.meterProvider = sdkmetric.NewMeterProvider(
sdkmetric.WithResource(resource.Empty()),
sdkmetric.WithReader(exporter),
)
settings.TelemetrySettings.MeterProvider = settings.meterProvider

settings.promHandler = promhttp.HandlerFor(promReg, promhttp.HandlerOpts{})

t.Cleanup(func() { assert.NoError(t, settings.meterProvider.Shutdown(context.Background())) })

return settings
}

func fetchPrometheusMetrics(handler http.Handler) (map[string]*io_prometheus_client.MetricFamily, error) {
req, err := http.NewRequest("GET", "/metrics", nil)
if err != nil {
return nil, err
}

rr := httptest.NewRecorder()
handler.ServeHTTP(rr, req)

var parser expfmt.TextParser
return parser.TextToMetricFamilies(rr.Body)
}

func TestOtelProcessTelemetry(t *testing.T) {
tel := setupTelemetry(t)
registry := featuregate.NewRegistry()
obsreportconfig.RegisterInternalMetricFeatureGate(registry)
require.NoError(t, registry.Apply(map[string]bool{obsreportconfig.UseOtelForInternalMetricsfeatureGateID: true}))

require.NoError(t, RegisterProcessMetrics(context.Background(), nil, tel.MeterProvider, registry, 0))

mp, err := fetchPrometheusMetrics(tel.promHandler)
require.NoError(t, err)

for _, metricName := range tel.expectedMetrics {
metric, ok := mp[metricName]
if !ok {
withSuffix := metricName + "_total"
metric, ok = mp[withSuffix]
}
require.True(t, ok)
require.True(t, len(metric.Metric) == 1)
// require.True(t, metric.GetType() == io_prometheus_client.MetricType_COUNTER)
var metricValue float64
if metric.GetType() == io_prometheus_client.MetricType_COUNTER {
metricValue = metric.Metric[0].GetCounter().GetValue()
} else {
metricValue = metric.Metric[0].GetGauge().GetValue()
}
assert.True(t, metricValue > 0)
}
}

func TestOCProcessTelemetry(t *testing.T) {
ocRegistry := metric.NewRegistry()
registry := featuregate.NewRegistry()
obsreportconfig.RegisterInternalMetricFeatureGate(registry)
require.NoError(t, registry.Apply(map[string]bool{obsreportconfig.UseOtelForInternalMetricsfeatureGateID: false}))

require.NoError(t, RegisterProcessMetrics(context.Background(), ocRegistry, otelmetric.NewNoopMeterProvider(), registry, 0))

// Check that the metrics are actually filled.
<-time.After(200 * time.Millisecond)

metrics := registry.Read()
metrics := ocRegistry.Read()

for _, metricName := range expectedMetrics {
m := findMetric(metrics, metricName)
Expand Down Expand Up @@ -71,13 +182,15 @@ func TestProcessTelemetry(t *testing.T) {
}

func TestProcessTelemetryFailToRegister(t *testing.T) {

registry := featuregate.NewRegistry()
obsreportconfig.RegisterInternalMetricFeatureGate(registry)
require.NoError(t, registry.Apply(map[string]bool{obsreportconfig.UseOtelForInternalMetricsfeatureGateID: false}))
for _, metricName := range expectedMetrics {
t.Run(metricName, func(t *testing.T) {
registry := metric.NewRegistry()
_, err := registry.AddFloat64Gauge(metricName)
ocRegistry := metric.NewRegistry()
_, err := ocRegistry.AddFloat64Gauge(metricName)
require.NoError(t, err)
assert.Error(t, RegisterProcessMetrics(registry, 0))
assert.Error(t, RegisterProcessMetrics(context.Background(), ocRegistry, otelmetric.NewNoopMeterProvider(), registry, 0))
})
}
}
Expand Down
2 changes: 1 addition & 1 deletion service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ func (srv *Service) initExtensionsAndPipeline(ctx context.Context, set Settings,

if cfg.Telemetry.Metrics.Level != configtelemetry.LevelNone && cfg.Telemetry.Metrics.Address != "" {
// The process telemetry initialization requires the ballast size, which is available after the extensions are initialized.
if err = proctelemetry.RegisterProcessMetrics(srv.telemetryInitializer.ocRegistry, getBallastSize(srv.host)); err != nil {
if err = proctelemetry.RegisterProcessMetrics(context.Background(), srv.telemetryInitializer.ocRegistry, srv.telemetryInitializer.mp, srv.telemetryInitializer.registry, getBallastSize(srv.host)); err != nil {
return fmt.Errorf("failed to register process metrics: %w", err)
}
}
Expand Down

0 comments on commit 5b668aa

Please sign in to comment.