Skip to content

Commit

Permalink
Switch to using extra sinks in the telemetry library
Browse files Browse the repository at this point in the history
  • Loading branch information
Achooo committed Apr 28, 2023
1 parent c741570 commit 61461a3
Show file tree
Hide file tree
Showing 5 changed files with 63 additions and 56 deletions.
34 changes: 23 additions & 11 deletions agent/hcp/deps.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"net/url"
"time"

"github.com/coredns/coredns/plugin/pkg/log"
hcpclient "github.com/hashicorp/consul/agent/hcp/client"
"github.com/hashicorp/consul/agent/hcp/config"
"github.com/hashicorp/consul/agent/hcp/scada"
Expand All @@ -20,7 +21,7 @@ import (
type Deps struct {
Client hcpclient.Client
Provider scada.Provider
SinkOpts *telemetry.OTELSinkOpts
Sink *telemetry.OTELSink
}

func NewDeps(cfg config.CloudConfig, logger hclog.Logger) (d Deps, err error) {
Expand All @@ -34,18 +35,21 @@ func NewDeps(cfg config.CloudConfig, logger hclog.Logger) (d Deps, err error) {
return
}

d.SinkOpts = sinkOpts(&cfg, d.Client, logger)
d.Sink = sink(d.Client, &cfg, logger)

return
}

// setupSink provides OTELSink configuration to initialize a Go Metrics sink,
// only if the server is registered with the management plane (CCM).
// sink provides initializes an OTELSink which forwards Consul metrics to HCP.
// The sink is only initialized if the server is registered with the management plane (CCM).
// This step should not block server initialization, so errors are logged, but not returned.
func sinkOpts(cfg hcpclient.CloudConfig, client hcpclient.Client, logger hclog.Logger) *telemetry.OTELSinkOpts {
func sink(hcpClient hcpclient.Client, cfg hcpclient.CloudConfig, logger hclog.Logger) *telemetry.OTELSink {
ctx := context.Background()
url, err := verifyCCMRegistration(ctx, client)
if err != nil {
url, err := verifyCCMRegistration(ctx, hcpClient)

// if endpoint is empty, no metrics endpoint configuration for this Consul server
// (e.g. not registered with CCM or feature flag to control rollout) so do not enable the HCP metrics sink.
if url == "" {
return nil
}

Expand All @@ -66,12 +70,18 @@ func sinkOpts(cfg hcpclient.CloudConfig, client hcpclient.Client, logger hclog.L
Reader: telemetry.NewOTELReader(metricsClient, url, 10*time.Second),
}

return sinkOpts
sink, err := telemetry.NewOTELSink(sinkOpts)
if err != nil {
logger.Error("failed to init OTEL sink: %w", err)
return nil
}

return sink
}

// verifyCCMRegistration checks that a server is registered with the HCP management plane
// by making a HTTP request to the HCP TelemetryConfig endpoint.
// If registered, it returns the full URL for the HCP Telemetry Gateway endpoint where metrics should be forwarded.
// If registered, it returns the endpoint for the HCP Telemetry Gateway endpoint where metrics should be forwarded.
func verifyCCMRegistration(ctx context.Context, client hcpclient.Client) (string, error) {
reqCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
Expand All @@ -86,13 +96,15 @@ func verifyCCMRegistration(ctx context.Context, client hcpclient.Client) (string
endpoint = override
}

// no error, the server simply isn't configured for metrics forwarding.
if endpoint == "" {
return "", fmt.Errorf("server not registed with management plane")
return "", nil
}

// The endpoint from the HCP gateway is a domain without scheme, so it must be added.
// The endpoint from the HCP gateway is a domain without scheme, and without the metrics path, so they must be added.
url, err := url.Parse(fmt.Sprintf("https://%s/v1/metrics", endpoint))
if err != nil {
log.Error("failed to parse url: %w", err)
return "", fmt.Errorf("failed to parse url: %w", err)
}

Expand Down
44 changes: 25 additions & 19 deletions agent/hcp/deps_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,24 +11,25 @@ import (
"github.com/stretchr/testify/require"
)

func TestSinkOpts(t *testing.T) {
func TestSink(t *testing.T) {
for name, test := range map[string]struct {
expect func(*client.MockClient)
mockCloudCfg client.CloudConfig
wantErr bool
expectedSink bool
}{
"success": {
expect: func(mockClient *client.MockClient) {
mockClient.EXPECT().FetchTelemetryConfig(mock.Anything).Return(&client.TelemetryConfig{
Endpoint: "test.com",
MetricsOverride: &client.MetricsConfig{
Endpoint: "",
Endpoint: "test.com",
},
}, nil)
},
mockCloudCfg: client.MockCloudCfg{},
expectedSink: true,
},
"emptyOptsWhenServerNotRegisteredWithCCM": {
"noSinkWhenServerNotRegisteredWithCCM": {
expect: func(mockClient *client.MockClient) {
mockClient.EXPECT().FetchTelemetryConfig(mock.Anything).Return(&client.TelemetryConfig{
Endpoint: "",
Expand All @@ -38,9 +39,14 @@ func TestSinkOpts(t *testing.T) {
}, nil)
},
mockCloudCfg: client.MockCloudCfg{},
wantErr: true,
},
"emptyOptsWhenMetricsClientInitFails": {
"noSinkWhenCCMVerificationFails": {
expect: func(mockClient *client.MockClient) {
mockClient.EXPECT().FetchTelemetryConfig(mock.Anything).Return(nil, fmt.Errorf("fetch failed"))
},
mockCloudCfg: client.MockCloudCfg{},
},
"noSinkWhenMetricsClientInitFails": {
expect: func(mockClient *client.MockClient) {
mockClient.EXPECT().FetchTelemetryConfig(mock.Anything).Return(&client.TelemetryConfig{
Endpoint: "test.com",
Expand All @@ -50,22 +56,22 @@ func TestSinkOpts(t *testing.T) {
}, nil)
},
mockCloudCfg: client.MockErrCloudCfg{},
wantErr: true,
},
} {
t.Run(name, func(t *testing.T) {
c := client.NewMockClient(t)
l := hclog.NewNullLogger()
test.expect(c)
sinkOpts := sinkOpts(test.mockCloudCfg, c, l)
if test.wantErr {
sinkOpts := sink(c, test.mockCloudCfg, l)
if !test.expectedSink {
require.Nil(t, sinkOpts)
return
}
require.NotNil(t, sinkOpts)
})
}
}

func TestVerifyCCMRegistration(t *testing.T) {
for name, test := range map[string]struct {
expect func(*client.MockClient)
Expand All @@ -78,29 +84,29 @@ func TestVerifyCCMRegistration(t *testing.T) {
},
wantErr: "failed to fetch telemetry config",
},
"failsWithEmptyEndpoint": {
"failsWithURLParseErr": {
expect: func(mockClient *client.MockClient) {
mockClient.EXPECT().FetchTelemetryConfig(mock.Anything).Return(&client.TelemetryConfig{
Endpoint: "",
// Minimum 2 chars for a domain to be valid.
Endpoint: "s",
MetricsOverride: &client.MetricsConfig{
Endpoint: "",
// Invalid domain chars
Endpoint: " ",
},
}, nil)
},
wantErr: "server not registed with management plane",
wantErr: "failed to parse url:",
},
"failsWithURLParseErr": {
"noErrWithEmptyEndpoint": {
expect: func(mockClient *client.MockClient) {
mockClient.EXPECT().FetchTelemetryConfig(mock.Anything).Return(&client.TelemetryConfig{
// Minimum 2 chars for a domain to be valid.
Endpoint: "s",
Endpoint: "",
MetricsOverride: &client.MetricsConfig{
// Invalid domain chars
Endpoint: " ",
Endpoint: "",
},
}, nil)
},
wantErr: "failed to parse url:",
expectedURL: "",
},
"success": {
expect: func(mockClient *client.MockClient) {
Expand Down
4 changes: 3 additions & 1 deletion agent/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,9 @@ func NewBaseDeps(configLoader ConfigLoader, logOut io.Writer, providedLogger hcl
if err != nil {
return d, err
}
cfg.Telemetry.HCPSinkOpts = d.HCP.SinkOpts
if d.HCP.Sink != nil {
cfg.Telemetry.ExtraSinks = append(cfg.Telemetry.ExtraSinks, d.HCP.Sink)
}
}

d.MetricsConfig, err = lib.InitTelemetry(cfg.Telemetry, d.Logger)
Expand Down
20 changes: 7 additions & 13 deletions lib/telemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"github.com/hashicorp/go-multierror"
prometheuscore "github.com/prometheus/client_golang/prometheus"

"github.com/hashicorp/consul/agent/hcp/telemetry"
"github.com/hashicorp/consul/lib/retry"
)

Expand Down Expand Up @@ -212,10 +211,8 @@ type TelemetryConfig struct {
// hcl: telemetry { prometheus_retention_time = "duration" }
PrometheusOpts prometheus.PrometheusOpts

// HCPSinkOpts provides configuration for an OpenTelemetry HCP Metrics sink.
// The aggregated OpenTelemetry metrics are periodically exported to HCP.
// The HCPSinkOpts are created when the HCP Deps are initialized.
HCPSinkOpts *telemetry.OTELSinkOpts
// ExtraSinks are additional metrics.MetricSink implementations that are to always be added.
ExtraSinks []metrics.MetricSink
}

// MetricsHandler provides an http.Handler for displaying metrics.
Expand All @@ -239,13 +236,6 @@ func (cfg *MetricsConfig) Cancel() {
}
}

func hcpSink(cfg TelemetryConfig, _ string) (metrics.MetricSink, error) {
if cfg.HCPSinkOpts == nil || cfg.HCPSinkOpts.Reader == nil {
return nil, nil
}
return telemetry.NewOTELSink(cfg.HCPSinkOpts)
}

func statsiteSink(cfg TelemetryConfig, hostname string) (metrics.MetricSink, error) {
addr := cfg.StatsiteAddr
if addr == "" {
Expand Down Expand Up @@ -362,7 +352,11 @@ func configureSinks(cfg TelemetryConfig, memSink metrics.MetricSink) (metrics.Fa
addSink(dogstatdSink)
addSink(circonusSink)
addSink(prometheusSink)
addSink(hcpSink)
for _, sink := range cfg.ExtraSinks {
if sink != nil {
sinks = append(sinks, sink)
}
}

if len(sinks) > 0 {
sinks = append(sinks, memSink)
Expand Down
17 changes: 5 additions & 12 deletions lib/telemetry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,41 +4,34 @@
package lib

import (
"context"
"errors"
"io"
"net"
"os"
"testing"

hcptelemetry "github.com/hashicorp/consul/agent/hcp/telemetry"
"github.com/armon/go-metrics"
"github.com/hashicorp/consul/logging"
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-multierror"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/otel/sdk/metric"
)

func newCfg() TelemetryConfig {
opts := &hcptelemetry.OTELSinkOpts{
Logger: hclog.New(&hclog.LoggerOptions{Output: io.Discard}),
Reader: metric.NewManualReader(),
Ctx: context.Background(),
}

return TelemetryConfig{
StatsdAddr: "statsd.host:1234",
StatsiteAddr: "statsite.host:1234",
DogstatsdAddr: "mydog.host:8125",
HCPSinkOpts: opts,
ExtraSinks: []metrics.MetricSink{
&metrics.BlackholeSink{},
},
}
}

func TestConfigureSinks(t *testing.T) {
cfg := newCfg()
sinks, err := configureSinks(cfg, nil)
require.Error(t, err)
// 4 sinks: statsd, statsite, inmem, hcp
// 4 sinks: statsd, statsite, inmem, extra sink (blackhole)
require.Equal(t, 4, len(sinks))

cfg = TelemetryConfig{
Expand Down

0 comments on commit 61461a3

Please sign in to comment.