Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve Lucene's I/O concurrency #13179

Closed
7 of 9 tasks
jpountz opened this issue Mar 13, 2024 · 12 comments
Closed
7 of 9 tasks

Improve Lucene's I/O concurrency #13179

jpountz opened this issue Mar 13, 2024 · 12 comments

Comments

@jpountz
Copy link
Contributor

jpountz commented Mar 13, 2024

Description

Currently, Lucene's I/O concurrency is bound by the search concurrency. If IndexSearcher runs on N threads, then Lucene will never perform more than N I/Os concurrently. Unless you significantly overprovision your search thread pool - which is bad for other reasons, Lucene will bottleneck on I/O latency without even maxing out the IOPS of the host.

I don't think that Lucene should fully embrace asynchronousness in its APIs, or query evaluation would become overly complicated. But I still expect that we have a lot of room for improvement to allow each search thread to perform multiple I/Os concurrently under the hood when needed.

Some examples:

  • When running a query on two terms, e.g. apache OR lucene, could the I/O lookups in the tim file (terms dictionary) be performed concurrently for both terms?
  • When running a query on two terms and start offsets in the doc file (postings) have been resolved, could we start loading the first bytes from these postings lists from disk concurrently?
  • When fetching the top N=100 stored documents that match a query, could we load bytes from the fdt file (stored fields) for all these documents concurrently?

This would require API changes in our Directory APIs, and some low-level IndexReader APIs (TermsEnum, StoredFieldsReader?).

@jpountz jpountz added this to the 10.0.0 milestone Mar 20, 2024
jpountz added a commit to jpountz/lucene that referenced this issue May 2, 2024
This adds `IndexInput#prefetch`, which is an optional operation that instructs
the `IndexInput` to start fetching bytes from storage in the background. These
bytes will be picked up by follow-up calls to the `IndexInput#readXXX` methods.
In the future, this will help Lucene move from a maximum of one I/O operation
per search thread to one I/O operation per search thread per `IndexInput`.
Typically, when running a query on two terms, the I/O into the terms dictionary
is sequential today. In the future, we would ideally do these I/Os in parallel
using this new API. Note that this will require API changes to some classes
including `TermsEnum`.

I settled on this API because it's simple and wouldn't require making all
Lucene APIs asynchronous to take advantage of extra I/O concurrency, which I
worry would make the query evaluation logic too complicated.

Currently, only `NIOFSDirectory` implements this new API. I played with
`MMapDirectory` as well and found an approach that worked better in the
benchmark I've been playing with, but I'm not sure it makes sense to implement
this API on this directory as it either requires adding an explicit buffer on
`MMapDirectory`, or forcing data to be loaded into the page cache even though
the OS may have decided that it's not a good idea due to too few cache hits.

This change will require follow-ups to start using this new API when working
with terms dictionaries, postings, etc.

Relates apache#13179
jpountz added a commit to jpountz/lucene that referenced this issue May 2, 2024
This adds `IndexInput#prefetch`, which is an optional operation that instructs
the `IndexInput` to start fetching bytes from storage in the background. These
bytes will be picked up by follow-up calls to the `IndexInput#readXXX` methods.
In the future, this will help Lucene move from a maximum of one I/O operation
per search thread to one I/O operation per search thread per `IndexInput`.
Typically, when running a query on two terms, the I/O into the terms dictionary
is sequential today. In the future, we would ideally do these I/Os in parallel
using this new API. Note that this will require API changes to some classes
including `TermsEnum`.

I settled on this API because it's simple and wouldn't require making all
Lucene APIs asynchronous to take advantage of extra I/O concurrency, which I
worry would make the query evaluation logic too complicated.

Currently, only `NIOFSDirectory` implements this new API. I played with
`MMapDirectory` as well and found an approach that worked better in the
benchmark I've been playing with, but I'm not sure it makes sense to implement
this API on this directory as it either requires adding an explicit buffer on
`MMapDirectory`, or forcing data to be loaded into the page cache even though
the OS may have decided that it's not a good idea due to too few cache hits.

