-
Notifications
You must be signed in to change notification settings - Fork 543
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Streaming store-gateway Series() #3355
Conversation
6e8f461
to
e763037
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for working on this! I didn't review every single line of code for correctness, cause I want to focus on the overall design first.
I'm leaving here a feedback reported on Slack too. Let's keep talking on Slack cause discussing through comments here is harder and slower.
My main feedback is that with this design the query will underperform. Let's say we query 100 blocks. We preload 1 batch from 1 block at the time. Everything is serialized. I would like to keep having parallelization but be in control of it (e.g. max memory allocated).
8f90712
to
241e66e
Compare
I went back to this to see how latency looked like during that period. We expected to see increased latency. But that wasn't the case. The p99 for the For context, the batch size was set to 10K, and my queries were touching around 550K series over 12h and were sharded 32 ways. This was a very ad-hoc experiment, and it's probably not as good as it seems. The plan for improvements is to
I'm now trying to make the code a bit easier to undergo the 3 changes above - defining more interfaces, breaking up the index and chunk readers |
593d0a4 is my proposal for the interfaces. This commit should help you imagine where the whole PR would go with concurrency, bytes-limits, elegant cleanups and theoretically offloading to disk. It shows the different components (via interfaces) and their dependencies (via factories). The interfaces are sometimes intentionally brief and omit things like errors handling, contexts, loggers, metrics, stats. I believe these can be added later without disruption. |
b5d3ed5
to
4efee8a
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You're doing an amazing work! I'm super excited about it. The design is way more clear than the initial one. I haven't reviewed every single bit (PR too big, we'll need to split into smaller PRs to get proper reviews, I will help you doing it) but I looked at the overall design which looks pretty solid to me.
25084f2
to
7091e1e
Compare
The CHANGELOG has just been cut to prepare for the next Mimir release. Please rebase |
5c980ac
to
3aada56
Compare
2c85113
to
625521c
Compare
This is a POC and is not meant to be merged yet. Overview Instead of loading all series (label sets and chunks) in memory before responding to a Series() RPC, we can batch them and load X at a time. This gives more predictability on the memory utilization of the store-gateway. The tradeoff is having to do one trip to the index cache and the bucket for each batch, which will affect overall latency of requests. How to use This change disables batch series loading by default and adds two flags to control this - whether it's enabled via `-blocks-storage.bucket-store.batched-series-loading=false` and how many series go into each batch via `-blocks-storage.bucket-store.batch-series-size=65536`. Limiting Ideally we want ot put a limit on the number of bytes that we want to load in each batch instead of the number of series. For now limiting the number of series should still give us some resilience against "big" requests, while still being vulnerable to a flurry of many requests. Testing I've changed all tests within pkg/storegateway to use this new loading strategy. This should give confidence that it is producing correct results. Further work should improve testing around resource utilization (i.e. batches are indeed freed one after the other) and should test both batched and non-batched strategies. This commit has TODOs, which should be addressed before merging this. Signed-off-by: Dimitar Dimitrov <dimitar.dimitrov@grafana.com> Signed-off-by: Marco Pracucci <marco@pracucci.com> Fix which context we use for openBlockSeriesChunkRefsSetsIterator Signed-off-by: Dimitar Dimitrov <dimitar.dimitrov@grafana.com> Streaming store-gateway: Split up blockSeriesChunkRefsSetIterator (#3641) Merge main into dimitar/store-gateway-async-series (#3642) Streaming store-gateway: Move seriesHasher (#3643) Move test limiter Signed-off-by: Dimitar Dimitrov <dimitar.dimitrov@grafana.com> streaming store-gateway: move and rename loadingBatchSet (#3650) Renames `loadingBatchSet` to `loadingSeriesChunksSetIterator` and moves it to `series_chunks.go` Signed-off-by: Dimitar Dimitrov <dimitar.dimitrov@grafana.com> Signed-off-by: Dimitar Dimitrov <dimitar.dimitrov@grafana.com> streaming store-gateway: add preloading for series without chunks (#3649) * streaming store-gateway: add preloading for series without chunks This PR * adds preloading to the seriesSet without chunks * changes the implementation of `preloadingSeriesChunkSetIterator` to be a generic one, so we can reuse it for preloading `seriesChunkRefsSet`s too. * moves some tests of `preloadingSeriesChunkSetIterator` from `batch_series_test.go` to `series_chunks_test.go` * moves `newSeriesSetWithChunks` to `series_chunks.go` Question/note to reviewers: should this iterator be in a new file? I think putting it in preload.go and tests in preload_test.go sounds ok since it now applies to both series_chunks.go and series_refs.go. Signed-off-by: Dimitar Dimitrov <dimitar.dimitrov@grafana.com> streaming store-gateway: Polish batchSetsForBlocks (#3651) * Polish batchSetsForBlocks * rename to batchedSeriesSetForBlocks * remove unused cleanups * move to bucket.go * some formatting * also remove cleanups from synchronousSeriesSet Signed-off-by: Dimitar Dimitrov <dimitar.dimitrov@grafana.com> * Rename batchedSeriesSetForBlocks to streamingSeriesSetForBlocks Signed-off-by: Dimitar Dimitrov <dimitar.dimitrov@grafana.com> Signed-off-by: Dimitar Dimitrov <dimitar.dimitrov@grafana.com> Co-authored-by: Marco Pracucci <marco@pracucci.com> Merge main into dimitar/store-gateway-async-series (#3655) Signed-off-by: Dimitar Dimitrov <dimitar.dimitrov@grafana.com>
402d122
to
98063ee
Compare
…3656) Use a simpler test setup. Locally this reduces the time it takes to run the test from 8.417s to 0.351s This also allows to do better assertions on the chunk refs that we load the from the storage Signed-off-by: Dimitar Dimitrov <dimitar.dimitrov@grafana.com> Signed-off-by: Dimitar Dimitrov <dimitar.dimitrov@grafana.com>
* store-gateway: bucketStore tests with and without streaming This PR also removes the streaming implementation from * `TestSeries_BlockWithMultipleChunks`, which I think is ok because we already have unit tests for multiple chunks in our units * `TestLabelNamesAndValuesHints` label values aren't using streaming still anyway * `TestSeries_ErrorUnmarshallingRequestHints` this should be independent of how series are fetched (streaming or not) pkg/storegateway now takes 109.121s to run on my machine compared to the 94.677s it took before Signed-off-by: Dimitar Dimitrov <dimitar.dimitrov@grafana.com> * Reduce batch size for streaming e2e tests Signed-off-by: Dimitar Dimitrov <dimitar.dimitrov@grafana.com> * Fix test case names Signed-off-by: Dimitar Dimitrov <dimitar.dimitrov@grafana.com> * Run benchmarks with 1K and 10K series Signed-off-by: Dimitar Dimitrov <dimitar.dimitrov@grafana.com> * Move cleanup operations Signed-off-by: Dimitar Dimitrov <dimitar.dimitrov@grafana.com> * Move setupStoreForHintsTest() inside the test run in TestSeries_RequestAndResponseHints Signed-off-by: Marco Pracucci <marco@pracucci.com> Signed-off-by: Dimitar Dimitrov <dimitar.dimitrov@grafana.com> Signed-off-by: Marco Pracucci <marco@pracucci.com> Co-authored-by: Marco Pracucci <marco@pracucci.com>
Signed-off-by: Dimitar Dimitrov <dimitar.dimitrov@grafana.com>
Signed-off-by: Dimitar Dimitrov <dimitar.dimitrov@grafana.com>
Signed-off-by: Dimitar Dimitrov <dimitar.dimitrov@grafana.com>
} | ||
|
||
begin := time.Now() | ||
err := g.Wait() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This means that we effectively block until we fetch all postings for all blocks before starting to load series and merging them. I think we can do better and fetch them asynchronously.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
agreed offline to do this change after merging this PR
Signed-off-by: Dimitar Dimitrov <dimitar.dimitrov@grafana.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good job, LGTM! I reviewed all this code already, so no big surprises to me. I noticed few last things I would suggest to take a look at.
I think this work deserve a CHANGELOG entry. Remember to add all PR numbers of previously merged PRs too.
Can you also double check we haven't left any nolint:unused
?
var ( | ||
resMtx sync.Mutex | ||
res []storepb.SeriesSet | ||
cleanups []func() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note to reviewers: removed because was not used.
if err == nil { | ||
return | ||
} | ||
code := codes.Aborted |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The main difference of moving this error wrapping logic here is that now it's applied to all errors returned, while previously it was only applied to some of them. Problem is that I don't know what's the correct one, because I don't remember what is this used for. Do you? Can you check how the error is handled in the querier (where this function is called via gRPC)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i think it kind of depends. i had to do this because some of the errors which were previously sruficing here
mimir/pkg/storegateway/bucket.go
Lines 1023 to 1029 in ad526f8
if err != nil { | |
code := codes.Aborted | |
if s, ok := status.FromError(errors.Cause(err)); ok { | |
code = s.Code() | |
} | |
return nil, cleanup, status.Error(code, err.Error()) | |
} |
are now surfacing here
mimir/pkg/storegateway/bucket.go
Lines 951 to 954 in 78ed13f
if seriesSet.Err() != nil { | |
err = errors.Wrap(seriesSet.Err(), "expand series set") | |
return | |
} |
so with the collective changes we are removing this case where we return Unknown
mimir/pkg/storegateway/bucket.go
Lines 910 to 913 in ad526f8
if set.Err() != nil { | |
err = status.Error(codes.Unknown, errors.Wrap(set.Err(), "expand series set").Error()) | |
return | |
} |
i couldn't find a place which looks for the Unknown code.
to answer your question on the querier. In the querier we just ignore the errors unless it's EOF
mimir/pkg/querier/blocks_store_queryable.go
Lines 737 to 744 in 3592c25
resp, err := stream.Recv() | |
if errors.Is(err, io.EOF) { | |
break | |
} | |
if err != nil { | |
level.Warn(spanLog).Log("msg", "failed to receive series", "remote", c.RemoteAddress(), "err", err) | |
return nil | |
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the details. I also can't see a reason why this change could break any error handling, given the querier is not checking it and io.EOF
comes from the client, not the server.
pkg/storegateway/bucket.go
Outdated
var readers *chunkReaders | ||
if !req.SkipChunks { | ||
readers = newChunkReaders(chunkr) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[nit] What if, for simplicity, we always call newChunkReaders()
regardless req.SkipChunks
?
pkg/storegateway/bucket.go
Outdated
mergeDuration := time.Since(begin) | ||
mergeStats.mergeDuration += mergeDuration | ||
s.metrics.seriesMergeDuration.Observe(mergeDuration.Seconds()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Has this changed intentionally? Logic is the same, but I've the feeling this has changed unintentionally.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i was changed and then changed back.. and in the meantime it got a new form. i will revert it
Signed-off-by: Dimitar Dimitrov <dimitar.dimitrov@grafana.com>
Signed-off-by: Dimitar Dimitrov <dimitar.dimitrov@grafana.com>
Signed-off-by: Dimitar Dimitrov <dimitar.dimitrov@grafana.com>
Signed-off-by: Dimitar Dimitrov <dimitar.dimitrov@grafana.com>
Signed-off-by: Dimitar Dimitrov <dimitar.dimitrov@grafana.com>
Signed-off-by: Dimitar Dimitrov <dimitar.dimitrov@grafana.com>
Signed-off-by: Dimitar Dimitrov <dimitar.dimitrov@grafana.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
3.2.1.... LGTM!
Signed-off-by: Dimitar Dimitrov <dimitar.dimitrov@grafana.com>
The store-gateway can now stream results back to the querier instead of buffering them. This is expected to greatly reduce peak memory consumption while keeping latency the same. You can enable this feature by setting `-blocks-storage.bucket-store.batch-series-size` to a value in the high thousands (5000-10000). This is still an experimental feature and is subject to a changing API and instability. Signed-off-by: Dimitar Dimitrov <dimitar.dimitrov@grafana.com> Signed-off-by: Marco Pracucci <marco@pracucci.com> Co-authored-by: Marco Pracucci <marco@pracucci.com>
This is a POC and is not meant to be merged yet.Overview
Instead of loading all series (label sets and chunks) in memory before
responding to a Series() RPC, we can batch them and load X at a time.
This gives more predictability on the memory utilization of the
store-gateway. The tradeoff is having to do one trip to the index cache
and the bucket for each batch, which will affect overall latency of requests.
How to use
This change disables batch series loading by default and adds two flags
to control this - whether it's enabled via
-blocks-storage.bucket-store.batched-series-loading=false
and how manyseries go into each batch via
-blocks-storage.bucket-store.batch-series-size=65536
.Limiting
Ideally we want ot put a limit on the number of bytes that we want to
load in each batch instead of the number of series. For now limiting
the number of series should still give us some resilience against "big"
requests, while still being vulnerable to a flurry of many requests.
Testing
I've changed all tests within pkg/storegateway to use this new loading
strategy. This should give confidence that it is producing correct
results. Further work should improve testing around resource utilization
(i.e. batches are indeed freed one after the other)
and should test both batched and non-batched strategies.
This commit has TODOs, which should be addressed before merging this.
Signed-off-by: Dimitar Dimitrov dimitar.dimitrov@grafana.com