Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: only read the data file's fields from the page table and not the whole dataset's fields #2095

Merged
merged 1 commit into from
Mar 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions python/python/tests/test_schema_evolution.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,14 @@ def test_add_columns_exprs(tmp_path):
assert dataset.to_table() == pa.table({"a": range(100), "b": range(1, 101)})


def test_add_many_columns(tmp_path: Path):
table = pa.table([pa.array([1, 2, 3])], names=["0"])
dataset = lance.write_dataset(table, tmp_path)
dataset.add_columns(dict([(str(i), "0") for i in range(1, 1000)]))
dataset = lance.dataset(tmp_path)
assert dataset.to_table().num_rows == 3


def test_query_after_merge(tmp_path):
# https://github.com/lancedb/lance/issues/1905
tab = pa.table({
Expand Down
112 changes: 97 additions & 15 deletions rust/lance-file/src/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ impl FileReader {
schema: Schema,
fragment_id: u32,
field_id_offset: u32,
num_fields: u32,
session: Option<&FileMetadataCache>,
) -> Result<Self> {
let object_reader = object_store.open(path).await?;
Expand All @@ -130,18 +131,21 @@ impl FileReader {
schema,
fragment_id,
field_id_offset,
num_fields,
session,
)
.await
}

#[allow(clippy::too_many_arguments)]
pub async fn try_new_from_reader(
path: &Path,
object_reader: Arc<dyn Reader>,
metadata: Option<Arc<Metadata>>,
schema: Schema,
fragment_id: u32,
field_id_offset: u32,
num_fields: u32,
session: Option<&FileMetadataCache>,
) -> Result<Self> {
let metadata = match metadata {
Expand All @@ -150,12 +154,11 @@ impl FileReader {
};

let page_table = async {
let num_fields = schema.max_field_id().unwrap_or_default() + 1 - field_id_offset as i32;
Self::load_from_cache(session, path, |_| async {
PageTable::load(
object_reader.as_ref(),
metadata.page_table_position,
num_fields,
num_fields as i32,
metadata.num_batches() as i32,
field_id_offset as i32,
)
Expand Down Expand Up @@ -255,7 +258,10 @@ impl FileReader {

/// Open one Lance data file for read.
pub async fn try_new(object_store: &ObjectStore, path: &Path, schema: Schema) -> Result<Self> {
Self::try_new_with_fragment_id(object_store, path, schema, 0, 0, None).await
// If just reading a lance data file we assume the schema is the schema of the data file
let num_fields = schema.max_field_id().unwrap_or_default() + 1;
Self::try_new_with_fragment_id(object_store, path, schema, 0, 0, num_fields as u32, None)
.await
}

/// Instruct the FileReader to return meta row id column.
Expand Down Expand Up @@ -979,10 +985,17 @@ mod tests {
file_writer.finish().await.unwrap();

let fragment = 123_u64;
let mut reader =
FileReader::try_new_with_fragment_id(&store, &path, schema, fragment as u32, 0, None)
.await
.unwrap();
let mut reader = FileReader::try_new_with_fragment_id(
&store,
&path,
schema,
fragment as u32,
0,
2,
None,
)
.await
.unwrap();
reader.with_row_id(true);

for b in 0..10 {
Expand Down Expand Up @@ -1114,10 +1127,17 @@ mod tests {
// delete even rows
let dv = DeletionVector::Bitmap(RoaringBitmap::from_iter((0..100).filter(|x| x % 2 == 0)));

let mut reader =
FileReader::try_new_with_fragment_id(&store, &path, schema, fragment as u32, 0, None)
.await
.unwrap();
let mut reader = FileReader::try_new_with_fragment_id(
&store,
&path,
schema,
fragment as u32,
0,
2,
None,
)
.await
.unwrap();
reader.with_row_id(true);

for b in 0..10 {
Expand Down Expand Up @@ -1171,10 +1191,17 @@ mod tests {
// delete even rows
let dv = DeletionVector::Bitmap(RoaringBitmap::from_iter((0..100).filter(|x| x % 2 == 0)));

let mut reader =
FileReader::try_new_with_fragment_id(&store, &path, schema, fragment as u32, 0, None)
.await
.unwrap();
let mut reader = FileReader::try_new_with_fragment_id(
&store,
&path,
schema,
fragment as u32,
0,
2,
None,
)
.await
.unwrap();
reader.with_row_id(false);

for b in 0..10 {
Expand Down Expand Up @@ -1764,4 +1791,59 @@ mod tests {
&BooleanArray::from(vec![false, false, true, false, true])
);
}

#[tokio::test]
async fn test_read_projection() {
// The dataset schema may be very large. The file reader should support reading
// a small projection of that schema (this just tests the field_offset / num_fields
// parameters)
let store = ObjectStore::memory();
let path = Path::from("/partial_read");

// Create a large schema
let mut fields = vec![];
for i in 0..100 {
fields.push(ArrowField::new(format!("f{}", i), DataType::Int32, false));
}
let arrow_schema = ArrowSchema::new(fields);
let schema = Schema::try_from(&arrow_schema).unwrap();

let partial_schema = schema.project(&["f50"]).unwrap();
let partial_arrow: ArrowSchema = (&partial_schema).into();

let mut file_writer = FileWriter::<NotSelfDescribing>::try_new(
&store,
&path,
partial_schema.clone(),
&Default::default(),
)
.await
.unwrap();

let array = Int32Array::from(vec![0; 15]);
let batch =
RecordBatch::try_new(Arc::new(partial_arrow), vec![Arc::new(array.clone())]).unwrap();
file_writer.write(&[batch.clone()]).await.unwrap();
file_writer.finish().await.unwrap();

let field_id = partial_schema.fields.first().unwrap().id;
let num_fields = 1;
let reader = FileReader::try_new_with_fragment_id(
&store,
&path,
schema.clone(),
0,
field_id as u32,
num_fields,
None,
)
.await
.unwrap();
let actual = reader
.read_batch(0, ReadBatchParams::RangeFull, &partial_schema, None)
.await
.unwrap();

assert_eq!(actual, batch);
}
}
2 changes: 2 additions & 0 deletions rust/lance-table/src/format/manifest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -439,13 +439,15 @@ impl SelfDescribingFileReader for FileReader {
let mut manifest: Manifest = read_struct(object_reader.as_ref(), manifest_position).await?;
populate_schema_dictionary(&mut manifest.schema, object_reader.as_ref()).await?;
let schema = manifest.schema;
let num_fields = schema.max_field_id().unwrap_or_default() + 1;
Self::try_new_from_reader(
path,
object_reader.into(),
Some(metadata),
schema,
0,
0,
num_fields as u32,
cache,
)
.await
Expand Down
11 changes: 9 additions & 2 deletions rust/lance/src/dataset/fragment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,7 @@ impl FileFragment {
let data_file_schema = data_file.schema(full_schema);
let schema_per_file = data_file_schema.intersection(projection)?;
let add_row_id = with_row_id && i == 0;
let num_fields = data_file.fields.len() as u32;
if add_row_id || !schema_per_file.fields.is_empty() {
let path = self.dataset.data_dir().child(data_file.path.as_str());
let field_id_offset = Self::get_field_id_offset(data_file);
Expand All @@ -212,6 +213,7 @@ impl FileFragment {
self.schema().clone(),
self.id() as u32,
field_id_offset,
num_fields,
Some(&self.dataset.session.file_metadata_cache),
)
.await?;
Expand Down Expand Up @@ -309,12 +311,14 @@ impl FileFragment {
// Just open any file. All of them should have same size.
let some_file = &self.metadata.files[0];
let path = self.dataset.data_dir().child(some_file.path.as_str());
let num_fields = some_file.fields.len() as u32;
let reader = FileReader::try_new_with_fragment_id(
&self.dataset.object_store,
&path,
self.schema().clone(),
self.id() as u32,
Self::get_field_id_offset(some_file),
num_fields,
Some(&self.dataset.session.file_metadata_cache),
)
.await?;
Expand All @@ -337,15 +341,17 @@ impl FileFragment {
.map(|data_file| {
let path = self.dataset.data_dir().child(data_file.path.as_str());
let field_id_offset = Self::get_field_id_offset(data_file);
(path, field_id_offset)
let num_fields = data_file.fields.len() as u32;
(path, field_id_offset, num_fields)
})
.map(|(path, field_id_offset)| async move {
.map(|(path, field_id_offset, num_fields)| async move {
let reader = FileReader::try_new_with_fragment_id(
&self.dataset.object_store,
&path,
self.schema().clone(),
self.id() as u32,
field_id_offset,
num_fields,
Some(&self.dataset.session.file_metadata_cache),
)
.await?;
Expand Down Expand Up @@ -1487,6 +1493,7 @@ mod tests {
schema.as_ref().try_into().unwrap(),
10,
0,
1,
None,
)
.await
Expand Down
3 changes: 3 additions & 0 deletions rust/lance/src/index/vector/hnsw.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ impl HNSWIndex {
schema,
0,
0,
2,
None,
)
.await?;
Expand Down Expand Up @@ -192,6 +193,7 @@ impl VectorIndex for HNSWIndex {
schema,
0,
0,
2,
None,
)
.await?;
Expand Down Expand Up @@ -224,6 +226,7 @@ impl VectorIndex for HNSWIndex {
schema,
0,
0,
2,
None,
)
.await?;
Expand Down
Loading