Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HCP Observability] OTELExporter #17128

Merged
merged 20 commits into from
May 12, 2023
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Create new OTELExporter which uses the MetricsClient
Add transform because the conversion is in an /internal package
  • Loading branch information
Achooo committed May 8, 2023
commit 894cef471487797fe2e6948e0b2bd5cfec992702
53 changes: 53 additions & 0 deletions agent/hcp/telemetry/otel_exporter.go
Copy link
Contributor Author

@Achooo Achooo Apr 25, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package telemetry
Achooo marked this conversation as resolved.
Show resolved Hide resolved

import (
"context"

hcpclient "github.com/hashicorp/consul/agent/hcp/client"
"github.com/hashicorp/go-multierror"
"go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/metric/aggregation"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
)

// OTELExporter is a custom implementation of a OTEL Metrics SDK metrics.Exporter.
// The exporter is used by a OTEL Metrics SDK PeriodicReader to export aggregated metrics.
// This allows us to use a custom client - HCP authenticated MetricsClient.
type OTELExporter struct {
client hcpclient.MetricsClient
endpoint string
Achooo marked this conversation as resolved.
Show resolved Hide resolved
}

// Temporality returns the Cumulative temporality for metrics aggregation.
// Telemetry Gateway stores metrics in Prometheus format, so use Cummulative aggregation as default.
func (e *OTELExporter) Temporality(_ metric.InstrumentKind) metricdata.Temporality {
return metricdata.CumulativeTemporality
}

// Aggregation returns the Aggregation to use for an instrument kind.
func (e *OTELExporter) Aggregation(kind metric.InstrumentKind) aggregation.Aggregation {
Achooo marked this conversation as resolved.
Show resolved Hide resolved
switch kind {
case metric.InstrumentKindObservableGauge:
return aggregation.LastValue{}
case metric.InstrumentKindHistogram:
return aggregation.ExplicitBucketHistogram{
Boundaries: []float64{0, 5, 10, 25, 50, 75, 100, 250, 500, 750, 1000, 2500, 5000, 7500, 10000},
Achooo marked this conversation as resolved.
Show resolved Hide resolved
NoMinMax: false,
}
}
// for metric.InstrumentKindCounter and others, default to sum.
return aggregation.Sum{}
}

// Export serializes and transmits metric data to a receiver.
func (e *OTELExporter) Export(ctx context.Context, metrics *metricdata.ResourceMetrics) error {
otlpMetrics, merr := transformOTLP(metrics)
err := e.client.ExportMetrics(ctx, otlpMetrics, e.endpoint)
return multierror.Append(merr, err)
}

// ForceFlush does nothing, as the MetricsClient client holds no state.
Achooo marked this conversation as resolved.
Show resolved Hide resolved
func (e *OTELExporter) ForceFlush(ctx context.Context) error { return ctx.Err() }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we add a metric to track this situation.


// Shutdown does nothing, as the MetricsClient is a HTTP client that requires no graceful shutdown.
Achooo marked this conversation as resolved.
Show resolved Hide resolved
func (e *OTELExporter) Shutdown(ctx context.Context) error { return ctx.Err() }
133 changes: 133 additions & 0 deletions agent/hcp/telemetry/otel_exporter_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
package telemetry

import (
"context"
"fmt"
"testing"

"github.com/hashicorp/consul/agent/hcp/client"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/metric/aggregation"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
"go.opentelemetry.io/otel/sdk/resource"
metricpb "go.opentelemetry.io/proto/otlp/metrics/v1"
)

func TestTemporality(t *testing.T) {
exp := &OTELExporter{}
require.Equal(t, metricdata.CumulativeTemporality, exp.Temporality(metric.InstrumentKindCounter))
}

func TestAggregation(t *testing.T) {
Achooo marked this conversation as resolved.
Show resolved Hide resolved
for name, test := range map[string]struct {
kind metric.InstrumentKind
expAgg aggregation.Aggregation
}{
"gauge": {
kind: metric.InstrumentKindObservableGauge,
expAgg: aggregation.LastValue{},
},
"counter": {
kind: metric.InstrumentKindCounter,
expAgg: aggregation.Sum{},
},
"histogram": {
kind: metric.InstrumentKindHistogram,
expAgg: aggregation.ExplicitBucketHistogram{Boundaries: []float64{0, 5, 10, 25, 50, 75, 100, 250, 500, 750, 1000, 2500, 5000, 7500, 10000}, NoMinMax: false},
},
} {
t.Run(name, func(t *testing.T) {
exp := &OTELExporter{}
Achooo marked this conversation as resolved.
Show resolved Hide resolved
require.Equal(t, test.expAgg, exp.Aggregation(test.kind))
})
}
}

type mockErrMetricsClient struct{}

func (m *mockErrMetricsClient) ExportMetrics(ctx context.Context, protoMetrics *metricpb.ResourceMetrics, endpoint string) error {
return fmt.Errorf("failed to export metrics")
}

