Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add fragment take / fixed-size-binary support to v2 format #2354

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 46 additions & 0 deletions rust/lance-datagen/src/generator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -555,6 +555,47 @@ impl<T: ArrowPrimitiveType + Send + Sync> ArrayGenerator for RandomBytesGenerato
}
}

// This is pretty much the same thing as RandomBinaryGenerator but we can't use that
// because there is no ArrowPrimitiveType for FixedSizeBinary
pub struct RandomFixedSizeBinaryGenerator {
data_type: DataType,
size: i32,
}

impl RandomFixedSizeBinaryGenerator {
fn new(size: i32) -> Self {
Self {
size,
data_type: DataType::FixedSizeBinary(size),
}
}
}

impl ArrayGenerator for RandomFixedSizeBinaryGenerator {
fn generate(
&mut self,
length: RowCount,
rng: &mut rand_xoshiro::Xoshiro256PlusPlus,
) -> Result<Arc<dyn arrow_array::Array>, ArrowError> {
let num_bytes = length.0 * self.size as u64;
let mut bytes = vec![0; num_bytes as usize];
rng.fill_bytes(&mut bytes);
Ok(Arc::new(FixedSizeBinaryArray::new(
self.size,
Buffer::from(bytes),
None,
)))
}

fn data_type(&self) -> &DataType {
&self.data_type
}

fn element_size_bytes(&self) -> Option<ByteCount> {
Some(ByteCount::from(self.size as u64))
}
}

pub struct RandomBinaryGenerator {
bytes_per_element: ByteCount,
scale_to_utf8: bool,
Expand Down Expand Up @@ -1416,6 +1457,10 @@ pub mod array {
Box::new(RandomBytesGenerator::<T>::new(data_type))
}

pub fn rand_fsb(size: i32) -> Box<dyn ArrayGenerator> {
Box::new(RandomFixedSizeBinaryGenerator::new(size))
}

/// Create a generator of randomly sampled date32 values
///
/// Instead of sampling the entire range, all values will be drawn from the last year as this
Expand Down Expand Up @@ -1610,6 +1655,7 @@ pub mod array {
rand_type(child.data_type()),
Dimension::from(*dimension as u32),
),
DataType::FixedSizeBinary(size) => rand_fsb(*size),
DataType::List(child) => rand_list(child.data_type()),
DataType::Duration(unit) => match unit {
TimeUnit::Second => rand::<DurationSecondType>(),
Expand Down
2 changes: 1 addition & 1 deletion rust/lance-encoding/src/decoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,7 @@ impl DecodeBatchScheduler {
} else {
match data_type {
// DataType::is_primitive doesn't consider these primitive but we do
DataType::Boolean | DataType::Null => true,
DataType::Boolean | DataType::Null | DataType::FixedSizeBinary(_) => true,
DataType::FixedSizeList(inner, _) => Self::is_primitive(inner.data_type()),
_ => false,
}
Expand Down
1 change: 1 addition & 0 deletions rust/lance-encoding/src/encoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,7 @@ impl BatchEncoder {
| DataType::UInt32
| DataType::UInt64
| DataType::UInt8
| DataType::FixedSizeBinary(_)
| DataType::FixedSizeList(_, _) => {
let my_col_idx = *col_idx;
*col_idx += 1;
Expand Down
13 changes: 12 additions & 1 deletion rust/lance-encoding/src/encodings/logical/primitive.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use arrow_array::{
TimestampMicrosecondType, TimestampMillisecondType, TimestampNanosecondType,
TimestampSecondType, UInt16Type, UInt32Type, UInt64Type, UInt8Type,
},
ArrayRef, BooleanArray, FixedSizeListArray, PrimitiveArray,
ArrayRef, BooleanArray, FixedSizeBinaryArray, FixedSizeListArray, PrimitiveArray,
};
use arrow_buffer::{BooleanBuffer, Buffer, NullBuffer, ScalarBuffer};
use arrow_schema::{DataType, IntervalUnit, TimeUnit};
Expand Down Expand Up @@ -356,6 +356,17 @@ impl PrimitiveFieldDecodeTask {
DataType::UInt8 => Ok(Self::new_primitive_array::<UInt8Type>(
buffers, num_rows, data_type,
)),
DataType::FixedSizeBinary(dimension) => {
let mut buffers_iter = buffers.into_iter();
let fsb_validity = buffers_iter.next().unwrap();
let fsb_nulls = Self::bytes_to_validity(fsb_validity, num_rows);

let fsb_values = buffers_iter.next().unwrap();
let fsb_values = Buffer::from_bytes(fsb_values.freeze().into());
Ok(Arc::new(FixedSizeBinaryArray::new(
*dimension, fsb_values, fsb_nulls,
)))
}
DataType::FixedSizeList(items, dimension) => {
let mut buffers_iter = buffers.into_iter();
let fsl_validity = buffers_iter.next().unwrap();
Expand Down
11 changes: 6 additions & 5 deletions rust/lance-encoding/src/encodings/physical/value.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,17 +130,17 @@ pub struct ValueEncoder {

impl ValueEncoder {
pub fn try_new(data_type: &DataType) -> Result<Self> {
if data_type.is_primitive() {
if *data_type == DataType::Boolean {
Ok(Self {
buffer_encoder: Box::<FlatBufferEncoder>::default(),
buffer_encoder: Box::<BitmapBufferEncoder>::default(),
})
} else if *data_type == DataType::Boolean {
} else if data_type.is_fixed_stride() {
Ok(Self {
buffer_encoder: Box::<BitmapBufferEncoder>::default(),
buffer_encoder: Box::<FlatBufferEncoder>::default(),
})
} else {
Err(Error::invalid_input(
format!("Cannot use value encoded to encode {}", data_type),
format!("Cannot use ValueEncoder to encode {}", data_type),
location!(),
))
}
Expand Down Expand Up @@ -189,6 +189,7 @@ pub(crate) mod tests {
use crate::testing::check_round_trip_encoding_random;

const PRIMITIVE_TYPES: &[DataType] = &[
DataType::FixedSizeBinary(2),
DataType::Date32,
DataType::Date64,
DataType::Int8,
Expand Down
22 changes: 22 additions & 0 deletions rust/lance-io/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,28 @@ pub enum ReadBatchParams {
Indices(UInt32Array),
}

impl std::fmt::Display for ReadBatchParams {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
match self {
Self::Range(r) => write!(f, "Range({}..{})", r.start, r.end),
Self::RangeFull => write!(f, "RangeFull"),
Self::RangeTo(r) => write!(f, "RangeTo({})", r.end),
Self::RangeFrom(r) => write!(f, "RangeFrom({})", r.start),
Self::Indices(indices) => {
let mut indices_str = indices.values().iter().fold(String::new(), |mut acc, v| {
acc.push_str(&v.to_string());
acc.push(',');
acc
});
if !indices_str.is_empty() {
indices_str.pop();
}
write!(f, "Indices({})", indices_str)
}
}
}
}

impl Default for ReadBatchParams {
fn default() -> Self {
// Default of ReadBatchParams is reading the full batch.
Expand Down
Loading
Loading