Skip to content

Commit

Permalink
Fix various bugs in page size / batch size / validity handling
Browse files Browse the repository at this point in the history
  • Loading branch information
westonpace committed Oct 18, 2024
1 parent 31342f9 commit ac98d0b
Show file tree
Hide file tree
Showing 11 changed files with 1,258 additions and 1,051 deletions.
5 changes: 5 additions & 0 deletions protos/encodings.proto
Original file line number Diff line number Diff line change
Expand Up @@ -298,8 +298,13 @@ message MiniBlockLayout {
ArrayEncoding value_compression = 3;
}

message AllNullLayout {

}

message PageLayout {
oneof layout {
MiniBlockLayout mini_block_layout = 1;
AllNullLayout all_null_layout = 2;
}
}
10 changes: 5 additions & 5 deletions python/python/benchmarks/test_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ def read_all():

result = benchmark.pedantic(read_all, rounds=1, iterations=1)

# assert result.num_rows == NUM_ROWS
assert result.num_rows == NUM_ROWS


@pytest.mark.parametrize(
Expand Down Expand Up @@ -80,7 +80,7 @@ def read_all():

result = benchmark.pedantic(read_all, rounds=1, iterations=1)

# assert result.num_rows == NUM_ROWS
assert result.num_rows == NUM_ROWS


@pytest.mark.benchmark(group="scan_single_column")
Expand Down Expand Up @@ -129,7 +129,7 @@ def read_all():

result = benchmark.pedantic(read_all, rounds=1, iterations=1)

# assert result.num_rows == NUM_ROWS
assert result.num_rows == NUM_ROWS


@pytest.mark.parametrize(
Expand Down Expand Up @@ -166,7 +166,7 @@ def sample():

result = benchmark.pedantic(sample, rounds=30, iterations=1)

# assert result.num_rows == NUM_ROWS
assert result.num_rows == NUM_ROWS


@pytest.mark.benchmark(group="sample_single_column")
Expand Down Expand Up @@ -217,4 +217,4 @@ def sample():

result = benchmark.pedantic(sample, rounds=30, iterations=1)

# assert result.num_rows == NUM_ROWS
assert result.num_rows == NUM_ROWS
1 change: 0 additions & 1 deletion python/src/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@ impl Iterator for LanceReader {

fn next(&mut self) -> Option<Self::Item> {
let stream = self.stream.clone();
let _span = tracing::debug_span!("LanceReader::next").entered();
RT.spawn(None, async move {
let mut stream = stream.lock().await;
stream.next().await
Expand Down
1 change: 1 addition & 0 deletions rust/lance-core/src/datatypes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ lazy_static::lazy_static! {
]);
pub static ref BLOB_DESC_FIELD: ArrowField =
ArrowField::new("description", DataType::Struct(BLOB_DESC_FIELDS.clone()), false);
pub static ref BLOB_DESC_LANCE_FIELD: Field = Field::try_from(&*BLOB_DESC_FIELD).unwrap();
}

/// LogicalType is a string presentation of arrow type.
Expand Down
36 changes: 26 additions & 10 deletions rust/lance-encoding/src/data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -308,14 +308,12 @@ impl FixedWidthDataBlockBuilder {
}

impl DataBlockBuilderImpl for FixedWidthDataBlockBuilder {
fn append(&mut self, data_block: DataBlock, selection: &[Range<u64>]) {
let block = data_block.as_fixed_width().unwrap();
fn append(&mut self, data_block: &DataBlock, selection: Range<u64>) {
let block = data_block.as_fixed_width_ref().unwrap();
assert_eq!(self.bits_per_value, block.bits_per_value);
for rng in selection {
let start = rng.start as usize * self.bytes_per_value as usize;
let end = rng.end as usize * self.bytes_per_value as usize;
self.values.extend_from_slice(&block.data[start..end]);
}
let start = selection.start as usize * self.bytes_per_value as usize;
let end = selection.end as usize * self.bytes_per_value as usize;
self.values.extend_from_slice(&block.data[start..end]);
}

fn finish(self: Box<Self>) -> DataBlock {
Expand Down Expand Up @@ -868,6 +866,17 @@ macro_rules! as_type {
};
}

macro_rules! as_type_ref {
($fn_name:ident, $inner:tt, $inner_type:ident) => {
pub fn $fn_name(&self) -> Option<&$inner_type> {
match self {
Self::$inner(inner) => Some(inner),
_ => None,
}
}
};
}

// Cast implementations
impl DataBlock {
as_type!(as_all_null, AllNull, AllNullDataBlock);
Expand All @@ -877,6 +886,13 @@ impl DataBlock {
as_type!(as_variable_width, VariableWidth, VariableWidthBlock);
as_type!(as_struct, Struct, StructDataBlock);
as_type!(as_dictionary, Dictionary, DictionaryDataBlock);
as_type_ref!(as_all_null_ref, AllNull, AllNullDataBlock);
as_type_ref!(as_nullable_ref, Nullable, NullableDataBlock);
as_type_ref!(as_fixed_width_ref, FixedWidth, FixedWidthDataBlock);
as_type_ref!(as_fixed_size_list_ref, FixedSizeList, FixedSizeListBlock);
as_type_ref!(as_variable_width_ref, VariableWidth, VariableWidthBlock);
as_type_ref!(as_struct_ref, Struct, StructDataBlock);
as_type_ref!(as_dictionary_ref, Dictionary, DictionaryDataBlock);
}

// Methods to convert from Arrow -> DataBlock
Expand Down Expand Up @@ -1335,7 +1351,7 @@ impl From<ArrayRef> for DataBlock {
}

pub trait DataBlockBuilderImpl {
fn append(&mut self, data_block: DataBlock, selection: &[Range<u64>]);
fn append(&mut self, data_block: &DataBlock, selection: Range<u64>);
fn finish(self: Box<Self>) -> DataBlock;
}

Expand All @@ -1359,8 +1375,8 @@ impl DataBlockBuilder {
self.builder.as_mut().unwrap().as_mut()
}

pub fn append(&mut self, data_block: DataBlock, selection: &[Range<u64>]) {
self.get_builder(&data_block).append(data_block, selection);
pub fn append(&mut self, data_block: &DataBlock, selection: Range<u64>) {
self.get_builder(data_block).append(data_block, selection);
}

pub fn finish(self) -> DataBlock {
Expand Down
38 changes: 5 additions & 33 deletions rust/lance-encoding/src/decoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ use futures::stream::{self, BoxStream};
use futures::{FutureExt, StreamExt};
use lance_arrow::DataTypeExt;
use lance_core::cache::{CapacityMode, FileMetadataCache};
use lance_core::datatypes::{Field, Schema, BLOB_DESC_FIELD};
use lance_core::datatypes::{Field, Schema, BLOB_DESC_LANCE_FIELD};
use log::{debug, trace, warn};
use snafu::{location, Location};
use tokio::sync::mpsc::error::SendError;
Expand All @@ -238,7 +238,7 @@ use crate::buffer::LanceBuffer;
use crate::data::DataBlock;
use crate::encoder::{values_column_encoding, EncodedBatch};
use crate::encodings::logical::binary::BinaryFieldScheduler;
use crate::encodings::logical::blob::{BlobFieldScheduler, DESC_FIELD_LANCE};
use crate::encodings::logical::blob::BlobFieldScheduler;
use crate::encodings::logical::list::{ListFieldScheduler, OffsetPageInfo};
use crate::encodings::logical::primitive::{
PrimitiveFieldScheduler, StructuralPrimitiveFieldScheduler,
Expand Down Expand Up @@ -756,21 +756,10 @@ impl CoreFieldDecoderStrategy {
let column_info = column_infos.next().unwrap().clone();
// Column is blob and user is asking for binary data
if let Some(blob_col) = Self::unwrap_blob(column_info.as_ref()) {
<<<<<<< HEAD
let desc_scheduler = self.create_primitive_scheduler(
BLOB_DESC_FIELD.data_type(),
chain.current_path(),
&blob_col,
buffers,
)?;
let blob_scheduler = Arc::new(BlobFieldScheduler::new(desc_scheduler));
return Ok((chain, Ok(blob_scheduler)));
=======
let desc_scheduler =
self.create_primitive_scheduler(&DESC_FIELD_LANCE, &blob_col, buffers)?;
self.create_primitive_scheduler(&BLOB_DESC_LANCE_FIELD, &blob_col, buffers)?;
let blob_scheduler = Box::new(BlobFieldScheduler::new(desc_scheduler.into()));
return Ok(blob_scheduler);
>>>>>>> 8992f2e7 (Add 2.1 read path)
}
if let Some(page_info) = column_info.page_infos.first() {
if matches!(
Expand Down Expand Up @@ -855,13 +844,7 @@ impl CoreFieldDecoderStrategy {
// Column is blob and user is asking for descriptions
if let Some(blob_col) = Self::unwrap_blob(column_info.as_ref()) {
// Can use primitive scheduler here since descriptions are always packed struct
let desc_scheduler = self.create_primitive_scheduler(
&data_type,
chain.current_path(),
&blob_col,
buffers,
)?;
return Ok((chain, Ok(desc_scheduler)));
return self.create_primitive_scheduler(field, &blob_col, buffers);
}

if Self::check_packed_struct(column_info) {
Expand Down Expand Up @@ -961,17 +944,6 @@ impl DecodeBatchScheduler {
};
let arrow_schema = ArrowSchema::from(schema);
let root_fields = arrow_schema.fields().clone();
<<<<<<< HEAD
let mut columns = Vec::with_capacity(column_infos.len() + 1);
columns.push(Arc::new(root_column(num_rows)));
columns.extend(column_infos.iter().cloned());
let adjusted_column_indices = [0_u32]
.into_iter()
.chain(column_indices.iter().map(|i| i.saturating_add(1)))
.collect::<Vec<_>>();
let mut column_iter = ColumnInfoIter::new(columns, &adjusted_column_indices);
=======
>>>>>>> 8992f2e7 (Add 2.1 read path)
let root_type = DataType::Struct(root_fields.clone());
let mut root_field = Field::try_from(&ArrowField::new("root", root_type, false))?;
// root_field.children and schema.fields should be identical at this point but the latter
Expand Down Expand Up @@ -1005,7 +977,7 @@ impl DecodeBatchScheduler {

let adjusted_column_indices = [0_u32]
.into_iter()
.chain(column_indices.iter().map(|i| *i + 1))
.chain(column_indices.iter().map(|i| i.saturating_add(1)))
.collect::<Vec<_>>();
let mut column_iter = ColumnInfoIter::new(columns, &adjusted_column_indices);
let root_scheduler = CoreFieldDecoderStrategy::default()
Expand Down
Loading

0 comments on commit ac98d0b

Please sign in to comment.