type mockMetricsClient struct{}

func (m *mockMetricsClient) ExportMetrics(ctx context.Context, protoMetrics *metricpb.ResourceMetrics, endpoint string) error {
return nil
}
Achooo marked this conversation as resolved.
Show resolved Hide resolved

func TestExport(t *testing.T) {
for name, test := range map[string]struct {
wantErr string
metrics *metricdata.ResourceMetrics
client client.MetricsClient
}{
"errorWithExportFailure": {
client: &mockErrMetricsClient{},
metrics: &metricdata.ResourceMetrics{
Resource: resource.Empty(),
},
wantErr: "failed to export metrics",
},
"errorWithTransformFailure": {
wantErr: "unknown aggregation: metricdata.Gauge[int64]",
client: &mockMetricsClient{},
metrics: &metricdata.ResourceMetrics{
Resource: resource.Empty(),
ScopeMetrics: []metricdata.ScopeMetrics{
{
Metrics: []metricdata.Metrics{
{
// unsupported, only float64 supported
Data: metricdata.Gauge[int64]{},
},
},
},
},
},
},
"multierrorTransformExportFailure": {
wantErr: "2 errors occurred:\n\t* unknown aggregation: metricdata.Gauge[int64]\n\t* failed to export metrics",
client: &mockErrMetricsClient{},
metrics: &metricdata.ResourceMetrics{
Resource: resource.Empty(),
ScopeMetrics: []metricdata.ScopeMetrics{
{
Metrics: []metricdata.Metrics{
{
// unsupported, only float64 supported
Data: metricdata.Gauge[int64]{},
},
},
},
},
},
},
} {
t.Run(name, func(t *testing.T) {
exp := &OTELExporter{
Achooo marked this conversation as resolved.
Show resolved Hide resolved
client: test.client,
}

err := exp.Export(context.Background(), test.metrics)
require.Error(t, err)
require.Contains(t, err.Error(), test.wantErr)
})
}
}

func TestForceFlush(t *testing.T) {
exp := &OTELExporter{}
ctx, cancel := context.WithCancel(context.Background())
cancel()

require.Error(t, exp.ForceFlush(ctx))
Achooo marked this conversation as resolved.
Show resolved Hide resolved
}

func TestShutdown(t *testing.T) {
exp := &OTELExporter{}
ctx, cancel := context.WithCancel(context.Background())
cancel()

require.Error(t, exp.Shutdown(ctx))
Achooo marked this conversation as resolved.
Show resolved Hide resolved
}
168 changes: 168 additions & 0 deletions agent/hcp/telemetry/otlp_transform.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
package telemetry
Copy link
Contributor Author

@Achooo Achooo Apr 25, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was hoping to re-use the implementation by the opentelemetry-go library for the conversion between the OTEL metrics sdk to the OTLP proto.

Turns out that package is an internal one. Here's how it's used.

I used the same implementation but I was able to reduce the code for our use-case to the current file - we only support float64 values and string type labels.


import (
"fmt"

"github.com/hashicorp/go-multierror"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
cpb "go.opentelemetry.io/proto/otlp/common/v1"
mpb "go.opentelemetry.io/proto/otlp/metrics/v1"
rpb "go.opentelemetry.io/proto/otlp/resource/v1"
)

// TransformOTLP returns an OTLP ResourceMetrics generated from OTEL metrics. If rm
// contains invalid ScopeMetrics, an error will be returned along with an OTLP
// ResourceMetrics that contains partial OTLP ScopeMetrics.
func transformOTLP(rm *metricdata.ResourceMetrics) (*mpb.ResourceMetrics, error) {
sms, err := scopeMetrics(rm.ScopeMetrics)
return &mpb.ResourceMetrics{
Resource: &rpb.Resource{
Attributes: attributes(rm.Resource.Iter()),
},
ScopeMetrics: sms,
}, err
}

// scopeMetrics returns a slice of OTLP ScopeMetrics.
func scopeMetrics(scopeMetrics []metricdata.ScopeMetrics) ([]*mpb.ScopeMetrics, error) {
var merr *multierror.Error
out := make([]*mpb.ScopeMetrics, 0, len(scopeMetrics))
for _, sm := range scopeMetrics {
ms, err := metrics(sm.Metrics)
Achooo marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
merr = multierror.Append(merr, err)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to continue here? Maybe a test would have caught this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah so we don't want to prevent appending (as we want the good metrics in) but we want to filter bad metrics (which is done in the inner function). I've cleaned up this code to remove the error handling here since it's pretty much useless. After discussing with Chappie and Connor, we'd rather get metrics on when transformation failed, and we'd do this within the inner functions when the error occurs. I'm thinking of doing this in a follow-up PR. The alternative is to log the error, but it would be easier to obtain metrics.

}

out = append(out, &mpb.ScopeMetrics{
Scope: &cpb.InstrumentationScope{
Name: sm.Scope.Name,
Version: sm.Scope.Version,
},
Metrics: ms,
})
}
return out, merr
}

