Skip to content

Commit

Permalink
Improve race test and fix PR feedback by removing hash equals and avo…
Browse files Browse the repository at this point in the history
…id testing the timer.Ticker logic, and instead unit test
  • Loading branch information
Achooo committed Jul 31, 2023
1 parent 9677781 commit c23798c
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 84 deletions.
9 changes: 5 additions & 4 deletions agent/hcp/telemetry_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,9 +106,10 @@ func (h *hcpProviderImpl) getUpdate(ctx context.Context) *dynamicConfig {
return nil
}

// RefreshInterval of 0 or less will cause time.Reset() panic.
if telemetryCfg.RefreshConfig.RefreshInterval <= 0 {
logger.Error("invalid refresh interval")
// newRefreshInterval of 0 or less can cause ticker Reset() panic.
newRefreshInterval := telemetryCfg.RefreshConfig.RefreshInterval
if newRefreshInterval <= 0 {
logger.Error("invalid refresh interval duration", "refreshInterval", newRefreshInterval)
metrics.IncrCounter(internalMetricRefreshFailure, 1)
return nil
}
Expand All @@ -117,7 +118,7 @@ func (h *hcpProviderImpl) getUpdate(ctx context.Context) *dynamicConfig {
Filters: telemetryCfg.MetricsConfig.Filters,
Endpoint: telemetryCfg.MetricsConfig.Endpoint,
Labels: telemetryCfg.MetricsConfig.Labels,
RefreshInterval: telemetryCfg.RefreshConfig.RefreshInterval,
RefreshInterval: newRefreshInterval,
}

// Acquire write lock to update new configuration.
Expand Down
121 changes: 41 additions & 80 deletions agent/hcp/telemetry_provider_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package hcp

import (
"context"
"errors"
"fmt"
"net/url"
"regexp"
Expand All @@ -21,8 +20,8 @@ import (

const (
testRefreshInterval = 100 * time.Millisecond
testRaceSampleCount = 5000
testSinkServiceName = "test.telemetry_config_provider"
testRaceSampleCount = 5000
)

var (
Expand Down Expand Up @@ -223,41 +222,8 @@ func TestTelemetryConfigProviderGetUpdate(t *testing.T) {
}
}

// mockClientRace returns new configuration everytime checkUpdate is called
// by creating unique labels using its counter. This allows us to induce
// race conditions with the changing dynamic config in the provider.
type mockClientRace struct {
counter int
defaultEndpoint *url.URL
defaultFilters *regexp.Regexp
}

func (mc *mockClientRace) FetchBootstrap(ctx context.Context) (*client.BootstrapConfig, error) {
return nil, nil
}
func (mc *mockClientRace) PushServerStatus(ctx context.Context, status *client.ServerStatus) error {
return nil
}
func (mc *mockClientRace) DiscoverServers(ctx context.Context) ([]string, error) {
return nil, nil
}
func (mc *mockClientRace) FetchTelemetryConfig(ctx context.Context) (*client.TelemetryConfig, error) {
mc.counter++
return &client.TelemetryConfig{
MetricsConfig: &client.MetricsConfig{
Endpoint: mc.defaultEndpoint,
Filters: mc.defaultFilters,
// Generate unique labels.
Labels: map[string]string{fmt.Sprintf("label_%d", mc.counter): fmt.Sprintf("value_%d", mc.counter)},
},
RefreshConfig: &client.RefreshConfig{
RefreshInterval: testRefreshInterval,
},
}, nil
}

func TestTelemetryConfigProvider_Race(t *testing.T) {
dynamicCfg, err := testDynamicCfg(&testConfig{
cfg, err := testTelemetryCfg(&testConfig{
endpoint: "http://test.com/v1/metrics",
filters: "test",
labels: map[string]string{
Expand All @@ -270,58 +236,53 @@ func TestTelemetryConfigProvider_Race(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

provider := &hcpProviderImpl{
hcpClient: &mockClientRace{
defaultEndpoint: dynamicCfg.Endpoint,
defaultFilters: dynamicCfg.Filters,
},
cfg: dynamicCfg,
}

go provider.run(ctx, dynamicCfg.RefreshInterval)
mockClient := client.NewMockClient(t)
mockClient.EXPECT().FetchTelemetryConfig(mock.Anything).Return(cfg, nil)

// Start the provider goroutine
// Every refresh interval, config will be modified.
provider, err := NewHCPProvider(ctx, mockClient, cfg)
require.NoError(t, err)

wg := &sync.WaitGroup{}

labelErrCh := make(chan error, testRaceSampleCount)
labelErr := errors.New("expected labels to have one entry")
// Start 5000 goroutines that try to access label configuration.
kickOff(wg, labelErrCh, provider, func(provider *hcpProviderImpl) bool {
return len(provider.GetLabels()) == 1
}, labelErr)

expectedEndpoint := dynamicCfg.Endpoint.String()
endpointErr := fmt.Errorf("expected endpoint to be %s", expectedEndpoint)
endpointErrCh := make(chan error, testRaceSampleCount)
// Start 5000 goroutines that try to access endpoint configuration.
kickOff(wg, endpointErrCh, provider, func(provider *hcpProviderImpl) bool {
return provider.GetEndpoint().String() == expectedEndpoint
}, endpointErr)

expectedFilters := dynamicCfg.Filters.String()
filtersErr := fmt.Errorf("expected filters to be %s", expectedFilters)
filtersErrCh := make(chan error, testRaceSampleCount)
// Start 5000 goroutines that try to access filter configuration.
kickOff(wg, filtersErrCh, provider, func(provider *hcpProviderImpl) bool {
return provider.GetFilters().String() == expectedFilters
}, filtersErr)

wg.Wait()

require.Empty(t, labelErrCh)
require.Empty(t, endpointErrCh)
require.Empty(t, filtersErrCh)
// Every refresh interval, try to query config using Get* methods inducing a race condition.
timer := time.NewTimer(testRefreshInterval)
defer timer.Stop()
for {
select {
case <-timer.C:
wg := &sync.WaitGroup{}
// Start goroutines that try to access label configuration.
kickOff(wg, testRaceSampleCount, provider, func(provider *hcpProviderImpl) {
require.Equal(t, provider.GetLabels(), cfg.MetricsConfig.Labels)
})

// Start goroutines that try to access endpoint configuration.
kickOff(wg, testRaceSampleCount, provider, func(provider *hcpProviderImpl) {
require.Equal(t, provider.GetFilters(), cfg.MetricsConfig.Filters)
})

// Start goroutines that try to access filter configuration.
kickOff(wg, testRaceSampleCount, provider, func(provider *hcpProviderImpl) {
require.Equal(t, provider.GetEndpoint(), cfg.MetricsConfig.Endpoint)
})

wg.Wait()
// Stop after 10 refresh intervals.
case <-time.After(10 * testRefreshInterval):
return
case <-ctx.Done():
require.Fail(t, "Context cancelled before test finishes")
return
}
}
}

func kickOff(wg *sync.WaitGroup, errCh chan error, provider *hcpProviderImpl, check func(cfgProvider *hcpProviderImpl) bool, err error) {
for i := 0; i < 5000; i++ {
func kickOff(wg *sync.WaitGroup, count int, provider *hcpProviderImpl, check func(cfgProvider *hcpProviderImpl)) {
for i := 0; i < count; i++ {
wg.Add(1)
go func() {
defer wg.Done()
if !check(provider) {
errCh <- err
}
check(provider)
}()
}
}
Expand Down

0 comments on commit c23798c

Please sign in to comment.