diff --git a/arrow-ipc/src/convert.rs b/arrow-ipc/src/convert.rs index 51e54215ea7f..49da0efae3a6 100644 --- a/arrow-ipc/src/convert.rs +++ b/arrow-ipc/src/convert.rs @@ -247,8 +247,10 @@ pub(crate) fn get_data_type(field: crate::Field, may_be_dictionary: bool) -> Dat } } crate::Type::Binary => DataType::Binary, + crate::Type::BinaryView => DataType::BinaryView, crate::Type::LargeBinary => DataType::LargeBinary, crate::Type::Utf8 => DataType::Utf8, + crate::Type::Utf8View => DataType::Utf8View, crate::Type::LargeUtf8 => DataType::LargeUtf8, crate::Type::FixedSizeBinary => { let fsb = field.type_as_fixed_size_binary().unwrap(); @@ -548,7 +550,16 @@ pub(crate) fn get_fb_field_type<'a>( .as_union_value(), children: Some(fbb.create_vector(&empty_fields[..])), }, - BinaryView | Utf8View => unimplemented!("unimplemented"), + BinaryView => FBFieldType { + type_type: crate::Type::BinaryView, + type_: crate::BinaryViewBuilder::new(fbb).finish().as_union_value(), + children: Some(fbb.create_vector(&empty_fields[..])), + }, + Utf8View => FBFieldType { + type_type: crate::Type::Utf8View, + type_: crate::Utf8ViewBuilder::new(fbb).finish().as_union_value(), + children: Some(fbb.create_vector(&empty_fields[..])), + }, Utf8 => FBFieldType { type_type: crate::Type::Utf8, type_: crate::Utf8Builder::new(fbb).finish().as_union_value(), @@ -921,7 +932,9 @@ mod tests { true, ), Field::new("utf8", DataType::Utf8, false), + Field::new("utf8_view", DataType::Utf8View, false), Field::new("binary", DataType::Binary, false), + Field::new("binary_view", DataType::BinaryView, false), Field::new_list("list[u8]", Field::new("item", DataType::UInt8, false), true), Field::new_fixed_size_list( "fixed_size_list[u8]", diff --git a/arrow-ipc/src/reader.rs b/arrow-ipc/src/reader.rs index dd0365da4bc7..4591777c1e37 100644 --- a/arrow-ipc/src/reader.rs +++ b/arrow-ipc/src/reader.rs @@ -25,7 +25,7 @@ mod stream; pub use stream::*; use flatbuffers::{VectorIter, VerifierOptions}; -use std::collections::HashMap; +use std::collections::{HashMap, VecDeque}; use std::fmt; use std::io::{BufReader, Read, Seek, SeekFrom}; use std::sync::Arc; @@ -64,6 +64,9 @@ fn read_buffer( /// Coordinates reading arrays based on data types. /// +/// `variadic_counts` encodes the number of buffers to read for variadic types (e.g., Utf8View, BinaryView) +/// When encounter such types, we pop from the front of the queue to get the number of buffers to read. +/// /// Notes: /// * In the IPC format, null buffers are always set, but may be empty. We discard them if an array has 0 nulls /// * Numeric values inside list arrays are often stored as 64-bit values regardless of their data type size. @@ -71,7 +74,11 @@ fn read_buffer( /// - check if the bit width of non-64-bit numbers is 64, and /// - read the buffer as 64-bit (signed integer or float), and /// - cast the 64-bit array to the appropriate data type -fn create_array(reader: &mut ArrayReader, field: &Field) -> Result { +fn create_array( + reader: &mut ArrayReader, + field: &Field, + variadic_counts: &mut VecDeque, +) -> Result { let data_type = field.data_type(); match data_type { Utf8 | Binary | LargeBinary | LargeUtf8 => create_primitive_array( @@ -83,6 +90,18 @@ fn create_array(reader: &mut ArrayReader, field: &Field) -> Result { + let count = variadic_counts + .pop_front() + .ok_or(ArrowError::IpcError(format!( + "Missing variadic count for {data_type} column" + )))?; + let count = count + 2; // view and null buffer. + let buffers = (0..count) + .map(|_| reader.next_buffer()) + .collect::, _>>()?; + create_primitive_array(reader.next_node(field)?, data_type, &buffers) + } FixedSizeBinary(_) => create_primitive_array( reader.next_node(field)?, data_type, @@ -91,13 +110,13 @@ fn create_array(reader: &mut ArrayReader, field: &Field) -> Result { let list_node = reader.next_node(field)?; let list_buffers = [reader.next_buffer()?, reader.next_buffer()?]; - let values = create_array(reader, list_field)?; + let values = create_array(reader, list_field, variadic_counts)?; create_list_array(list_node, data_type, &list_buffers, values) } FixedSizeList(ref list_field, _) => { let list_node = reader.next_node(field)?; let list_buffers = [reader.next_buffer()?]; - let values = create_array(reader, list_field)?; + let values = create_array(reader, list_field, variadic_counts)?; create_list_array(list_node, data_type, &list_buffers, values) } Struct(struct_fields) => { @@ -109,7 +128,7 @@ fn create_array(reader: &mut ArrayReader, field: &Field) -> Result Result { let run_node = reader.next_node(field)?; - let run_ends = create_array(reader, run_ends_field)?; - let values = create_array(reader, values_field)?; + let run_ends = create_array(reader, run_ends_field, variadic_counts)?; + let values = create_array(reader, values_field, variadic_counts)?; let run_array_length = run_node.length() as usize; let data = ArrayData::builder(data_type.clone()) @@ -177,7 +196,7 @@ fn create_array(reader: &mut ArrayReader, field: &Field) -> Result ArrayData::builder(data_type.clone()) + .len(length) + .buffers(buffers[1..].to_vec()) + .null_bit_buffer(null_buffer) + .build_aligned()?, _ if data_type.is_primitive() || matches!(data_type, Boolean | FixedSizeBinary(_)) => { // read 2 buffers: null buffer (optional) and data buffer ArrayData::builder(data_type.clone()) @@ -328,7 +352,11 @@ impl<'a> ArrayReader<'a> { }) } - fn skip_field(&mut self, field: &Field) -> Result<(), ArrowError> { + fn skip_field( + &mut self, + field: &Field, + variadic_count: &mut VecDeque, + ) -> Result<(), ArrowError> { self.next_node(field)?; match field.data_type() { @@ -337,6 +365,18 @@ impl<'a> ArrayReader<'a> { self.skip_buffer() } } + Utf8View | BinaryView => { + let count = variadic_count + .pop_front() + .ok_or(ArrowError::IpcError(format!( + "Missing variadic count for {} column", + field.data_type() + )))?; + let count = count + 2; // view and null buffer. + for _i in 0..count { + self.skip_buffer() + } + } FixedSizeBinary(_) => { self.skip_buffer(); self.skip_buffer(); @@ -344,23 +384,23 @@ impl<'a> ArrayReader<'a> { List(list_field) | LargeList(list_field) | Map(list_field, _) => { self.skip_buffer(); self.skip_buffer(); - self.skip_field(list_field)?; + self.skip_field(list_field, variadic_count)?; } FixedSizeList(list_field, _) => { self.skip_buffer(); - self.skip_field(list_field)?; + self.skip_field(list_field, variadic_count)?; } Struct(struct_fields) => { self.skip_buffer(); // skip for each field for struct_field in struct_fields { - self.skip_field(struct_field)? + self.skip_field(struct_field, variadic_count)? } } RunEndEncoded(run_ends_field, values_field) => { - self.skip_field(run_ends_field)?; - self.skip_field(values_field)?; + self.skip_field(run_ends_field, variadic_count)?; + self.skip_field(values_field, variadic_count)?; } Dictionary(_, _) => { self.skip_buffer(); // Nulls @@ -375,7 +415,7 @@ impl<'a> ArrayReader<'a> { }; for (_, field) in fields.iter() { - self.skip_field(field)? + self.skip_field(field, variadic_count)? } } Null => {} // No buffer increases @@ -403,6 +443,10 @@ pub fn read_record_batch( let field_nodes = batch.nodes().ok_or_else(|| { ArrowError::IpcError("Unable to get field nodes from IPC RecordBatch".to_string()) })?; + + let mut variadic_counts: VecDeque = + batch.variadicBufferCounts().into_iter().flatten().collect(); + let batch_compression = batch.compression(); let compression = batch_compression .map(|batch_compression| batch_compression.codec().try_into()) @@ -425,12 +469,13 @@ pub fn read_record_batch( for (idx, field) in schema.fields().iter().enumerate() { // Create array for projected field if let Some(proj_idx) = projection.iter().position(|p| p == &idx) { - let child = create_array(&mut reader, field)?; + let child = create_array(&mut reader, field, &mut variadic_counts)?; arrays.push((proj_idx, child)); } else { - reader.skip_field(field)?; + reader.skip_field(field, &mut variadic_counts)?; } } + assert!(variadic_counts.is_empty()); arrays.sort_by_key(|t| t.0); RecordBatch::try_new_with_options( Arc::new(schema.project(projection)?), @@ -441,9 +486,10 @@ pub fn read_record_batch( let mut children = vec![]; // keep track of index as lists require more than one node for field in schema.fields() { - let child = create_array(&mut reader, field)?; + let child = create_array(&mut reader, field, &mut variadic_counts)?; children.push(child); } + assert!(variadic_counts.is_empty()); RecordBatch::try_new_with_options(schema, children, &options) } } @@ -1759,6 +1805,121 @@ mod tests { assert_eq!(input_batch, output_batch); } + const LONG_TEST_STRING: &str = + "This is a long string to make sure binary view array handles it"; + + #[test] + fn test_roundtrip_view_types() { + let schema = Schema::new(vec![ + Field::new("field_1", DataType::BinaryView, true), + Field::new("field_2", DataType::Utf8, true), + Field::new("field_3", DataType::Utf8View, true), + ]); + let bin_values: Vec> = vec![ + Some(b"foo"), + None, + Some(b"bar"), + Some(LONG_TEST_STRING.as_bytes()), + ]; + let utf8_values: Vec> = + vec![Some("foo"), None, Some("bar"), Some(LONG_TEST_STRING)]; + let bin_view_array = BinaryViewArray::from_iter(bin_values); + let utf8_array = StringArray::from_iter(utf8_values.iter()); + let utf8_view_array = StringViewArray::from_iter(utf8_values); + let record_batch = RecordBatch::try_new( + Arc::new(schema.clone()), + vec![ + Arc::new(bin_view_array), + Arc::new(utf8_array), + Arc::new(utf8_view_array), + ], + ) + .unwrap(); + + assert_eq!(record_batch, roundtrip_ipc(&record_batch)); + assert_eq!(record_batch, roundtrip_ipc_stream(&record_batch)); + + let sliced_batch = record_batch.slice(1, 2); + assert_eq!(sliced_batch, roundtrip_ipc(&sliced_batch)); + assert_eq!(sliced_batch, roundtrip_ipc_stream(&sliced_batch)); + } + + #[test] + fn test_roundtrip_view_types_nested_dict() { + let bin_values: Vec> = vec![ + Some(b"foo"), + None, + Some(b"bar"), + Some(LONG_TEST_STRING.as_bytes()), + Some(b"field"), + ]; + let utf8_values: Vec> = vec![ + Some("foo"), + None, + Some("bar"), + Some(LONG_TEST_STRING), + Some("field"), + ]; + let bin_view_array = Arc::new(BinaryViewArray::from_iter(bin_values)); + let utf8_view_array = Arc::new(StringViewArray::from_iter(utf8_values)); + + let key_dict_keys = Int8Array::from_iter_values([0, 0, 1, 2, 0, 1, 3]); + let key_dict_array = DictionaryArray::new(key_dict_keys, utf8_view_array.clone()); + let keys_field = Arc::new(Field::new_dict( + "keys", + DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::Utf8View)), + true, + 1, + false, + )); + + let value_dict_keys = Int8Array::from_iter_values([0, 3, 0, 1, 2, 0, 1]); + let value_dict_array = DictionaryArray::new(value_dict_keys, bin_view_array); + let values_field = Arc::new(Field::new_dict( + "values", + DataType::Dictionary(Box::new(DataType::Int8), Box::new(DataType::BinaryView)), + true, + 2, + false, + )); + let entry_struct = StructArray::from(vec![ + (keys_field, make_array(key_dict_array.into_data())), + (values_field, make_array(value_dict_array.into_data())), + ]); + + let map_data_type = DataType::Map( + Arc::new(Field::new( + "entries", + entry_struct.data_type().clone(), + false, + )), + false, + ); + let entry_offsets = Buffer::from_slice_ref([0, 2, 4, 7]); + let map_data = ArrayData::builder(map_data_type) + .len(3) + .add_buffer(entry_offsets) + .add_child_data(entry_struct.into_data()) + .build() + .unwrap(); + let map_array = MapArray::from(map_data); + + let dict_keys = Int8Array::from_iter_values([0, 1, 0, 1, 1, 2, 0, 1, 2]); + let dict_dict_array = DictionaryArray::new(dict_keys, Arc::new(map_array)); + let schema = Arc::new(Schema::new(vec![Field::new( + "f1", + dict_dict_array.data_type().clone(), + false, + )])); + let batch = RecordBatch::try_new(schema, vec![Arc::new(dict_dict_array)]).unwrap(); + assert_eq!(batch, roundtrip_ipc(&batch)); + assert_eq!(batch, roundtrip_ipc_stream(&batch)); + + let sliced_batch = batch.slice(1, 2); + assert_eq!(sliced_batch, roundtrip_ipc(&sliced_batch)); + assert_eq!(sliced_batch, roundtrip_ipc_stream(&sliced_batch)); + } + #[test] fn test_no_columns_batch() { let schema = Arc::new(Schema::empty()); diff --git a/arrow-ipc/src/writer.rs b/arrow-ipc/src/writer.rs index 22edfbc2454d..2a3474fe0fc6 100644 --- a/arrow-ipc/src/writer.rs +++ b/arrow-ipc/src/writer.rs @@ -412,6 +412,8 @@ impl IpcDataGenerator { let compression_codec: Option = batch_compression_type.map(TryInto::try_into).transpose()?; + let mut variadic_buffer_counts = vec![]; + for array in batch.columns() { let array_data = array.to_data(); offset = write_array_data( @@ -425,6 +427,8 @@ impl IpcDataGenerator { compression_codec, write_options, )?; + + append_variadic_buffer_counts(&mut variadic_buffer_counts, &array_data); } // pad the tail of body data let len = arrow_data.len(); @@ -434,6 +438,12 @@ impl IpcDataGenerator { // write data let buffers = fbb.create_vector(&buffers); let nodes = fbb.create_vector(&nodes); + let variadic_buffer = if variadic_buffer_counts.is_empty() { + None + } else { + Some(fbb.create_vector(&variadic_buffer_counts)) + }; + let root = { let mut batch_builder = crate::RecordBatchBuilder::new(&mut fbb); batch_builder.add_length(batch.num_rows() as i64); @@ -442,6 +452,10 @@ impl IpcDataGenerator { if let Some(c) = compression { batch_builder.add_compression(c); } + + if let Some(v) = variadic_buffer { + batch_builder.add_variadicBufferCounts(v); + } let b = batch_builder.finish(); b.as_union_value() }; @@ -501,6 +515,9 @@ impl IpcDataGenerator { write_options, )?; + let mut variadic_buffer_counts = vec![]; + append_variadic_buffer_counts(&mut variadic_buffer_counts, array_data); + // pad the tail of body data let len = arrow_data.len(); let pad_len = pad_to_8(len as u32); @@ -509,6 +526,11 @@ impl IpcDataGenerator { // write data let buffers = fbb.create_vector(&buffers); let nodes = fbb.create_vector(&nodes); + let variadic_buffer = if variadic_buffer_counts.is_empty() { + None + } else { + Some(fbb.create_vector(&variadic_buffer_counts)) + }; let root = { let mut batch_builder = crate::RecordBatchBuilder::new(&mut fbb); @@ -518,6 +540,9 @@ impl IpcDataGenerator { if let Some(c) = compression { batch_builder.add_compression(c); } + if let Some(v) = variadic_buffer { + batch_builder.add_variadicBufferCounts(v); + } batch_builder.finish() }; @@ -547,6 +572,25 @@ impl IpcDataGenerator { } } +fn append_variadic_buffer_counts(counts: &mut Vec, array: &ArrayData) { + match array.data_type() { + DataType::BinaryView | DataType::Utf8View => { + // The spec documents the counts only includes the variadic buffers, not the view/null buffers. + // https://arrow.apache.org/docs/format/Columnar.html#variadic-buffers + counts.push(array.buffers().len() as i64 - 1); + } + DataType::Dictionary(_, _) => { + // Do nothing + // Dictionary types are handled in `encode_dictionaries`. + } + _ => { + for child in array.child_data() { + append_variadic_buffer_counts(counts, child) + } + } + } +} + pub(crate) fn unslice_run_array(arr: ArrayData) -> Result { match arr.data_type() { DataType::RunEndEncoded(k, _) => match k.data_type() { @@ -1249,6 +1293,22 @@ fn write_array_data( compression_codec, )?; } + } else if matches!(data_type, DataType::BinaryView | DataType::Utf8View) { + // Slicing the views buffer is safe and easy, + // but pruning unneeded data buffers is much more nuanced since it's complicated to prove that no views reference the pruned buffers + // + // Current implementation just serialize the raw arrays as given and not try to optimize anything. + // If users wants to "compact" the arrays prior to sending them over IPC, + // they should consider the gc API suggested in #5513 + for buffer in array_data.buffers() { + offset = write_buffer( + buffer.as_slice(), + buffers, + arrow_data, + offset, + compression_codec, + )?; + } } else if matches!(data_type, DataType::LargeBinary | DataType::LargeUtf8) { let (offsets, values) = get_byte_array_buffers::(array_data); for buffer in [offsets, values] { @@ -1804,6 +1864,57 @@ mod tests { write_union_file(IpcWriteOptions::try_new(8, false, MetadataVersion::V5).unwrap()); } + #[test] + fn test_write_view_types() { + const LONG_TEST_STRING: &str = + "This is a long string to make sure binary view array handles it"; + let schema = Schema::new(vec![ + Field::new("field1", DataType::BinaryView, true), + Field::new("field2", DataType::Utf8View, true), + ]); + let values: Vec> = vec![ + Some(b"foo"), + Some(b"bar"), + Some(LONG_TEST_STRING.as_bytes()), + ]; + let binary_array = BinaryViewArray::from_iter(values); + let utf8_array = + StringViewArray::from_iter(vec![Some("foo"), Some("bar"), Some(LONG_TEST_STRING)]); + let record_batch = RecordBatch::try_new( + Arc::new(schema.clone()), + vec![Arc::new(binary_array), Arc::new(utf8_array)], + ) + .unwrap(); + + let mut file = tempfile::tempfile().unwrap(); + { + let mut writer = FileWriter::try_new(&mut file, &schema).unwrap(); + writer.write(&record_batch).unwrap(); + writer.finish().unwrap(); + } + file.rewind().unwrap(); + { + let mut reader = FileReader::try_new(&file, None).unwrap(); + let read_batch = reader.next().unwrap().unwrap(); + read_batch + .columns() + .iter() + .zip(record_batch.columns()) + .for_each(|(a, b)| { + assert_eq!(a, b); + }); + } + file.rewind().unwrap(); + { + let mut reader = FileReader::try_new(&file, Some(vec![0])).unwrap(); + let read_batch = reader.next().unwrap().unwrap(); + assert_eq!(read_batch.num_columns(), 1); + let read_array = read_batch.column(0); + let write_array = record_batch.column(0); + assert_eq!(read_array, write_array); + } + } + #[test] fn truncate_ipc_record_batch() { fn create_batch(rows: usize) -> RecordBatch {