diff --git a/go.mod b/go.mod index 2cf5179061c..89c20b74ff0 100644 --- a/go.mod +++ b/go.mod @@ -22,7 +22,7 @@ require ( github.com/golang/snappy v0.0.4 github.com/google/gopacket v1.1.19 github.com/gorilla/mux v1.8.1 - github.com/grafana/dskit v0.0.0-20241105154643-a6b453a88040 + github.com/grafana/dskit v0.0.0-20241115082728-f2a7eb3aa0e9 github.com/grafana/e2e v0.1.2-0.20240118170847-db90b84177fc github.com/hashicorp/golang-lru v1.0.2 // indirect github.com/json-iterator/go v1.1.12 diff --git a/go.sum b/go.sum index 50342ad04a4..4bc30aec022 100644 --- a/go.sum +++ b/go.sum @@ -1260,8 +1260,8 @@ github.com/grafana-tools/sdk v0.0.0-20220919052116-6562121319fc h1:PXZQA2WCxe85T github.com/grafana-tools/sdk v0.0.0-20220919052116-6562121319fc/go.mod h1:AHHlOEv1+GGQ3ktHMlhuTUwo3zljV3QJbC0+8o2kn+4= github.com/grafana/alerting v0.0.0-20241021123319-be61d61f71e7 h1:lsM/QscEX+ZDIJm48ynQscH+msETyGYV6ug8L4f2DtM= github.com/grafana/alerting v0.0.0-20241021123319-be61d61f71e7/go.mod h1:QsnoKX/iYZxA4Cv+H+wC7uxutBD8qi8ZW5UJvD2TYmU= -github.com/grafana/dskit v0.0.0-20241105154643-a6b453a88040 h1:IR+UNYHqaU31t8/TArJk8K/GlDwOyxMpGNkWCXeZ28g= -github.com/grafana/dskit v0.0.0-20241105154643-a6b453a88040/go.mod h1:SPLNCARd4xdjCkue0O6hvuoveuS1dGJjDnfxYe405YQ= +github.com/grafana/dskit v0.0.0-20241115082728-f2a7eb3aa0e9 h1:Dx7+6aU/fhwD2vkMr0PUcyxGat1sjUssHAKQKaS7sDM= +github.com/grafana/dskit v0.0.0-20241115082728-f2a7eb3aa0e9/go.mod h1:SPLNCARd4xdjCkue0O6hvuoveuS1dGJjDnfxYe405YQ= github.com/grafana/e2e v0.1.2-0.20240118170847-db90b84177fc h1:BW+LjKJDz0So5LI8UZfW5neWeKpSkWqhmGjQFzcFfLM= github.com/grafana/e2e v0.1.2-0.20240118170847-db90b84177fc/go.mod h1:JVmqPBe8A/pZWwRoJW5ZjyALeY5OXMzPl7LrVXOdZAI= github.com/grafana/franz-go v0.0.0-20241009100846-782ba1442937 h1:fwwnG/NcygoS6XbAaEyK2QzMXI/BZIEJvQ3CD+7XZm8= diff --git a/pkg/distributor/ha_tracker_test.go b/pkg/distributor/ha_tracker_test.go index 84906174093..839ff6827c5 100644 --- a/pkg/distributor/ha_tracker_test.go +++ b/pkg/distributor/ha_tracker_test.go @@ -72,7 +72,8 @@ func checkReplicaTimestamp(t *testing.T, duration time.Duration, c *haTracker, u func TestHATrackerCacheSyncOnStart(t *testing.T) { const cluster = "c1" - const replica = "r1" + const replicaOne = "r1" + const replicaTwo = "r2" var c *haTracker var err error @@ -82,10 +83,10 @@ func TestHATrackerCacheSyncOnStart(t *testing.T) { kvStore, closer := consul.NewInMemoryClient(codec, log.NewNopLogger(), nil) t.Cleanup(func() { assert.NoError(t, closer.Close()) }) - mock := kv.PrefixClient(kvStore, "prefix") + mockCountingClient := kv.NewMockCountingClient(kvStore) c, err = newHATracker(HATrackerConfig{ EnableHATracker: true, - KVStore: kv.Config{Mock: mock}, + KVStore: kv.Config{Mock: mockCountingClient}, UpdateTimeout: time.Millisecond * 100, UpdateTimeoutJitterMax: 0, FailoverTimeout: time.Millisecond * 2, @@ -93,17 +94,24 @@ func TestHATrackerCacheSyncOnStart(t *testing.T) { require.NoError(t, err) require.NoError(t, services.StartAndAwaitRunning(context.Background(), c)) + // KV Store empty: The sync should try fetching the Keys only + // client.List: 1 + // client.Get: 0 + assert.Equal(t, 1, int(mockCountingClient.ListCalls.Load())) + assert.Equal(t, 0, int(mockCountingClient.GetCalls.Load())) + now = time.Now() - err = c.checkReplica(context.Background(), "user", cluster, replica, now) + err = c.checkReplica(context.Background(), "user", cluster, replicaOne, now) assert.NoError(t, err) err = services.StopAndAwaitTerminated(context.Background(), c) assert.NoError(t, err) - replicaTwo := "r2" + // Initializing a New Client to set calls to zero + mockCountingClient = kv.NewMockCountingClient(kvStore) c, err = newHATracker(HATrackerConfig{ EnableHATracker: true, - KVStore: kv.Config{Mock: mock}, + KVStore: kv.Config{Mock: mockCountingClient}, UpdateTimeout: time.Millisecond * 100, UpdateTimeoutJitterMax: 0, FailoverTimeout: time.Millisecond * 2, @@ -112,6 +120,12 @@ func TestHATrackerCacheSyncOnStart(t *testing.T) { require.NoError(t, services.StartAndAwaitRunning(context.Background(), c)) t.Cleanup(func() { assert.NoError(t, services.StopAndAwaitTerminated(context.Background(), c)) }) + // KV Store has one entry: The sync should try fetching the Keys and updating the cache + // client.List: 1 + // client.Get: 1 + assert.Equal(t, 1, int(mockCountingClient.ListCalls.Load())) + assert.Equal(t, 1, int(mockCountingClient.GetCalls.Load())) + now = time.Now() err = c.checkReplica(context.Background(), "user", cluster, replicaTwo, now) assert.Error(t, err) diff --git a/vendor/github.com/grafana/dskit/kv/mock.go b/vendor/github.com/grafana/dskit/kv/mock.go index 59d7430676c..99c84e58d6e 100644 --- a/vendor/github.com/grafana/dskit/kv/mock.go +++ b/vendor/github.com/grafana/dskit/kv/mock.go @@ -5,6 +5,7 @@ import ( "github.com/go-kit/log" "github.com/go-kit/log/level" + "go.uber.org/atomic" ) // The mockClient does not anything. @@ -37,3 +38,63 @@ func (m mockClient) WatchKey(_ context.Context, _ string, _ func(interface{}) bo func (m mockClient) WatchPrefix(_ context.Context, _ string, _ func(string, interface{}) bool) { } + +// MockCountingClient is a wrapper around the Client interface that counts the number of times its functions are called. +// This is used for testing only. +type MockCountingClient struct { + client Client + + ListCalls *atomic.Uint32 + GetCalls *atomic.Uint32 + DeleteCalls *atomic.Uint32 + CASCalls *atomic.Uint32 + WatchKeyCalls *atomic.Uint32 + WatchPrefixCalls *atomic.Uint32 +} + +func NewMockCountingClient(client Client) *MockCountingClient { + return &MockCountingClient{ + client: client, + ListCalls: atomic.NewUint32(0), + GetCalls: atomic.NewUint32(0), + DeleteCalls: atomic.NewUint32(0), + CASCalls: atomic.NewUint32(0), + WatchKeyCalls: atomic.NewUint32(0), + WatchPrefixCalls: atomic.NewUint32(0), + } +} + +func (mc *MockCountingClient) List(ctx context.Context, prefix string) ([]string, error) { + mc.ListCalls.Inc() + + return mc.client.List(ctx, prefix) +} +func (mc *MockCountingClient) Get(ctx context.Context, key string) (interface{}, error) { + mc.GetCalls.Inc() + + return mc.client.Get(ctx, key) +} + +func (mc *MockCountingClient) Delete(ctx context.Context, key string) error { + mc.DeleteCalls.Inc() + + return mc.client.Delete(ctx, key) +} + +func (mc *MockCountingClient) CAS(ctx context.Context, key string, f func(in interface{}) (out interface{}, retry bool, err error)) error { + mc.CASCalls.Inc() + + return mc.client.CAS(ctx, key, f) +} + +func (mc *MockCountingClient) WatchKey(ctx context.Context, key string, f func(interface{}) bool) { + mc.WatchKeyCalls.Inc() + + mc.client.WatchKey(ctx, key, f) +} + +func (mc *MockCountingClient) WatchPrefix(ctx context.Context, key string, f func(string, interface{}) bool) { + mc.WatchPrefixCalls.Inc() + + mc.client.WatchPrefix(ctx, key, f) +} diff --git a/vendor/modules.txt b/vendor/modules.txt index 8c97b03833a..7334be2c04d 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -617,7 +617,7 @@ github.com/grafana/alerting/receivers/webex github.com/grafana/alerting/receivers/webhook github.com/grafana/alerting/receivers/wecom github.com/grafana/alerting/templates -# github.com/grafana/dskit v0.0.0-20241105154643-a6b453a88040 +# github.com/grafana/dskit v0.0.0-20241115082728-f2a7eb3aa0e9 ## explicit; go 1.21 github.com/grafana/dskit/backoff github.com/grafana/dskit/ballast