Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

store-gateway: bucketStore tests with and without streaming #3658

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 26 additions & 5 deletions pkg/storegateway/bucket_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ type prepareStoreConfig struct {
seriesLimiterFactory SeriesLimiterFactory
series []labels.Labels
indexCache indexcache.IndexCache
bucketStoreOpts []BucketStoreOption
}

func (c *prepareStoreConfig) apply(opts ...prepareStoreConfigOption) *prepareStoreConfig {
Expand Down Expand Up @@ -173,6 +174,12 @@ func withManyParts() prepareStoreConfigOption {
}
}

func withBucketStoreOptions(opts ...BucketStoreOption) prepareStoreConfigOption {
return func(config *prepareStoreConfig) {
config.bucketStoreOpts = opts
}
}

func prepareStoreWithTestBlocks(t testing.TB, bkt objstore.Bucket, cfg *prepareStoreConfig) *storeSuite {
extLset := labels.FromStrings("ext1", "value1")

Expand All @@ -188,6 +195,9 @@ func prepareStoreWithTestBlocks(t testing.TB, bkt objstore.Bucket, cfg *prepareS
metaFetcher, err := block.NewMetaFetcher(s.logger, 20, objstore.WithNoopInstr(bkt), cfg.tempDir, nil, []block.MetadataFilter{})
assert.NoError(t, err)

// Have our options in the beginning so tests can override logger and index cache if they need to
storeOpts := append([]BucketStoreOption{WithLogger(s.logger), WithIndexCache(s.cache)}, cfg.bucketStoreOpts...)

store, err := NewBucketStore(
"tenant",
objstore.WithNoopInstr(bkt),
Expand All @@ -203,9 +213,7 @@ func prepareStoreWithTestBlocks(t testing.TB, bkt objstore.Bucket, cfg *prepareS
time.Minute,
hashcache.NewSeriesHashCache(1024*1024),
NewBucketStoreMetrics(nil),
WithLogger(s.logger),
WithIndexCache(s.cache),
WithStreamingSeriesPerBatch(65536),
storeOpts...,
)
assert.NoError(t, err)
t.Cleanup(func() {
Expand Down Expand Up @@ -786,11 +794,24 @@ func foreachStore(t *testing.T, runTest func(t *testing.T, newSuite suiteFactory
t.Run("filesystem", func(t *testing.T) {
t.Parallel()

dir := t.TempDir()
b, err := filesystem.NewBucket(t.TempDir())
assert.NoError(t, err)
factory := func(opts ...prepareStoreConfigOption) *storeSuite {
return prepareStoreWithTestBlocks(t, b, defaultPrepareStoreConfig(t).apply(opts...))
}
runTest(t, factory)
})

t.Run("streaming", func(t *testing.T) {
t.Parallel()

b, err := filesystem.NewBucket(dir)
b, err := filesystem.NewBucket(t.TempDir())
assert.NoError(t, err)
factory := func(opts ...prepareStoreConfigOption) *storeSuite {
// We want to force each Series() call to use more than one batch to catch some edge cases.
// This should make the implementation slightly slower, although test time
// should be dominated by the setup.
opts = append(opts, withBucketStoreOptions(WithStreamingSeriesPerBatch(10)))
return prepareStoreWithTestBlocks(t, b, defaultPrepareStoreConfig(t).apply(opts...))
}
runTest(t, factory)
Expand Down
251 changes: 134 additions & 117 deletions pkg/storegateway/bucket_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1038,77 +1038,88 @@ func benchBucketSeries(t test.TB, skipChunk bool, samplesPerSeries, totalSeries
chunkPool, err := pool.NewBucketedBytes(chunkBytesPoolMinSize, chunkBytesPoolMaxSize, 2, 1e9) // 1GB.
assert.NoError(t, err)

st, err := NewBucketStore(
"test",
ibkt,
f,
tmpDir,
NewChunksLimiterFactory(0),
NewSeriesLimiterFactory(0),
newGapBasedPartitioner(mimir_tsdb.DefaultPartitionerMaxGapSize, nil),
1,
mimir_tsdb.DefaultPostingOffsetInMemorySampling,
indexheader.BinaryReaderConfig{},
false,
0,
hashcache.NewSeriesHashCache(1024*1024),
NewBucketStoreMetrics(nil),
WithLogger(logger),
WithChunkPool(chunkPool),
WithStreamingSeriesPerBatch(65536),
)
assert.NoError(t, err)
runTestWithStore := func(t test.TB, st *BucketStore) {
if !t.IsBenchmark() {
st.chunkPool = &mockedPool{parent: st.chunkPool}
}

if !t.IsBenchmark() {
st.chunkPool = &mockedPool{parent: st.chunkPool}
}
assert.NoError(t, st.SyncBlocks(context.Background()))

assert.NoError(t, st.SyncBlocks(context.Background()))
var bCases []*seriesCase
for _, p := range requestedRatios {
expectedSamples := int(p * float64(totalSeries*samplesPerSeries))
if expectedSamples == 0 {
expectedSamples = 1
}
seriesCut := int(p * float64(numOfBlocks*seriesPerBlock))
if seriesCut == 0 {
seriesCut = 1
} else if seriesCut == 1 {
seriesCut = expectedSamples / samplesPerSeriesPerBlock
}

var bCases []*seriesCase
for _, p := range requestedRatios {
expectedSamples := int(p * float64(totalSeries*samplesPerSeries))
if expectedSamples == 0 {
expectedSamples = 1
}
seriesCut := int(p * float64(numOfBlocks*seriesPerBlock))
if seriesCut == 0 {
seriesCut = 1
} else if seriesCut == 1 {
seriesCut = expectedSamples / samplesPerSeriesPerBlock
bCases = append(bCases, &seriesCase{
Name: fmt.Sprintf("%dof%d", expectedSamples, totalSeries*samplesPerSeries),
Req: &storepb.SeriesRequest{
MinTime: 0,
MaxTime: int64(expectedSamples) - 1,
Matchers: []storepb.LabelMatcher{
{Type: storepb.LabelMatcher_EQ, Name: "foo", Value: "bar"},
},
SkipChunks: skipChunk,
},
ExpectedHints: hintspb.SeriesResponseHints{
QueriedBlocks: expectedQueriesBlocks,
},
// This does not cut chunks properly, but those are assured against for non benchmarks only, where we use 100% case only.
ExpectedSeries: series[:seriesCut],
})
}
runTestServerSeries(t, st, bCases...)

bCases = append(bCases, &seriesCase{
Name: fmt.Sprintf("%dof%d", expectedSamples, totalSeries*samplesPerSeries),
Req: &storepb.SeriesRequest{
MinTime: 0,
MaxTime: int64(expectedSamples) - 1,
Matchers: []storepb.LabelMatcher{
{Type: storepb.LabelMatcher_EQ, Name: "foo", Value: "bar"},
},
SkipChunks: skipChunk,
},
ExpectedHints: hintspb.SeriesResponseHints{
QueriedBlocks: expectedQueriesBlocks,
},
// This does not cut chunks properly, but those are assured against for non benchmarks only, where we use 100% case only.
ExpectedSeries: series[:seriesCut],
})
}
runTestServerSeries(t, st, bCases...)
if !t.IsBenchmark() {
if !skipChunk {
// TODO(bwplotka): This is wrong negative for large number of samples (1mln). Investigate.
assert.Equal(t, 0, int(st.chunkPool.(*mockedPool).balance.Load()))
st.chunkPool.(*mockedPool).gets.Store(0)
}

if !t.IsBenchmark() {
if !skipChunk {
// TODO(bwplotka): This is wrong negative for large number of samples (1mln). Investigate.
assert.Equal(t, 0, int(st.chunkPool.(*mockedPool).balance.Load()))
st.chunkPool.(*mockedPool).gets.Store(0)
for _, b := range st.blocks {
// NOTE(bwplotka): It is 4 x 1.0 for 100mln samples. Kind of make sense: long series.
assert.Equal(t, 0.0, promtest.ToFloat64(b.metrics.seriesRefetches))
}
}
}

for _, b := range st.blocks {
// NOTE(bwplotka): It is 4 x 1.0 for 100mln samples. Kind of make sense: long series.
assert.Equal(t, 0.0, promtest.ToFloat64(b.metrics.seriesRefetches))
}
for testName, bucketStoreOpts := range map[string][]BucketStoreOption{
"with default options": {WithLogger(logger), WithChunkPool(chunkPool)},
"with series streaming (1K per batch)": {WithLogger(logger), WithChunkPool(chunkPool), WithStreamingSeriesPerBatch(1000)},
"with series streaming (10K per batch)": {WithLogger(logger), WithChunkPool(chunkPool), WithStreamingSeriesPerBatch(10000)},
} {
st, err := NewBucketStore(
"test",
ibkt,
f,
tmpDir,
NewChunksLimiterFactory(0),
NewSeriesLimiterFactory(0),
newGapBasedPartitioner(mimir_tsdb.DefaultPartitionerMaxGapSize, nil),
1,
mimir_tsdb.DefaultPostingOffsetInMemorySampling,
indexheader.BinaryReaderConfig{},
false,
0,
hashcache.NewSeriesHashCache(1024*1024),
NewBucketStoreMetrics(nil),
bucketStoreOpts...,
)
assert.NoError(t, err)

t.Run(testName, func(t test.TB) {
runTestWithStore(t, st)
})
}

}

type mockedPool struct {
Expand Down Expand Up @@ -1314,65 +1325,74 @@ func TestBucketSeries_OneBlock_InMemIndexCacheSegfault(t *testing.T) {
}

func TestSeries_RequestAndResponseHints(t *testing.T) {
tb, store, seriesSet1, seriesSet2, block1, block2, close := setupStoreForHintsTest(t)
defer close()

testCases := []*seriesCase{
{
Name: "querying a range containing 1 block should return 1 block in the response hints",
Req: &storepb.SeriesRequest{
MinTime: 0,
MaxTime: 1,
Matchers: []storepb.LabelMatcher{
{Type: storepb.LabelMatcher_EQ, Name: "foo", Value: "bar"},
newTestCases := func(seriesSet1 []*storepb.Series, seriesSet2 []*storepb.Series, block1 ulid.ULID, block2 ulid.ULID) []*seriesCase {
return []*seriesCase{
{
Name: "querying a range containing 1 block should return 1 block in the response hints",
Req: &storepb.SeriesRequest{
MinTime: 0,
MaxTime: 1,
Matchers: []storepb.LabelMatcher{
{Type: storepb.LabelMatcher_EQ, Name: "foo", Value: "bar"},
},
},
},
ExpectedSeries: seriesSet1,
ExpectedHints: hintspb.SeriesResponseHints{
QueriedBlocks: []hintspb.Block{
{Id: block1.String()},
ExpectedSeries: seriesSet1,
ExpectedHints: hintspb.SeriesResponseHints{
QueriedBlocks: []hintspb.Block{
{Id: block1.String()},
},
},
},
}, {
Name: "querying a range containing multiple blocks should return multiple blocks in the response hints",
Req: &storepb.SeriesRequest{
MinTime: 0,
MaxTime: 3,
Matchers: []storepb.LabelMatcher{
{Type: storepb.LabelMatcher_EQ, Name: "foo", Value: "bar"},
}, {
Name: "querying a range containing multiple blocks should return multiple blocks in the response hints",
Req: &storepb.SeriesRequest{
MinTime: 0,
MaxTime: 3,
Matchers: []storepb.LabelMatcher{
{Type: storepb.LabelMatcher_EQ, Name: "foo", Value: "bar"},
},
},
},
ExpectedSeries: append(append([]*storepb.Series{}, seriesSet1...), seriesSet2...),
ExpectedHints: hintspb.SeriesResponseHints{
QueriedBlocks: []hintspb.Block{
{Id: block1.String()},
{Id: block2.String()},
ExpectedSeries: append(append([]*storepb.Series{}, seriesSet1...), seriesSet2...),
ExpectedHints: hintspb.SeriesResponseHints{
QueriedBlocks: []hintspb.Block{
{Id: block1.String()},
{Id: block2.String()},
},
},
},
}, {
Name: "querying a range containing multiple blocks but filtering a specific block should query only the requested block",
Req: &storepb.SeriesRequest{
MinTime: 0,
MaxTime: 3,
Matchers: []storepb.LabelMatcher{
{Type: storepb.LabelMatcher_EQ, Name: "foo", Value: "bar"},
}, {
Name: "querying a range containing multiple blocks but filtering a specific block should query only the requested block",
Req: &storepb.SeriesRequest{
MinTime: 0,
MaxTime: 3,
Matchers: []storepb.LabelMatcher{
{Type: storepb.LabelMatcher_EQ, Name: "foo", Value: "bar"},
},
Hints: mustMarshalAny(&hintspb.SeriesRequestHints{
BlockMatchers: []storepb.LabelMatcher{
{Type: storepb.LabelMatcher_EQ, Name: block.BlockIDLabel, Value: block1.String()},
},
}),
},
Hints: mustMarshalAny(&hintspb.SeriesRequestHints{
BlockMatchers: []storepb.LabelMatcher{
{Type: storepb.LabelMatcher_EQ, Name: block.BlockIDLabel, Value: block1.String()},
ExpectedSeries: seriesSet1,
ExpectedHints: hintspb.SeriesResponseHints{
QueriedBlocks: []hintspb.Block{
{Id: block1.String()},
},
}),
},
ExpectedSeries: seriesSet1,
ExpectedHints: hintspb.SeriesResponseHints{
QueriedBlocks: []hintspb.Block{
{Id: block1.String()},
},
},
},
}
}

runTestServerSeries(tb, store, testCases...)
t.Run("with default options", func(t *testing.T) {
tb, store, seriesSet1, seriesSet2, block1, block2, close := setupStoreForHintsTest(t)
tb.Cleanup(close)
runTestServerSeries(tb, store, newTestCases(seriesSet1, seriesSet2, block1, block2)...)
})

t.Run("with series streaming", func(t *testing.T) {
tb, store, seriesSet1, seriesSet2, block1, block2, close := setupStoreForHintsTest(t, WithStreamingSeriesPerBatch(5000))
tb.Cleanup(close)
runTestServerSeries(tb, store, newTestCases(seriesSet1, seriesSet2, block1, block2)...)
})
}

func TestSeries_ErrorUnmarshallingRequestHints(t *testing.T) {
Expand Down Expand Up @@ -1412,7 +1432,6 @@ func TestSeries_ErrorUnmarshallingRequestHints(t *testing.T) {
NewBucketStoreMetrics(nil),
WithLogger(logger),
WithIndexCache(indexCache),
WithStreamingSeriesPerBatch(65536),
)
assert.NoError(t, err)
defer func() { assert.NoError(t, store.RemoveBlocksAndClose()) }()
Expand Down Expand Up @@ -1503,7 +1522,6 @@ func TestSeries_BlockWithMultipleChunks(t *testing.T) {
NewBucketStoreMetrics(nil),
WithLogger(logger),
WithIndexCache(indexCache),
WithStreamingSeriesPerBatch(65536),
)
assert.NoError(t, err)
assert.NoError(t, store.SyncBlocks(context.Background()))
Expand Down Expand Up @@ -1594,7 +1612,7 @@ func createBlockWithOneSeriesWithStep(t test.TB, dir string, lbls labels.Labels,
return createBlockFromHead(t, dir, h)
}

func setupStoreForHintsTest(t *testing.T) (test.TB, *BucketStore, []*storepb.Series, []*storepb.Series, ulid.ULID, ulid.ULID, func()) {
func setupStoreForHintsTest(t *testing.T, opts ...BucketStoreOption) (test.TB, *BucketStore, []*storepb.Series, []*storepb.Series, ulid.ULID, ulid.ULID, func()) {
tb := test.NewTB(t)

closers := []func(){}
Expand Down Expand Up @@ -1652,6 +1670,7 @@ func setupStoreForHintsTest(t *testing.T) (test.TB, *BucketStore, []*storepb.Ser
indexCache, err := indexcache.NewInMemoryIndexCacheWithConfig(logger, nil, indexcache.InMemoryIndexCacheConfig{})
assert.NoError(tb, err)

opts = append([]BucketStoreOption{WithLogger(logger), WithIndexCache(indexCache)}, opts...)
store, err := NewBucketStore(
"tenant",
instrBkt,
Expand All @@ -1667,9 +1686,7 @@ func setupStoreForHintsTest(t *testing.T) (test.TB, *BucketStore, []*storepb.Ser
0,
hashcache.NewSeriesHashCache(1024*1024),
NewBucketStoreMetrics(nil),
WithLogger(logger),
WithIndexCache(indexCache),
WithStreamingSeriesPerBatch(65536),
opts...,
)
assert.NoError(tb, err)
assert.NoError(tb, store.SyncBlocks(context.Background()))
Expand Down