diff --git a/pkg/receive/handler.go b/pkg/receive/handler.go index 495afa1559..52d194125f 100644 --- a/pkg/receive/handler.go +++ b/pkg/receive/handler.go @@ -638,14 +638,6 @@ func (h *Handler) forward(ctx context.Context, tenant string, r replica, wreq *p span, ctx := tracing.StartSpan(ctx, "receive_fanout_forward") defer span.Finish() - // It is possible that hashring is ready in testReady() but unready now, - // so need to lock here. - h.mtx.RLock() - if h.hashring == nil { - h.mtx.RUnlock() - return errors.New("hashring is not ready") - } - var replicas []uint64 if r.replicated { replicas = []uint64{r.n} diff --git a/pkg/receive/handler_test.go b/pkg/receive/handler_test.go index affabc085c..5bdb2fef47 100644 --- a/pkg/receive/handler_test.go +++ b/pkg/receive/handler_test.go @@ -39,6 +39,7 @@ import ( "github.com/prometheus/prometheus/model/relabel" "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/tsdb" + "github.com/stretchr/testify/require" "github.com/efficientgo/core/testutil" @@ -1664,3 +1665,79 @@ func TestHandlerEarlyStop(t *testing.T) { testutil.NotOk(t, err) testutil.Equals(t, "http: Server closed", err.Error()) } + +func TestHandlerFlippingHashrings(t *testing.T) { + h := NewHandler(log.NewLogfmtLogger(os.Stderr), &Options{}) + t.Cleanup(h.Close) + + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + + h1, err := newSimpleHashring([]Endpoint{ + { + Address: "http://localhost:9090", + }, + }) + require.NoError(t, err) + h2, err := newSimpleHashring([]Endpoint{ + { + Address: "http://localhost:9091", + }, + }) + require.NoError(t, err) + + h.Hashring(h1) + + var wg sync.WaitGroup + + wg.Add(2) + go func() { + defer wg.Done() + + for { + select { + case <-time.After(50 * time.Millisecond): + case <-ctx.Done(): + return + } + + err := h.handleRequest(ctx, 0, "test", &prompb.WriteRequest{ + Timeseries: []prompb.TimeSeries{ + { + Labels: labelpb.ZLabelsFromPromLabels(labels.FromStrings("foo", "bar")), + Samples: []prompb.Sample{ + { + Timestamp: time.Now().Unix(), + Value: 123, + }, + }, + }, + }, + }) + require.Error(t, err) + } + }() + go func() { + defer wg.Done() + var flipper bool + + for { + select { + case <-time.After(200 * time.Millisecond): + case <-ctx.Done(): + return + } + + if flipper { + h.Hashring(h2) + } else { + h.Hashring(h1) + } + flipper = !flipper + } + }() + + <-time.After(1 * time.Second) + cancel() + wg.Wait() +}