diff --git a/pkg/storegateway/bucket_e2e_test.go b/pkg/storegateway/bucket_e2e_test.go index 2dec613c8af..0f21227a93c 100644 --- a/pkg/storegateway/bucket_e2e_test.go +++ b/pkg/storegateway/bucket_e2e_test.go @@ -136,6 +136,7 @@ type prepareStoreConfig struct { seriesLimiterFactory SeriesLimiterFactory series []labels.Labels indexCache indexcache.IndexCache + bucketStoreOpts []BucketStoreOption } func (c *prepareStoreConfig) apply(opts ...prepareStoreConfigOption) *prepareStoreConfig { @@ -173,6 +174,12 @@ func withManyParts() prepareStoreConfigOption { } } +func withBucketStoreOptions(opts ...BucketStoreOption) prepareStoreConfigOption { + return func(config *prepareStoreConfig) { + config.bucketStoreOpts = opts + } +} + func prepareStoreWithTestBlocks(t testing.TB, bkt objstore.Bucket, cfg *prepareStoreConfig) *storeSuite { extLset := labels.FromStrings("ext1", "value1") @@ -188,6 +195,9 @@ func prepareStoreWithTestBlocks(t testing.TB, bkt objstore.Bucket, cfg *prepareS metaFetcher, err := block.NewMetaFetcher(s.logger, 20, objstore.WithNoopInstr(bkt), cfg.tempDir, nil, []block.MetadataFilter{}) assert.NoError(t, err) + // Have our options in the beginning so tests can override logger and index cache if they need to + storeOpts := append([]BucketStoreOption{WithLogger(s.logger), WithIndexCache(s.cache)}, cfg.bucketStoreOpts...) + store, err := NewBucketStore( "tenant", objstore.WithNoopInstr(bkt), @@ -203,9 +213,7 @@ func prepareStoreWithTestBlocks(t testing.TB, bkt objstore.Bucket, cfg *prepareS time.Minute, hashcache.NewSeriesHashCache(1024*1024), NewBucketStoreMetrics(nil), - WithLogger(s.logger), - WithIndexCache(s.cache), - WithStreamingSeriesPerBatch(65536), + storeOpts..., ) assert.NoError(t, err) t.Cleanup(func() { @@ -786,11 +794,24 @@ func foreachStore(t *testing.T, runTest func(t *testing.T, newSuite suiteFactory t.Run("filesystem", func(t *testing.T) { t.Parallel() - dir := t.TempDir() + b, err := filesystem.NewBucket(t.TempDir()) + assert.NoError(t, err) + factory := func(opts ...prepareStoreConfigOption) *storeSuite { + return prepareStoreWithTestBlocks(t, b, defaultPrepareStoreConfig(t).apply(opts...)) + } + runTest(t, factory) + }) + + t.Run("streaming", func(t *testing.T) { + t.Parallel() - b, err := filesystem.NewBucket(dir) + b, err := filesystem.NewBucket(t.TempDir()) assert.NoError(t, err) factory := func(opts ...prepareStoreConfigOption) *storeSuite { + // We want to force each Series() call to use more than one batch to catch some edge cases. + // This should make the implementation slightly slower, although test time + // should be dominated by the setup. + opts = append(opts, withBucketStoreOptions(WithStreamingSeriesPerBatch(10))) return prepareStoreWithTestBlocks(t, b, defaultPrepareStoreConfig(t).apply(opts...)) } runTest(t, factory) diff --git a/pkg/storegateway/bucket_test.go b/pkg/storegateway/bucket_test.go index 7cf68cd4b5d..2cbb568a538 100644 --- a/pkg/storegateway/bucket_test.go +++ b/pkg/storegateway/bucket_test.go @@ -1038,77 +1038,88 @@ func benchBucketSeries(t test.TB, skipChunk bool, samplesPerSeries, totalSeries chunkPool, err := pool.NewBucketedBytes(chunkBytesPoolMinSize, chunkBytesPoolMaxSize, 2, 1e9) // 1GB. assert.NoError(t, err) - st, err := NewBucketStore( - "test", - ibkt, - f, - tmpDir, - NewChunksLimiterFactory(0), - NewSeriesLimiterFactory(0), - newGapBasedPartitioner(mimir_tsdb.DefaultPartitionerMaxGapSize, nil), - 1, - mimir_tsdb.DefaultPostingOffsetInMemorySampling, - indexheader.BinaryReaderConfig{}, - false, - 0, - hashcache.NewSeriesHashCache(1024*1024), - NewBucketStoreMetrics(nil), - WithLogger(logger), - WithChunkPool(chunkPool), - WithStreamingSeriesPerBatch(65536), - ) - assert.NoError(t, err) + runTestWithStore := func(t test.TB, st *BucketStore) { + if !t.IsBenchmark() { + st.chunkPool = &mockedPool{parent: st.chunkPool} + } - if !t.IsBenchmark() { - st.chunkPool = &mockedPool{parent: st.chunkPool} - } + assert.NoError(t, st.SyncBlocks(context.Background())) - assert.NoError(t, st.SyncBlocks(context.Background())) + var bCases []*seriesCase + for _, p := range requestedRatios { + expectedSamples := int(p * float64(totalSeries*samplesPerSeries)) + if expectedSamples == 0 { + expectedSamples = 1 + } + seriesCut := int(p * float64(numOfBlocks*seriesPerBlock)) + if seriesCut == 0 { + seriesCut = 1 + } else if seriesCut == 1 { + seriesCut = expectedSamples / samplesPerSeriesPerBlock + } - var bCases []*seriesCase - for _, p := range requestedRatios { - expectedSamples := int(p * float64(totalSeries*samplesPerSeries)) - if expectedSamples == 0 { - expectedSamples = 1 - } - seriesCut := int(p * float64(numOfBlocks*seriesPerBlock)) - if seriesCut == 0 { - seriesCut = 1 - } else if seriesCut == 1 { - seriesCut = expectedSamples / samplesPerSeriesPerBlock + bCases = append(bCases, &seriesCase{ + Name: fmt.Sprintf("%dof%d", expectedSamples, totalSeries*samplesPerSeries), + Req: &storepb.SeriesRequest{ + MinTime: 0, + MaxTime: int64(expectedSamples) - 1, + Matchers: []storepb.LabelMatcher{ + {Type: storepb.LabelMatcher_EQ, Name: "foo", Value: "bar"}, + }, + SkipChunks: skipChunk, + }, + ExpectedHints: hintspb.SeriesResponseHints{ + QueriedBlocks: expectedQueriesBlocks, + }, + // This does not cut chunks properly, but those are assured against for non benchmarks only, where we use 100% case only. + ExpectedSeries: series[:seriesCut], + }) } + runTestServerSeries(t, st, bCases...) - bCases = append(bCases, &seriesCase{ - Name: fmt.Sprintf("%dof%d", expectedSamples, totalSeries*samplesPerSeries), - Req: &storepb.SeriesRequest{ - MinTime: 0, - MaxTime: int64(expectedSamples) - 1, - Matchers: []storepb.LabelMatcher{ - {Type: storepb.LabelMatcher_EQ, Name: "foo", Value: "bar"}, - }, - SkipChunks: skipChunk, - }, - ExpectedHints: hintspb.SeriesResponseHints{ - QueriedBlocks: expectedQueriesBlocks, - }, - // This does not cut chunks properly, but those are assured against for non benchmarks only, where we use 100% case only. - ExpectedSeries: series[:seriesCut], - }) - } - runTestServerSeries(t, st, bCases...) + if !t.IsBenchmark() { + if !skipChunk { + // TODO(bwplotka): This is wrong negative for large number of samples (1mln). Investigate. + assert.Equal(t, 0, int(st.chunkPool.(*mockedPool).balance.Load())) + st.chunkPool.(*mockedPool).gets.Store(0) + } - if !t.IsBenchmark() { - if !skipChunk { - // TODO(bwplotka): This is wrong negative for large number of samples (1mln). Investigate. - assert.Equal(t, 0, int(st.chunkPool.(*mockedPool).balance.Load())) - st.chunkPool.(*mockedPool).gets.Store(0) + for _, b := range st.blocks { + // NOTE(bwplotka): It is 4 x 1.0 for 100mln samples. Kind of make sense: long series. + assert.Equal(t, 0.0, promtest.ToFloat64(b.metrics.seriesRefetches)) + } } + } - for _, b := range st.blocks { - // NOTE(bwplotka): It is 4 x 1.0 for 100mln samples. Kind of make sense: long series. - assert.Equal(t, 0.0, promtest.ToFloat64(b.metrics.seriesRefetches)) - } + for testName, bucketStoreOpts := range map[string][]BucketStoreOption{ + "with default options": {WithLogger(logger), WithChunkPool(chunkPool)}, + "with series streaming (1K per batch)": {WithLogger(logger), WithChunkPool(chunkPool), WithStreamingSeriesPerBatch(1000)}, + "with series streaming (10K per batch)": {WithLogger(logger), WithChunkPool(chunkPool), WithStreamingSeriesPerBatch(10000)}, + } { + st, err := NewBucketStore( + "test", + ibkt, + f, + tmpDir, + NewChunksLimiterFactory(0), + NewSeriesLimiterFactory(0), + newGapBasedPartitioner(mimir_tsdb.DefaultPartitionerMaxGapSize, nil), + 1, + mimir_tsdb.DefaultPostingOffsetInMemorySampling, + indexheader.BinaryReaderConfig{}, + false, + 0, + hashcache.NewSeriesHashCache(1024*1024), + NewBucketStoreMetrics(nil), + bucketStoreOpts..., + ) + assert.NoError(t, err) + + t.Run(testName, func(t test.TB) { + runTestWithStore(t, st) + }) } + } type mockedPool struct { @@ -1314,65 +1325,74 @@ func TestBucketSeries_OneBlock_InMemIndexCacheSegfault(t *testing.T) { } func TestSeries_RequestAndResponseHints(t *testing.T) { - tb, store, seriesSet1, seriesSet2, block1, block2, close := setupStoreForHintsTest(t) - defer close() - - testCases := []*seriesCase{ - { - Name: "querying a range containing 1 block should return 1 block in the response hints", - Req: &storepb.SeriesRequest{ - MinTime: 0, - MaxTime: 1, - Matchers: []storepb.LabelMatcher{ - {Type: storepb.LabelMatcher_EQ, Name: "foo", Value: "bar"}, + newTestCases := func(seriesSet1 []*storepb.Series, seriesSet2 []*storepb.Series, block1 ulid.ULID, block2 ulid.ULID) []*seriesCase { + return []*seriesCase{ + { + Name: "querying a range containing 1 block should return 1 block in the response hints", + Req: &storepb.SeriesRequest{ + MinTime: 0, + MaxTime: 1, + Matchers: []storepb.LabelMatcher{ + {Type: storepb.LabelMatcher_EQ, Name: "foo", Value: "bar"}, + }, }, - }, - ExpectedSeries: seriesSet1, - ExpectedHints: hintspb.SeriesResponseHints{ - QueriedBlocks: []hintspb.Block{ - {Id: block1.String()}, + ExpectedSeries: seriesSet1, + ExpectedHints: hintspb.SeriesResponseHints{ + QueriedBlocks: []hintspb.Block{ + {Id: block1.String()}, + }, }, - }, - }, { - Name: "querying a range containing multiple blocks should return multiple blocks in the response hints", - Req: &storepb.SeriesRequest{ - MinTime: 0, - MaxTime: 3, - Matchers: []storepb.LabelMatcher{ - {Type: storepb.LabelMatcher_EQ, Name: "foo", Value: "bar"}, + }, { + Name: "querying a range containing multiple blocks should return multiple blocks in the response hints", + Req: &storepb.SeriesRequest{ + MinTime: 0, + MaxTime: 3, + Matchers: []storepb.LabelMatcher{ + {Type: storepb.LabelMatcher_EQ, Name: "foo", Value: "bar"}, + }, }, - }, - ExpectedSeries: append(append([]*storepb.Series{}, seriesSet1...), seriesSet2...), - ExpectedHints: hintspb.SeriesResponseHints{ - QueriedBlocks: []hintspb.Block{ - {Id: block1.String()}, - {Id: block2.String()}, + ExpectedSeries: append(append([]*storepb.Series{}, seriesSet1...), seriesSet2...), + ExpectedHints: hintspb.SeriesResponseHints{ + QueriedBlocks: []hintspb.Block{ + {Id: block1.String()}, + {Id: block2.String()}, + }, }, - }, - }, { - Name: "querying a range containing multiple blocks but filtering a specific block should query only the requested block", - Req: &storepb.SeriesRequest{ - MinTime: 0, - MaxTime: 3, - Matchers: []storepb.LabelMatcher{ - {Type: storepb.LabelMatcher_EQ, Name: "foo", Value: "bar"}, + }, { + Name: "querying a range containing multiple blocks but filtering a specific block should query only the requested block", + Req: &storepb.SeriesRequest{ + MinTime: 0, + MaxTime: 3, + Matchers: []storepb.LabelMatcher{ + {Type: storepb.LabelMatcher_EQ, Name: "foo", Value: "bar"}, + }, + Hints: mustMarshalAny(&hintspb.SeriesRequestHints{ + BlockMatchers: []storepb.LabelMatcher{ + {Type: storepb.LabelMatcher_EQ, Name: block.BlockIDLabel, Value: block1.String()}, + }, + }), }, - Hints: mustMarshalAny(&hintspb.SeriesRequestHints{ - BlockMatchers: []storepb.LabelMatcher{ - {Type: storepb.LabelMatcher_EQ, Name: block.BlockIDLabel, Value: block1.String()}, + ExpectedSeries: seriesSet1, + ExpectedHints: hintspb.SeriesResponseHints{ + QueriedBlocks: []hintspb.Block{ + {Id: block1.String()}, }, - }), - }, - ExpectedSeries: seriesSet1, - ExpectedHints: hintspb.SeriesResponseHints{ - QueriedBlocks: []hintspb.Block{ - {Id: block1.String()}, }, }, - }, + } } - runTestServerSeries(tb, store, testCases...) + t.Run("with default options", func(t *testing.T) { + tb, store, seriesSet1, seriesSet2, block1, block2, close := setupStoreForHintsTest(t) + tb.Cleanup(close) + runTestServerSeries(tb, store, newTestCases(seriesSet1, seriesSet2, block1, block2)...) + }) + + t.Run("with series streaming", func(t *testing.T) { + tb, store, seriesSet1, seriesSet2, block1, block2, close := setupStoreForHintsTest(t, WithStreamingSeriesPerBatch(5000)) + tb.Cleanup(close) + runTestServerSeries(tb, store, newTestCases(seriesSet1, seriesSet2, block1, block2)...) + }) } func TestSeries_ErrorUnmarshallingRequestHints(t *testing.T) { @@ -1412,7 +1432,6 @@ func TestSeries_ErrorUnmarshallingRequestHints(t *testing.T) { NewBucketStoreMetrics(nil), WithLogger(logger), WithIndexCache(indexCache), - WithStreamingSeriesPerBatch(65536), ) assert.NoError(t, err) defer func() { assert.NoError(t, store.RemoveBlocksAndClose()) }() @@ -1503,7 +1522,6 @@ func TestSeries_BlockWithMultipleChunks(t *testing.T) { NewBucketStoreMetrics(nil), WithLogger(logger), WithIndexCache(indexCache), - WithStreamingSeriesPerBatch(65536), ) assert.NoError(t, err) assert.NoError(t, store.SyncBlocks(context.Background())) @@ -1594,7 +1612,7 @@ func createBlockWithOneSeriesWithStep(t test.TB, dir string, lbls labels.Labels, return createBlockFromHead(t, dir, h) } -func setupStoreForHintsTest(t *testing.T) (test.TB, *BucketStore, []*storepb.Series, []*storepb.Series, ulid.ULID, ulid.ULID, func()) { +func setupStoreForHintsTest(t *testing.T, opts ...BucketStoreOption) (test.TB, *BucketStore, []*storepb.Series, []*storepb.Series, ulid.ULID, ulid.ULID, func()) { tb := test.NewTB(t) closers := []func(){} @@ -1652,6 +1670,7 @@ func setupStoreForHintsTest(t *testing.T) (test.TB, *BucketStore, []*storepb.Ser indexCache, err := indexcache.NewInMemoryIndexCacheWithConfig(logger, nil, indexcache.InMemoryIndexCacheConfig{}) assert.NoError(tb, err) + opts = append([]BucketStoreOption{WithLogger(logger), WithIndexCache(indexCache)}, opts...) store, err := NewBucketStore( "tenant", instrBkt, @@ -1667,9 +1686,7 @@ func setupStoreForHintsTest(t *testing.T) (test.TB, *BucketStore, []*storepb.Ser 0, hashcache.NewSeriesHashCache(1024*1024), NewBucketStoreMetrics(nil), - WithLogger(logger), - WithIndexCache(indexCache), - WithStreamingSeriesPerBatch(65536), + opts..., ) assert.NoError(tb, err) assert.NoError(tb, store.SyncBlocks(context.Background()))