Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reduce the frequency of HCP OTEL metric exports to minutely #18584

Merged
merged 10 commits into from
Aug 28, 2023
3 changes: 3 additions & 0 deletions .changelog/18584.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:improvement
Reduce the frequency of metric exports from Consul to HCP from every 10s to every 1m
```
2 changes: 1 addition & 1 deletion agent/hcp/deps.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func sink(
return nil, fmt.Errorf("failed to init config provider: %w", err)
}

reader := telemetry.NewOTELReader(metricsClient, cfgProvider, telemetry.DefaultExportInterval)
reader := telemetry.NewOTELReader(metricsClient, cfgProvider)
sinkOpts := &telemetry.OTELSinkOpts{
Reader: reader,
ConfigProvider: cfgProvider,
Expand Down
20 changes: 10 additions & 10 deletions agent/hcp/telemetry/otel_exporter.go
jjti marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -27,32 +27,32 @@ type EndpointProvider interface {
GetEndpoint() *url.URL
}

// OTELExporter is a custom implementation of a OTEL Metrics SDK metrics.Exporter.
// otelExporter is a custom implementation of a OTEL Metrics SDK metrics.Exporter.
// The exporter is used by a OTEL Metrics SDK PeriodicReader to export aggregated metrics.
// This allows us to use a custom client - HCP authenticated MetricsClient.
type OTELExporter struct {
type otelExporter struct {
jjti marked this conversation as resolved.
Show resolved Hide resolved
client MetricsClient
endpointProvider EndpointProvider
}

// NewOTELExporter returns a configured OTELExporter.
func NewOTELExporter(client MetricsClient, endpointProvider EndpointProvider) *OTELExporter {
return &OTELExporter{
// newOTELExporter returns a configured OTELExporter.
func newOTELExporter(client MetricsClient, endpointProvider EndpointProvider) *otelExporter {
return &otelExporter{
client: client,
endpointProvider: endpointProvider,
}
}

// Temporality returns the Cumulative temporality for metrics aggregation.
// Telemetry Gateway stores metrics in Prometheus format, so use Cummulative aggregation as default.
func (e *OTELExporter) Temporality(_ metric.InstrumentKind) metricdata.Temporality {
func (e *otelExporter) Temporality(_ metric.InstrumentKind) metricdata.Temporality {
return metricdata.CumulativeTemporality
}

// Aggregation returns the Aggregation to use for an instrument kind.
// The default implementation provided by the OTEL Metrics SDK library DefaultAggregationSelector panics.
// This custom version replicates that logic, but removes the panic.
func (e *OTELExporter) Aggregation(kind metric.InstrumentKind) aggregation.Aggregation {
func (e *otelExporter) Aggregation(kind metric.InstrumentKind) aggregation.Aggregation {
switch kind {
case metric.InstrumentKindObservableGauge:
return aggregation.LastValue{}
Expand All @@ -67,7 +67,7 @@ func (e *OTELExporter) Aggregation(kind metric.InstrumentKind) aggregation.Aggre
}

// Export serializes and transmits metric data to a receiver.
func (e *OTELExporter) Export(ctx context.Context, metrics *metricdata.ResourceMetrics) error {
func (e *otelExporter) Export(ctx context.Context, metrics *metricdata.ResourceMetrics) error {
endpoint := e.endpointProvider.GetEndpoint()
if endpoint == nil {
return nil
Expand All @@ -89,13 +89,13 @@ func (e *OTELExporter) Export(ctx context.Context, metrics *metricdata.ResourceM
}

// ForceFlush is a no-op, as the MetricsClient client holds no state.
func (e *OTELExporter) ForceFlush(ctx context.Context) error {
func (e *otelExporter) ForceFlush(ctx context.Context) error {
goMetrics.IncrCounter(internalMetricExporterForceFlush, 1)
return ctx.Err()
}

// Shutdown is a no-op, as the MetricsClient is a HTTP client that requires no graceful shutdown.
func (e *OTELExporter) Shutdown(ctx context.Context) error {
func (e *otelExporter) Shutdown(ctx context.Context) error {
goMetrics.IncrCounter(internalMetricExporterShutdown, 1)
return ctx.Err()
}
12 changes: 6 additions & 6 deletions agent/hcp/telemetry/otel_exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func (m *mockEndpointProvider) GetEndpoint() *url.URL { return m.endpoint }

func TestTemporality(t *testing.T) {
t.Parallel()
exp := &OTELExporter{}
exp := &otelExporter{}
require.Equal(t, metricdata.CumulativeTemporality, exp.Temporality(metric.InstrumentKindCounter))
}

Expand All @@ -66,7 +66,7 @@ func TestAggregation(t *testing.T) {
test := test
t.Run(name, func(t *testing.T) {
t.Parallel()
exp := &OTELExporter{}
exp := &otelExporter{}
require.Equal(t, test.expAgg, exp.Aggregation(test.kind))
})
}
Expand Down Expand Up @@ -125,7 +125,7 @@ func TestExport(t *testing.T) {
}
}

exp := NewOTELExporter(test.client, provider)
exp := newOTELExporter(test.client, provider)

err := exp.Export(context.Background(), test.metrics)
if test.wantErr != "" {
Expand Down Expand Up @@ -182,7 +182,7 @@ func TestExport_CustomMetrics(t *testing.T) {
u, err := url.Parse(testExportEndpoint)
require.NoError(t, err)

exp := NewOTELExporter(tc.client, &mockEndpointProvider{
exp := newOTELExporter(tc.client, &mockEndpointProvider{
endpoint: u,
})

Expand Down Expand Up @@ -212,7 +212,7 @@ func TestExport_CustomMetrics(t *testing.T) {

func TestForceFlush(t *testing.T) {
t.Parallel()
exp := &OTELExporter{}
exp := &otelExporter{}
ctx, cancel := context.WithCancel(context.Background())
cancel()

Expand All @@ -222,7 +222,7 @@ func TestForceFlush(t *testing.T) {

func TestShutdown(t *testing.T) {
t.Parallel()
exp := &OTELExporter{}
exp := &otelExporter{}
ctx, cancel := context.WithCancel(context.Background())
cancel()

Expand Down
26 changes: 21 additions & 5 deletions agent/hcp/telemetry/otel_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,26 @@ import (
"go.opentelemetry.io/otel/sdk/resource"
)

// DefaultExportInterval is a default time interval between export of aggregated metrics.
const DefaultExportInterval = 10 * time.Second
const (
// defaultExportInterval is a default time interval between export of aggregated metrics.
// At the time of writing this is the same as the otelsdk.Reader's export interval.
defaultExportInterval = 60 * time.Second

// defaultExportTimeout is the time the otelsdk.Reader waits on an export before cancelling it.
// At the time of writing this is the same as the otelsdk.Reader's export timeout default.
//
// note: in practice we are more likely to hit the http.Client Timeout in telemetry.MetricsClient.
// That http.Client Timeout is 15 seconds (at the time of writing). The otelsdk.Reader will use
// defaultExportTimeout for the entire Export call, but since the http.Client's Timeout is 15s,
// we should hit that first before reaching the 30 second timeout set here.
defaultExportTimeout = 30 * time.Second
)

// ConfigProvider is required to provide custom metrics processing.
type ConfigProvider interface {
// GetLabels should return a set of OTEL attributes added by default all metrics.
GetLabels() map[string]string

// GetFilters should return filtesr that are required to enable metric processing.
// Filters act as an allowlist to collect only the required metrics.
GetFilters() *regexp.Regexp
Expand Down Expand Up @@ -75,9 +88,12 @@ type OTELSink struct {
// NewOTELReader returns a configured OTEL PeriodicReader to export metrics every X seconds.
// It configures the reader with a custom OTELExporter with a MetricsClient to transform and export
// metrics in OTLP format to an external url.
func NewOTELReader(client MetricsClient, endpointProvider EndpointProvider, exportInterval time.Duration) otelsdk.Reader {
exporter := NewOTELExporter(client, endpointProvider)
return otelsdk.NewPeriodicReader(exporter, otelsdk.WithInterval(exportInterval))
func NewOTELReader(client MetricsClient, endpointProvider EndpointProvider) otelsdk.Reader {
jjti marked this conversation as resolved.
Show resolved Hide resolved
return otelsdk.NewPeriodicReader(
newOTELExporter(client, endpointProvider),
otelsdk.WithInterval(defaultExportInterval),
otelsdk.WithTimeout(defaultExportTimeout),
Achooo marked this conversation as resolved.
Show resolved Hide resolved
)
}

// NewOTELSink returns a sink which fits the Go Metrics MetricsSink interface.
Expand Down