This change will require follow-ups to start using this new API when working
with terms dictionaries, postings, etc.

Relates apache#13179
jpountz added a commit to jpountz/lucene that referenced this issue May 2, 2024
This adds `IndexInput#prefetch`, which is an optional operation that instructs
the `IndexInput` to start fetching bytes from storage in the background. These
bytes will be picked up by follow-up calls to the `IndexInput#readXXX` methods.
In the future, this will help Lucene move from a maximum of one I/O operation
per search thread to one I/O operation per search thread per `IndexInput`.
Typically, when running a query on two terms, the I/O into the terms dictionary
is sequential today. In the future, we would ideally do these I/Os in parallel
using this new API. Note that this will require API changes to some classes
including `TermsEnum`.

I settled on this API because it's simple and wouldn't require making all
Lucene APIs asynchronous to take advantage of extra I/O concurrency, which I
worry would make the query evaluation logic too complicated.

Currently, only `NIOFSDirectory` implements this new API. I played with
`MMapDirectory` as well and found an approach that worked better in the
benchmark I've been playing with, but I'm not sure it makes sense to implement
this API on this directory as it either requires adding an explicit buffer on
`MMapDirectory`, or forcing data to be loaded into the page cache even though
the OS may have decided that it's not a good idea due to too few cache hits.

This change will require follow-ups to start using this new API when working
with terms dictionaries, postings, etc.

Relates apache#13179
jpountz added a commit that referenced this issue May 10, 2024
This adds `IndexInput#prefetch`, which is an optional operation that instructs
the `IndexInput` to start fetching bytes from storage in the background. These
bytes will be picked up by follow-up calls to the `IndexInput#readXXX` methods.
In the future, this will help Lucene move from a maximum of one I/O operation
per search thread to one I/O operation per search thread per `IndexInput`.
Typically, when running a query on two terms, the I/O into the terms dictionary
is sequential today. In the future, we would ideally do these I/Os in parallel
using this new API. Note that this will require API changes to some classes
including `TermsEnum`.

I settled on this API because it's simple and wouldn't require making all
Lucene APIs asynchronous to take advantage of extra I/O concurrency, which I
worry would make the query evaluation logic too complicated.

This change will require follow-ups to start using this new API when working
with terms dictionaries, postings, etc.

Relates #13179

Co-authored-by: Uwe Schindler <uschindler@apache.org>
@sohami
Copy link
Contributor

sohami commented May 24, 2024

@jpountz This looks interesting. On similar lines to this, in OpenSearch we are working on building a warm index which will not have all the data available locally all the time and will download data on-demand from remote store during search time. To improve on search performance I am thinking about the mechanism to prefetch the blocks of data from remote store to local disk before it is accessed. In addition to above, aggregations in OpenSearch can also benefit from this. The aggregation collector performs the doc value lookup on the fields of the matched documents. This happens sequentially document by document as driven by collectors. This can be changed to first collect all the matching documents and then perform prefetch of the blocks for matched documents followed by actual collection in AggregationCollector (similar to FacetCollector in Lucene). On first look it seems like to achieve this different DocValuesIterator can provide an optional prefetch API which takes in list of docIds and internally triggers the prefetch for each of these using the IndexInput (which already supports prefetch now). Would like to get your feedback on this ? Facets or any other usage of doc values can also use this mechanism to trigger prefetch of doc values of matching docs before getting actual values.

@jpountz
Copy link
Contributor Author

jpountz commented May 24, 2024

Thanks for looking at this!

This can be changed to first collect all the matching documents and then perform prefetch of the blocks for matched documents followed by actual collection

This should work, though I'm wary of making it the new way that collectors need to interact with doc values if they want to be able to take advantage of prefetching. E.g. we also have collectors for top hits sorted by field, where collecting all hits ahead of time would kill the benefits of dynamic pruning. I wonder if there are approaches that don't require collecting all matches up-front? Access to doc values is forward-only, so prefetching the first page only and then relying on some form of read ahead would hopefully do what we need?

