diff --git a/datafusion/physical-expr/src/array_expressions.rs b/datafusion/physical-expr/src/array_expressions.rs index ded606c3b705f..7d065646b4e7e 100644 --- a/datafusion/physical-expr/src/array_expressions.rs +++ b/datafusion/physical-expr/src/array_expressions.rs @@ -389,88 +389,40 @@ fn array_array(args: &[ArrayRef], data_type: DataType) -> Result { return plan_err!("Array requires at least one argument"); } - let res = match data_type { - DataType::List(..) => { - let row_count = args[0].len(); - let column_count = args.len(); - let mut list_arrays = vec![]; - let mut list_array_lengths = vec![]; - let mut list_valid = BooleanBufferBuilder::new(row_count); - // Construct ListArray per row - for index in 0..row_count { - let mut arrays = vec![]; - let mut array_lengths = vec![]; - let mut valid = BooleanBufferBuilder::new(column_count); - for arg in args { - if arg.as_any().downcast_ref::().is_some() { - array_lengths.push(0); - valid.append(false); - } else { - let list_arr = as_list_array(arg)?; - let arr = list_arr.value(index); - array_lengths.push(arr.len()); - arrays.push(arr); - valid.append(true); - } - } - if arrays.is_empty() { - list_valid.append(false); - list_array_lengths.push(0); - } else { - let buffer = valid.finish(); - // Assume all list arrays have the same data type - let data_type = arrays[0].data_type(); - let field = Arc::new(Field::new("item", data_type.to_owned(), true)); - let elements = arrays.iter().map(|x| x.as_ref()).collect::>(); - let values = compute::concat(elements.as_slice())?; - let list_arr = ListArray::new( - field, - OffsetBuffer::from_lengths(array_lengths), - values, - Some(NullBuffer::new(buffer)), - ); - list_valid.append(true); - list_array_lengths.push(list_arr.len()); - list_arrays.push(list_arr); - } + let mut data = vec![]; + let mut all_len = 0; + for arg in args { + data.push(arg.to_data()); + all_len += arg.len(); + } + let mut offsets = Vec::with_capacity(all_len); + offsets.push(0); + + let capacity = Capacities::Array(all_len); + let data_ref = data.iter().map(|d| d).collect::>(); + let mut mutable = MutableArrayData::with_capacities(data_ref, false, capacity); + + let num_rows = args[0].len(); + let mut args_offsets = vec![0; args.len()]; + for row_idx in 0..num_rows { + for (arr_idx, arg) in args.iter().enumerate() { + if !arg.as_any().is::() + && !arg.is_null(row_idx) + && arg.is_valid(row_idx) + { + mutable.extend(arr_idx, row_idx, row_idx + 1); } - // Construct ListArray for all rows - let buffer = list_valid.finish(); - // Assume all list arrays have the same data type - let data_type = list_arrays[0].data_type(); - let field = Arc::new(Field::new("item", data_type.to_owned(), true)); - let elements = list_arrays - .iter() - .map(|x| x as &dyn Array) - .collect::>(); - let values = compute::concat(elements.as_slice())?; - let list_arr = ListArray::new( - field, - OffsetBuffer::from_lengths(list_array_lengths), - values, - Some(NullBuffer::new(buffer)), - ); - Arc::new(list_arr) - } - DataType::Utf8 => array!(args, StringArray, StringBuilder), - DataType::LargeUtf8 => array!(args, LargeStringArray, LargeStringBuilder), - DataType::Boolean => array!(args, BooleanArray, BooleanBuilder), - DataType::Float32 => array!(args, Float32Array, Float32Builder), - DataType::Float64 => array!(args, Float64Array, Float64Builder), - DataType::Int8 => array!(args, Int8Array, Int8Builder), - DataType::Int16 => array!(args, Int16Array, Int16Builder), - DataType::Int32 => array!(args, Int32Array, Int32Builder), - DataType::Int64 => array!(args, Int64Array, Int64Builder), - DataType::UInt8 => array!(args, UInt8Array, UInt8Builder), - DataType::UInt16 => array!(args, UInt16Array, UInt16Builder), - DataType::UInt32 => array!(args, UInt32Array, UInt32Builder), - DataType::UInt64 => array!(args, UInt64Array, UInt64Builder), - data_type => { - return not_impl_err!("Array is not implemented for type '{data_type:?}'.") } - }; + offsets.push(mutable.len() as i32); + } - Ok(res) + let data = mutable.freeze(); + Ok(Arc::new(ListArray::try_new( + Arc::new(Field::new("item", data_type, true)), + OffsetBuffer::new(offsets.into()), + arrow_array::make_array(data), + None, + )?)) } /// `make_array` SQL function