diff --git a/datafusion/physical-expr/src/array_expressions.rs b/datafusion/physical-expr/src/array_expressions.rs index 269bbf7dcf10..ea10ebcfc2a7 100644 --- a/datafusion/physical-expr/src/array_expressions.rs +++ b/datafusion/physical-expr/src/array_expressions.rs @@ -18,7 +18,6 @@ //! Array expressions use std::any::type_name; -use std::cmp::Ordering; use std::collections::HashSet; use std::sync::Arc; @@ -370,135 +369,64 @@ pub fn make_array(arrays: &[ArrayRef]) -> Result { } } -fn return_empty(return_null: bool, data_type: DataType) -> Arc { - if return_null { - new_null_array(&data_type, 1) - } else { - new_empty_array(&data_type) - } -} - -fn list_slice( - array: &dyn Array, - i: i64, - j: i64, - return_element: bool, -) -> ArrayRef { - let array = array.as_any().downcast_ref::().unwrap(); - - let array_type = array.data_type().clone(); +/// array_element SQL function +/// +/// There are two arguments for array_element, the first one is the array, the second one is the 1-indexed index. +/// `array_element(array, index)` +/// +/// For example: +/// > array_element(\[1, 2, 3], 2) -> 2 +pub fn array_element(args: &[ArrayRef]) -> Result { + let list_array = as_list_array(&args[0])?; + let indexes = as_int64_array(&args[1])?; - if i == 0 && j == 0 || array.is_empty() { - return return_empty(return_element, array_type); - } + let values = list_array.values(); + let original_data = values.to_data(); + let capacity = Capacities::Array(original_data.len()); - let i = match i.cmp(&0) { - Ordering::Less => { - if i.unsigned_abs() > array.len() as u64 { - return return_empty(true, array_type); - } + // use_nulls: true, we don't construct List for array_element, so we need explicit nulls. + let mut mutable = + MutableArrayData::with_capacities(vec![&original_data], true, capacity); - (array.len() as i64 + i + 1) as usize - } - Ordering::Equal => 1, - Ordering::Greater => i as usize, - }; + fn adjusted_array_index(index: i64, len: usize) -> Option { + // 0 ~ len - 1 + let adjusted_zero_index = if index < 0 { + index + len as i64 + } else { + index - 1 + }; - let j = match j.cmp(&0) { - Ordering::Less => { - if j.unsigned_abs() as usize > array.len() { - return return_empty(true, array_type); - } - if return_element { - (array.len() as i64 + j + 1) as usize - } else { - (array.len() as i64 + j) as usize - } + if 0 <= adjusted_zero_index && adjusted_zero_index < len as i64 { + Some(adjusted_zero_index) + } else { + // Out of bounds + None } - Ordering::Equal => 1, - Ordering::Greater => j.min(array.len() as i64) as usize, - }; - - if i > j || i > array.len() { - return_empty(return_element, array_type) - } else { - Arc::new(array.slice(i - 1, j + 1 - i)) } -} -fn slice( - array: &ListArray, - key: &Int64Array, - extra_key: &Int64Array, - return_element: bool, -) -> Result> { - let sliced_array: Vec> = array - .iter() - .zip(key.iter()) - .zip(extra_key.iter()) - .map(|((arr, i), j)| match (arr, i, j) { - (Some(arr), Some(i), Some(j)) => list_slice::(&arr, i, j, return_element), - (Some(arr), None, Some(j)) => list_slice::(&arr, 1i64, j, return_element), - (Some(arr), Some(i), None) => { - list_slice::(&arr, i, arr.len() as i64, return_element) - } - (Some(arr), None, None) if !return_element => arr.clone(), - _ => return_empty(return_element, array.value_type()), - }) - .collect(); + for (row_index, offset_window) in list_array.offsets().windows(2).enumerate() { + let start = offset_window[0] as usize; + let end = offset_window[1] as usize; + let len = end - start; - // concat requires input of at least one array - if sliced_array.is_empty() { - Ok(return_empty(return_element, array.value_type())) - } else { - let vec = sliced_array - .iter() - .map(|a| a.as_ref()) - .collect::>(); - let mut i: i32 = 0; - let mut offsets = vec![i]; - offsets.extend( - vec.iter() - .map(|a| { - i += a.len() as i32; - i - }) - .collect::>(), - ); - let values = compute::concat(vec.as_slice()).unwrap(); + // array is null + if len == 0 { + mutable.extend_nulls(1); + continue; + } + + let index = adjusted_array_index(indexes.value(row_index), len); - if return_element { - Ok(values) + if let Some(index) = index { + mutable.extend(0, start + index as usize, start + index as usize + 1); } else { - let field = Arc::new(Field::new("item", array.value_type(), true)); - Ok(Arc::new(ListArray::try_new( - field, - OffsetBuffer::new(offsets.into()), - values, - None, - )?)) + // Index out of bounds + mutable.extend_nulls(1); } } -} - -fn define_array_slice( - list_array: &ListArray, - key: &Int64Array, - extra_key: &Int64Array, - return_element: bool, -) -> Result { - macro_rules! array_function { - ($ARRAY_TYPE:ident) => { - slice::<$ARRAY_TYPE>(list_array, key, extra_key, return_element) - }; - } - call_array_function!(list_array.value_type(), true) -} -pub fn array_element(args: &[ArrayRef]) -> Result { - let list_array = as_list_array(&args[0])?; - let key = as_int64_array(&args[1])?; - define_array_slice(list_array, key, key, true) + let data = mutable.freeze(); + Ok(arrow_array::make_array(data)) } fn general_except( @@ -579,47 +507,136 @@ pub fn array_except(args: &[ArrayRef]) -> Result { } } +/// array_slice SQL function +/// +/// We follow the behavior of array_slice in DuckDB +/// Note that array_slice is 1-indexed. And there are two additional arguments `from` and `to` in array_slice. +/// +/// > array_slice(array, from, to) +/// +/// Positive index is treated as the index from the start of the array. If the +/// `from` index is smaller than 1, it is treated as 1. If the `to` index is larger than the +/// length of the array, it is treated as the length of the array. +/// +/// Negative index is treated as the index from the end of the array. If the index +/// is larger than the length of the array, it is NOT VALID, either in `from` or `to`. +/// The `to` index is exclusive like python slice syntax. +/// +/// See test cases in `array.slt` for more details. pub fn array_slice(args: &[ArrayRef]) -> Result { let list_array = as_list_array(&args[0])?; - let key = as_int64_array(&args[1])?; - let extra_key = as_int64_array(&args[2])?; - define_array_slice(list_array, key, extra_key, false) -} + let from_array = as_int64_array(&args[1])?; + let to_array = as_int64_array(&args[2])?; -fn general_array_pop( - list_array: &GenericListArray, - from_back: bool, -) -> Result<(Vec, Vec)> { - if from_back { - let key = vec![0; list_array.len()]; - // Attention: `arr.len() - 1` in extra key defines the last element position (position = index + 1, not inclusive) we want in the new array. - let extra_key: Vec<_> = list_array - .iter() - .map(|x| x.map_or(0, |arr| arr.len() as i64 - 1)) - .collect(); - Ok((key, extra_key)) - } else { - // Attention: 2 in the `key`` defines the first element position (position = index + 1) we want in the new array. - // We only handle two cases of the first element index: if the old array has any elements, starts from 2 (index + 1), or starts from initial. - let key: Vec<_> = list_array.iter().map(|x| x.map_or(0, |_| 2)).collect(); - let extra_key: Vec<_> = list_array - .iter() - .map(|x| x.map_or(0, |arr| arr.len() as i64)) - .collect(); - Ok((key, extra_key)) + let values = list_array.values(); + let original_data = values.to_data(); + let capacity = Capacities::Array(original_data.len()); + + // use_nulls: false, we don't need nulls but empty array for array_slice, so we don't need explicit nulls but adjust offset to indicate nulls. + let mut mutable = + MutableArrayData::with_capacities(vec![&original_data], false, capacity); + + // We have the slice syntax compatible with DuckDB v0.8.1. + // The rule `adjusted_from_index` and `adjusted_to_index` follows the rule of array_slice in duckdb. + + fn adjusted_from_index(index: i64, len: usize) -> Option { + // 0 ~ len - 1 + let adjusted_zero_index = if index < 0 { + index + len as i64 + } else { + // array_slice(arr, 1, to) is the same as array_slice(arr, 0, to) + std::cmp::max(index - 1, 0) + }; + + if 0 <= adjusted_zero_index && adjusted_zero_index < len as i64 { + Some(adjusted_zero_index) + } else { + // Out of bounds + None + } + } + + fn adjusted_to_index(index: i64, len: usize) -> Option { + // 0 ~ len - 1 + let adjusted_zero_index = if index < 0 { + // array_slice in duckdb with negative to_index is python-like, so index itself is exclusive + index + len as i64 - 1 + } else { + // array_slice(arr, from, len + 1) is the same as array_slice(arr, from, len) + std::cmp::min(index - 1, len as i64 - 1) + }; + + if 0 <= adjusted_zero_index && adjusted_zero_index < len as i64 { + Some(adjusted_zero_index) + } else { + // Out of bounds + None + } + } + + let mut offsets = vec![0]; + + for (row_index, offset_window) in list_array.offsets().windows(2).enumerate() { + let start = offset_window[0] as usize; + let end = offset_window[1] as usize; + let len = end - start; + + // len 0 indicate array is null, return empty array in this row. + if len == 0 { + offsets.push(offsets[row_index]); + continue; + } + + // If index is null, we consider it as the minimum / maximum index of the array. + let from_index = if from_array.is_null(row_index) { + Some(0) + } else { + adjusted_from_index(from_array.value(row_index), len) + }; + + let to_index = if to_array.is_null(row_index) { + Some(len as i64 - 1) + } else { + adjusted_to_index(to_array.value(row_index), len) + }; + + if let (Some(from), Some(to)) = (from_index, to_index) { + if from <= to { + assert!(start + to as usize <= end); + mutable.extend(0, start + from as usize, start + to as usize + 1); + offsets.push(offsets[row_index] + (to - from + 1) as i32); + } else { + // invalid range, return empty array + offsets.push(offsets[row_index]); + } + } else { + // invalid range, return empty array + offsets.push(offsets[row_index]); + } } + + let data = mutable.freeze(); + + Ok(Arc::new(ListArray::try_new( + Arc::new(Field::new("item", list_array.value_type(), true)), + OffsetBuffer::new(offsets.into()), + arrow_array::make_array(data), + None, + )?)) } +/// array_pop_back SQL function pub fn array_pop_back(args: &[ArrayRef]) -> Result { let list_array = as_list_array(&args[0])?; - let (key, extra_key) = general_array_pop(list_array, true)?; - - define_array_slice( - list_array, - &Int64Array::from(key), - &Int64Array::from(extra_key), - false, - ) + let from_array = Int64Array::from(vec![1; list_array.len()]); + let to_array = Int64Array::from( + list_array + .iter() + .map(|arr| arr.map_or(0, |arr| arr.len() as i64 - 1)) + .collect::>(), + ); + let args = vec![args[0].clone(), Arc::new(from_array), Arc::new(to_array)]; + array_slice(args.as_slice()) } /// Appends or prepends elements to a ListArray. @@ -743,16 +760,18 @@ pub fn gen_range(args: &[ArrayRef]) -> Result { Ok(arr) } +/// array_pop_front SQL function pub fn array_pop_front(args: &[ArrayRef]) -> Result { let list_array = as_list_array(&args[0])?; - let (key, extra_key) = general_array_pop(list_array, false)?; - - define_array_slice( - list_array, - &Int64Array::from(key), - &Int64Array::from(extra_key), - false, - ) + let from_array = Int64Array::from(vec![2; list_array.len()]); + let to_array = Int64Array::from( + list_array + .iter() + .map(|arr| arr.map_or(0, |arr| arr.len() as i64)) + .collect::>(), + ); + let args = vec![args[0].clone(), Arc::new(from_array), Arc::new(to_array)]; + array_slice(args.as_slice()) } /// Array_append SQL function diff --git a/datafusion/physical-expr/src/expressions/get_indexed_field.rs b/datafusion/physical-expr/src/expressions/get_indexed_field.rs index 9c2a64723dc6..43fd5a812a16 100644 --- a/datafusion/physical-expr/src/expressions/get_indexed_field.rs +++ b/datafusion/physical-expr/src/expressions/get_indexed_field.rs @@ -453,7 +453,7 @@ mod tests { .evaluate(&batch)? .into_array(batch.num_rows()) .expect("Failed to convert to array"); - assert!(result.is_null(0)); + assert!(result.is_empty()); Ok(()) }