Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HCP Telemetry] Move first TelemetryConfig Fetch into the TelemetryConfigProvider #18318

Merged
merged 20 commits into from
Aug 30, 2023
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions agent/hcp/client/telemetry_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import (

var (
// defaultMetricFilters is a regex that matches all metric names.
defaultMetricFilters = regexp.MustCompile(".+")
DefaultMetricFilters = regexp.MustCompile(".+")
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Export this to use as default in https://github.com/hashicorp/consul/pull/18318/files#diff-9fd2535b13a1b9701661bdd379418cb004a41b5b747ff9911e8cbbffea54b9afR60.

In case the first fetch is not available, this default is set to all metrics.


// Validation errors for AgentTelemetryConfigOK response.
errMissingPayload = errors.New("missing payload")
Expand Down Expand Up @@ -142,15 +142,15 @@ func convertMetricFilters(ctx context.Context, payloadFilters []string) *regexp.

if len(validFilters) == 0 {
logger.Error("no valid filters")
return defaultMetricFilters
return DefaultMetricFilters
}

// Combine the valid regex strings with OR.
finalRegex := strings.Join(validFilters, "|")
composedRegex, err := regexp.Compile(finalRegex)
if err != nil {
logger.Error("failed to compile final regex", "error", err)
return defaultMetricFilters
return DefaultMetricFilters
}

return composedRegex
Expand Down
6 changes: 3 additions & 3 deletions agent/hcp/client/telemetry_config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ func TestConvertAgentTelemetryResponse(t *testing.T) {
MetricsConfig: &MetricsConfig{
Endpoint: validTestURL,
Labels: map[string]string{"test": "test"},
Filters: defaultMetricFilters,
Filters: DefaultMetricFilters,
},
RefreshConfig: &RefreshConfig{
RefreshInterval: 2 * time.Second,
Expand Down Expand Up @@ -274,13 +274,13 @@ func TestConvertMetricFilters(t *testing.T) {
}{
"badFilterRegex": {
filters: []string{"(*LF)"},
expectedRegexString: defaultMetricFilters.String(),
expectedRegexString: DefaultMetricFilters.String(),
matches: []string{"consul.raft.peers", "consul.mem.heap_size"},
wantMatch: true,
},
"emptyRegex": {
filters: []string{},
expectedRegexString: defaultMetricFilters.String(),
expectedRegexString: DefaultMetricFilters.String(),
matches: []string{"consul.raft.peers", "consul.mem.heap_size"},
wantMatch: true,
},
Expand Down
39 changes: 12 additions & 27 deletions agent/hcp/deps.go
Copy link
Contributor Author

@Achooo Achooo Jul 28, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The entire first fetchTelemetryConfig call is removed simplifying this file quite a lot.
The TelemetryConfigProvider will now try to perform this first fetch.

Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,19 @@ package hcp
import (
"context"
"fmt"
"time"

"github.com/armon/go-metrics"
hcpclient "github.com/hashicorp/consul/agent/hcp/client"
"github.com/hashicorp/go-hclog"

"github.com/hashicorp/consul/agent/hcp/client"
"github.com/hashicorp/consul/agent/hcp/config"
"github.com/hashicorp/consul/agent/hcp/scada"
"github.com/hashicorp/consul/agent/hcp/telemetry"
"github.com/hashicorp/go-hclog"
)

// Deps contains the interfaces that the rest of Consul core depends on for HCP integration.
type Deps struct {
Client hcpclient.Client
Client client.Client
Provider scada.Provider
Sink metrics.MetricSink
}
Expand All @@ -27,7 +27,7 @@ func NewDeps(cfg config.CloudConfig, logger hclog.Logger) (Deps, error) {
ctx := context.Background()
ctx = hclog.WithContext(ctx, logger)

client, err := hcpclient.NewClient(cfg)
hcpClient, err := client.NewClient(cfg)
if err != nil {
return Deps{}, fmt.Errorf("failed to init client: %w", err)
}
Expand All @@ -37,50 +37,35 @@ func NewDeps(cfg config.CloudConfig, logger hclog.Logger) (Deps, error) {
return Deps{}, fmt.Errorf("failed to init scada: %w", err)
}

metricsClient, err := hcpclient.NewMetricsClient(ctx, &cfg)
metricsClient, err := client.NewMetricsClient(ctx, &cfg)
if err != nil {
logger.Error("failed to init metrics client", "error", err)
return Deps{}, fmt.Errorf("failed to init metrics client: %w", err)
}

sink, err := sink(ctx, client, metricsClient)
sink, err := sink(ctx, hcpClient, metricsClient)
if err != nil {
// Do not prevent server start if sink init fails, only log error.
logger.Error("failed to init sink", "error", err)
}

return Deps{
Client: client,
Client: hcpClient,
Provider: provider,
Sink: sink,
}, nil
}

// sink initializes an OTELSink which forwards Consul metrics to HCP.
// The sink is only initialized if the server is registered with the management plane (CCM).
// This step should not block server initialization, errors are returned, only to be logged.
func sink(
ctx context.Context,
hcpClient hcpclient.Client,
hcpClient client.Client,
metricsClient telemetry.MetricsClient,
) (metrics.MetricSink, error) {
logger := hclog.FromContext(ctx).Named("sink")
reqCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()

telemetryCfg, err := hcpClient.FetchTelemetryConfig(reqCtx)
if err != nil {
return nil, fmt.Errorf("failed to fetch telemetry config: %w", err)
}

if !telemetryCfg.MetricsEnabled() {
return nil, nil
}
logger := hclog.FromContext(ctx)

cfgProvider, err := NewHCPProvider(ctx, hcpClient, telemetryCfg)
if err != nil {
return nil, fmt.Errorf("failed to init config provider: %w", err)
}
cfgProvider := NewHCPProvider(ctx, hcpClient)

reader := telemetry.NewOTELReader(metricsClient, cfgProvider, telemetry.DefaultExportInterval)
sinkOpts := &telemetry.OTELSinkOpts{
Expand All @@ -90,7 +75,7 @@ func sink(

sink, err := telemetry.NewOTELSink(ctx, sinkOpts)
if err != nil {
return nil, fmt.Errorf("failed create OTELSink: %w", err)
return nil, fmt.Errorf("failed to create OTELSink: %w", err)
}

logger.Debug("initialized HCP metrics sink")
Expand Down
70 changes: 10 additions & 60 deletions agent/hcp/deps_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package hcp

import (
"context"
"fmt"
"net/url"
"regexp"
"testing"
Expand All @@ -21,69 +20,20 @@ type mockMetricsClient struct {

func TestSink(t *testing.T) {
t.Parallel()
for name, test := range map[string]struct {
expect func(*client.MockClient)
wantErr string
expectedSink bool
}{
"success": {
expect: func(mockClient *client.MockClient) {
u, _ := url.Parse("https://test.com/v1/metrics")
filters, _ := regexp.Compile("test")
mt := mockTelemetryConfig(1*time.Second, u, filters)
mockClient.EXPECT().FetchTelemetryConfig(mock.Anything).Return(mt, nil)
},
expectedSink: true,
},
"noSinkWhenFetchTelemetryConfigFails": {
expect: func(mockClient *client.MockClient) {
mockClient.EXPECT().FetchTelemetryConfig(mock.Anything).Return(nil, fmt.Errorf("fetch failed"))
},
wantErr: "failed to fetch telemetry config",
},
"noSinkWhenServerNotRegisteredWithCCM": {
expect: func(mockClient *client.MockClient) {
mt := mockTelemetryConfig(1*time.Second, nil, nil)
mockClient.EXPECT().FetchTelemetryConfig(mock.Anything).Return(mt, nil)
},
},
"noSinkWhenTelemetryConfigProviderInitFails": {
expect: func(mockClient *client.MockClient) {
u, _ := url.Parse("https://test.com/v1/metrics")
// Bad refresh interval forces ConfigProvider creation failure.
mt := mockTelemetryConfig(0*time.Second, u, nil)
mockClient.EXPECT().FetchTelemetryConfig(mock.Anything).Return(mt, nil)
},
wantErr: "failed to init config provider",
},
} {
test := test
t.Run(name, func(t *testing.T) {
t.Parallel()
c := client.NewMockClient(t)
mc := mockMetricsClient{}

test.expect(c)
ctx := context.Background()

s, err := sink(ctx, c, mc)
u, _ := url.Parse("https://test.com/v1/metrics")
filters, _ := regexp.Compile("test")

if test.wantErr != "" {
require.NotNil(t, err)
require.Contains(t, err.Error(), test.wantErr)
require.Nil(t, s)
return
}
c := client.NewMockClient(t)
mt := mockTelemetryConfig(1*time.Second, u, filters)
c.EXPECT().FetchTelemetryConfig(mock.Anything).Return(mt, nil)

if !test.expectedSink {
require.Nil(t, s)
require.Nil(t, err)
return
}
mc := mockMetricsClient{}
ctx := context.Background()
s, err := sink(ctx, c, mc)

require.NotNil(t, s)
})
}
require.NotNil(t, s)
require.NoError(t, err)
}

func mockTelemetryConfig(refreshInterval time.Duration, metricsEndpoint *url.URL, filters *regexp.Regexp) *client.TelemetryConfig {
Expand Down
5 changes: 5 additions & 0 deletions agent/hcp/telemetry/otel_exporter.go
Copy link
Contributor Author

@Achooo Achooo Aug 2, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

EndpointProvider now embeds the Disabled interface to turn on/off export of metrics.

Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ type MetricsClient interface {
// EndpointProvider exposes the GetEndpoint() interface method to fetch the endpoint.
// This abstraction layer offers flexibility, in particular for dynamic configuration or changes to the endpoint.
Achooo marked this conversation as resolved.
Show resolved Hide resolved
type EndpointProvider interface {
Enabler
GetEndpoint() *url.URL
}

Expand Down Expand Up @@ -65,6 +66,10 @@ func (e *OTELExporter) Aggregation(kind metric.InstrumentKind) aggregation.Aggre

// Export serializes and transmits metric data to a receiver.
func (e *OTELExporter) Export(ctx context.Context, metrics *metricdata.ResourceMetrics) error {
if !e.endpointProvider.Enabled() {
Copy link
Contributor Author

@Achooo Achooo Jul 28, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Early return when not exporter is not enabled.

return nil
}

endpoint := e.endpointProvider.GetEndpoint()
if endpoint == nil {
return nil
Expand Down
10 changes: 10 additions & 0 deletions agent/hcp/telemetry/otel_exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,11 @@ func (m *mockMetricsClient) ExportMetrics(ctx context.Context, protoMetrics *met

type mockEndpointProvider struct {
endpoint *url.URL
enabled bool
}

func (m *mockEndpointProvider) GetEndpoint() *url.URL { return m.endpoint }
func (m *mockEndpointProvider) Enabled() bool { return m.enabled }

func TestTemporality(t *testing.T) {
t.Parallel()
Expand Down Expand Up @@ -84,13 +86,16 @@ func TestExport(t *testing.T) {
"earlyReturnWithoutScopeMetrics": {
client: &mockMetricsClient{},
metrics: mutateMetrics(nil),
provider: &mockEndpointProvider{
enabled: true},
},
"earlyReturnWithoutMetrics": {
client: &mockMetricsClient{},
metrics: mutateMetrics([]metricdata.ScopeMetrics{
{Metrics: []metricdata.Metrics{}},
},
),
provider: &mockEndpointProvider{enabled: true},
},
"errorWithExportFailure": {
client: &mockMetricsClient{
Expand All @@ -107,6 +112,10 @@ func TestExport(t *testing.T) {
},
},
),
provider: &mockEndpointProvider{
enabled: true,
endpoint: &url.URL{},
},
wantErr: "failed to export metrics",
},
} {
Expand Down Expand Up @@ -181,6 +190,7 @@ func TestExport_CustomMetrics(t *testing.T) {

exp := NewOTELExporter(tc.client, &mockEndpointProvider{
endpoint: u,
enabled: true,
})

ctx := context.Background()
Expand Down
22 changes: 19 additions & 3 deletions agent/hcp/telemetry/otel_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,15 @@ import (
// DefaultExportInterval is a default time interval between export of aggregated metrics.
const DefaultExportInterval = 10 * time.Second

// Enabler must be implemented as it is required to process metrics
// Returning false will mean the sink is disabled, and will not accept metrics.
type Enabler interface {
Copy link
Contributor Author

@Achooo Achooo Jul 28, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@chapmanc Although I like this idea a lot, I think it might be neater to use a Disabler interface 😓 I know it's less intuitive in the telemetry_config_provider, but it's leading to a lot less intuition and complexity in the sink, particularly a lot of changes in the tests.

So moving to:

type Disabler interface {
  Disabled() bool
}

By default this will be false, so sink is enabled.

What do you think?

Enabled() bool
}

// ConfigProvider is required to provide custom metrics processing.
type ConfigProvider interface {
Copy link
Contributor Author

@Achooo Achooo Aug 2, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ConfigProvider now embeds the Disabled to turn on/off collection of metrics.

Enabler
// GetLabels should return a set of OTEL attributes added by default all metrics.
GetLabels() map[string]string
// GetFilters should return filtesr that are required to enable metric processing.
Expand Down Expand Up @@ -127,8 +134,11 @@ func (o *OTELSink) IncrCounter(key []string, val float32) {
// AddSampleWithLabels emits a Consul gauge metric that gets
// registed by an OpenTelemetry Histogram instrument.
func (o *OTELSink) SetGaugeWithLabels(key []string, val float32, labels []gometrics.Label) {
k := o.flattenKey(key)
if !o.cfgProvider.Enabled() {
return
}

k := o.flattenKey(key)
if !o.allowedMetric(k) {
return
}
Expand All @@ -155,8 +165,11 @@ func (o *OTELSink) SetGaugeWithLabels(key []string, val float32, labels []gometr

// AddSampleWithLabels emits a Consul sample metric that gets registed by an OpenTelemetry Histogram instrument.
func (o *OTELSink) AddSampleWithLabels(key []string, val float32, labels []gometrics.Label) {
k := o.flattenKey(key)
if !o.cfgProvider.Enabled() {
Copy link
Contributor Author

@Achooo Achooo Jul 28, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How the early return works when it is not enabled. So no metrics are collected.

return
}

k := o.flattenKey(key)
if !o.allowedMetric(k) {
return
}
Expand All @@ -181,8 +194,11 @@ func (o *OTELSink) AddSampleWithLabels(key []string, val float32, labels []gomet

// IncrCounterWithLabels emits a Consul counter metric that gets registed by an OpenTelemetry Histogram instrument.
func (o *OTELSink) IncrCounterWithLabels(key []string, val float32, labels []gometrics.Label) {
k := o.flattenKey(key)
if !o.cfgProvider.Enabled() {
return
}

k := o.flattenKey(key)
if !o.allowedMetric(k) {
return
}
Expand Down
Loading