From 70ae573f1f77248e4dc1e448c86850c49864092f Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Thu, 2 May 2024 13:02:13 -0700 Subject: [PATCH 1/5] Make sure late materialization runs on full scans --- rust/lance/src/dataset/scanner.rs | 148 +++++++++++++-- rust/lance/src/utils/test.rs | 304 +++++++++++++----------------- 2 files changed, 266 insertions(+), 186 deletions(-) diff --git a/rust/lance/src/dataset/scanner.rs b/rust/lance/src/dataset/scanner.rs index 29b34eec95..9ae8e7b5ac 100644 --- a/rust/lance/src/dataset/scanner.rs +++ b/rust/lance/src/dataset/scanner.rs @@ -139,7 +139,7 @@ pub struct Scanner { pub(crate) filter: Option, /// The batch size controls the maximum size of rows to return for each read. - batch_size: usize, + batch_size: Option, /// Number of batches to prefetch batch_readahead: usize, @@ -190,19 +190,13 @@ impl Scanner { pub fn new(dataset: Arc) -> Self { let projection = dataset.schema().clone(); - // Default batch size to be large enough so that a i32 column can be - // read in a single range request. For the object store default of - // 64KB, this is 16K rows. For local file systems, the default block size - // is just 4K, which would mean only 1K rows, which might be a little small. - // So we use a default minimum of 8K rows. - let batch_size = std::cmp::max(dataset.object_store().block_size() / 4, DEFAULT_BATCH_SIZE); Self { dataset, phyical_columns: projection, requested_output_expr: None, prefilter: false, filter: None, - batch_size, + batch_size: None, batch_readahead: DEFAULT_BATCH_READAHEAD, fragment_readahead: DEFAULT_FRAGMENT_READAHEAD, limit: None, @@ -231,6 +225,20 @@ impl Scanner { self } + fn get_batch_size(&self) -> usize { + // Default batch size to be large enough so that a i32 column can be + // read in a single range request. For the object store default of + // 64KB, this is 16K rows. For local file systems, the default block size + // is just 4K, which would mean only 1K rows, which might be a little small. + // So we use a default minimum of 8K rows. + self.batch_size.unwrap_or_else(|| { + std::cmp::max( + self.dataset.object_store().block_size() / 4, + DEFAULT_BATCH_SIZE, + ) + }) + } + fn ensure_not_fragment_scan(&self) -> Result<()> { if self.is_fragment_scan() { Err(Error::IO { @@ -352,7 +360,7 @@ impl Scanner { /// Set the batch size. pub fn batch_size(&mut self, batch_size: usize) -> &mut Self { - self.batch_size = batch_size; + self.batch_size = Some(batch_size); self } @@ -807,8 +815,6 @@ impl Scanner { let planner = Planner::new(Arc::new(self.dataset.schema().into())); - // NOTE: we only support node that have one partition. So any nodes that - // produce multiple need to be repartitioned to 1. let mut filter_plan = if let Some(filter) = self.filter.as_ref() { let index_info = self.dataset.scalar_index_info().await?; let filter_plan = @@ -869,13 +875,21 @@ impl Scanner { self.scalar_indexed_scan(&filter_schema, index_query) .await? } - (None, Some(_)) if self.use_stats && self.batch_size == DEFAULT_BATCH_SIZE => { + (None, Some(_)) if self.use_stats && self.batch_size.is_none() => { self.pushdown_scan(false, filter_plan.refine_expr.take().unwrap())? } (None, _) => { // The source is a full scan of the table let with_row_id = filter_plan.has_refine() || self.with_row_id; - self.scan(with_row_id, false, self.phyical_columns.clone().into()) + let schema = if filter_plan.has_refine() { + // If there is a filter then only load the filter columns in the + // initial scan. We will `take` the remaining columns later + let columns = filter_plan.refine_columns(); + Arc::new(self.dataset.schema().project(&columns)?) + } else { + Arc::new(self.phyical_columns.clone()) + }; + self.scan(with_row_id, false, schema) } } }; @@ -1293,7 +1307,7 @@ impl Scanner { self.dataset.clone(), fragments, projection, - self.batch_size, + self.get_batch_size(), self.batch_readahead, self.fragment_readahead, with_row_id, @@ -1647,8 +1661,9 @@ mod test { use half::f16; use lance_datagen::{array, gen, BatchCount, Dimension, RowCount}; use lance_index::IndexType; + use lance_io::object_store::ObjectStoreParams; use lance_testing::datagen::{BatchGenerator, IncrementingInt32, RandomVector}; - use tempfile::{tempdir, TempDir}; + use tempfile::tempdir; use super::*; use crate::arrow::*; @@ -1658,6 +1673,7 @@ mod test { use crate::dataset::WriteParams; use crate::index::scalar::ScalarIndexParams; use crate::index::vector::VectorIndexParams; + use crate::utils::test::IoTrackingStore; #[tokio::test] async fn test_batch_size() { @@ -2911,7 +2927,6 @@ mod test { } struct ScalarIndexTestFixture { - _test_dir: TempDir, dataset: Dataset, sample_query: Arc, delete_query: Arc, @@ -3055,7 +3070,6 @@ mod test { dataset.restore().await.unwrap(); Self { - _test_dir: test_dir, dataset, sample_query, delete_query, @@ -3418,6 +3432,106 @@ mod test { Ok(()) } + #[tokio::test] + async fn test_late_materialization() { + // Create a large dataset with a scalar indexed column and a sorted but not scalar + // indexed column + let data = gen() + .col( + "vector", + array::rand_vec::(Dimension::from(32)), + ) + .col("indexed", array::step::()) + .col("not_indexed", array::step::()) + .into_reader_rows(RowCount::from(1000), BatchCount::from(20)); + + let (io_stats_wrapper, io_stats) = IoTrackingStore::new_wrapper(); + let mut dataset = Dataset::write( + data, + "memory://test", + Some(WriteParams { + store_params: Some(ObjectStoreParams { + object_store_wrapper: Some(io_stats_wrapper), + ..Default::default() + }), + ..Default::default() + }), + ) + .await + .unwrap(); + dataset + .create_index( + &["indexed"], + IndexType::Scalar, + None, + &ScalarIndexParams::default(), + false, + ) + .await + .unwrap(); + + let get_bytes = || io_stats.lock().unwrap().read_bytes; + + // First run a full scan to get a baseline + let start_bytes = get_bytes(); + dataset.scan().try_into_batch().await.unwrap(); + let full_scan_bytes = get_bytes() - start_bytes; + + // Next do a scan without pushdown, we should still see a benefit from late materialization + let start_bytes = get_bytes(); + dataset + .scan() + .use_stats(false) + .filter("not_indexed = 50") + .unwrap() + .try_into_batch() + .await + .unwrap(); + let filtered_scan_bytes = get_bytes() - start_bytes; + + assert!(filtered_scan_bytes < full_scan_bytes); + + // Now do a scan with pushdown, the benefit should be even greater + let start_bytes = get_bytes(); + dataset + .scan() + .filter("not_indexed = 50") + .unwrap() + .try_into_batch() + .await + .unwrap(); + let pushdown_scan_bytes = get_bytes() - start_bytes; + + assert!(pushdown_scan_bytes < filtered_scan_bytes); + + // Now do a scalar index scan, this should be better than a + // full scan but since we have to load the index might be more + // expensive than late / pushdown scan + let start_bytes = get_bytes(); + dataset + .scan() + .filter("indexed = 50") + .unwrap() + .try_into_batch() + .await + .unwrap(); + let index_scan_bytes = get_bytes() - start_bytes; + assert!(index_scan_bytes < full_scan_bytes); + + // A second scalar index scan should be cheaper than the first + // since we should have the index in cache + let start_bytes = get_bytes(); + dataset + .scan() + .filter("indexed = 50") + .unwrap() + .try_into_batch() + .await + .unwrap(); + let second_index_scan_bytes = get_bytes() - start_bytes; + assert!(second_index_scan_bytes < index_scan_bytes); + } + #[tokio::test] async fn test_project_nested() -> Result<()> { let struct_i_field = ArrowField::new("i", DataType::Int32, true); diff --git a/rust/lance/src/utils/test.rs b/rust/lance/src/utils/test.rs index 4552ea8567..48bead9b92 100644 --- a/rust/lance/src/utils/test.rs +++ b/rust/lance/src/utils/test.rs @@ -1,15 +1,26 @@ // SPDX-License-Identifier: Apache-2.0 // SPDX-FileCopyrightText: Copyright The Lance Authors -use std::sync::Arc; +use std::fmt::{Display, Formatter}; +use std::ops::Range; +use std::sync::{Arc, Mutex}; use arrow_array::{RecordBatch, RecordBatchIterator}; use arrow_schema::Schema as ArrowSchema; +use bytes::Bytes; +use futures::stream::BoxStream; use lance_arrow::RecordBatchExt; use lance_core::datatypes::Schema; +use lance_io::object_store::WrappingObjectStore; use lance_table::format::Fragment; +use object_store::path::Path; +use object_store::{ + GetOptions, GetResult, ListResult, MultipartId, ObjectMeta, ObjectStore, PutOptions, PutResult, + Result as OSResult, +}; use rand::prelude::SliceRandom; use rand::{Rng, SeedableRng}; +use tokio::io::AsyncWrite; use crate::dataset::fragment::write::FragmentCreateBuilder; use crate::dataset::transaction::Operation; @@ -230,183 +241,138 @@ fn field_structure(fragment: &Fragment) -> Vec> { .collect::>() } -#[cfg(test)] -mod tests { - use std::sync::Arc; - - use super::*; - use arrow_array::{ArrayRef, BooleanArray, Float64Array, Int32Array, StringArray, StructArray}; - use arrow_schema::{DataType, Field as ArrowField, Fields as ArrowFields}; - - #[test] - fn test_make_schema() { - let arrow_schema = Arc::new(ArrowSchema::new(vec![ - ArrowField::new("a", DataType::Int32, false), - ArrowField::new( - "b", - DataType::Struct( - vec![ - ArrowField::new("f1", DataType::Utf8, true), - ArrowField::new("f2", DataType::Boolean, false), - ] - .into(), - ), - true, - ), - ArrowField::new("c", DataType::Float64, false), - ])); - let data = vec![RecordBatch::new_empty(arrow_schema.clone())]; - - let generator = TestDatasetGenerator::new(data); - let schema = generator.make_schema(&mut rand::thread_rng()); - - let roundtripped_schema = ArrowSchema::from(&schema); - assert_eq!(&roundtripped_schema, arrow_schema.as_ref()); +#[derive(Debug, Default)] +pub struct IoStats { + pub read_iops: u64, + pub read_bytes: u64, +} - let field_ids = schema.fields_pre_order().map(|f| f.id).collect::>(); - let mut sorted_ids = field_ids.clone(); - sorted_ids.sort_unstable(); - assert_ne!(field_ids, sorted_ids); - - let mut num_holes = 0; - for w in sorted_ids.windows(2) { - let prev = w[0]; - let next = w[1]; - if next - prev > 1 { - num_holes += 1; - } +impl Display for IoStats { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!(f, "{:#?}", self) + } +} + +#[derive(Debug)] +pub struct IoTrackingStore { + target: Arc, + stats: Arc>, +} + +impl Display for IoTrackingStore { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!(f, "{:#?}", self) + } +} + +#[derive(Debug)] +struct StatsHolder(Arc>); + +impl WrappingObjectStore for StatsHolder { + fn wrap(&self, target: Arc) -> Arc { + Arc::new(IoTrackingStore { + target, + stats: self.0.clone(), + }) + } +} + +impl IoTrackingStore { + pub fn new_wrapper() -> (Arc, Arc>) { + let stats = Arc::new(Mutex::new(IoStats::default())); + (Arc::new(StatsHolder(stats.clone())), stats) + } + + fn record_read(&self, num_bytes: u64) { + let mut stats = self.stats.lock().unwrap(); + stats.read_iops += 1; + stats.read_bytes += num_bytes as u64; + } +} + +#[async_trait::async_trait] +impl ObjectStore for IoTrackingStore { + async fn put(&self, location: &Path, bytes: Bytes) -> OSResult { + self.target.put(location, bytes).await + } + + async fn put_opts( + &self, + location: &Path, + bytes: Bytes, + opts: PutOptions, + ) -> OSResult { + self.target.put_opts(location, bytes, opts).await + } + + async fn put_multipart( + &self, + location: &Path, + ) -> OSResult<(MultipartId, Box)> { + self.target.put_multipart(location).await + } + + async fn abort_multipart(&self, location: &Path, multipart_id: &MultipartId) -> OSResult<()> { + self.target.abort_multipart(location, multipart_id).await + } + + async fn get_opts(&self, location: &Path, options: GetOptions) -> OSResult { + let result = self.target.get_opts(location, options).await; + if let Ok(result) = &result { + let num_bytes = result.range.end - result.range.start; + self.record_read(num_bytes as u64); } - assert!(num_holes > 0, "Expected at least one hole in the field ids"); + result } - #[tokio::test] - async fn test_make_fragment() { - let tmp_dir = tempfile::tempdir().unwrap(); - - let struct_fields: ArrowFields = vec![ - ArrowField::new("f1", DataType::Utf8, true), - ArrowField::new("f2", DataType::Boolean, false), - ] - .into(); - let schema = Arc::new(ArrowSchema::new(vec![ - ArrowField::new("a", DataType::Int32, false), - ArrowField::new("b", DataType::Struct(struct_fields.clone()), true), - ArrowField::new("c", DataType::Float64, false), - ])); - let data = RecordBatch::try_new( - schema.clone(), - vec![ - Arc::new(Int32Array::from(vec![1, 2, 3])), - Arc::new(StructArray::new( - struct_fields, - vec![ - Arc::new(StringArray::from(vec!["foo", "bar", "baz"])) as ArrayRef, - Arc::new(BooleanArray::from(vec![true, false, true])), - ], - None, - )), - Arc::new(Float64Array::from(vec![1.1, 2.2, 3.3])), - ], - ) - .unwrap(); - - let generator = TestDatasetGenerator::new(vec![data.clone()]); - let mut rng = rand::thread_rng(); - for _ in 1..50 { - let schema = generator.make_schema(&mut rng); - let fragment = generator - .make_fragment( - tmp_dir.path().to_str().unwrap(), - &data, - &schema, - &mut rng, - 2, - ) - .await; - - assert!(fragment.files.len() > 1, "Expected multiple files"); - - let mut field_ids_frags = fragment - .files - .iter() - .flat_map(|file| file.fields.iter()) - .cloned() - .collect::>(); - let mut field_ids = schema.fields_pre_order().map(|f| f.id).collect::>(); - assert_ne!(field_ids_frags, field_ids); - field_ids_frags.sort_unstable(); - field_ids.sort_unstable(); - assert_eq!(field_ids_frags, field_ids); + async fn get_range(&self, location: &Path, range: Range) -> OSResult { + let result = self.target.get_range(location, range).await; + if let Ok(result) = &result { + self.record_read(result.len() as u64); } + result } - #[tokio::test] - async fn test_make_hostile() { - let tmp_dir = tempfile::tempdir().unwrap(); - - let schema = Arc::new(ArrowSchema::new(vec![ - ArrowField::new("a", DataType::Int32, false), - ArrowField::new("b", DataType::Int32, false), - ArrowField::new("c", DataType::Float64, false), - ])); - let data = vec![ - RecordBatch::try_new( - schema.clone(), - vec![ - Arc::new(Int32Array::from(vec![1, 2, 3])), - Arc::new(Int32Array::from(vec![10, 20, 30])), - Arc::new(Float64Array::from(vec![1.1, 2.2, 3.3])), - ], - ) - .unwrap(), - RecordBatch::try_new( - schema.clone(), - vec![ - Arc::new(Int32Array::from(vec![4, 5, 6])), - Arc::new(Int32Array::from(vec![40, 50, 60])), - Arc::new(Float64Array::from(vec![4.4, 5.5, 6.6])), - ], - ) - .unwrap(), - ]; - - let seed = 42; - let generator = TestDatasetGenerator::new(data.clone()).seed(seed); - - let path = tmp_dir.path().join("ds1"); - let dataset = generator.make_hostile(path.to_str().unwrap()).await; - - let path2 = tmp_dir.path().join("ds2"); - let dataset2 = generator.make_hostile(path2.to_str().unwrap()).await; - - // Given the same seed, should produce the same layout. - assert_eq!(dataset.schema(), dataset2.schema()); - let field_structure_1 = get_field_structure(&dataset); - let field_structure_2 = get_field_structure(&dataset2); - assert_eq!(field_structure_1, field_structure_2); - - // Make sure we handle different numbers of columns - for num_cols in 1..4 { - let projection = (0..num_cols).collect::>(); - let data = data - .iter() - .map(|rb| rb.project(&projection).unwrap()) - .collect::>(); + async fn get_ranges(&self, location: &Path, ranges: &[Range]) -> OSResult> { + let result = self.target.get_ranges(location, ranges).await; + if let Ok(result) = &result { + self.record_read(result.iter().map(|b| b.len() as u64).sum()); + } + result + } - let generator = TestDatasetGenerator::new(data.clone()); - // Sample a few - for i in 1..20 { - let path = tmp_dir.path().join(format!("test_ds_{}_{}", num_cols, i)); - let dataset = generator.make_hostile(path.to_str().unwrap()).await; + async fn head(&self, location: &Path) -> OSResult { + self.target.head(location).await + } - let field_structure = get_field_structure(&dataset); + async fn delete(&self, location: &Path) -> OSResult<()> { + self.target.delete(location).await + } - // The two fragments should have different layout. - assert_eq!(field_structure.len(), 2); - if num_cols > 1 { - assert_ne!(field_structure[0], field_structure[1]); - } - } - } + fn delete_stream<'a>( + &'a self, + locations: BoxStream<'a, OSResult>, + ) -> BoxStream<'a, OSResult> { + self.target.delete_stream(locations) + } + + fn list(&self, prefix: Option<&Path>) -> BoxStream<'_, OSResult> { + self.target.list(prefix) + } + + async fn list_with_delimiter(&self, prefix: Option<&Path>) -> OSResult { + self.target.list_with_delimiter(prefix).await + } + + async fn copy(&self, from: &Path, to: &Path) -> OSResult<()> { + self.target.copy(from, to).await + } + + async fn rename(&self, from: &Path, to: &Path) -> OSResult<()> { + self.target.rename(from, to).await + } + + async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> OSResult<()> { + self.target.copy_if_not_exists(from, to).await } } From dea7b33de09e6c9de72ca49d6c923603e46566e8 Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Thu, 2 May 2024 13:46:04 -0700 Subject: [PATCH 2/5] Address compiler warnings --- rust/lance/src/utils/test.rs | 11 ++--------- 1 file changed, 2 insertions(+), 9 deletions(-) diff --git a/rust/lance/src/utils/test.rs b/rust/lance/src/utils/test.rs index 48bead9b92..5f36722803 100644 --- a/rust/lance/src/utils/test.rs +++ b/rust/lance/src/utils/test.rs @@ -48,6 +48,7 @@ impl TestDatasetGenerator { /// Set the seed for the random number generator. /// /// If not set, a random seed will be generated on each call to [`Self::make_hostile`]. + #[allow(dead_code)] pub fn seed(mut self, seed: u64) -> Self { self.seed = Some(seed); self @@ -225,14 +226,6 @@ impl TestDatasetGenerator { } } -fn get_field_structure(dataset: &Dataset) -> Vec>> { - dataset - .get_fragments() - .into_iter() - .map(|frag| field_structure(frag.metadata())) - .collect::>() -} - fn field_structure(fragment: &Fragment) -> Vec> { fragment .files @@ -286,7 +279,7 @@ impl IoTrackingStore { fn record_read(&self, num_bytes: u64) { let mut stats = self.stats.lock().unwrap(); stats.read_iops += 1; - stats.read_bytes += num_bytes as u64; + stats.read_bytes += num_bytes; } } From d77293d686c95d4f1d11aabe149b0e90d43e4c6d Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Thu, 2 May 2024 16:18:02 -0700 Subject: [PATCH 3/5] Don't remove tests? Not sure what happened there --- rust/lance/src/utils/test.rs | 189 +++++++++++++++++++++++++++++++++++ 1 file changed, 189 insertions(+) diff --git a/rust/lance/src/utils/test.rs b/rust/lance/src/utils/test.rs index 5f36722803..bab55041b0 100644 --- a/rust/lance/src/utils/test.rs +++ b/rust/lance/src/utils/test.rs @@ -226,6 +226,14 @@ impl TestDatasetGenerator { } } +fn get_field_structure(dataset: &Dataset) -> Vec>> { + dataset + .get_fragments() + .into_iter() + .map(|frag| field_structure(frag.metadata())) + .collect::>() +} + fn field_structure(fragment: &Fragment) -> Vec> { fragment .files @@ -369,3 +377,184 @@ impl ObjectStore for IoTrackingStore { self.target.copy_if_not_exists(from, to).await } } + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use super::*; + use arrow_array::{ArrayRef, BooleanArray, Float64Array, Int32Array, StringArray, StructArray}; + use arrow_schema::{DataType, Field as ArrowField, Fields as ArrowFields}; + + #[test] + fn test_make_schema() { + let arrow_schema = Arc::new(ArrowSchema::new(vec![ + ArrowField::new("a", DataType::Int32, false), + ArrowField::new( + "b", + DataType::Struct( + vec![ + ArrowField::new("f1", DataType::Utf8, true), + ArrowField::new("f2", DataType::Boolean, false), + ] + .into(), + ), + true, + ), + ArrowField::new("c", DataType::Float64, false), + ])); + let data = vec![RecordBatch::new_empty(arrow_schema.clone())]; + + let generator = TestDatasetGenerator::new(data); + let schema = generator.make_schema(&mut rand::thread_rng()); + + let roundtripped_schema = ArrowSchema::from(&schema); + assert_eq!(&roundtripped_schema, arrow_schema.as_ref()); + + let field_ids = schema.fields_pre_order().map(|f| f.id).collect::>(); + let mut sorted_ids = field_ids.clone(); + sorted_ids.sort_unstable(); + assert_ne!(field_ids, sorted_ids); + + let mut num_holes = 0; + for w in sorted_ids.windows(2) { + let prev = w[0]; + let next = w[1]; + if next - prev > 1 { + num_holes += 1; + } + } + assert!(num_holes > 0, "Expected at least one hole in the field ids"); + } + + #[tokio::test] + async fn test_make_fragment() { + let tmp_dir = tempfile::tempdir().unwrap(); + + let struct_fields: ArrowFields = vec![ + ArrowField::new("f1", DataType::Utf8, true), + ArrowField::new("f2", DataType::Boolean, false), + ] + .into(); + let schema = Arc::new(ArrowSchema::new(vec![ + ArrowField::new("a", DataType::Int32, false), + ArrowField::new("b", DataType::Struct(struct_fields.clone()), true), + ArrowField::new("c", DataType::Float64, false), + ])); + let data = RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(Int32Array::from(vec![1, 2, 3])), + Arc::new(StructArray::new( + struct_fields, + vec![ + Arc::new(StringArray::from(vec!["foo", "bar", "baz"])) as ArrayRef, + Arc::new(BooleanArray::from(vec![true, false, true])), + ], + None, + )), + Arc::new(Float64Array::from(vec![1.1, 2.2, 3.3])), + ], + ) + .unwrap(); + + let generator = TestDatasetGenerator::new(vec![data.clone()]); + let mut rng = rand::thread_rng(); + for _ in 1..50 { + let schema = generator.make_schema(&mut rng); + let fragment = generator + .make_fragment( + tmp_dir.path().to_str().unwrap(), + &data, + &schema, + &mut rng, + 2, + ) + .await; + + assert!(fragment.files.len() > 1, "Expected multiple files"); + + let mut field_ids_frags = fragment + .files + .iter() + .flat_map(|file| file.fields.iter()) + .cloned() + .collect::>(); + let mut field_ids = schema.fields_pre_order().map(|f| f.id).collect::>(); + assert_ne!(field_ids_frags, field_ids); + field_ids_frags.sort_unstable(); + field_ids.sort_unstable(); + assert_eq!(field_ids_frags, field_ids); + } + } + + #[tokio::test] + async fn test_make_hostile() { + let tmp_dir = tempfile::tempdir().unwrap(); + + let schema = Arc::new(ArrowSchema::new(vec![ + ArrowField::new("a", DataType::Int32, false), + ArrowField::new("b", DataType::Int32, false), + ArrowField::new("c", DataType::Float64, false), + ])); + let data = vec![ + RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(Int32Array::from(vec![1, 2, 3])), + Arc::new(Int32Array::from(vec![10, 20, 30])), + Arc::new(Float64Array::from(vec![1.1, 2.2, 3.3])), + ], + ) + .unwrap(), + RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(Int32Array::from(vec![4, 5, 6])), + Arc::new(Int32Array::from(vec![40, 50, 60])), + Arc::new(Float64Array::from(vec![4.4, 5.5, 6.6])), + ], + ) + .unwrap(), + ]; + + let seed = 42; + let generator = TestDatasetGenerator::new(data.clone()).seed(seed); + + let path = tmp_dir.path().join("ds1"); + let dataset = generator.make_hostile(path.to_str().unwrap()).await; + + let path2 = tmp_dir.path().join("ds2"); + let dataset2 = generator.make_hostile(path2.to_str().unwrap()).await; + + // Given the same seed, should produce the same layout. + assert_eq!(dataset.schema(), dataset2.schema()); + let field_structure_1 = get_field_structure(&dataset); + let field_structure_2 = get_field_structure(&dataset2); + assert_eq!(field_structure_1, field_structure_2); + + // Make sure we handle different numbers of columns + for num_cols in 1..4 { + let projection = (0..num_cols).collect::>(); + let data = data + .iter() + .map(|rb| rb.project(&projection).unwrap()) + .collect::>(); + + let generator = TestDatasetGenerator::new(data.clone()); + // Sample a few + for i in 1..20 { + let path = tmp_dir.path().join(format!("test_ds_{}_{}", num_cols, i)); + let dataset = generator.make_hostile(path.to_str().unwrap()).await; + + let field_structure = get_field_structure(&dataset); + + // The two fragments should have different layout. + assert_eq!(field_structure.len(), 2); + if num_cols > 1 { + assert_ne!(field_structure[0], field_structure[1]); + } + } + } + } +} From 1fe5b328cb10052ecbf599044107b7c149abc1d1 Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Thu, 2 May 2024 17:15:18 -0700 Subject: [PATCH 4/5] Remove changes to wrong unit tests --- rust/lance/src/dataset/scanner.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/rust/lance/src/dataset/scanner.rs b/rust/lance/src/dataset/scanner.rs index 9ae8e7b5ac..9817117f01 100644 --- a/rust/lance/src/dataset/scanner.rs +++ b/rust/lance/src/dataset/scanner.rs @@ -1663,7 +1663,7 @@ mod test { use lance_index::IndexType; use lance_io::object_store::ObjectStoreParams; use lance_testing::datagen::{BatchGenerator, IncrementingInt32, RandomVector}; - use tempfile::tempdir; + use tempfile::{tempdir, TempDir}; use super::*; use crate::arrow::*; @@ -2927,6 +2927,7 @@ mod test { } struct ScalarIndexTestFixture { + _test_dir: TempDir, dataset: Dataset, sample_query: Arc, delete_query: Arc, @@ -3070,6 +3071,7 @@ mod test { dataset.restore().await.unwrap(); Self { + _test_dir: test_dir, dataset, sample_query, delete_query, From 65bc2cd56b151eb155a4f6fb12de433d00446c29 Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Thu, 2 May 2024 19:03:20 -0700 Subject: [PATCH 5/5] Fix test_plans --- rust/lance/src/dataset/scanner.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/rust/lance/src/dataset/scanner.rs b/rust/lance/src/dataset/scanner.rs index 9817117f01..1d92c6dd55 100644 --- a/rust/lance/src/dataset/scanner.rs +++ b/rust/lance/src/dataset/scanner.rs @@ -3620,9 +3620,9 @@ mod test { .filter("i > 10 and i < 20") }, "Projection: fields=[s] - FilterExec: i@2 > 10 AND i@2 < 20 - Take: columns=\"s, _rowid, i\" - LanceScan: uri..., projection=[s], row_id=true, ordered=true", + Take: columns=\"i, _rowid, s\" + FilterExec: i@0 > 10 AND i@0 < 20 + LanceScan: uri..., projection=[i], row_id=true, ordered=true", ) .await?;