Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf: re-enable late materialization on full scans #2290

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
150 changes: 133 additions & 17 deletions rust/lance/src/dataset/scanner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ pub struct Scanner {
pub(crate) filter: Option<Expr>,

/// The batch size controls the maximum size of rows to return for each read.
batch_size: usize,
batch_size: Option<usize>,

/// Number of batches to prefetch
batch_readahead: usize,
Expand Down Expand Up @@ -190,19 +190,13 @@ impl Scanner {
pub fn new(dataset: Arc<Dataset>) -> Self {
let projection = dataset.schema().clone();

// Default batch size to be large enough so that a i32 column can be
// read in a single range request. For the object store default of
// 64KB, this is 16K rows. For local file systems, the default block size
// is just 4K, which would mean only 1K rows, which might be a little small.
// So we use a default minimum of 8K rows.
let batch_size = std::cmp::max(dataset.object_store().block_size() / 4, DEFAULT_BATCH_SIZE);
Self {
dataset,
phyical_columns: projection,
requested_output_expr: None,
prefilter: false,
filter: None,
batch_size,
batch_size: None,
batch_readahead: DEFAULT_BATCH_READAHEAD,
fragment_readahead: DEFAULT_FRAGMENT_READAHEAD,
limit: None,
Expand Down Expand Up @@ -231,6 +225,20 @@ impl Scanner {
self
}

fn get_batch_size(&self) -> usize {
// Default batch size to be large enough so that a i32 column can be
// read in a single range request. For the object store default of
// 64KB, this is 16K rows. For local file systems, the default block size
// is just 4K, which would mean only 1K rows, which might be a little small.
// So we use a default minimum of 8K rows.
self.batch_size.unwrap_or_else(|| {
std::cmp::max(
self.dataset.object_store().block_size() / 4,
DEFAULT_BATCH_SIZE,
)
})
}

fn ensure_not_fragment_scan(&self) -> Result<()> {
if self.is_fragment_scan() {
Err(Error::IO {
Expand Down Expand Up @@ -352,7 +360,7 @@ impl Scanner {

/// Set the batch size.
pub fn batch_size(&mut self, batch_size: usize) -> &mut Self {
self.batch_size = batch_size;
self.batch_size = Some(batch_size);
self
}

Expand Down Expand Up @@ -807,8 +815,6 @@ impl Scanner {

let planner = Planner::new(Arc::new(self.dataset.schema().into()));

// NOTE: we only support node that have one partition. So any nodes that
// produce multiple need to be repartitioned to 1.
let mut filter_plan = if let Some(filter) = self.filter.as_ref() {
let index_info = self.dataset.scalar_index_info().await?;
let filter_plan =
Expand Down Expand Up @@ -869,13 +875,21 @@ impl Scanner {
self.scalar_indexed_scan(&filter_schema, index_query)
.await?
}
(None, Some(_)) if self.use_stats && self.batch_size == DEFAULT_BATCH_SIZE => {
(None, Some(_)) if self.use_stats && self.batch_size.is_none() => {
self.pushdown_scan(false, filter_plan.refine_expr.take().unwrap())?
}
(None, _) => {
// The source is a full scan of the table
let with_row_id = filter_plan.has_refine() || self.with_row_id;
self.scan(with_row_id, false, self.phyical_columns.clone().into())
let schema = if filter_plan.has_refine() {
// If there is a filter then only load the filter columns in the
// initial scan. We will `take` the remaining columns later
let columns = filter_plan.refine_columns();
Arc::new(self.dataset.schema().project(&columns)?)
} else {
Arc::new(self.phyical_columns.clone())
};
self.scan(with_row_id, false, schema)
}
}
};
Expand Down Expand Up @@ -1293,7 +1307,7 @@ impl Scanner {
self.dataset.clone(),
fragments,
projection,
self.batch_size,
self.get_batch_size(),
self.batch_readahead,
self.fragment_readahead,
with_row_id,
Expand Down Expand Up @@ -1647,6 +1661,7 @@ mod test {
use half::f16;
use lance_datagen::{array, gen, BatchCount, Dimension, RowCount};
use lance_index::IndexType;
use lance_io::object_store::ObjectStoreParams;
use lance_testing::datagen::{BatchGenerator, IncrementingInt32, RandomVector};
use tempfile::{tempdir, TempDir};

Expand All @@ -1658,6 +1673,7 @@ mod test {
use crate::dataset::WriteParams;
use crate::index::scalar::ScalarIndexParams;
use crate::index::vector::VectorIndexParams;
use crate::utils::test::IoTrackingStore;

#[tokio::test]
async fn test_batch_size() {
Expand Down Expand Up @@ -3418,6 +3434,106 @@ mod test {
Ok(())
}

#[tokio::test]
async fn test_late_materialization() {
// Create a large dataset with a scalar indexed column and a sorted but not scalar
// indexed column
let data = gen()
.col(
"vector",
array::rand_vec::<Float32Type>(Dimension::from(32)),
)
.col("indexed", array::step::<Int32Type>())
.col("not_indexed", array::step::<Int32Type>())
.into_reader_rows(RowCount::from(1000), BatchCount::from(20));

let (io_stats_wrapper, io_stats) = IoTrackingStore::new_wrapper();
let mut dataset = Dataset::write(
data,
"memory://test",
Some(WriteParams {
store_params: Some(ObjectStoreParams {
object_store_wrapper: Some(io_stats_wrapper),
..Default::default()
}),
..Default::default()
}),
)
.await
.unwrap();
dataset
.create_index(
&["indexed"],
IndexType::Scalar,
None,
&ScalarIndexParams::default(),
false,
)
.await
.unwrap();

let get_bytes = || io_stats.lock().unwrap().read_bytes;

// First run a full scan to get a baseline
let start_bytes = get_bytes();
dataset.scan().try_into_batch().await.unwrap();
let full_scan_bytes = get_bytes() - start_bytes;

// Next do a scan without pushdown, we should still see a benefit from late materialization
let start_bytes = get_bytes();
dataset
.scan()
.use_stats(false)
.filter("not_indexed = 50")
.unwrap()
.try_into_batch()
.await
.unwrap();
let filtered_scan_bytes = get_bytes() - start_bytes;

assert!(filtered_scan_bytes < full_scan_bytes);

// Now do a scan with pushdown, the benefit should be even greater
let start_bytes = get_bytes();
dataset
.scan()
.filter("not_indexed = 50")
.unwrap()
.try_into_batch()
.await
.unwrap();
let pushdown_scan_bytes = get_bytes() - start_bytes;

assert!(pushdown_scan_bytes < filtered_scan_bytes);

// Now do a scalar index scan, this should be better than a
// full scan but since we have to load the index might be more
// expensive than late / pushdown scan
let start_bytes = get_bytes();
dataset
.scan()
.filter("indexed = 50")
.unwrap()
.try_into_batch()
.await
.unwrap();
let index_scan_bytes = get_bytes() - start_bytes;
assert!(index_scan_bytes < full_scan_bytes);

// A second scalar index scan should be cheaper than the first
// since we should have the index in cache
let start_bytes = get_bytes();
dataset
.scan()
.filter("indexed = 50")
.unwrap()
.try_into_batch()
.await
.unwrap();
let second_index_scan_bytes = get_bytes() - start_bytes;
assert!(second_index_scan_bytes < index_scan_bytes);
}

#[tokio::test]
async fn test_project_nested() -> Result<()> {
let struct_i_field = ArrowField::new("i", DataType::Int32, true);
Expand Down Expand Up @@ -3504,9 +3620,9 @@ mod test {
.filter("i > 10 and i < 20")
},
"Projection: fields=[s]
FilterExec: i@2 > 10 AND i@2 < 20
Take: columns=\"s, _rowid, i\"
LanceScan: uri..., projection=[s], row_id=true, ordered=true",
Take: columns=\"i, _rowid, s\"
FilterExec: i@0 > 10 AND i@0 < 20
LanceScan: uri..., projection=[i], row_id=true, ordered=true",
)
.await?;

Expand Down
Loading
Loading