Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Backport of [HCP Telemetry] Periodic Refresh for Dynamic Telemetry Configuration into release/1.15.x #18360

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
[HCP Telemetry] Periodic Refresh for Dynamic Telemetry Configuration (#…
…18168)

* OTElExporter now uses an EndpointProvider to discover the endpoint

* OTELSink uses a ConfigProvider to obtain filters and labels configuration

* improve tests for otel_sink

* Regex logic is moved into client for a method on the TelemetryConfig object

* Create a telemetry_config_provider and update deps to use it

* Fix conversion

* fix import newline

* Add logger to hcp client and move telemetry_config out of the client.go file

* Add a telemetry_config.go to refactor client.go

* Update deps

* update hcp deps test

* Modify telemetry_config_providers

* Check for nil filters

* PR review updates

* Fix comments and move around pieces

* Fix comments

* Remove context from client struct

* Moved ctx out of sink struct and fixed filters, added a test

* Remove named imports, use errors.New if not fformatting

* Remove HCP dependencies in telemetry package

* Add success metric and move lock only to grab the t.cfgHahs

* Update hash

* fix nits

* Create an equals method and add tests

* Improve telemetry_config_provider.go tests

* Add race test

* Add missing godoc

* Remove mock for MetricsClient

* Avoid goroutine test panics

* trying to kick CI lint issues by upgrading mod

* imprve test code and add hasher for testing

* Use structure logging for filters, fix error constants, and default to allow all regex

* removed hashin and modify logic to simplify

* Improve race test and fix PR feedback by removing hash equals and avoid testing the timer.Ticker logic, and instead unit test

* Ran make go-mod-tidy

* Use errtypes in the test

* Add changelog

* add safety check for exporter endpoint

* remove require.Contains by using error types, fix structure logging, and fix success metric typo in exporter

* Fixed race test to have changing config values

* Send success metric before modifying config

* Avoid the defer and move the success metric under
  • Loading branch information
