Skip to content

Commit

Permalink
replace macro in array_array
Browse files Browse the repository at this point in the history
Signed-off-by: veeupup <code@tanweime.com>
  • Loading branch information
Veeupup committed Nov 17, 2023
1 parent 9fd0f4e commit de7d740
Showing 1 changed file with 31 additions and 79 deletions.
110 changes: 31 additions & 79 deletions datafusion/physical-expr/src/array_expressions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -389,88 +389,40 @@ fn array_array(args: &[ArrayRef], data_type: DataType) -> Result<ArrayRef> {
return plan_err!("Array requires at least one argument");
}

let res = match data_type {
DataType::List(..) => {
let row_count = args[0].len();
let column_count = args.len();
let mut list_arrays = vec![];
let mut list_array_lengths = vec![];
let mut list_valid = BooleanBufferBuilder::new(row_count);
// Construct ListArray per row
for index in 0..row_count {
let mut arrays = vec![];
let mut array_lengths = vec![];
let mut valid = BooleanBufferBuilder::new(column_count);
for arg in args {
if arg.as_any().downcast_ref::<NullArray>().is_some() {
array_lengths.push(0);
valid.append(false);
} else {
let list_arr = as_list_array(arg)?;
let arr = list_arr.value(index);
array_lengths.push(arr.len());
arrays.push(arr);
valid.append(true);
}
}
if arrays.is_empty() {
list_valid.append(false);
list_array_lengths.push(0);
} else {
let buffer = valid.finish();
// Assume all list arrays have the same data type
let data_type = arrays[0].data_type();
let field = Arc::new(Field::new("item", data_type.to_owned(), true));
let elements = arrays.iter().map(|x| x.as_ref()).collect::<Vec<_>>();
let values = compute::concat(elements.as_slice())?;
let list_arr = ListArray::new(
field,
OffsetBuffer::from_lengths(array_lengths),
values,
Some(NullBuffer::new(buffer)),
);
list_valid.append(true);
list_array_lengths.push(list_arr.len());
list_arrays.push(list_arr);
}
let mut data = vec![];
let mut all_len = 0;
for arg in args {
data.push(arg.to_data());
all_len += arg.len();
}
let mut offsets = Vec::with_capacity(all_len);
offsets.push(0);

let capacity = Capacities::Array(all_len);
let data_ref = data.iter().map(|d| d).collect::<Vec<_>>();
let mut mutable = MutableArrayData::with_capacities(data_ref, false, capacity);

let num_rows = args[0].len();
let mut args_offsets = vec![0; args.len()];
for row_idx in 0..num_rows {
for (arr_idx, arg) in args.iter().enumerate() {
if !arg.as_any().is::<NullArray>()
&& !arg.is_null(row_idx)
&& arg.is_valid(row_idx)
{
mutable.extend(arr_idx, row_idx, row_idx + 1);
}
// Construct ListArray for all rows
let buffer = list_valid.finish();
// Assume all list arrays have the same data type
let data_type = list_arrays[0].data_type();
let field = Arc::new(Field::new("item", data_type.to_owned(), true));
let elements = list_arrays
.iter()
.map(|x| x as &dyn Array)
.collect::<Vec<_>>();
let values = compute::concat(elements.as_slice())?;
let list_arr = ListArray::new(
field,
OffsetBuffer::from_lengths(list_array_lengths),
values,
Some(NullBuffer::new(buffer)),
);
Arc::new(list_arr)
}
DataType::Utf8 => array!(args, StringArray, StringBuilder),
DataType::LargeUtf8 => array!(args, LargeStringArray, LargeStringBuilder),
DataType::Boolean => array!(args, BooleanArray, BooleanBuilder),
DataType::Float32 => array!(args, Float32Array, Float32Builder),
DataType::Float64 => array!(args, Float64Array, Float64Builder),
DataType::Int8 => array!(args, Int8Array, Int8Builder),
DataType::Int16 => array!(args, Int16Array, Int16Builder),
DataType::Int32 => array!(args, Int32Array, Int32Builder),
DataType::Int64 => array!(args, Int64Array, Int64Builder),
DataType::UInt8 => array!(args, UInt8Array, UInt8Builder),
DataType::UInt16 => array!(args, UInt16Array, UInt16Builder),
DataType::UInt32 => array!(args, UInt32Array, UInt32Builder),
DataType::UInt64 => array!(args, UInt64Array, UInt64Builder),
data_type => {
return not_impl_err!("Array is not implemented for type '{data_type:?}'.")
}
};
offsets.push(mutable.len() as i32);
}

Ok(res)
let data = mutable.freeze();
Ok(Arc::new(ListArray::try_new(
Arc::new(Field::new("item", data_type, true)),
OffsetBuffer::new(offsets.into()),
arrow_array::make_array(data),
None,
)?))
}

/// `make_array` SQL function
Expand Down

0 comments on commit de7d740

Please sign in to comment.