Skip to content

Commit

Permalink
Parameterize most tests to run against both the v1 and the v2 writers…
Browse files Browse the repository at this point in the history
…. Add support for fixed size binary to the v2 format. Correctly connect take_rows to the v2 format in the fragment API.
  • Loading branch information
westonpace committed May 19, 2024
1 parent 6c1f01b commit 040de9f
Show file tree
Hide file tree
Showing 12 changed files with 642 additions and 173 deletions.
46 changes: 46 additions & 0 deletions rust/lance-datagen/src/generator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -555,6 +555,47 @@ impl<T: ArrowPrimitiveType + Send + Sync> ArrayGenerator for RandomBytesGenerato
}
}

// This is pretty much the same thing as RandomBinaryGenerator but we can't use that
// because there is no ArrowPrimitiveType for FixedSizeBinary
pub struct RandomFixedSizeBinaryGenerator {
data_type: DataType,
dimension: i32,
}

impl RandomFixedSizeBinaryGenerator {
fn new(dimension: i32) -> Self {
Self {
dimension,
data_type: DataType::FixedSizeBinary(dimension),
}
}
}

impl ArrayGenerator for RandomFixedSizeBinaryGenerator {
fn generate(
&mut self,
length: RowCount,
rng: &mut rand_xoshiro::Xoshiro256PlusPlus,
) -> Result<Arc<dyn arrow_array::Array>, ArrowError> {
let num_bytes = length.0 * self.dimension as u64;
let mut bytes = vec![0; num_bytes as usize];
rng.fill_bytes(&mut bytes);
Ok(Arc::new(FixedSizeBinaryArray::new(
self.dimension,
Buffer::from(bytes),
None,
)))
}

fn data_type(&self) -> &DataType {
&self.data_type
}

fn element_size_bytes(&self) -> Option<ByteCount> {
Some(ByteCount::from(self.dimension as u64))
}
}

pub struct RandomBinaryGenerator {
bytes_per_element: ByteCount,
scale_to_utf8: bool,
Expand Down Expand Up @@ -1416,6 +1457,10 @@ pub mod array {
Box::new(RandomBytesGenerator::<T>::new(data_type))
}

pub fn rand_fsb(dimension: i32) -> Box<dyn ArrayGenerator> {
Box::new(RandomFixedSizeBinaryGenerator::new(dimension))
}

/// Create a generator of randomly sampled date32 values
///
/// Instead of sampling the entire range, all values will be drawn from the last year as this
Expand Down Expand Up @@ -1610,6 +1655,7 @@ pub mod array {
rand_type(child.data_type()),
Dimension::from(*dimension as u32),
),
DataType::FixedSizeBinary(dimension) => rand_fsb(*dimension),
DataType::List(child) => rand_list(child.data_type()),
DataType::Duration(unit) => match unit {
TimeUnit::Second => rand::<DurationSecondType>(),
Expand Down
2 changes: 1 addition & 1 deletion rust/lance-encoding/src/decoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,7 @@ impl DecodeBatchScheduler {
} else {
match data_type {
// DataType::is_primitive doesn't consider these primitive but we do
DataType::Boolean | DataType::Null => true,
DataType::Boolean | DataType::Null | DataType::FixedSizeBinary(_) => true,
DataType::FixedSizeList(inner, _) => Self::is_primitive(inner.data_type()),
_ => false,
}
Expand Down
1 change: 1 addition & 0 deletions rust/lance-encoding/src/encoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,7 @@ impl BatchEncoder {
| DataType::UInt32
| DataType::UInt64
| DataType::UInt8
| DataType::FixedSizeBinary(_)
| DataType::FixedSizeList(_, _) => {
let my_col_idx = *col_idx;
*col_idx += 1;
Expand Down
13 changes: 12 additions & 1 deletion rust/lance-encoding/src/encodings/logical/primitive.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use arrow_array::{
TimestampMicrosecondType, TimestampMillisecondType, TimestampNanosecondType,
TimestampSecondType, UInt16Type, UInt32Type, UInt64Type, UInt8Type,
},
ArrayRef, BooleanArray, FixedSizeListArray, PrimitiveArray,
ArrayRef, BooleanArray, FixedSizeBinaryArray, FixedSizeListArray, PrimitiveArray,
};
use arrow_buffer::{BooleanBuffer, Buffer, NullBuffer, ScalarBuffer};
use arrow_schema::{DataType, IntervalUnit, TimeUnit};
Expand Down Expand Up @@ -356,6 +356,17 @@ impl PrimitiveFieldDecodeTask {
DataType::UInt8 => Ok(Self::new_primitive_array::<UInt8Type>(
buffers, num_rows, data_type,
)),
DataType::FixedSizeBinary(dimension) => {
let mut buffers_iter = buffers.into_iter();
let fsb_validity = buffers_iter.next().unwrap();
let fsb_nulls = Self::bytes_to_validity(fsb_validity, num_rows);

let fsb_values = buffers_iter.next().unwrap();
let fsb_values = Buffer::from_bytes(fsb_values.freeze().into());
Ok(Arc::new(FixedSizeBinaryArray::new(
*dimension, fsb_values, fsb_nulls,
)))
}
DataType::FixedSizeList(items, dimension) => {
let mut buffers_iter = buffers.into_iter();
let fsl_validity = buffers_iter.next().unwrap();
Expand Down
11 changes: 6 additions & 5 deletions rust/lance-encoding/src/encodings/physical/value.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,17 +130,17 @@ pub struct ValueEncoder {

impl ValueEncoder {
pub fn try_new(data_type: &DataType) -> Result<Self> {
if data_type.is_primitive() {
if *data_type == DataType::Boolean {
Ok(Self {
buffer_encoder: Box::<FlatBufferEncoder>::default(),
buffer_encoder: Box::<BitmapBufferEncoder>::default(),
})
} else if *data_type == DataType::Boolean {
} else if data_type.is_fixed_stride() {
Ok(Self {
buffer_encoder: Box::<BitmapBufferEncoder>::default(),
buffer_encoder: Box::<FlatBufferEncoder>::default(),
})
} else {
Err(Error::invalid_input(
format!("Cannot use value encoded to encode {}", data_type),
format!("Cannot use ValueEncoder to encode {}", data_type),
location!(),
))
}
Expand Down Expand Up @@ -189,6 +189,7 @@ pub(crate) mod tests {
use crate::testing::check_round_trip_encoding_random;

const PRIMITIVE_TYPES: &[DataType] = &[
DataType::FixedSizeBinary(2),
DataType::Date32,
DataType::Date64,
DataType::Int8,
Expand Down
22 changes: 22 additions & 0 deletions rust/lance-io/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,28 @@ pub enum ReadBatchParams {
Indices(UInt32Array),
}

impl std::fmt::Display for ReadBatchParams {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
match self {
Self::Range(r) => write!(f, "Range({}..{})", r.start, r.end),
Self::RangeFull => write!(f, "RangeFull"),
Self::RangeTo(r) => write!(f, "RangeTo({})", r.end),
Self::RangeFrom(r) => write!(f, "RangeFrom({})", r.start),
Self::Indices(indices) => {
let mut indices_str = indices.values().iter().fold(String::new(), |mut acc, v| {
acc.push_str(&v.to_string());
acc.push(',');
acc
});
if !indices_str.is_empty() {
indices_str.pop();
}
write!(f, "Indices({})", indices_str)
}
}
}
}

impl Default for ReadBatchParams {
fn default() -> Self {
// Default of ReadBatchParams is reading the full batch.
Expand Down
Loading

0 comments on commit 040de9f

Please sign in to comment.