Skip to content

Commit

Permalink
Refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
Tyler-Sch committed May 25, 2024
1 parent 19dafb2 commit b920144
Show file tree
Hide file tree
Showing 3 changed files with 78 additions and 265 deletions.
116 changes: 48 additions & 68 deletions ruhvro/src/complex.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
use anyhow::{anyhow, Result};
use apache_avro::types::Value;
use arrow::array::{
make_builder, Array, ArrayBuilder, ArrayRef, BooleanBufferBuilder, BooleanBuilder,
Date32Builder, Float32Builder, GenericListArray, Int32Builder, Int64Builder, ListArray,
MapArray, StringBuilder, StructArray, TimestampMicrosecondBuilder, TimestampMillisecondBuilder,
UnionArray,
make_builder, ArrayBuilder, ArrayRef, AsArray, BooleanBufferBuilder, BooleanBuilder,
Date32Builder, Float32Builder, GenericListArray, Int32Builder,
Int64Builder, MapArray, StringBuilder, StructArray, TimestampMicrosecondBuilder,
TimestampMillisecondBuilder, UnionArray,
};
use arrow::buffer::{Buffer, NullBuffer, OffsetBuffer, ScalarBuffer};
use arrow::datatypes::{DataType, Field, FieldRef, Fields, TimeUnit};
Expand Down Expand Up @@ -245,6 +245,12 @@ impl StructContainer {
let s_arr = Arc::new(StructArray::try_new(Fields::from(fields), a, Some(nulls))?);
Ok(s_arr)
}

pub fn try_build_struct_array(self) -> Result<StructArray> {
let built = self.build()?;
let typed = built.as_struct().to_owned();
Ok(typed)
}
}
struct UnionContainer {
type_ids: Vec<i8>,
Expand Down Expand Up @@ -375,37 +381,19 @@ impl MapContainer {
}
fn build(self) -> Result<ArrayRef> {
let l_array = self.inner_list.build()?;
let list_arr = l_array
.as_any()
.downcast_ref::<ListArray>()
.unwrap()
.to_owned();
let (a, b, c, d) = list_arr.into_parts();
let list_arr = l_array.as_list().to_owned();
let (field_ref, offsets, data, null_buffer) = list_arr.into_parts();
let m = MapArray::try_new(
a,
b,
c.as_any().downcast_ref::<StructArray>().unwrap().to_owned(),
d,
field_ref,
offsets,
data.as_struct().to_owned(),
null_buffer,
false,
)?;
Ok(Arc::new(m))
}
}

#[inline]
fn get_val_from_possible_union<'a>(value: &'a Value, field: &'a Field) -> &'a Value {
let val;
if field.is_nullable() {
if let Value::Union(_, b) = value {
val = b.as_ref();
} else {
val = value;
}
} else {
val = value;
}
val
}

pub fn add_data_to_array_builder(
data: &Value,
Expand All @@ -415,67 +403,43 @@ pub fn add_data_to_array_builder(
let data = get_val_from_possible_union(data, field);
match field.data_type() {
DataType::Boolean => {
let target = builder
.as_any_mut()
.downcast_mut::<BooleanBuilder>()
.expect("expected boolean builder");
let target = get_typed_array::<BooleanBuilder>(builder);
add_val!(data, target, field, Boolean);
}
DataType::Int32 => {
let target = builder
.as_any_mut()
.downcast_mut::<Int32Builder>()
.expect("Did not find Int builder");
let target = get_typed_array::<Int32Builder>(builder);
add_val!(data, target, field, Int);
}
DataType::Int64 => {
let target = builder
.as_any_mut()
.downcast_mut::<Int64Builder>()
.expect("Did not find Int64 builder");
let target = get_typed_array::<Int64Builder>(builder);
add_val!(data, target, field, Long);
}
DataType::Float32 => {
let target = builder
.as_any_mut()
.downcast_mut::<Float32Builder>()
.expect("Did not find Float32 builder");
let target = get_typed_array::<Float32Builder>(builder);
add_val!(data, target, field, Float);
}
DataType::Timestamp(a, _) => {
let _ = match a {
TimeUnit::Millisecond => {
let target = builder
.as_any_mut()
.downcast_mut::<TimestampMillisecondBuilder>()
.expect("Did not find TimestampMillisecond builder");
let target = get_typed_array::<TimestampMillisecondBuilder>(builder);
add_val!(data, target, field, TimestampMillis);
}
TimeUnit::Microsecond => {
let target = builder
.as_any_mut()
.downcast_mut::<TimestampMicrosecondBuilder>()
.expect("Did not find TimestampMicrosecond builder");
let target = get_typed_array::<TimestampMicrosecondBuilder>(builder);
add_val!(data, target, field, TimestampMicros);
}
_ => unimplemented!(),
};
}
DataType::Date32 => {
let target = builder
.as_any_mut()
.downcast_mut::<Date32Builder>()
.expect("Did not find Date32 builder");
let target = get_typed_array::<Date32Builder>(builder);
add_val!(data, target, field, Date);
}
DataType::Duration(_) => {
unimplemented!()
}
DataType::Utf8 => {
let ta = builder
.as_any_mut()
.downcast_mut::<StringBuilder>()
.expect("Did not find StringBuilder");
let ta = get_typed_array::<StringBuilder>(builder);
if let Value::String(s) = data {
ta.append_value(s.clone());
} else {
Expand All @@ -486,6 +450,28 @@ pub fn add_data_to_array_builder(
}
}

#[inline]
fn get_typed_array<'a, T: ArrayBuilder>(arr: &'a mut Box<dyn ArrayBuilder>) -> &mut T {
arr.as_any_mut()
.downcast_mut::<T>()
.expect("Did not find expected builder")
}

#[inline]
fn get_val_from_possible_union<'a>(value: &'a Value, field: &'a Field) -> &'a Value {
let val;
if field.is_nullable() {
if let Value::Union(_, b) = value {
val = b.as_ref();
} else {
val = value;
}
} else {
val = value;
}
val
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down Expand Up @@ -604,20 +590,15 @@ mod tests {
let result = s.build().unwrap();
let a = result
.clone()
.as_any()
.downcast_ref::<StructArray>()
.unwrap()
.as_struct()
.column(0)
.as_any()
.downcast_ref::<Int32Array>()
.unwrap()
.value(0);
assert_eq!(a, 1);
let b = result
.clone()
.as_any()
.downcast_ref::<StructArray>()
.unwrap()
.clone().as_struct()
.column(1)
.as_any()
.downcast_ref::<StringArray>()
Expand Down Expand Up @@ -745,7 +726,6 @@ mod tests {
]);
let _ = sb.add_val(&avro_outer_struct);


let finished = sb.build();
let a: RecordBatch = finished
.unwrap()
Expand Down
Loading

0 comments on commit b920144

Please sign in to comment.