@msfroh
Copy link
Contributor

msfroh commented May 29, 2024

This should work, though I'm wary of making it the new way that collectors need to interact with doc values if they want to be able to take advantage of prefetching. E.g. we also have collectors for top hits sorted by field, where collecting all hits ahead of time would kill the benefits of dynamic pruning. I wonder if there are approaches that don't require collecting all matches up-front? Access to doc values is forward-only, so prefetching the first page only and then relying on some form of read ahead would hopefully do what we need?

Couldn't we do both with the suggested prefetch operation on DocValuesIterator? Just prefetch as much or as little as needed for the particular use-case.

For the general top doc collector / bulk scorer, the doc value prefetch could look "just ahead", prefetching as it goes (maybe we buffer the next few doc IDs from the first-phase scorer and prefetch those?). Am I correct in understanding that prefetching an already-fetched page is (at least approximately) a no-op?

If we want to collect all the doc IDs during the collect phase (as Lucene's FacetsCollector does) and then prefetch them all at once to compute facet counts, that works too.

@sohami
Copy link
Contributor

sohami commented May 29, 2024

This should work, though I'm wary of making it the new way that collectors need to interact with doc values if they want to be able to take advantage of prefetching

To add to the above, keeping it an optional API will let different collectors type also decide if it wants to make the prefetch based call or not based on its use case. Not calling the Api will keep the existing behavior.

@jpountz
Copy link
Contributor Author

jpountz commented May 29, 2024

Am I correct in understanding that prefetching an already-fetched page is (at least approximately) a no-op?

We tried to make it cheap (see e.g. the logic to disable calling madvise after a while if the data seems to fully fit in the page cache anyway), but so is reading a doc-value that fits in the page cache via RandomAccessInput, so I'm expecting us to see a performance regression in nightlies if we add calls to prefetch() to (Numeric|SortedNumeric|Sorted|SortedSet|Binary)DocValues#advance.

To avoid this per-doc overhead, I imagine that we would need to add some prefetch() API on (Numeric|SortedNumeric|Sorted|SortedSet|Binary)DocValues like @sohami suggests and require it to be called at a higher level when this can be more easily amortized across many docs, e.g. by making BulkScorer score ranges of X doc IDs at once and only calling prefetch once per range.

I'm still debating with myself whether this would be valuable enough vs. just giving a hint to the IndexInput that it would be a good idea to read ahead because it's being used by postings or doc values that have a forward-only access pattern.

maybe we buffer the next few doc IDs from the first-phase scorer and prefetch those

FWIW this would break a few things, e.g. we have collectors that only compute the score when needed (e.g. when sorting by field then score). But if we need to buffer docs up-front, then we don't know at this point in time if scores are going to be needed or not, so we need to score more docs. Maybe it's still the right trade-off, I'm mostly pointing out that this would be a bigger trade-off than what we've done for prefetching until now.

If we want to collect all the doc IDs during the collect phase (as Lucene's FacetsCollector does) and then prefetch them all at once to compute facet counts, that works too.

Maybe such an approach would be ok for application code that can make assumptions about how much page cache it has, but I'm expecting Lucene code to avoid ever prefetching many MBs at once, because this increases chances that the first bytes that got prefetched got paged out before we could use them. This is one reason why I like the approach of just giving a hint to the IndexInput that it should perform read-ahead, IndexInput impls that read from fast storage can read ahead relatively little, in the order of a few pages, while IndexInput impls that read from slower storage like the warm index use-case that @sohami describes above could fetch MBs from slow remote storage and cache it on a local disk or something like that to reduce interactions with the slow remote storage.

If one of you would like to take a stab at an approach to prefetching doc values, I'd be happy to look at a PR.

@sohami
Copy link
Contributor

sohami commented May 30, 2024

To avoid this per-doc overhead, I imagine that we would need to add some prefetch() API on (Numeric|SortedNumeric|Sorted|SortedSet|Binary)DocValues like @sohami suggests and require it to be called at a higher level when this can be more easily amortized across many docs, e.g. by making BulkScorer score ranges of X doc IDs at once and only calling prefetch once per range.

Ya I was thinking that this prefetch API could take in bitset of docIds.

Maybe such an approach would be ok for application code that can make assumptions about how much page cache it has, but I'm expecting Lucene code to avoid ever prefetching many MBs at once, because this increases chances that the first bytes that got prefetched got paged out before we could use them. This is one reason why I like the approach of just giving a hint to the IndexInput that it should perform read-ahead, IndexInput impls that read from fast storage can read ahead relatively little, in the order of a few pages, while IndexInput impls that read from slower storage like the warm index use-case that @sohami describes above could fetch MBs from slow remote storage and cache it on a local disk or something like that to reduce interactions with the slow remote storage.

If I understand correctly, the read ahead mechanism in IndexInput will be useful if matching docs fall within the read ahead size. Otherwise those will be wasted pages cached or downloaded in the warm index use-case and prefetch will not be useful. Instead was thinking that if we have bulk prefetch API in IndexInput layer too (along with in the DocValues) which takes in say list of offset and may be length as well, then each IndexInput can internally make a decision to limit the prefetch vs perform prefetch for each of the provided input ? One mechanism to limit the prefetch can be to decide based on different offsets on how many pages needs to be prefetched and limit the distinct pages based on some threshold which the IndexInput implementation can decide on.

If one of you would like to take a stab at an approach to prefetching doc values, I'd be happy to look at a PR.

Sure, I can take a stab for say NumericDocValues and in context of facets to start with. Will have to explore how facets work in lucene so it may take me sometime. Let me know if that sounds good to you as a starting poc.

@jpountz
Copy link
Contributor Author

jpountz commented May 30, 2024

If I understand correctly, the read ahead mechanism in IndexInput will be useful if matching docs fall within the read ahead size. Otherwise those will be wasted pages cached or downloaded in the warm index use-case and prefetch will not be useful.

This is correct. For the record, this wastage may sound disappointing, but it also helps with making I/O more concurrent. For instance, say you have a conjunction on two clauses: "a AND b" (which could be postings, but also doc-value-based iterators, e.g. via a FieldExistsQuery). First we advance a, then we advance b to the next doc that is on of after the doc that a is on. If we don't want to prefetch data without evidence that it's actually going to be needed then we have no way of doing the I/O for a and b in parallel since we need to finish the I/O for a before having a chance to know what to prefetch for b.

Sure, I can take a stab for say NumericDocValues and in context of facets to start with.

This sounds fine, we need to start somewhere. FWIW the main consumers of the NumericDocValues API that we should care about in my opinion are NumericComparator, SortedNumericDocValuesRangeQuery, LeafSimScorer and the lucene/facet module. Ideally we'd come up with an approach that works well for all these consumers.

jpountz added a commit to jpountz/lucene that referenced this issue Jun 4, 2024
…ess pattern.

This introduces a new API that allows directories to optimize access to
`IndexInput`s that have a forward-only access pattern by reading ahead of the
current position. It would be applicable to:
 - Postings lists,
 - Doc values,
 - Norms,
 - Points.

Relates apache#13179
@jpountz
Copy link
Contributor Author

jpountz commented Jun 4, 2024

@sohami I gave a try at a possible approach at #13450 in case you're curious.

@sohami
Copy link
Contributor

sohami commented Jun 4, 2024

@sohami I gave a try at a possible approach at #13450 in case you're curious.

@jpountz Thanks for sharing this. Originally I was thinking the prefetch optimization only in collect phase but I am trying to understand if it can be used in iterators side of things as well. To understand better I am looking into SortedNumericDocValuesRangeQuery test to understand the flow when different iterators are involved.

So far my general understanding is all the scoring and collection of docs via Collectors happens in the method DefaultBulkScorer::Score. The lead scorerIterator in that could either be a standalone iterator or wrapper on multiple iterators or an approximation iterator when TwoPhaseIterator is non-null. These are then passed down to scoreAll or scoreRange (ignoring the competitiveIterator for now). In either of scoreAll or scoreRange we iterate over the lead scorerIterator to get the matching docs and then check if the doc matches the TwoPhaseIterator or not to make it eligible for collection via collectors. So I see following flows/cases: a) When only lead scorerIterator is present, b) When both lead scorerIterator and TwoPhaseIterator is present, c) the collect phase which happens over doc that scorers have found.

