Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

store-gateway: bucketStore tests with and without streaming #3658

Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 23 additions & 5 deletions pkg/storegateway/bucket_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ type prepareStoreConfig struct {
seriesLimiterFactory SeriesLimiterFactory
series []labels.Labels
indexCache indexcache.IndexCache
bucketStoreOpts []BucketStoreOption
}

func (c *prepareStoreConfig) apply(opts ...prepareStoreConfigOption) *prepareStoreConfig {
Expand Down Expand Up @@ -173,6 +174,12 @@ func withManyParts() prepareStoreConfigOption {
}
}

func withBucketStoreOptions(opts ...BucketStoreOption) prepareStoreConfigOption {
return func(config *prepareStoreConfig) {
config.bucketStoreOpts = opts
}
}

func prepareStoreWithTestBlocks(t testing.TB, bkt objstore.Bucket, cfg *prepareStoreConfig) *storeSuite {
extLset := labels.FromStrings("ext1", "value1")

Expand All @@ -188,6 +195,9 @@ func prepareStoreWithTestBlocks(t testing.TB, bkt objstore.Bucket, cfg *prepareS
metaFetcher, err := block.NewMetaFetcher(s.logger, 20, objstore.WithNoopInstr(bkt), cfg.tempDir, nil, []block.MetadataFilter{})
assert.NoError(t, err)

// Have our options in the beginning so tests can override logger and index cache if they need to
storeOpts := append([]BucketStoreOption{WithLogger(s.logger), WithIndexCache(s.cache)}, cfg.bucketStoreOpts...)

store, err := NewBucketStore(
"tenant",
objstore.WithNoopInstr(bkt),
Expand All @@ -203,9 +213,7 @@ func prepareStoreWithTestBlocks(t testing.TB, bkt objstore.Bucket, cfg *prepareS
time.Minute,
hashcache.NewSeriesHashCache(1024*1024),
NewBucketStoreMetrics(nil),
WithLogger(s.logger),
WithIndexCache(s.cache),
WithStreamingSeriesPerBatch(65536),
storeOpts...,
)
assert.NoError(t, err)
t.Cleanup(func() {
Expand Down Expand Up @@ -786,11 +794,21 @@ func foreachStore(t *testing.T, runTest func(t *testing.T, newSuite suiteFactory
t.Run("filesystem", func(t *testing.T) {
t.Parallel()

dir := t.TempDir()
b, err := filesystem.NewBucket(t.TempDir())
assert.NoError(t, err)
factory := func(opts ...prepareStoreConfigOption) *storeSuite {
return prepareStoreWithTestBlocks(t, b, defaultPrepareStoreConfig(t).apply(opts...))
}
runTest(t, factory)
})

t.Run("streaming", func(t *testing.T) {
t.Parallel()

b, err := filesystem.NewBucket(dir)
b, err := filesystem.NewBucket(t.TempDir())
assert.NoError(t, err)
factory := func(opts ...prepareStoreConfigOption) *storeSuite {
opts = append(opts, withBucketStoreOptions(WithStreamingSeriesPerBatch(1000)))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Each Series() call in the tests should use multiple batches. Is it the case if we use such a large batch? What if we drastically reduce to 10 and leave a comment on why?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

makes sense to me, i did the change in d0d52a8

return prepareStoreWithTestBlocks(t, b, defaultPrepareStoreConfig(t).apply(opts...))
}
runTest(t, factory)
Expand Down
252 changes: 135 additions & 117 deletions pkg/storegateway/bucket_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1038,77 +1038,87 @@ func benchBucketSeries(t test.TB, skipChunk bool, samplesPerSeries, totalSeries
chunkPool, err := pool.NewBucketedBytes(chunkBytesPoolMinSize, chunkBytesPoolMaxSize, 2, 1e9) // 1GB.
assert.NoError(t, err)

st, err := NewBucketStore(
"test",
ibkt,
f,
tmpDir,
NewChunksLimiterFactory(0),
NewSeriesLimiterFactory(0),
newGapBasedPartitioner(mimir_tsdb.DefaultPartitionerMaxGapSize, nil),
1,
mimir_tsdb.DefaultPostingOffsetInMemorySampling,
indexheader.BinaryReaderConfig{},
false,
0,
hashcache.NewSeriesHashCache(1024*1024),
NewBucketStoreMetrics(nil),
WithLogger(logger),
WithChunkPool(chunkPool),
WithStreamingSeriesPerBatch(65536),
)
assert.NoError(t, err)
runTestWithStore := func(t test.TB, st *BucketStore) {
if !t.IsBenchmark() {
st.chunkPool = &mockedPool{parent: st.chunkPool}
}

if !t.IsBenchmark() {
st.chunkPool = &mockedPool{parent: st.chunkPool}
}
assert.NoError(t, st.SyncBlocks(context.Background()))

assert.NoError(t, st.SyncBlocks(context.Background()))
var bCases []*seriesCase
for _, p := range requestedRatios {
expectedSamples := int(p * float64(totalSeries*samplesPerSeries))
if expectedSamples == 0 {
expectedSamples = 1
}
seriesCut := int(p * float64(numOfBlocks*seriesPerBlock))
if seriesCut == 0 {
seriesCut = 1
} else if seriesCut == 1 {
seriesCut = expectedSamples / samplesPerSeriesPerBlock
}

var bCases []*seriesCase
for _, p := range requestedRatios {
expectedSamples := int(p * float64(totalSeries*samplesPerSeries))
if expectedSamples == 0 {
expectedSamples = 1
}
seriesCut := int(p * float64(numOfBlocks*seriesPerBlock))
if seriesCut == 0 {
seriesCut = 1
} else if seriesCut == 1 {
seriesCut = expectedSamples / samplesPerSeriesPerBlock
bCases = append(bCases, &seriesCase{
Name: fmt.Sprintf("%dof%d", expectedSamples, totalSeries*samplesPerSeries),
Req: &storepb.SeriesRequest{
MinTime: 0,
MaxTime: int64(expectedSamples) - 1,
Matchers: []storepb.LabelMatcher{
{Type: storepb.LabelMatcher_EQ, Name: "foo", Value: "bar"},
},
SkipChunks: skipChunk,
},
ExpectedHints: hintspb.SeriesResponseHints{
QueriedBlocks: expectedQueriesBlocks,
},
// This does not cut chunks properly, but those are assured against for non benchmarks only, where we use 100% case only.
ExpectedSeries: series[:seriesCut],
})
}
runTestServerSeries(t, st, bCases...)

bCases = append(bCases, &seriesCase{
Name: fmt.Sprintf("%dof%d", expectedSamples, totalSeries*samplesPerSeries),
Req: &storepb.SeriesRequest{
MinTime: 0,
MaxTime: int64(expectedSamples) - 1,
Matchers: []storepb.LabelMatcher{
{Type: storepb.LabelMatcher_EQ, Name: "foo", Value: "bar"},
},
SkipChunks: skipChunk,
},
ExpectedHints: hintspb.SeriesResponseHints{
QueriedBlocks: expectedQueriesBlocks,
},
// This does not cut chunks properly, but those are assured against for non benchmarks only, where we use 100% case only.
ExpectedSeries: series[:seriesCut],
})
}
runTestServerSeries(t, st, bCases...)
if !t.IsBenchmark() {
if !skipChunk {
// TODO(bwplotka): This is wrong negative for large number of samples (1mln). Investigate.
assert.Equal(t, 0, int(st.chunkPool.(*mockedPool).balance.Load()))
st.chunkPool.(*mockedPool).gets.Store(0)
}

if !t.IsBenchmark() {
if !skipChunk {
// TODO(bwplotka): This is wrong negative for large number of samples (1mln). Investigate.
assert.Equal(t, 0, int(st.chunkPool.(*mockedPool).balance.Load()))
st.chunkPool.(*mockedPool).gets.Store(0)
for _, b := range st.blocks {
// NOTE(bwplotka): It is 4 x 1.0 for 100mln samples. Kind of make sense: long series.
assert.Equal(t, 0.0, promtest.ToFloat64(b.metrics.seriesRefetches))
}
}
}

for _, b := range st.blocks {
// NOTE(bwplotka): It is 4 x 1.0 for 100mln samples. Kind of make sense: long series.
assert.Equal(t, 0.0, promtest.ToFloat64(b.metrics.seriesRefetches))
}
for testName, bucketStoreOpts := range map[string][]BucketStoreOption{
"run with default options": {WithLogger(logger), WithChunkPool(chunkPool)},
dimitarvdimitrov marked this conversation as resolved.
Show resolved Hide resolved
"with series streaming": {WithLogger(logger), WithChunkPool(chunkPool), WithStreamingSeriesPerBatch(5000)},
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same comment here. How many series do we have in this test?
You may also consider having 2 test cases, one with small batches and one with big batches.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this runs with many different setups for series - from 1 series to 1M series and different number of samples per series

I made two variations - one with 1K series and one with 10K series. I think those two are more realistic. WDYT?

} {
st, err := NewBucketStore(
"test",
ibkt,
f,
tmpDir,
NewChunksLimiterFactory(0),
NewSeriesLimiterFactory(0),
newGapBasedPartitioner(mimir_tsdb.DefaultPartitionerMaxGapSize, nil),
1,
mimir_tsdb.DefaultPostingOffsetInMemorySampling,
indexheader.BinaryReaderConfig{},
false,
0,
hashcache.NewSeriesHashCache(1024*1024),
NewBucketStoreMetrics(nil),
bucketStoreOpts...,
)
assert.NoError(t, err)

t.Run(testName, func(t test.TB) {
runTestWithStore(t, st)
})
}

}

type mockedPool struct {
Expand Down Expand Up @@ -1314,65 +1324,76 @@ func TestBucketSeries_OneBlock_InMemIndexCacheSegfault(t *testing.T) {
}

func TestSeries_RequestAndResponseHints(t *testing.T) {
tb, store, seriesSet1, seriesSet2, block1, block2, close := setupStoreForHintsTest(t)
defer close()

testCases := []*seriesCase{
{
Name: "querying a range containing 1 block should return 1 block in the response hints",
Req: &storepb.SeriesRequest{
MinTime: 0,
MaxTime: 1,
Matchers: []storepb.LabelMatcher{
{Type: storepb.LabelMatcher_EQ, Name: "foo", Value: "bar"},
newTestCases := func(seriesSet1 []*storepb.Series, seriesSet2 []*storepb.Series, block1 ulid.ULID, block2 ulid.ULID) []*seriesCase {
return []*seriesCase{
{
Name: "querying a range containing 1 block should return 1 block in the response hints",
Req: &storepb.SeriesRequest{
MinTime: 0,
MaxTime: 1,
Matchers: []storepb.LabelMatcher{
{Type: storepb.LabelMatcher_EQ, Name: "foo", Value: "bar"},
},
},
},
ExpectedSeries: seriesSet1,
ExpectedHints: hintspb.SeriesResponseHints{
QueriedBlocks: []hintspb.Block{
{Id: block1.String()},
ExpectedSeries: seriesSet1,
ExpectedHints: hintspb.SeriesResponseHints{
QueriedBlocks: []hintspb.Block{
{Id: block1.String()},
},
},
},
}, {
Name: "querying a range containing multiple blocks should return multiple blocks in the response hints",
Req: &storepb.SeriesRequest{
MinTime: 0,
MaxTime: 3,
Matchers: []storepb.LabelMatcher{
{Type: storepb.LabelMatcher_EQ, Name: "foo", Value: "bar"},
}, {
Name: "querying a range containing multiple blocks should return multiple blocks in the response hints",
Req: &storepb.SeriesRequest{
MinTime: 0,
MaxTime: 3,
Matchers: []storepb.LabelMatcher{
{Type: storepb.LabelMatcher_EQ, Name: "foo", Value: "bar"},
},
},
},
ExpectedSeries: append(append([]*storepb.Series{}, seriesSet1...), seriesSet2...),
ExpectedHints: hintspb.SeriesResponseHints{
QueriedBlocks: []hintspb.Block{
{Id: block1.String()},
{Id: block2.String()},
ExpectedSeries: append(append([]*storepb.Series{}, seriesSet1...), seriesSet2...),
ExpectedHints: hintspb.SeriesResponseHints{
QueriedBlocks: []hintspb.Block{
{Id: block1.String()},
{Id: block2.String()},
},
},
},
}, {
Name: "querying a range containing multiple blocks but filtering a specific block should query only the requested block",
Req: &storepb.SeriesRequest{
MinTime: 0,
MaxTime: 3,
Matchers: []storepb.LabelMatcher{
{Type: storepb.LabelMatcher_EQ, Name: "foo", Value: "bar"},
}, {
Name: "querying a range containing multiple blocks but filtering a specific block should query only the requested block",
Req: &storepb.SeriesRequest{
MinTime: 0,
MaxTime: 3,
Matchers: []storepb.LabelMatcher{
{Type: storepb.LabelMatcher_EQ, Name: "foo", Value: "bar"},
},
Hints: mustMarshalAny(&hintspb.SeriesRequestHints{
BlockMatchers: []storepb.LabelMatcher{
{Type: storepb.LabelMatcher_EQ, Name: block.BlockIDLabel, Value: block1.String()},
},
}),
},
Hints: mustMarshalAny(&hintspb.SeriesRequestHints{
BlockMatchers: []storepb.LabelMatcher{
{Type: storepb.LabelMatcher_EQ, Name: block.BlockIDLabel, Value: block1.String()},
ExpectedSeries: seriesSet1,
ExpectedHints: hintspb.SeriesResponseHints{
QueriedBlocks: []hintspb.Block{
{Id: block1.String()},
},
}),
},
ExpectedSeries: seriesSet1,
ExpectedHints: hintspb.SeriesResponseHints{
QueriedBlocks: []hintspb.Block{
{Id: block1.String()},
},
},
},
}
}

runTestServerSeries(tb, store, testCases...)
tb, store, seriesSet1, seriesSet2, block1, block2, close := setupStoreForHintsTest(t)
tb.Cleanup(close)
dimitarvdimitrov marked this conversation as resolved.
Show resolved Hide resolved

tb.Run("with regular implementation", func(tb test.TB) {
dimitarvdimitrov marked this conversation as resolved.
Show resolved Hide resolved
runTestServerSeries(tb, store, newTestCases(seriesSet1, seriesSet2, block1, block2)...)
})

tb, store, seriesSet1, seriesSet2, block1, block2, close = setupStoreForHintsTest(t, WithStreamingSeriesPerBatch(5000))
tb.Cleanup(close)
dimitarvdimitrov marked this conversation as resolved.
Show resolved Hide resolved

tb.Run("with streaming implementation", func(tb test.TB) {
dimitarvdimitrov marked this conversation as resolved.
Show resolved Hide resolved
runTestServerSeries(tb, store, newTestCases(seriesSet1, seriesSet2, block1, block2)...)
})
}

func TestSeries_ErrorUnmarshallingRequestHints(t *testing.T) {
Expand Down Expand Up @@ -1412,7 +1433,6 @@ func TestSeries_ErrorUnmarshallingRequestHints(t *testing.T) {
NewBucketStoreMetrics(nil),
WithLogger(logger),
WithIndexCache(indexCache),
WithStreamingSeriesPerBatch(65536),
)
assert.NoError(t, err)
defer func() { assert.NoError(t, store.RemoveBlocksAndClose()) }()
Expand Down Expand Up @@ -1503,7 +1523,6 @@ func TestSeries_BlockWithMultipleChunks(t *testing.T) {
NewBucketStoreMetrics(nil),
WithLogger(logger),
WithIndexCache(indexCache),
WithStreamingSeriesPerBatch(65536),
)
assert.NoError(t, err)
assert.NoError(t, store.SyncBlocks(context.Background()))
Expand Down Expand Up @@ -1594,7 +1613,7 @@ func createBlockWithOneSeriesWithStep(t test.TB, dir string, lbls labels.Labels,
return createBlockFromHead(t, dir, h)
}

func setupStoreForHintsTest(t *testing.T) (test.TB, *BucketStore, []*storepb.Series, []*storepb.Series, ulid.ULID, ulid.ULID, func()) {
func setupStoreForHintsTest(t *testing.T, opts ...BucketStoreOption) (test.TB, *BucketStore, []*storepb.Series, []*storepb.Series, ulid.ULID, ulid.ULID, func()) {
tb := test.NewTB(t)

closers := []func(){}
Expand Down Expand Up @@ -1652,6 +1671,7 @@ func setupStoreForHintsTest(t *testing.T) (test.TB, *BucketStore, []*storepb.Ser
indexCache, err := indexcache.NewInMemoryIndexCacheWithConfig(logger, nil, indexcache.InMemoryIndexCacheConfig{})
assert.NoError(tb, err)

opts = append([]BucketStoreOption{WithLogger(logger), WithIndexCache(indexCache)}, opts...)
store, err := NewBucketStore(
"tenant",
instrBkt,
Expand All @@ -1667,9 +1687,7 @@ func setupStoreForHintsTest(t *testing.T) (test.TB, *BucketStore, []*storepb.Ser
0,
hashcache.NewSeriesHashCache(1024*1024),
NewBucketStoreMetrics(nil),
WithLogger(logger),
WithIndexCache(indexCache),
WithStreamingSeriesPerBatch(65536),
opts...,
)
assert.NoError(tb, err)
assert.NoError(tb, store.SyncBlocks(context.Background()))
Expand Down