Skip to content

Commit

Permalink
Avoid concat in array_append (#8137)
Browse files Browse the repository at this point in the history
* clean array_append

Signed-off-by: jayzhan211 <jayzhan211@gmail.com>

* done

Signed-off-by: jayzhan211 <jayzhan211@gmail.com>

---------

Signed-off-by: jayzhan211 <jayzhan211@gmail.com>
  • Loading branch information
jayzhan211 authored Nov 13, 2023
1 parent 3df8955 commit d21a40c
Showing 1 changed file with 85 additions and 92 deletions.
177 changes: 85 additions & 92 deletions datafusion/physical-expr/src/array_expressions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -579,57 +579,85 @@ pub fn array_pop_back(args: &[ArrayRef]) -> Result<ArrayRef> {
)
}

/// Appends or prepends elements to a ListArray.
///
/// This function takes a ListArray, an ArrayRef, a FieldRef, and a boolean flag
/// indicating whether to append or prepend the elements. It returns a `Result<ArrayRef>`
/// representing the resulting ListArray after the operation.
///
/// # Arguments
///
/// * `list_array` - A reference to the ListArray to which elements will be appended/prepended.
/// * `element_array` - A reference to the Array containing elements to be appended/prepended.
/// * `field` - A reference to the Field describing the data type of the arrays.
/// * `is_append` - A boolean flag indicating whether to append (`true`) or prepend (`false`) elements.
///
/// # Examples
///
/// general_append_and_prepend(
/// [1, 2, 3], 4, append => [1, 2, 3, 4]
/// 5, [6, 7, 8], prepend => [5, 6, 7, 8]
/// )
fn general_append_and_prepend(
list_array: &ListArray,
element_array: &ArrayRef,
data_type: &DataType,
is_append: bool,
) -> Result<ArrayRef> {
let mut offsets = vec![0];
let values = list_array.values();
let original_data = values.to_data();
let element_data = element_array.to_data();
let capacity = Capacities::Array(original_data.len() + element_data.len());

let mut mutable = MutableArrayData::with_capacities(
vec![&original_data, &element_data],
false,
capacity,
);

let values_index = 0;
let element_index = 1;

for (row_index, offset_window) in list_array.offsets().windows(2).enumerate() {
let start = offset_window[0] as usize;
let end = offset_window[1] as usize;
if is_append {
mutable.extend(values_index, start, end);
mutable.extend(element_index, row_index, row_index + 1);
} else {
mutable.extend(element_index, row_index, row_index + 1);
mutable.extend(values_index, start, end);
}
offsets.push(offsets[row_index] + (end - start + 1) as i32);
}

let data = mutable.freeze();

Ok(Arc::new(ListArray::try_new(
Arc::new(Field::new("item", data_type.to_owned(), true)),
OffsetBuffer::new(offsets.into()),
arrow_array::make_array(data),
None,
)?))
}

/// Array_append SQL function
pub fn array_append(args: &[ArrayRef]) -> Result<ArrayRef> {
let arr = as_list_array(&args[0])?;
let element = &args[1];
let list_array = as_list_array(&args[0])?;
let element_array = &args[1];

check_datatypes("array_append", &[arr.values(), element])?;
let res = match arr.value_type() {
check_datatypes("array_append", &[list_array.values(), element_array])?;
let res = match list_array.value_type() {
DataType::List(_) => concat_internal(args)?,
DataType::Null => return make_array(&[element.to_owned()]),
DataType::Null => return make_array(&[element_array.to_owned()]),
data_type => {
let mut new_values = vec![];
let mut offsets = vec![0];

let elem_data = element.to_data();
for (row_index, arr) in arr.iter().enumerate() {
let new_array = if let Some(arr) = arr {
let original_data = arr.to_data();
let capacity = Capacities::Array(original_data.len() + 1);
let mut mutable = MutableArrayData::with_capacities(
vec![&original_data, &elem_data],
false,
capacity,
);
mutable.extend(0, 0, original_data.len());
mutable.extend(1, row_index, row_index + 1);
let data = mutable.freeze();
arrow_array::make_array(data)
} else {
let capacity = Capacities::Array(1);
let mut mutable = MutableArrayData::with_capacities(
vec![&elem_data],
false,
capacity,
);
mutable.extend(0, row_index, row_index + 1);
let data = mutable.freeze();
arrow_array::make_array(data)
};
offsets.push(offsets[row_index] + new_array.len() as i32);
new_values.push(new_array);
}

let new_values: Vec<_> = new_values.iter().map(|a| a.as_ref()).collect();
let values = arrow::compute::concat(&new_values)?;

Arc::new(ListArray::try_new(
Arc::new(Field::new("item", data_type.to_owned(), true)),
OffsetBuffer::new(offsets.into()),
values,
None,
)?)
return general_append_and_prepend(
list_array,
element_array,
&data_type,
true,
);
}
};

Expand All @@ -638,55 +666,20 @@ pub fn array_append(args: &[ArrayRef]) -> Result<ArrayRef> {

/// Array_prepend SQL function
pub fn array_prepend(args: &[ArrayRef]) -> Result<ArrayRef> {
let element = &args[0];
let arr = as_list_array(&args[1])?;
let list_array = as_list_array(&args[1])?;
let element_array = &args[0];

check_datatypes("array_prepend", &[element, arr.values()])?;
let res = match arr.value_type() {
check_datatypes("array_prepend", &[element_array, list_array.values()])?;
let res = match list_array.value_type() {
DataType::List(_) => concat_internal(args)?,
DataType::Null => return make_array(&[element.to_owned()]),
DataType::Null => return make_array(&[element_array.to_owned()]),
data_type => {
let mut new_values = vec![];
let mut offsets = vec![0];

let elem_data = element.to_data();
for (row_index, arr) in arr.iter().enumerate() {
let new_array = if let Some(arr) = arr {
let original_data = arr.to_data();
let capacity = Capacities::Array(original_data.len() + 1);
let mut mutable = MutableArrayData::with_capacities(
vec![&original_data, &elem_data],
false,
capacity,
);
mutable.extend(1, row_index, row_index + 1);
mutable.extend(0, 0, original_data.len());
let data = mutable.freeze();
arrow_array::make_array(data)
} else {
let capacity = Capacities::Array(1);
let mut mutable = MutableArrayData::with_capacities(
vec![&elem_data],
false,
capacity,
);
mutable.extend(0, row_index, row_index + 1);
let data = mutable.freeze();
arrow_array::make_array(data)
};
offsets.push(offsets[row_index] + new_array.len() as i32);
new_values.push(new_array);
}

let new_values: Vec<_> = new_values.iter().map(|a| a.as_ref()).collect();
let values = arrow::compute::concat(&new_values)?;

Arc::new(ListArray::try_new(
Arc::new(Field::new("item", data_type.to_owned(), true)),
OffsetBuffer::new(offsets.into()),
values,
None,
)?)
return general_append_and_prepend(
list_array,
element_array,
&data_type,
false,
);
}
};

Expand Down

0 comments on commit d21a40c

Please sign in to comment.