Based on my above understanding, I am thinking below and would love your feedback

  1. For case (a), when only single iterator is involved the readAhead mechanism can be useful. This is considering a single iterator will not know what next match is until it goes to the next doc.

  2. For case (b), we can potentially do combination of readAhead and prefetch. We can use readAhead on lead iterator and then buffer some of the matching docs from this lead iterator. Then before evaluating if these docs matches TwoPhaseIterator or not, we can perform prefetch on these buffered docs (via some prepareMatches mechanism on TwoPhaseIterator). Here we know which all docs will be used for evaluating matches on TwoPhaseIterator, so we should be able to prefetch data for those docs. Would like to understand more on your earlier feedback on this, as my understanding is collection will come afterwards.

maybe we buffer the next few doc IDs from the first-phase scorer and prefetch those

FWIW this would break a few things, e.g. we have collectors that only compute the score when needed (e.g. when sorting by field then score). But if we need to buffer docs up-front, then we don't know at this point in time if scores are going to be needed or not, so we need to score more docs. Maybe it's still the right trade-off, I'm mostly pointing out that this would be a bigger trade-off than what we've done for prefetching until now.

  1. Before calling collect phase on collectors, we can first buffer up the matching docs. Ask collectors to trigger optional prefetch of the docs which will be passed to it for collection. These docs are the ones which was produced by scorers with or without TwoPhaseIterator in the mix.

