diff --git a/datafusion/physical-expr/src/array_expressions.rs b/datafusion/physical-expr/src/array_expressions.rs index 54452e3653a8..73ef0ea6da9f 100644 --- a/datafusion/physical-expr/src/array_expressions.rs +++ b/datafusion/physical-expr/src/array_expressions.rs @@ -579,57 +579,85 @@ pub fn array_pop_back(args: &[ArrayRef]) -> Result { ) } +/// Appends or prepends elements to a ListArray. +/// +/// This function takes a ListArray, an ArrayRef, a FieldRef, and a boolean flag +/// indicating whether to append or prepend the elements. It returns a `Result` +/// representing the resulting ListArray after the operation. +/// +/// # Arguments +/// +/// * `list_array` - A reference to the ListArray to which elements will be appended/prepended. +/// * `element_array` - A reference to the Array containing elements to be appended/prepended. +/// * `field` - A reference to the Field describing the data type of the arrays. +/// * `is_append` - A boolean flag indicating whether to append (`true`) or prepend (`false`) elements. +/// +/// # Examples +/// +/// general_append_and_prepend( +/// [1, 2, 3], 4, append => [1, 2, 3, 4] +/// 5, [6, 7, 8], prepend => [5, 6, 7, 8] +/// ) +fn general_append_and_prepend( + list_array: &ListArray, + element_array: &ArrayRef, + data_type: &DataType, + is_append: bool, +) -> Result { + let mut offsets = vec![0]; + let values = list_array.values(); + let original_data = values.to_data(); + let element_data = element_array.to_data(); + let capacity = Capacities::Array(original_data.len() + element_data.len()); + + let mut mutable = MutableArrayData::with_capacities( + vec![&original_data, &element_data], + false, + capacity, + ); + + let values_index = 0; + let element_index = 1; + + for (row_index, offset_window) in list_array.offsets().windows(2).enumerate() { + let start = offset_window[0] as usize; + let end = offset_window[1] as usize; + if is_append { + mutable.extend(values_index, start, end); + mutable.extend(element_index, row_index, row_index + 1); + } else { + mutable.extend(element_index, row_index, row_index + 1); + mutable.extend(values_index, start, end); + } + offsets.push(offsets[row_index] + (end - start + 1) as i32); + } + + let data = mutable.freeze(); + + Ok(Arc::new(ListArray::try_new( + Arc::new(Field::new("item", data_type.to_owned(), true)), + OffsetBuffer::new(offsets.into()), + arrow_array::make_array(data), + None, + )?)) +} + /// Array_append SQL function pub fn array_append(args: &[ArrayRef]) -> Result { - let arr = as_list_array(&args[0])?; - let element = &args[1]; + let list_array = as_list_array(&args[0])?; + let element_array = &args[1]; - check_datatypes("array_append", &[arr.values(), element])?; - let res = match arr.value_type() { + check_datatypes("array_append", &[list_array.values(), element_array])?; + let res = match list_array.value_type() { DataType::List(_) => concat_internal(args)?, - DataType::Null => return make_array(&[element.to_owned()]), + DataType::Null => return make_array(&[element_array.to_owned()]), data_type => { - let mut new_values = vec![]; - let mut offsets = vec![0]; - - let elem_data = element.to_data(); - for (row_index, arr) in arr.iter().enumerate() { - let new_array = if let Some(arr) = arr { - let original_data = arr.to_data(); - let capacity = Capacities::Array(original_data.len() + 1); - let mut mutable = MutableArrayData::with_capacities( - vec![&original_data, &elem_data], - false, - capacity, - ); - mutable.extend(0, 0, original_data.len()); - mutable.extend(1, row_index, row_index + 1); - let data = mutable.freeze(); - arrow_array::make_array(data) - } else { - let capacity = Capacities::Array(1); - let mut mutable = MutableArrayData::with_capacities( - vec![&elem_data], - false, - capacity, - ); - mutable.extend(0, row_index, row_index + 1); - let data = mutable.freeze(); - arrow_array::make_array(data) - }; - offsets.push(offsets[row_index] + new_array.len() as i32); - new_values.push(new_array); - } - - let new_values: Vec<_> = new_values.iter().map(|a| a.as_ref()).collect(); - let values = arrow::compute::concat(&new_values)?; - - Arc::new(ListArray::try_new( - Arc::new(Field::new("item", data_type.to_owned(), true)), - OffsetBuffer::new(offsets.into()), - values, - None, - )?) + return general_append_and_prepend( + list_array, + element_array, + &data_type, + true, + ); } }; @@ -638,55 +666,20 @@ pub fn array_append(args: &[ArrayRef]) -> Result { /// Array_prepend SQL function pub fn array_prepend(args: &[ArrayRef]) -> Result { - let element = &args[0]; - let arr = as_list_array(&args[1])?; + let list_array = as_list_array(&args[1])?; + let element_array = &args[0]; - check_datatypes("array_prepend", &[element, arr.values()])?; - let res = match arr.value_type() { + check_datatypes("array_prepend", &[element_array, list_array.values()])?; + let res = match list_array.value_type() { DataType::List(_) => concat_internal(args)?, - DataType::Null => return make_array(&[element.to_owned()]), + DataType::Null => return make_array(&[element_array.to_owned()]), data_type => { - let mut new_values = vec![]; - let mut offsets = vec![0]; - - let elem_data = element.to_data(); - for (row_index, arr) in arr.iter().enumerate() { - let new_array = if let Some(arr) = arr { - let original_data = arr.to_data(); - let capacity = Capacities::Array(original_data.len() + 1); - let mut mutable = MutableArrayData::with_capacities( - vec![&original_data, &elem_data], - false, - capacity, - ); - mutable.extend(1, row_index, row_index + 1); - mutable.extend(0, 0, original_data.len()); - let data = mutable.freeze(); - arrow_array::make_array(data) - } else { - let capacity = Capacities::Array(1); - let mut mutable = MutableArrayData::with_capacities( - vec![&elem_data], - false, - capacity, - ); - mutable.extend(0, row_index, row_index + 1); - let data = mutable.freeze(); - arrow_array::make_array(data) - }; - offsets.push(offsets[row_index] + new_array.len() as i32); - new_values.push(new_array); - } - - let new_values: Vec<_> = new_values.iter().map(|a| a.as_ref()).collect(); - let values = arrow::compute::concat(&new_values)?; - - Arc::new(ListArray::try_new( - Arc::new(Field::new("item", data_type.to_owned(), true)), - OffsetBuffer::new(offsets.into()), - values, - None, - )?) + return general_append_and_prepend( + list_array, + element_array, + &data_type, + false, + ); } };