Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

IPC format support for StringViewArray and BinaryViewArray #5525

Merged
merged 11 commits into from
Apr 1, 2024
15 changes: 14 additions & 1 deletion arrow-ipc/src/convert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -247,8 +247,10 @@ pub(crate) fn get_data_type(field: crate::Field, may_be_dictionary: bool) -> Dat
}
}
crate::Type::Binary => DataType::Binary,
crate::Type::BinaryView => DataType::BinaryView,
crate::Type::LargeBinary => DataType::LargeBinary,
crate::Type::Utf8 => DataType::Utf8,
crate::Type::Utf8View => DataType::Utf8View,
crate::Type::LargeUtf8 => DataType::LargeUtf8,
crate::Type::FixedSizeBinary => {
let fsb = field.type_as_fixed_size_binary().unwrap();
Expand Down Expand Up @@ -548,7 +550,16 @@ pub(crate) fn get_fb_field_type<'a>(
.as_union_value(),
children: Some(fbb.create_vector(&empty_fields[..])),
},
BinaryView | Utf8View => unimplemented!("unimplemented"),
BinaryView => FBFieldType {
type_type: crate::Type::BinaryView,
type_: crate::BinaryViewBuilder::new(fbb).finish().as_union_value(),
children: Some(fbb.create_vector(&empty_fields[..])),
},
Utf8View => FBFieldType {
type_type: crate::Type::Utf8View,
type_: crate::Utf8ViewBuilder::new(fbb).finish().as_union_value(),
children: Some(fbb.create_vector(&empty_fields[..])),
},
Utf8 => FBFieldType {
type_type: crate::Type::Utf8,
type_: crate::Utf8Builder::new(fbb).finish().as_union_value(),
Expand Down Expand Up @@ -921,7 +932,9 @@ mod tests {
true,
),
Field::new("utf8", DataType::Utf8, false),
Field::new("utf8_view", DataType::Utf8View, false),
Field::new("binary", DataType::Binary, false),
Field::new("binary_view", DataType::BinaryView, false),
Field::new_list("list[u8]", Field::new("item", DataType::UInt8, false), true),
Field::new_fixed_size_list(
"fixed_size_list[u8]",
Expand Down
197 changes: 179 additions & 18 deletions arrow-ipc/src/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ mod stream;
pub use stream::*;

use flatbuffers::{VectorIter, VerifierOptions};
use std::collections::HashMap;
use std::collections::{HashMap, VecDeque};
use std::fmt;
use std::io::{BufReader, Read, Seek, SeekFrom};
use std::sync::Arc;
Expand Down Expand Up @@ -64,14 +64,21 @@ fn read_buffer(

/// Coordinates reading arrays based on data types.
///
/// `variadic_counts` encodes the number of buffers to read for variadic types (e.g., Utf8View, BinaryView)
/// When encounter such types, we pop from the front of the queue to get the number of buffers to read.
///
/// Notes:
/// * In the IPC format, null buffers are always set, but may be empty. We discard them if an array has 0 nulls
/// * Numeric values inside list arrays are often stored as 64-bit values regardless of their data type size.
/// We thus:
/// - check if the bit width of non-64-bit numbers is 64, and
/// - read the buffer as 64-bit (signed integer or float), and
/// - cast the 64-bit array to the appropriate data type
fn create_array(reader: &mut ArrayReader, field: &Field) -> Result<ArrayRef, ArrowError> {
fn create_array(
XiangpengHao marked this conversation as resolved.
Show resolved Hide resolved
reader: &mut ArrayReader,
field: &Field,
variadic_counts: &mut VecDeque<i64>,
) -> Result<ArrayRef, ArrowError> {
let data_type = field.data_type();
match data_type {
Utf8 | Binary | LargeBinary | LargeUtf8 => create_primitive_array(
Expand All @@ -83,6 +90,18 @@ fn create_array(reader: &mut ArrayReader, field: &Field) -> Result<ArrayRef, Arr
reader.next_buffer()?,
],
),
BinaryView | Utf8View => {
let count = variadic_counts
.pop_front()
.ok_or(ArrowError::IpcError(format!(
"Missing variadic count for {data_type} column"
)))?;
let count = count + 2; // view and null buffer.
let buffers = (0..count)
.map(|_| reader.next_buffer())
.collect::<Result<Vec<_>, _>>()?;
create_primitive_array(reader.next_node(field)?, data_type, &buffers)
alamb marked this conversation as resolved.
Show resolved Hide resolved
}
FixedSizeBinary(_) => create_primitive_array(
reader.next_node(field)?,
data_type,
Expand All @@ -91,13 +110,13 @@ fn create_array(reader: &mut ArrayReader, field: &Field) -> Result<ArrayRef, Arr
List(ref list_field) | LargeList(ref list_field) | Map(ref list_field, _) => {
let list_node = reader.next_node(field)?;
let list_buffers = [reader.next_buffer()?, reader.next_buffer()?];
let values = create_array(reader, list_field)?;
let values = create_array(reader, list_field, variadic_counts)?;
create_list_array(list_node, data_type, &list_buffers, values)
}
FixedSizeList(ref list_field, _) => {
let list_node = reader.next_node(field)?;
let list_buffers = [reader.next_buffer()?];
let values = create_array(reader, list_field)?;
let values = create_array(reader, list_field, variadic_counts)?;
create_list_array(list_node, data_type, &list_buffers, values)
}
Struct(struct_fields) => {
Expand All @@ -109,7 +128,7 @@ fn create_array(reader: &mut ArrayReader, field: &Field) -> Result<ArrayRef, Arr
// TODO investigate whether just knowing the number of buffers could
// still work
for struct_field in struct_fields {
let child = create_array(reader, struct_field)?;
let child = create_array(reader, struct_field, variadic_counts)?;
struct_arrays.push((struct_field.clone(), child));
}
let null_count = struct_node.null_count() as usize;
Expand All @@ -123,8 +142,8 @@ fn create_array(reader: &mut ArrayReader, field: &Field) -> Result<ArrayRef, Arr
}
RunEndEncoded(run_ends_field, values_field) => {
let run_node = reader.next_node(field)?;
let run_ends = create_array(reader, run_ends_field)?;
let values = create_array(reader, values_field)?;
let run_ends = create_array(reader, run_ends_field, variadic_counts)?;
let values = create_array(reader, values_field, variadic_counts)?;

let run_array_length = run_node.length() as usize;
let data = ArrayData::builder(data_type.clone())
Expand Down Expand Up @@ -177,7 +196,7 @@ fn create_array(reader: &mut ArrayReader, field: &Field) -> Result<ArrayRef, Arr
let mut ids = Vec::with_capacity(fields.len());

for (id, field) in fields.iter() {
let child = create_array(reader, field)?;
let child = create_array(reader, field, variadic_counts)?;
children.push((field.as_ref().clone(), child));
ids.push(id);
}
Expand Down Expand Up @@ -230,6 +249,11 @@ fn create_primitive_array(
.null_bit_buffer(null_buffer)
.build_aligned()?
}
BinaryView | Utf8View => ArrayData::builder(data_type.clone())
.len(length)
.buffers(buffers[1..].to_vec())
.null_bit_buffer(null_buffer)
.build_aligned()?,
_ if data_type.is_primitive() || matches!(data_type, Boolean | FixedSizeBinary(_)) => {
// read 2 buffers: null buffer (optional) and data buffer
ArrayData::builder(data_type.clone())
Expand Down Expand Up @@ -328,7 +352,11 @@ impl<'a> ArrayReader<'a> {
})
}

fn skip_field(&mut self, field: &Field) -> Result<(), ArrowError> {
fn skip_field(
&mut self,
field: &Field,
variadic_count: &mut VecDeque<i64>,
) -> Result<(), ArrowError> {
self.next_node(field)?;

match field.data_type() {
Expand All @@ -337,30 +365,42 @@ impl<'a> ArrayReader<'a> {
self.skip_buffer()
}
}
Utf8View | BinaryView => {
let count = variadic_count
.pop_front()
.ok_or(ArrowError::IpcError(format!(
"Missing variadic count for {} column",
field.data_type()
)))?;
let count = count + 2; // view and null buffer.
for _i in 0..count {
self.skip_buffer()
}
}
FixedSizeBinary(_) => {
self.skip_buffer();
self.skip_buffer();
}
List(list_field) | LargeList(list_field) | Map(list_field, _) => {
self.skip_buffer();
self.skip_buffer();
self.skip_field(list_field)?;
self.skip_field(list_field, variadic_count)?;
}
FixedSizeList(list_field, _) => {
self.skip_buffer();
self.skip_field(list_field)?;
self.skip_field(list_field, variadic_count)?;
}
Struct(struct_fields) => {
self.skip_buffer();

// skip for each field
for struct_field in struct_fields {
self.skip_field(struct_field)?
self.skip_field(struct_field, variadic_count)?
}
}
RunEndEncoded(run_ends_field, values_field) => {
self.skip_field(run_ends_field)?;
self.skip_field(values_field)?;
self.skip_field(run_ends_field, variadic_count)?;
self.skip_field(values_field, variadic_count)?;
}
Dictionary(_, _) => {
self.skip_buffer(); // Nulls
Expand All @@ -375,7 +415,7 @@ impl<'a> ArrayReader<'a> {
};

for (_, field) in fields.iter() {
self.skip_field(field)?
self.skip_field(field, variadic_count)?
}
}
Null => {} // No buffer increases
Expand Down Expand Up @@ -403,6 +443,10 @@ pub fn read_record_batch(
let field_nodes = batch.nodes().ok_or_else(|| {
ArrowError::IpcError("Unable to get field nodes from IPC RecordBatch".to_string())
})?;

let mut variadic_counts: VecDeque<i64> =
batch.variadicBufferCounts().into_iter().flatten().collect();

let batch_compression = batch.compression();
let compression = batch_compression
.map(|batch_compression| batch_compression.codec().try_into())
Expand All @@ -425,12 +469,13 @@ pub fn read_record_batch(
for (idx, field) in schema.fields().iter().enumerate() {
// Create array for projected field
if let Some(proj_idx) = projection.iter().position(|p| p == &idx) {
let child = create_array(&mut reader, field)?;
let child = create_array(&mut reader, field, &mut variadic_counts)?;
arrays.push((proj_idx, child));
} else {
reader.skip_field(field)?;
reader.skip_field(field, &mut variadic_counts)?;
}
}
assert!(variadic_counts.is_empty());
arrays.sort_by_key(|t| t.0);
RecordBatch::try_new_with_options(
Arc::new(schema.project(projection)?),
Expand All @@ -441,9 +486,10 @@ pub fn read_record_batch(
let mut children = vec![];
// keep track of index as lists require more than one node
for field in schema.fields() {
let child = create_array(&mut reader, field)?;
let child = create_array(&mut reader, field, &mut variadic_counts)?;
children.push(child);
}
assert!(variadic_counts.is_empty());
RecordBatch::try_new_with_options(schema, children, &options)
}
}
Expand Down Expand Up @@ -1759,6 +1805,121 @@ mod tests {
assert_eq!(input_batch, output_batch);
}

const LONG_TEST_STRING: &str =
"This is a long string to make sure binary view array handles it";

#[test]
fn test_roundtrip_view_types() {
let schema = Schema::new(vec![
Field::new("field_1", DataType::BinaryView, true),
Field::new("field_2", DataType::Utf8, true),
Field::new("field_3", DataType::Utf8View, true),
]);
let bin_values: Vec<Option<&[u8]>> = vec![
Some(b"foo"),
None,
Some(b"bar"),
Some(LONG_TEST_STRING.as_bytes()),
];
let utf8_values: Vec<Option<&str>> =
vec![Some("foo"), None, Some("bar"), Some(LONG_TEST_STRING)];
let bin_view_array = BinaryViewArray::from_iter(bin_values);
let utf8_array = StringArray::from_iter(utf8_values.iter());
let utf8_view_array = StringViewArray::from_iter(utf8_values);
let record_batch = RecordBatch::try_new(
Arc::new(schema.clone()),
vec![
Arc::new(bin_view_array),
Arc::new(utf8_array),
Arc::new(utf8_view_array),
],
)
.unwrap();

assert_eq!(record_batch, roundtrip_ipc(&record_batch));
assert_eq!(record_batch, roundtrip_ipc_stream(&record_batch));

let sliced_batch = record_batch.slice(1, 2);
assert_eq!(sliced_batch, roundtrip_ipc(&sliced_batch));
assert_eq!(sliced_batch, roundtrip_ipc_stream(&sliced_batch));
}

#[test]
fn test_roundtrip_view_types_nested_dict() {
let bin_values: Vec<Option<&[u8]>> = vec![
Some(b"foo"),
None,
Some(b"bar"),
Some(LONG_TEST_STRING.as_bytes()),
XiangpengHao marked this conversation as resolved.
Show resolved Hide resolved
Some(b"field"),
];
let utf8_values: Vec<Option<&str>> = vec![
Some("foo"),
None,
Some("bar"),
Some(LONG_TEST_STRING),
Some("field"),
];
let bin_view_array = Arc::new(BinaryViewArray::from_iter(bin_values));
let utf8_view_array = Arc::new(StringViewArray::from_iter(utf8_values));

let key_dict_keys = Int8Array::from_iter_values([0, 0, 1, 2, 0, 1, 3]);
let key_dict_array = DictionaryArray::new(key_dict_keys, utf8_view_array.clone());
let keys_field = Arc::new(Field::new_dict(
"keys",
DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::Utf8View)),
true,
1,
false,
));

let value_dict_keys = Int8Array::from_iter_values([0, 3, 0, 1, 2, 0, 1]);
let value_dict_array = DictionaryArray::new(value_dict_keys, bin_view_array);
let values_field = Arc::new(Field::new_dict(
"values",
DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::BinaryView)),
true,
2,
false,
));
let entry_struct = StructArray::from(vec![
(keys_field, make_array(key_dict_array.into_data())),
(values_field, make_array(value_dict_array.into_data())),
]);

let map_data_type = DataType::Map(
Arc::new(Field::new(
"entries",
entry_struct.data_type().clone(),
false,
)),
false,
);
let entry_offsets = Buffer::from_slice_ref([0, 2, 4, 7]);
let map_data = ArrayData::builder(map_data_type)
.len(3)
.add_buffer(entry_offsets)
.add_child_data(entry_struct.into_data())
.build()
.unwrap();
let map_array = MapArray::from(map_data);

let dict_keys = Int8Array::from_iter_values([0, 1, 0, 1, 1, 2, 0, 1, 2]);
let dict_dict_array = DictionaryArray::new(dict_keys, Arc::new(map_array));
let schema = Arc::new(Schema::new(vec![Field::new(
"f1",
dict_dict_array.data_type().clone(),
false,
)]));
let batch = RecordBatch::try_new(schema, vec![Arc::new(dict_dict_array)]).unwrap();
assert_eq!(batch, roundtrip_ipc(&batch));
assert_eq!(batch, roundtrip_ipc_stream(&batch));
XiangpengHao marked this conversation as resolved.
Show resolved Hide resolved

let sliced_batch = batch.slice(1, 2);
assert_eq!(sliced_batch, roundtrip_ipc(&sliced_batch));
assert_eq!(sliced_batch, roundtrip_ipc_stream(&sliced_batch));
}

#[test]
fn test_no_columns_batch() {
let schema = Arc::new(Schema::empty());
Expand Down
Loading
Loading