I think for scenarios like 2 and 3 above where we know exact doc matches, performing prefetch could be useful vs readAhead.

@jpountz
Copy link
Contributor Author

jpountz commented Jun 5, 2024

Then before evaluating if these docs matches TwoPhaseIterator or not, we can perform prefetch on these buffered docs (via some prepareMatches mechanism on TwoPhaseIterator).

This can be done, but I'd note that this would be a significant change to our APIs since TwoPhaseIterator only supports verifying the current document that the approximation is on. It is not possible to buffer matching documents from the approximation, to then check them with the TwoPhaseIterator. This is similar to the point I was making in a previous comment about buffering documents in collectors, Scorer#score only supports scoring the current document that the scorer is positioned on, it is not possible to buffer several documents and then evaluate their scores in TopScoreDocCollector (without API changes).

via some prepareMatches mechanism on TwoPhaseIterator

FWIW one thing that is on my mind is that both postings and doc values take in the order of 1 or 2 bytes per document. So even a query that matches 0.1% of docs, evenly distributed in the doc ID space, would still end up fetching all pages in practice. So a very smart prefetching may only perform better than naive prefetching in the following cases:

  • Queries that are extremely sparse.
  • Queries whose matches are highly clustered in the doc ID spare, because of index sorting, recursive graph bisection or early termination.

But then I'd still expect some naive readahead logic to perform ok in such cases. For the extremely sparse case, it would fetch up to X times too many pages where X is the number of pages that get read ahead. For reasonable values of X, this should be ok.

The other thing that is on my mind is that this sort of approach allows us doing it completely at the OS level, which gives additional efficiency.

@sohami
Copy link
Contributor

sohami commented Jun 6, 2024

Then before evaluating if these docs matches TwoPhaseIterator or not, we can perform prefetch on these buffered docs (via some prepareMatches mechanism on TwoPhaseIterator).

This can be done, but I'd note that this would be a significant change to our APIs since TwoPhaseIterator only supports verifying the current document that the approximation is on. It is not possible to buffer matching documents from the approximation, to then check them with the TwoPhaseIterator. This is similar to the point I was making in a previous comment about buffering documents in collectors, Scorer#score only supports scoring the current document that the scorer is positioned on, it is not possible to buffer several documents and then evaluate their scores in TopScoreDocCollector (without API changes).

