Skip to content

Commit

Permalink
fix: postfilter should be applied in full if knn/fts even if scalar i…
Browse files Browse the repository at this point in the history
…ndex applies (#2931)

If we are in a post-filtering situation (ann/knn/fts) and there is a
scalar index that can be applied to the filter then the scalar index
part was being skipped.

This also fixes a bug where we would consider btree/bitmap indices when
trying to auto-pick the FTS column.
  • Loading branch information
westonpace authored Sep 27, 2024
1 parent d97a93d commit baacc63
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 5 deletions.
31 changes: 31 additions & 0 deletions python/python/tests/test_scalar_index.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ def gen_str(n, split="", char_set=string.ascii_letters + string.digits):
.append_column("price", pa.array(price))
.append_column("meta", pa.array(meta))
.append_column("doc", pa.array(doc, pa.large_string()))
.append_column("doc2", pa.array(doc, pa.string()))
.append_column("id", pa.array(range(nvec)))
)
return tbl
Expand Down Expand Up @@ -244,6 +245,36 @@ def test_filter_with_fts_index(dataset):
assert query == row.as_py()


def test_fts_with_postfilter(tmp_path):
tab = pa.table({"text": ["Frodo the puppy"] * 100, "id": range(100)})
dataset = lance.write_dataset(tab, tmp_path)
dataset.create_scalar_index("text", index_type="INVERTED", with_position=False)

results = dataset.to_table(
full_text_query="Frodo", filter="id = 7", prefilter=False
)
assert results.num_rows == 1

dataset.create_scalar_index("id", index_type="BTREE")

results = dataset.to_table(
full_text_query="Frodo", filter="id = 7", prefilter=False
)

assert results.num_rows == 1


def test_fts_with_other_str_scalar_index(dataset):
dataset.create_scalar_index("doc", index_type="INVERTED", with_position=False)
dataset.create_scalar_index("doc2", index_type="BTREE")

row = dataset.take(indices=[0], columns=["doc"])
query = row.column(0)[0].as_py()
query = query.split(" ")[0]

assert dataset.to_table(full_text_query=query).num_rows > 0


def test_bitmap_index(tmp_path: Path):
"""Test create bitmap index"""
tbl = pa.Table.from_arrays(
Expand Down
5 changes: 5 additions & 0 deletions rust/lance-index/src/scalar/expression.rs
Original file line number Diff line number Diff line change
Expand Up @@ -685,6 +685,11 @@ impl FilterPlan {
pub fn has_any_filter(&self) -> bool {
self.refine_expr.is_some() || self.index_query.is_some()
}

pub fn make_refine_only(&mut self) {
self.index_query = None;
self.refine_expr = self.full_expr.clone();
}
}

pub trait PlannerIndexExt {
Expand Down
28 changes: 23 additions & 5 deletions rust/lance/src/dataset/scanner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ use lance_datafusion::exec::{execute_plan, LanceExecutionOptions};
use lance_datafusion::projection::ProjectionPlan;
use lance_index::scalar::expression::PlannerIndexExt;
use lance_index::scalar::inverted::SCORE_COL;
use lance_index::scalar::FullTextSearchQuery;
use lance_index::scalar::{FullTextSearchQuery, ScalarIndexType};
use lance_index::vector::{Query, DIST_COL};
use lance_index::{scalar::expression::ScalarIndexExpr, DatasetIndexExt};
use lance_io::stream::RecordBatchStream;
Expand All @@ -53,6 +53,7 @@ use tracing::{info_span, instrument, Span};

use super::Dataset;
use crate::datatypes::Schema;
use crate::index::scalar::detect_scalar_index_type;
use crate::index::DatasetIndexInternalExt;
use crate::io::exec::fts::FtsExec;
use crate::io::exec::get_physical_optimizer;
Expand Down Expand Up @@ -1098,12 +1099,25 @@ impl Scanner {
filter_plan = FilterPlan::default();
source
} else {
// If we are postfiltering then we can't use scalar indices for the filter
// and will need to run the postfilter in memory
filter_plan.make_refine_only();
self.knn(&FilterPlan::default()).await?
}
}
(None, Some(query)) => {
// The source is a full text search
self.fts(&filter_plan, query).await?
// The source is an FTS search
if self.prefilter {
// If we are prefiltering then the fts node will take care of the filter
let source = self.fts(&filter_plan, query).await?;
filter_plan = FilterPlan::default();
source
} else {
// If we are postfiltering then we can't use scalar indices for the filter
// and will need to run the postfilter in memory
filter_plan.make_refine_only();
self.fts(&FilterPlan::default(), query).await?
}
}
(None, None) => {
let fragments = if let Some(fragments) = self.fragments.as_ref() {
Expand Down Expand Up @@ -1287,8 +1301,12 @@ impl Scanner {
let mut indexed_columns = Vec::new();
for column in string_columns {
let index = self.dataset.load_scalar_index_for_column(column).await?;
if index.is_some() {
indexed_columns.push(column.clone());
if let Some(index) = index {
let uuid = index.uuid.to_string();
let index_type = detect_scalar_index_type(&self.dataset, column, &uuid).await?;
if matches!(index_type, ScalarIndexType::Inverted) {
indexed_columns.push(column.clone());
}
}
}

Expand Down

0 comments on commit baacc63

Please sign in to comment.