Skip to content

Commit

Permalink
fix a bug scheduling ranges for data types other than 32-bit width
Browse files Browse the repository at this point in the history
  • Loading branch information
broccoliSpicy committed Sep 24, 2024
1 parent f09cad7 commit fc89bf4
Show file tree
Hide file tree
Showing 2 changed files with 91 additions and 220 deletions.
2 changes: 1 addition & 1 deletion rust/lance-encoding/benches/decoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ fn bench_decode(c: &mut Criterion) {
for data_type in PRIMITIVE_TYPES {
let data = lance_datagen::gen()
.anon_col(lance_datagen::array::rand_type(data_type))
.into_batch_rows(lance_datagen::RowCount::from(1024 * 1024))
.into_batch_rows(lance_datagen::RowCount::from(1024 * 1024 * 1024))
.unwrap();
let lance_schema =
Arc::new(lance_core::datatypes::Schema::try_from(data.schema().as_ref()).unwrap());
Expand Down
309 changes: 90 additions & 219 deletions rust/lance-encoding/src/encodings/physical/bitpack_fastlanes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -186,8 +186,7 @@ pub fn compute_compressed_bit_width_for_non_neg(arrays: &[ArrayRef]) -> u64 {
}
}
_ => {
// in dictionary encoding, they route it to here when array is utf8, don't know what we should do yet.
res = 8;
panic!("BitpackedForNonNegArrayEncoder only supports data types of UInt8, Int8, UInt16, Int16, UInt32, Int32, UInt64, Int64");
}
};
res
Expand Down Expand Up @@ -478,237 +477,109 @@ impl PrimitivePageDecoder for BitpackedForNonNegPageDecoder {
}
}

