Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove define_array_slice and reuse array_slice for array_pop_front/back #8401

Merged
merged 8 commits into from
Dec 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
335 changes: 177 additions & 158 deletions datafusion/physical-expr/src/array_expressions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
//! Array expressions

use std::any::type_name;
use std::cmp::Ordering;
use std::collections::HashSet;
use std::sync::Arc;

Expand Down Expand Up @@ -370,135 +369,64 @@ pub fn make_array(arrays: &[ArrayRef]) -> Result<ArrayRef> {
}
}

fn return_empty(return_null: bool, data_type: DataType) -> Arc<dyn Array> {
if return_null {
new_null_array(&data_type, 1)
} else {
new_empty_array(&data_type)
}
}

fn list_slice<T: Array + 'static>(
array: &dyn Array,
i: i64,
j: i64,
return_element: bool,
) -> ArrayRef {
let array = array.as_any().downcast_ref::<T>().unwrap();

let array_type = array.data_type().clone();
/// array_element SQL function
///
/// There are two arguments for array_element, the first one is the array, the second one is the 1-indexed index.
/// `array_element(array, index)`
///
/// For example:
/// > array_element(\[1, 2, 3], 2) -> 2
pub fn array_element(args: &[ArrayRef]) -> Result<ArrayRef> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this need to be pub? If it does, could you please add a documentation comment explaining what the arguments are (namely that the first is a list and the second is an array of indexes?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, there is array_element sql function

let list_array = as_list_array(&args[0])?;
let indexes = as_int64_array(&args[1])?;

if i == 0 && j == 0 || array.is_empty() {
return return_empty(return_element, array_type);
}
let values = list_array.values();
let original_data = values.to_data();
let capacity = Capacities::Array(original_data.len());

let i = match i.cmp(&0) {
Ordering::Less => {
if i.unsigned_abs() > array.len() as u64 {
return return_empty(true, array_type);
}
// use_nulls: true, we don't construct List for array_element, so we need explicit nulls.
let mut mutable =
MutableArrayData::with_capacities(vec![&original_data], true, capacity);

(array.len() as i64 + i + 1) as usize
}
Ordering::Equal => 1,
Ordering::Greater => i as usize,
};
fn adjusted_array_index(index: i64, len: usize) -> Option<i64> {
// 0 ~ len - 1
let adjusted_zero_index = if index < 0 {
index + len as i64
} else {
index - 1
};

let j = match j.cmp(&0) {
Ordering::Less => {
if j.unsigned_abs() as usize > array.len() {
return return_empty(true, array_type);
}
if return_element {
(array.len() as i64 + j + 1) as usize
} else {
(array.len() as i64 + j) as usize
}
if 0 <= adjusted_zero_index && adjusted_zero_index < len as i64 {
Some(adjusted_zero_index)
} else {
// Out of bounds
None
}
Ordering::Equal => 1,
Ordering::Greater => j.min(array.len() as i64) as usize,
};

if i > j || i > array.len() {
return_empty(return_element, array_type)
} else {
Arc::new(array.slice(i - 1, j + 1 - i))
}
}

fn slice<T: Array + 'static>(
array: &ListArray,
key: &Int64Array,
extra_key: &Int64Array,
return_element: bool,
) -> Result<Arc<dyn Array>> {
let sliced_array: Vec<Arc<dyn Array>> = array
.iter()
.zip(key.iter())
.zip(extra_key.iter())
.map(|((arr, i), j)| match (arr, i, j) {
(Some(arr), Some(i), Some(j)) => list_slice::<T>(&arr, i, j, return_element),
(Some(arr), None, Some(j)) => list_slice::<T>(&arr, 1i64, j, return_element),
(Some(arr), Some(i), None) => {
list_slice::<T>(&arr, i, arr.len() as i64, return_element)
}
(Some(arr), None, None) if !return_element => arr.clone(),
_ => return_empty(return_element, array.value_type()),
})
.collect();
for (row_index, offset_window) in list_array.offsets().windows(2).enumerate() {
let start = offset_window[0] as usize;
let end = offset_window[1] as usize;
let len = end - start;

// concat requires input of at least one array
if sliced_array.is_empty() {
Ok(return_empty(return_element, array.value_type()))
} else {
let vec = sliced_array
.iter()
.map(|a| a.as_ref())
.collect::<Vec<&dyn Array>>();
let mut i: i32 = 0;
let mut offsets = vec![i];
offsets.extend(
vec.iter()
.map(|a| {
i += a.len() as i32;
i
})
.collect::<Vec<_>>(),
);
let values = compute::concat(vec.as_slice()).unwrap();
// array is null
if len == 0 {
mutable.extend_nulls(1);
continue;
}

let index = adjusted_array_index(indexes.value(row_index), len);

if return_element {
Ok(values)
if let Some(index) = index {
mutable.extend(0, start + index as usize, start + index as usize + 1);
} else {
let field = Arc::new(Field::new("item", array.value_type(), true));
Ok(Arc::new(ListArray::try_new(
field,
OffsetBuffer::new(offsets.into()),
values,
None,
)?))
// Index out of bounds
mutable.extend_nulls(1);
}
}
}

fn define_array_slice(
list_array: &ListArray,
key: &Int64Array,
extra_key: &Int64Array,
return_element: bool,
) -> Result<ArrayRef> {
macro_rules! array_function {
($ARRAY_TYPE:ident) => {
slice::<$ARRAY_TYPE>(list_array, key, extra_key, return_element)
};
}
call_array_function!(list_array.value_type(), true)
}

pub fn array_element(args: &[ArrayRef]) -> Result<ArrayRef> {
let list_array = as_list_array(&args[0])?;
let key = as_int64_array(&args[1])?;
define_array_slice(list_array, key, key, true)
let data = mutable.freeze();
Ok(arrow_array::make_array(data))
}

fn general_except<OffsetSize: OffsetSizeTrait>(
Expand Down Expand Up @@ -579,47 +507,136 @@ pub fn array_except(args: &[ArrayRef]) -> Result<ArrayRef> {
}
}

/// array_slice SQL function
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

❤️

///
/// We follow the behavior of array_slice in DuckDB
/// Note that array_slice is 1-indexed. And there are two additional arguments `from` and `to` in array_slice.
///
/// > array_slice(array, from, to)
///
/// Positive index is treated as the index from the start of the array. If the
/// `from` index is smaller than 1, it is treated as 1. If the `to` index is larger than the
/// length of the array, it is treated as the length of the array.
///
/// Negative index is treated as the index from the end of the array. If the index
/// is larger than the length of the array, it is NOT VALID, either in `from` or `to`.
/// The `to` index is exclusive like python slice syntax.
///
/// See test cases in `array.slt` for more details.
pub fn array_slice(args: &[ArrayRef]) -> Result<ArrayRef> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also here I think it would help to add a comment explaining what the arguments are to this function and what the expected output is

Key details might be if the index values are 1 based or 0 based, for example

let list_array = as_list_array(&args[0])?;
let key = as_int64_array(&args[1])?;
let extra_key = as_int64_array(&args[2])?;
define_array_slice(list_array, key, extra_key, false)
}
let from_array = as_int64_array(&args[1])?;
let to_array = as_int64_array(&args[2])?;

fn general_array_pop(
list_array: &GenericListArray<i32>,
from_back: bool,
) -> Result<(Vec<i64>, Vec<i64>)> {
if from_back {
let key = vec![0; list_array.len()];
// Attention: `arr.len() - 1` in extra key defines the last element position (position = index + 1, not inclusive) we want in the new array.
let extra_key: Vec<_> = list_array
.iter()
.map(|x| x.map_or(0, |arr| arr.len() as i64 - 1))
.collect();
Ok((key, extra_key))
} else {
// Attention: 2 in the `key`` defines the first element position (position = index + 1) we want in the new array.
// We only handle two cases of the first element index: if the old array has any elements, starts from 2 (index + 1), or starts from initial.
let key: Vec<_> = list_array.iter().map(|x| x.map_or(0, |_| 2)).collect();
let extra_key: Vec<_> = list_array
.iter()
.map(|x| x.map_or(0, |arr| arr.len() as i64))
.collect();
Ok((key, extra_key))
let values = list_array.values();
let original_data = values.to_data();
let capacity = Capacities::Array(original_data.len());

// use_nulls: false, we don't need nulls but empty array for array_slice, so we don't need explicit nulls but adjust offset to indicate nulls.
let mut mutable =
MutableArrayData::with_capacities(vec![&original_data], false, capacity);

// We have the slice syntax compatible with DuckDB v0.8.1.
// The rule `adjusted_from_index` and `adjusted_to_index` follows the rule of array_slice in duckdb.

fn adjusted_from_index(index: i64, len: usize) -> Option<i64> {
// 0 ~ len - 1
let adjusted_zero_index = if index < 0 {
index + len as i64
} else {
// array_slice(arr, 1, to) is the same as array_slice(arr, 0, to)
std::cmp::max(index - 1, 0)
};

if 0 <= adjusted_zero_index && adjusted_zero_index < len as i64 {
Some(adjusted_zero_index)
} else {
// Out of bounds
None
}
}

fn adjusted_to_index(index: i64, len: usize) -> Option<i64> {
// 0 ~ len - 1
let adjusted_zero_index = if index < 0 {
// array_slice in duckdb with negative to_index is python-like, so index itself is exclusive
index + len as i64 - 1
} else {
// array_slice(arr, from, len + 1) is the same as array_slice(arr, from, len)
std::cmp::min(index - 1, len as i64 - 1)
};

if 0 <= adjusted_zero_index && adjusted_zero_index < len as i64 {
Some(adjusted_zero_index)
} else {
// Out of bounds
None
}
}

let mut offsets = vec![0];

for (row_index, offset_window) in list_array.offsets().windows(2).enumerate() {
let start = offset_window[0] as usize;
let end = offset_window[1] as usize;
let len = end - start;

// len 0 indicate array is null, return empty array in this row.
if len == 0 {
offsets.push(offsets[row_index]);
continue;
}

// If index is null, we consider it as the minimum / maximum index of the array.
let from_index = if from_array.is_null(row_index) {
Some(0)
} else {
adjusted_from_index(from_array.value(row_index), len)
};

let to_index = if to_array.is_null(row_index) {
Some(len as i64 - 1)
} else {
adjusted_to_index(to_array.value(row_index), len)
};

if let (Some(from), Some(to)) = (from_index, to_index) {
if from <= to {
assert!(start + to as usize <= end);
mutable.extend(0, start + from as usize, start + to as usize + 1);
offsets.push(offsets[row_index] + (to - from + 1) as i32);
} else {
// invalid range, return empty array
offsets.push(offsets[row_index]);
}
} else {
// invalid range, return empty array
offsets.push(offsets[row_index]);
}
}

let data = mutable.freeze();

Ok(Arc::new(ListArray::try_new(
Arc::new(Field::new("item", list_array.value_type(), true)),
OffsetBuffer::new(offsets.into()),
arrow_array::make_array(data),
None,
)?))
}

/// array_pop_back SQL function
pub fn array_pop_back(args: &[ArrayRef]) -> Result<ArrayRef> {
let list_array = as_list_array(&args[0])?;
let (key, extra_key) = general_array_pop(list_array, true)?;

define_array_slice(
list_array,
&Int64Array::from(key),
&Int64Array::from(extra_key),
false,
)
let from_array = Int64Array::from(vec![1; list_array.len()]);
let to_array = Int64Array::from(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is very clever 👍

list_array
.iter()
.map(|arr| arr.map_or(0, |arr| arr.len() as i64 - 1))
.collect::<Vec<i64>>(),
);
let args = vec![args[0].clone(), Arc::new(from_array), Arc::new(to_array)];
array_slice(args.as_slice())
}

/// Appends or prepends elements to a ListArray.
Expand Down Expand Up @@ -743,16 +760,18 @@ pub fn gen_range(args: &[ArrayRef]) -> Result<ArrayRef> {
Ok(arr)
}

/// array_pop_front SQL function
pub fn array_pop_front(args: &[ArrayRef]) -> Result<ArrayRef> {
let list_array = as_list_array(&args[0])?;
let (key, extra_key) = general_array_pop(list_array, false)?;

define_array_slice(
list_array,
&Int64Array::from(key),
&Int64Array::from(extra_key),
false,
)
let from_array = Int64Array::from(vec![2; list_array.len()]);
let to_array = Int64Array::from(
list_array
.iter()
.map(|arr| arr.map_or(0, |arr| arr.len() as i64))
.collect::<Vec<i64>>(),
);
let args = vec![args[0].clone(), Arc::new(from_array), Arc::new(to_array)];
array_slice(args.as_slice())
}

/// Array_append SQL function
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -453,7 +453,7 @@ mod tests {
.evaluate(&batch)?
.into_array(batch.num_rows())
.expect("Failed to convert to array");
assert!(result.is_null(0));
assert!(result.is_empty());
Ok(())
}

Expand Down