// metrics returns a slice of OTLP Metric generated from OTEL metrics sdk ones.
func metrics(metrics []metricdata.Metrics) ([]*mpb.Metric, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do the []metricdata.XXX types not have a ToProto() or similar helper?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Achooo this is marked as resolved but I don't see what changed?

Copy link
Contributor Author

@Achooo Achooo May 11, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Woops my bad! This was also brought up by Chappie, and I think I meant to tag you there, see this thread.

TLDR: The library doesn't provide anything :( and I wish it did. As mentioned in this comment and the thread, the conversion package is also internal and there are no marshal/unmarshall or ToProto() functions provided to make this easier.

var merr *multierror.Error
out := make([]*mpb.Metric, 0, len(metrics))
for _, m := range metrics {
o, err := metricType(m)
if err != nil {
merr = multierror.Append(merr, err)
continue
}
out = append(out, o)
}
return out, merr
}

// metricType identifies the instrument type and converts it to OTLP format.
// only float64 values are accepted since the go metrics sink only receives float64 values.
func metricType(m metricdata.Metrics) (*mpb.Metric, error) {
var err error
out := &mpb.Metric{
Name: m.Name,
Description: m.Description,
Unit: string(m.Unit),
}
switch a := m.Data.(type) {
case metricdata.Gauge[float64]:
out.Data = &mpb.Metric_Gauge{
Gauge: &mpb.Gauge{
DataPoints: dataPoints(a.DataPoints),
},
}
case metricdata.Sum[float64]:
if a.Temporality != metricdata.CumulativeTemporality {
return out, fmt.Errorf("%s: %T", "unsupported temporality", a)
}
out.Data = &mpb.Metric_Sum{
Sum: &mpb.Sum{
AggregationTemporality: mpb.AggregationTemporality_AGGREGATION_TEMPORALITY_CUMULATIVE,
IsMonotonic: a.IsMonotonic,
DataPoints: dataPoints(a.DataPoints),
},
}
case metricdata.Histogram[float64]:
if a.Temporality != metricdata.CumulativeTemporality {
return out, fmt.Errorf("%s: %T", "unsupported temporality", a)
}
out.Data = &mpb.Metric_Histogram{
Histogram: &mpb.Histogram{
AggregationTemporality: mpb.AggregationTemporality_AGGREGATION_TEMPORALITY_CUMULATIVE,
DataPoints: histogramDataPoints(a.DataPoints),
},
}
default:
return out, fmt.Errorf("%s: %T", "unknown aggregation", a)
}
return out, err
}

// DataPoints returns a slice of OTLP NumberDataPoint generated from OTEL metrics sdk ones.
func dataPoints(dataPoints []metricdata.DataPoint[float64]) []*mpb.NumberDataPoint {
out := make([]*mpb.NumberDataPoint, 0, len(dataPoints))
for _, dp := range dataPoints {
ndp := &mpb.NumberDataPoint{
Attributes: attributes(dp.Attributes.Iter()),
StartTimeUnixNano: uint64(dp.StartTime.UnixNano()),
TimeUnixNano: uint64(dp.Time.UnixNano()),
}

ndp.Value = &mpb.NumberDataPoint_AsDouble{
AsDouble: dp.Value,
}
out = append(out, ndp)
}
return out
}

// HistogramDataPoints returns a slice of OTLP HistogramDataPoint from OTEL metrics sdk ones.
func histogramDataPoints(dataPoints []metricdata.HistogramDataPoint[float64]) []*mpb.HistogramDataPoint {
out := make([]*mpb.HistogramDataPoint, 0, len(dataPoints))
for _, dp := range dataPoints {
sum := dp.Sum
hdp := &mpb.HistogramDataPoint{
Attributes: attributes(dp.Attributes.Iter()),
StartTimeUnixNano: uint64(dp.StartTime.UnixNano()),
TimeUnixNano: uint64(dp.Time.UnixNano()),
Count: dp.Count,
Sum: &sum,
BucketCounts: dp.BucketCounts,
ExplicitBounds: dp.Bounds,
}
if v, ok := dp.Min.Value(); ok {
hdp.Min = &v
}
if v, ok := dp.Max.Value(); ok {
hdp.Max = &v
}
out = append(out, hdp)
}
return out
}

// attributes transforms items of an attribute iterator into OTLP key-values.
// Currently, labels are only <string, string> key-value pairs.
func attributes(iter attribute.Iterator) []*cpb.KeyValue {
l := iter.Len()
if iter.Len() == 0 {
return nil
}

out := make([]*cpb.KeyValue, 0, l)
for iter.Next() {
kv := iter.Attribute()
av := &cpb.AnyValue{
Value: &cpb.AnyValue_StringValue{
StringValue: kv.Value.AsString(),
},
}
out = append(out, &cpb.KeyValue{Key: string(kv.Key), Value: av})
}
return out
}
Loading