fn bitpacked_for_non_neg_decode(
compressed_bit_width: u64,
uncompressed_bits_per_value: u64,
data: &[Bytes],
bytes_idx_to_range_indices: &[Vec<std::ops::Range<u64>>],
num_rows: u64,
) -> LanceBuffer {
match uncompressed_bits_per_value {
8 => {
let mut decompressed: Vec<u8> = Vec::with_capacity(num_rows as usize);
let packed_chunk_size: usize =
ELEMS_PER_CHUNK as usize * compressed_bit_width as usize / 8;
let mut decompress_chunk_buf = vec![0_u8; ELEMS_PER_CHUNK as usize];
for (i, bytes) in data.iter().enumerate() {
let mut j = 0;
let mut ranges_idx = 0;
let mut curr_range_start = bytes_idx_to_range_indices[i][0].start;
while j * packed_chunk_size < bytes.len() {
let chunk: &[u8] = &bytes[j * packed_chunk_size..][..packed_chunk_size];
unsafe {
BitPacking::unchecked_unpack(
compressed_bit_width as usize,
chunk,
&mut decompress_chunk_buf[..ELEMS_PER_CHUNK as usize],
);
}
loop {
if curr_range_start + ELEMS_PER_CHUNK
< bytes_idx_to_range_indices[i][ranges_idx].end
{
let this_part_len =
ELEMS_PER_CHUNK - curr_range_start % ELEMS_PER_CHUNK;
decompressed.extend_from_slice(
&decompress_chunk_buf
[(curr_range_start % ELEMS_PER_CHUNK) as usize..],
);
curr_range_start += this_part_len;
break;
} else {
let this_part_len =
bytes_idx_to_range_indices[i][ranges_idx].end - curr_range_start;
decompressed.extend_from_slice(
&decompress_chunk_buf
[(curr_range_start % ELEMS_PER_CHUNK) as usize..]
[..this_part_len as usize],
);
ranges_idx += 1;
if ranges_idx == bytes_idx_to_range_indices[i].len() {
break;
}
curr_range_start = bytes_idx_to_range_indices[i][ranges_idx].start;
}
}
j += 1;
macro_rules! bitpacked_decode {
($uncompressed_type:ty, $compressed_bit_width:expr, $data:expr, $bytes_idx_to_range_indices:expr, $num_rows:expr) => {{
let mut decompressed: Vec<$uncompressed_type> = Vec::with_capacity($num_rows as usize);
let packed_chunk_size_in_byte: usize = (ELEMS_PER_CHUNK * $compressed_bit_width) as usize / 8;
let mut decompress_chunk_buf = vec![0 as $uncompressed_type; ELEMS_PER_CHUNK as usize];

for (i, bytes) in $data.iter().enumerate() {
let mut ranges_idx = 0;
let mut curr_range_start = $bytes_idx_to_range_indices[i][0].start;
let mut chunk_num = 0;

while chunk_num * packed_chunk_size_in_byte < bytes.len() {
// Copy for memory alignment
let chunk_in_u8: Vec<u8> = bytes[chunk_num * packed_chunk_size_in_byte..]
[..packed_chunk_size_in_byte]
.to_vec();
chunk_num += 1;
let chunk = cast_slice(&chunk_in_u8);
unsafe {
BitPacking::unchecked_unpack(
$compressed_bit_width as usize,
chunk,
&mut decompress_chunk_buf,
);
}
}
LanceBuffer::Owned(decompressed)
}

16 => {
let mut decompressed: Vec<u16> = Vec::with_capacity(num_rows as usize);
let packed_chunk_size_in_byte: usize =
(ELEMS_PER_CHUNK * compressed_bit_width) as usize / 8;
let mut decompress_chunk_buf = vec![0_u16; ELEMS_PER_CHUNK as usize];
for (i, bytes) in data.iter().enumerate() {
let mut j = 0;
let mut ranges_idx = 0;
let mut curr_range_start = bytes_idx_to_range_indices[i][0].start;
while j * packed_chunk_size_in_byte < bytes.len() {
let chunk_in_u8: &[u8] =
&bytes[j * packed_chunk_size_in_byte..][..packed_chunk_size_in_byte];
let chunk = cast_slice(chunk_in_u8);
unsafe {
BitPacking::unchecked_unpack(
compressed_bit_width as usize,
chunk,
&mut decompress_chunk_buf,
loop {
// Case 1: All the elements after (curr_range_start % ELEMS_PER_CHUNK) inside this chunk are needed.
let elems_after_curr_range_start_in_this_chunk =
ELEMS_PER_CHUNK - curr_range_start % ELEMS_PER_CHUNK;
if curr_range_start + elems_after_curr_range_start_in_this_chunk
<= $bytes_idx_to_range_indices[i][ranges_idx].end
{
decompressed.extend_from_slice(
&decompress_chunk_buf[(curr_range_start % ELEMS_PER_CHUNK) as usize..],
);
}
loop {
if curr_range_start + ELEMS_PER_CHUNK
< bytes_idx_to_range_indices[i][ranges_idx].end
curr_range_start += elems_after_curr_range_start_in_this_chunk;
break;
} else {
// Case 2: Only part of the elements after (curr_range_start % ELEMS_PER_CHUNK) inside this chunk are needed.
let elems_this_range_needed_in_this_chunk =
($bytes_idx_to_range_indices[i][ranges_idx].end - curr_range_start)
.min(ELEMS_PER_CHUNK - curr_range_start % ELEMS_PER_CHUNK);
decompressed.extend_from_slice(
&decompress_chunk_buf[(curr_range_start % ELEMS_PER_CHUNK) as usize..]
[..elems_this_range_needed_in_this_chunk as usize],
);
if curr_range_start + elems_this_range_needed_in_this_chunk
== $bytes_idx_to_range_indices[i][ranges_idx].end
{
let this_part_len =
ELEMS_PER_CHUNK - curr_range_start % ELEMS_PER_CHUNK;
decompressed.extend_from_slice(
&decompress_chunk_buf
[(curr_range_start % ELEMS_PER_CHUNK) as usize..],
);
curr_range_start += this_part_len;

// when `curr_range_start + 1024 < bytes_idx_to_range_indices[i][ranges_idx].end`,
// we know this chunk has only data of this range
break;
} else {
let this_part_len =
bytes_idx_to_range_indices[i][ranges_idx].end - curr_range_start;
decompressed.extend_from_slice(
&decompress_chunk_buf
[(curr_range_start % ELEMS_PER_CHUNK) as usize..]
[..this_part_len as usize],
);
ranges_idx += 1;
if ranges_idx == bytes_idx_to_range_indices[i].len() {
if ranges_idx == $bytes_idx_to_range_indices[i].len() {
break;
}
curr_range_start = bytes_idx_to_range_indices[i][ranges_idx].start;
}
}
j += 1;
}
}
LanceBuffer::reinterpret_vec(decompressed).to_owned()
}

32 => {
let mut decompressed: Vec<u32> = Vec::with_capacity(num_rows as usize);
let packed_chunk_size_in_byte: usize =
(ELEMS_PER_CHUNK * compressed_bit_width) as usize / 8;
let mut decompress_chunk_buf = vec![0_u32; ELEMS_PER_CHUNK as usize];
for (i, bytes) in data.iter().enumerate() {
let mut ranges_idx = 0;
let mut curr_range_start = bytes_idx_to_range_indices[i][0].start;
let mut chunk_num = 0;
while chunk_num * packed_chunk_size_in_byte < bytes.len() {
// I have to do a copy here for memory alignment
let chunk_in_u8: Vec<u8> = bytes[chunk_num * packed_chunk_size_in_byte..]
[..packed_chunk_size_in_byte]
.to_vec();
chunk_num += 1;
let chunk = cast_slice(&chunk_in_u8);
unsafe {
BitPacking::unchecked_unpack(
compressed_bit_width as usize,
chunk,
&mut decompress_chunk_buf,
);
}
loop {
// case 1: all the elements after (curr_range_start % ELEMS_PER_CHUNK) inside this chunk is needed.
let elems_after_curr_range_start_in_this_chunk =
ELEMS_PER_CHUNK - curr_range_start % ELEMS_PER_CHUNK;
if curr_range_start + elems_after_curr_range_start_in_this_chunk
<= bytes_idx_to_range_indices[i][ranges_idx].end
{
decompressed.extend_from_slice(
&decompress_chunk_buf
[(curr_range_start % ELEMS_PER_CHUNK) as usize..],
);
curr_range_start += elems_after_curr_range_start_in_this_chunk;
break;
curr_range_start = $bytes_idx_to_range_indices[i][ranges_idx].start;
} else {
// case 2: only part of the elements after (curr_range_start % ELEMS_PER_CHUNK) inside this chunk is needed.
let elems_this_range_needed_in_this_chunk =
(bytes_idx_to_range_indices[i][ranges_idx].end - curr_range_start)
.min(ELEMS_PER_CHUNK - curr_range_start % ELEMS_PER_CHUNK);
decompressed.extend_from_slice(
&decompress_chunk_buf
[(curr_range_start % ELEMS_PER_CHUNK) as usize..]
[..elems_this_range_needed_in_this_chunk as usize],
);
if curr_range_start + elems_this_range_needed_in_this_chunk
== bytes_idx_to_range_indices[i][ranges_idx].end
{
ranges_idx += 1;
if ranges_idx == bytes_idx_to_range_indices[i].len() {
break;
}
curr_range_start = bytes_idx_to_range_indices[i][ranges_idx].start;
} else {
curr_range_start += elems_this_range_needed_in_this_chunk;
}
curr_range_start += elems_this_range_needed_in_this_chunk;
}
}
}
}
LanceBuffer::reinterpret_vec(decompressed).to_owned()
}

64 => {
let mut decompressed: Vec<u64> = Vec::with_capacity(num_rows as usize);
let packed_chunk_size_in_byte: usize =
(ELEMS_PER_CHUNK * compressed_bit_width) as usize / 8;
let mut decompress_chunk_buf = vec![0_u64; ELEMS_PER_CHUNK as usize];
for (i, bytes) in data.iter().enumerate() {
let mut j = 0;
let mut ranges_idx = 0;
let mut curr_range_start = bytes_idx_to_range_indices[i][0].start;
while j * packed_chunk_size_in_byte < bytes.len() {
let chunk_in_u8: &[u8] =
&bytes[j * packed_chunk_size_in_byte..][..packed_chunk_size_in_byte];
let chunk = cast_slice(chunk_in_u8);
unsafe {
BitPacking::unchecked_unpack(
compressed_bit_width as usize,
chunk,
&mut decompress_chunk_buf,
);
}
loop {
if curr_range_start + ELEMS_PER_CHUNK
< bytes_idx_to_range_indices[i][ranges_idx].end
{
let this_part_len =
ELEMS_PER_CHUNK - curr_range_start % ELEMS_PER_CHUNK;
decompressed.extend_from_slice(
&decompress_chunk_buf
[(curr_range_start % ELEMS_PER_CHUNK) as usize..],
);
curr_range_start += this_part_len;
break;
} else {
let this_part_len =
bytes_idx_to_range_indices[i][ranges_idx].end - curr_range_start;
decompressed.extend_from_slice(
&decompress_chunk_buf
[(curr_range_start % ELEMS_PER_CHUNK) as usize..]
[..this_part_len as usize],
);
ranges_idx += 1;
if ranges_idx == bytes_idx_to_range_indices[i].len() {
break;
}
curr_range_start = bytes_idx_to_range_indices[i][ranges_idx].start;
}
}
j += 1;
}
}
LanceBuffer::reinterpret_vec(decompressed).to_owned()
}
LanceBuffer::reinterpret_vec(decompressed)
}};
}

fn bitpacked_for_non_neg_decode(
compressed_bit_width: u64,
uncompressed_bits_per_value: u64,
data: &[Bytes],
bytes_idx_to_range_indices: &[Vec<std::ops::Range<u64>>],
num_rows: u64,
) -> LanceBuffer {
match uncompressed_bits_per_value {
8 => bitpacked_decode!(
u8,
compressed_bit_width,
data,
bytes_idx_to_range_indices,
num_rows
),
16 => bitpacked_decode!(
u16,
compressed_bit_width,
data,
bytes_idx_to_range_indices,
num_rows
),
32 => bitpacked_decode!(
u32,
compressed_bit_width,
data,
bytes_idx_to_range_indices,
num_rows
),
64 => bitpacked_decode!(
u64,
compressed_bit_width,
data,
bytes_idx_to_range_indices,
num_rows
),
_ => unreachable!(
"bitpacked_for_non_neg_decode only supports 8, 16, 32, 64 uncompressed_bits_per_value"
),
Expand Down

0 comments on commit fc89bf4

Please sign in to comment.