Achooo committed Aug 2, 2023
commit 62af9058480a0242ebdedf1f4ae045e118721bf6
3 changes: 3 additions & 0 deletions .changelog/18168.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:improvement
hcp: Add dynamic configuration support for the export of server metrics to HCP.
```
80 changes: 6 additions & 74 deletions agent/hcp/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,21 +35,6 @@ type Client interface {
DiscoverServers(ctx context.Context) ([]string, error)
}

// MetricsConfig holds metrics specific configuration for the TelemetryConfig.
// The endpoint field overrides the TelemetryConfig endpoint.
type MetricsConfig struct {
Filters []string
Endpoint string
}

// TelemetryConfig contains configuration for telemetry data forwarded by Consul servers
// to the HCP Telemetry gateway.
type TelemetryConfig struct {
Endpoint string
Labels map[string]string
MetricsConfig *MetricsConfig
}

type BootstrapConfig struct {
Name string
BootstrapExpect int
Expand Down Expand Up @@ -112,10 +97,14 @@ func (c *hcpClient) FetchTelemetryConfig(ctx context.Context) (*TelemetryConfig,

resp, err := c.tgw.AgentTelemetryConfig(params, nil)
if err != nil {
return nil, err
return nil, fmt.Errorf("failed to fetch from HCP: %w", err)
}

if err := validateAgentTelemetryConfigPayload(resp); err != nil {
return nil, fmt.Errorf("invalid response payload: %w", err)
}

return convertTelemetryConfig(resp)
return convertAgentTelemetryResponse(ctx, resp, c.cfg)
}

func (c *hcpClient) FetchBootstrap(ctx context.Context) (*BootstrapConfig, error) {
Expand Down Expand Up @@ -272,60 +261,3 @@ func (c *hcpClient) DiscoverServers(ctx context.Context) ([]string, error) {

return servers, nil
}

// convertTelemetryConfig validates the AgentTelemetryConfig payload and converts it into a TelemetryConfig object.
func convertTelemetryConfig(resp *hcptelemetry.AgentTelemetryConfigOK) (*TelemetryConfig, error) {
if resp.Payload == nil {
return nil, fmt.Errorf("missing payload")
}

if resp.Payload.TelemetryConfig == nil {
return nil, fmt.Errorf("missing telemetry config")
}

payloadConfig := resp.Payload.TelemetryConfig
var metricsConfig MetricsConfig
if payloadConfig.Metrics != nil {
metricsConfig.Endpoint = payloadConfig.Metrics.Endpoint
metricsConfig.Filters = payloadConfig.Metrics.IncludeList
}
return &TelemetryConfig{
Endpoint: payloadConfig.Endpoint,
Labels: payloadConfig.Labels,
MetricsConfig: &metricsConfig,
}, nil
}

// Enabled verifies if telemetry is enabled by ensuring a valid endpoint has been retrieved.
// It returns full metrics endpoint and true if a valid endpoint was obtained.
func (t *TelemetryConfig) Enabled() (string, bool) {
endpoint := t.Endpoint
if override := t.MetricsConfig.Endpoint; override != "" {
endpoint = override
}

if endpoint == "" {
return "", false
}

// The endpoint from Telemetry Gateway is a domain without scheme, and without the metrics path, so they must be added.
return endpoint + metricsGatewayPath, true
}

// DefaultLabels returns a set of <key, value> string pairs that must be added as attributes to all exported telemetry data.
func (t *TelemetryConfig) DefaultLabels(cfg config.CloudConfig) map[string]string {
labels := make(map[string]string)
nodeID := string(cfg.NodeID)
if nodeID != "" {
labels["node_id"] = nodeID
}
if cfg.NodeName != "" {
labels["node_name"] = cfg.NodeName
}

for k, v := range t.Labels {
labels[k] = v
}

return labels
}
240 changes: 81 additions & 159 deletions agent/hcp/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,200 +2,122 @@ package client

import (
"context"
"fmt"
"net/url"
"regexp"
"testing"
"time"

"github.com/hashicorp/consul/agent/hcp/config"
"github.com/hashicorp/consul/types"
"github.com/hashicorp/hcp-sdk-go/clients/cloud-consul-telemetry-gateway/preview/2023-04-14/client/consul_telemetry_service"
"github.com/go-openapi/runtime"
hcptelemetry "github.com/hashicorp/hcp-sdk-go/clients/cloud-consul-telemetry-gateway/preview/2023-04-14/client/consul_telemetry_service"
"github.com/hashicorp/hcp-sdk-go/clients/cloud-consul-telemetry-gateway/preview/2023-04-14/models"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
)

func TestFetchTelemetryConfig(t *testing.T) {
t.Parallel()
for name, test := range map[string]struct {
metricsEndpoint string
expect func(*MockClient)
disabled bool
}{
"success": {
expect: func(mockClient *MockClient) {
mockClient.EXPECT().FetchTelemetryConfig(mock.Anything).Return(&TelemetryConfig{
Endpoint: "https://test.com",
MetricsConfig: &MetricsConfig{
Endpoint: "",
},
}, nil)
},
metricsEndpoint: "https://test.com/v1/metrics",
},
"overrideMetricsEndpoint": {
expect: func(mockClient *MockClient) {
mockClient.EXPECT().FetchTelemetryConfig(mock.Anything).Return(&TelemetryConfig{
Endpoint: "https://test.com",
MetricsConfig: &MetricsConfig{
Endpoint: "https://test.com",
},
}, nil)
},
metricsEndpoint: "https://test.com/v1/metrics",
},
"disabledWithEmptyEndpoint": {
expect: func(mockClient *MockClient) {
mockClient.EXPECT().FetchTelemetryConfig(mock.Anything).Return(&TelemetryConfig{
Endpoint: "",
MetricsConfig: &MetricsConfig{
Endpoint: "",
},
}, nil)
},
disabled: true,
},
} {
test := test
t.Run(name, func(t *testing.T) {
t.Parallel()

mock := NewMockClient(t)
test.expect(mock)

telemetryCfg, err := mock.FetchTelemetryConfig(context.Background())
require.NoError(t, err)

if test.disabled {
endpoint, ok := telemetryCfg.Enabled()
require.False(t, ok)
require.Empty(t, endpoint)
return
}
type mockTGW struct {
mockResponse *hcptelemetry.AgentTelemetryConfigOK
mockError error
}

endpoint, ok := telemetryCfg.Enabled()
func (m *mockTGW) AgentTelemetryConfig(params *hcptelemetry.AgentTelemetryConfigParams, authInfo runtime.ClientAuthInfoWriter, opts ...hcptelemetry.ClientOption) (*hcptelemetry.AgentTelemetryConfigOK, error) {
return m.mockResponse, m.mockError
}
func (m *mockTGW) GetLabelValues(params *hcptelemetry.GetLabelValuesParams, authInfo runtime.ClientAuthInfoWriter, opts ...hcptelemetry.ClientOption) (*hcptelemetry.GetLabelValuesOK, error) {
return hcptelemetry.NewGetLabelValuesOK(), nil
}
func (m *mockTGW) QueryRangeBatch(params *hcptelemetry.QueryRangeBatchParams, authInfo runtime.ClientAuthInfoWriter, opts ...hcptelemetry.ClientOption) (*hcptelemetry.QueryRangeBatchOK, error) {
return hcptelemetry.NewQueryRangeBatchOK(), nil
}
func (m *mockTGW) SetTransport(transport runtime.ClientTransport) {}

require.True(t, ok)
require.Equal(t, test.metricsEndpoint, endpoint)
})
}
type expectedTelemetryCfg struct {
endpoint string
labels map[string]string
filters string
refreshInterval time.Duration
}

func TestConvertTelemetryConfig(t *testing.T) {
func TestFetchTelemetryConfig(t *testing.T) {
t.Parallel()
for name, test := range map[string]struct {
resp *consul_telemetry_service.AgentTelemetryConfigOK
expectedTelemetryCfg *TelemetryConfig
wantErr string
for name, tc := range map[string]struct {
mockResponse *hcptelemetry.AgentTelemetryConfigOK
mockError error
wantErr string
expected *expectedTelemetryCfg
}{
"success": {
resp: &consul_telemetry_service.AgentTelemetryConfigOK{
Payload: &models.HashicorpCloudConsulTelemetry20230414AgentTelemetryConfigResponse{
TelemetryConfig: &models.HashicorpCloudConsulTelemetry20230414TelemetryConfig{
Endpoint: "https://test.com",
Labels: map[string]string{"test": "test"},
},
},
},
expectedTelemetryCfg: &TelemetryConfig{
Endpoint: "https://test.com",
Labels: map[string]string{"test": "test"},
MetricsConfig: &MetricsConfig{},
"errorsWithFetchFailure": {
mockError: fmt.Errorf("failed to fetch from HCP"),
mockResponse: nil,
wantErr: "failed to fetch from HCP",
},
"errorsWithInvalidPayload": {
mockResponse: &hcptelemetry.AgentTelemetryConfigOK{
Payload: &models.HashicorpCloudConsulTelemetry20230414AgentTelemetryConfigResponse{},
},
mockError: nil,
wantErr: "invalid response payload",
},
"successWithMetricsConfig": {
resp: &consul_telemetry_service.AgentTelemetryConfigOK{
"success:": {
mockResponse: &hcptelemetry.AgentTelemetryConfigOK{
Payload: &models.HashicorpCloudConsulTelemetry20230414AgentTelemetryConfigResponse{
RefreshConfig: &models.HashicorpCloudConsulTelemetry20230414RefreshConfig{
RefreshInterval: "1s",
},
TelemetryConfig: &models.HashicorpCloudConsulTelemetry20230414TelemetryConfig{
Endpoint: "https://test.com",
Labels: map[string]string{"test": "test"},
Labels: map[string]string{"test": "123"},
Metrics: &models.HashicorpCloudConsulTelemetry20230414TelemetryMetricsConfig{
Endpoint: "https://metrics-test.com",
IncludeList: []string{"consul.raft.apply"},
IncludeList: []string{"consul", "test"},
},
},
},
},
expectedTelemetryCfg: &TelemetryConfig{
Endpoint: "https://test.com",
Labels: map[string]string{"test": "test"},
MetricsConfig: &MetricsConfig{
Endpoint: "https://metrics-test.com",
Filters: []string{"consul.raft.apply"},
},
},
},
"errorsWithNilPayload": {
resp: &consul_telemetry_service.AgentTelemetryConfigOK{},
wantErr: "missing payload",
},
"errorsWithNilTelemetryConfig": {
resp: &consul_telemetry_service.AgentTelemetryConfigOK{
Payload: &models.HashicorpCloudConsulTelemetry20230414AgentTelemetryConfigResponse{},
expected: &expectedTelemetryCfg{
endpoint: "https://test.com/v1/metrics",
labels: map[string]string{"test": "123"},
filters: "consul|test",
refreshInterval: 1 * time.Second,
},
wantErr: "missing telemetry config",
},
} {
test := test
tc := tc
t.Run(name, func(t *testing.T) {
t.Parallel()
telemetryCfg, err := convertTelemetryConfig(test.resp)
if test.wantErr != "" {
c := &hcpClient{
tgw: &mockTGW{
mockError: tc.mockError,
mockResponse: tc.mockResponse,
},
}

telemetryCfg, err := c.FetchTelemetryConfig(context.Background())

if tc.wantErr != "" {
require.Error(t, err)
require.Contains(t, err.Error(), tc.wantErr)
require.Nil(t, telemetryCfg)
require.Contains(t, err.Error(), test.wantErr)
return
}

urlEndpoint, err := url.Parse(tc.expected.endpoint)
require.NoError(t, err)
require.Equal(t, test.expectedTelemetryCfg, telemetryCfg)
})
}
}

func Test_DefaultLabels(t *testing.T) {
for name, tc := range map[string]struct {
cfg config.CloudConfig
expectedLabels map[string]string
}{
"Success": {
cfg: config.CloudConfig{
NodeID: types.NodeID("nodeyid"),
NodeName: "nodey",
},
expectedLabels: map[string]string{
"node_id": "nodeyid",
"node_name": "nodey",
},
},
regexFilters, err := regexp.Compile(tc.expected.filters)
require.NoError(t, err)

"NoNodeID": {
cfg: config.CloudConfig{
NodeID: types.NodeID(""),
NodeName: "nodey",
},
expectedLabels: map[string]string{
"node_name": "nodey",
},
},
"NoNodeName": {
cfg: config.CloudConfig{
NodeID: types.NodeID("nodeyid"),
NodeName: "",
},
expectedLabels: map[string]string{
"node_id": "nodeyid",
},
},
"Empty": {
cfg: config.CloudConfig{
NodeID: "",
NodeName: "",
},
expectedLabels: map[string]string{},
},
} {
t.Run(name, func(t *testing.T) {
tCfg := &TelemetryConfig{}
labels := tCfg.DefaultLabels(tc.cfg)
require.Equal(t, labels, tc.expectedLabels)
expectedCfg := &TelemetryConfig{
MetricsConfig: &MetricsConfig{
Endpoint: urlEndpoint,
Filters: regexFilters,
Labels: tc.expected.labels,
},
RefreshConfig: &RefreshConfig{
RefreshInterval: tc.expected.refreshInterval,
},
}

require.NoError(t, err)
require.Equal(t, expectedCfg, telemetryCfg)
})
}
}
Loading