Thanks for explaining this to me. Seems like this would mean to change the iteration and scoring behavior to work on range of docs vs 1 doc at a time (which is the current behavior in lucene). Probably it will work fine for collectors not requiring any scoring but it is not a general use case and will be limited to exact prefetch in collectors only.

via some prepareMatches mechanism on TwoPhaseIterator

FWIW one thing that is on my mind is that both postings and doc values take in the order of 1 or 2 bytes per document. So even a query that matches 0.1% of docs, evenly distributed in the doc ID space, would still end up fetching all pages in practice. So a very smart prefetching may only perform better than naive prefetching in the following cases:

  • Queries that are extremely sparse.
  • Queries whose matches are highly clustered in the doc ID spare, because of index sorting, recursive graph bisection or early termination.

But then I'd still expect some naive readahead logic to perform ok in such cases. For the extremely sparse case, it would fetch up to X times too many pages where X is the number of pages that get read ahead. For reasonable values of X, this should be ok.

Agreed. I had similar thought with readahead but was trying to see if there are ways to avoid X. But as you said in general cases it will probably end up fetching all the pages anyways.

For read ahead in DocValues case, I am thinking that when docValue is fetched for current doc, probably we can provide the hint there to the IndexInput to perform readahead. This can be useful for IndexInputs to perform some read ahead which interacts with remote store. However, in default case, I think the OS will take care of readahead on the read from a specific offset so it could be NoOp there. But same could be done even when any seek is happening on an IndexInput if it knows that it should follow the sequential access pattern. So I guess your latest PR is providing that hint and probably we don't need any separate readAhead API.

@javanna javanna removed this from the 10.0.0 milestone Oct 1, 2024
shatejas pushed a commit to shatejas/lucene that referenced this issue Oct 13, 2024
This adds `IndexInput#prefetch`, which is an optional operation that instructs
the `IndexInput` to start fetching bytes from storage in the background. These
bytes will be picked up by follow-up calls to the `IndexInput#readXXX` methods.
In the future, this will help Lucene move from a maximum of one I/O operation
per search thread to one I/O operation per search thread per `IndexInput`.
Typically, when running a query on two terms, the I/O into the terms dictionary
is sequential today. In the future, we would ideally do these I/Os in parallel
using this new API. Note that this will require API changes to some classes
including `TermsEnum`.

I settled on this API because it's simple and wouldn't require making all
Lucene APIs asynchronous to take advantage of extra I/O concurrency, which I
worry would make the query evaluation logic too complicated.

This change will require follow-ups to start using this new API when working
with terms dictionaries, postings, etc.

Relates apache#13179

Co-authored-by: Uwe Schindler <uschindler@apache.org>
@jpountz
Copy link
Contributor Author

jpountz commented Oct 17, 2024

There can always be follow-up improvements, but I think it's time to close this issue. Most usage of Lucene is now able to perform multiple concurrent I/O operations from the same search thread, this has been released in Lucene 10.

@jpountz jpountz closed this as completed Oct 17, 2024
shatejas pushed a commit to shatejas/lucene that referenced this issue Nov 17, 2024
This adds `IndexInput#prefetch`, which is an optional operation that instructs
the `IndexInput` to start fetching bytes from storage in the background. These
bytes will be picked up by follow-up calls to the `IndexInput#readXXX` methods.
In the future, this will help Lucene move from a maximum of one I/O operation
per search thread to one I/O operation per search thread per `IndexInput`.
Typically, when running a query on two terms, the I/O into the terms dictionary
is sequential today. In the future, we would ideally do these I/Os in parallel
using this new API. Note that this will require API changes to some classes
including `TermsEnum`.

I settled on this API because it's simple and wouldn't require making all
Lucene APIs asynchronous to take advantage of extra I/O concurrency, which I
worry would make the query evaluation logic too complicated.

This change will require follow-ups to start using this new API when working
with terms dictionaries, postings, etc.

Relates apache#13179

Co-authored-by: Uwe Schindler <uschindler@apache.org>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

4 participants