Skip to content

Commit

Permalink
AggExec: implement columnar accumulator states.
Browse files Browse the repository at this point in the history
  • Loading branch information
zhangli20 committed Nov 11, 2024
1 parent 5adf628 commit 79cf6f9
Show file tree
Hide file tree
Showing 34 changed files with 3,402 additions and 4,705 deletions.
78 changes: 40 additions & 38 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 1 addition & 2 deletions native-engine/blaze-serde/src/from_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ use datafusion_ext_exprs::{
string_ends_with::StringEndsWithExpr, string_starts_with::StringStartsWithExpr,
};
use datafusion_ext_plans::{
agg::{create_agg, AggExecMode, AggExpr, AggFunction, AggMode, GroupingExpr},
agg::{agg::create_agg, AggExecMode, AggExpr, AggFunction, AggMode, GroupingExpr},
agg_exec::AggExec,
broadcast_join_build_hash_map_exec::BroadcastJoinBuildHashMapExec,
broadcast_join_exec::BroadcastJoinExec,
Expand Down Expand Up @@ -536,7 +536,6 @@ impl TryInto<Arc<dyn ExecutionPlan>> for &protobuf::PhysicalPlanNode {
exec_mode,
physical_groupings,
physical_aggs,
agg.initial_input_buffer_offset as usize,
agg.supports_partial_skipping,
input,
)?))
Expand Down
16 changes: 16 additions & 0 deletions native-engine/datafusion-ext-commons/src/io/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,22 @@ pub fn read_u8<R: Read>(input: &mut R) -> std::io::Result<u8> {
Ok(buf[0])
}

pub fn read_bytes_into_vec<R: Read>(
input: &mut R,
buf: &mut Vec<u8>,
len: usize,
) -> std::io::Result<()> {
buf.reserve(len);
unsafe {
// safety: space has been reserved
input.read_exact(std::slice::from_raw_parts_mut(
buf.as_mut_ptr().add(buf.len()),
len,
))?;
buf.set_len(buf.len() + len);
}
Ok(())
}
pub fn read_bytes_slice<R: Read>(input: &mut R, len: usize) -> std::io::Result<Box<[u8]>> {
// safety - assume_init() is safe for [u8]
let mut byte_slice = unsafe { Box::new_uninit_slice(len).assume_init() };
Expand Down
6 changes: 6 additions & 0 deletions native-engine/datafusion-ext-commons/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,12 @@ pub const fn staging_mem_size_for_partial_sort() -> usize {
1048576
}

// bigger for better radix sort performance
// aggregate merging is row-based, so use bigger memory size
pub const fn staging_mem_size_for_agg_merge() -> usize {
16777216
}

// use bigger batch memory size writing shuffling data
pub const fn suggested_output_batch_mem_size() -> usize {
25165824
Expand Down
4 changes: 2 additions & 2 deletions native-engine/datafusion-ext-commons/src/spark_bit_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ impl SparkBitArray {
Self::new(vec![0; data_len])
}

pub fn read_from(mut r: impl Read) -> Result<Self> {
pub fn read_from(r: &mut impl Read) -> Result<Self> {
let data_len = r.read_i32::<BE>()? as usize;
let mut data = vec![0; data_len];
for datum in &mut data {
Expand All @@ -52,7 +52,7 @@ impl SparkBitArray {
Ok(Self::new(data))
}

pub fn write_to(&self, mut w: impl Write) -> Result<()> {
pub fn write_to(&self, w: &mut impl Write) -> Result<()> {
w.write_i32::<BE>(self.data.len() as i32)?;
for &datum in &self.data {
w.write_i64::<BE>(datum as i64)?;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ impl SparkBloomFilter {
}
}

pub fn read_from(mut r: impl std::io::Read) -> Result<Self> {
pub fn read_from(r: &mut impl std::io::Read) -> Result<Self> {
let version = r.read_i32::<BE>()?;
if version != 1 {
return df_execution_err!("unsupported version: {}", version);
Expand All @@ -71,7 +71,7 @@ impl SparkBloomFilter {
})
}

pub fn write_to(&self, mut w: impl Write) -> Result<()> {
pub fn write_to(&self, w: &mut impl Write) -> Result<()> {
w.write_i32::<BE>(1)?; // version number
w.write_i32::<BE>(self.num_hash_functions as i32)?;
self.bits.write_to(w)?;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use std::{
collections::HashMap,
fmt::{Debug, Display, Formatter},
hash::Hasher,
io::Cursor,
sync::{Arc, Weak},
};

Expand Down Expand Up @@ -108,7 +109,7 @@ impl PhysicalExpr for BloomFilterMightContainExpr {
get_cached_bloom_filter(&self.uuid, || {
match self.bloom_filter_expr.evaluate(batch)? {
ColumnarValue::Scalar(ScalarValue::Binary(Some(v))) => {
Ok(SparkBloomFilter::read_from(v.as_slice())?)
Ok(SparkBloomFilter::read_from(&mut Cursor::new(v.as_slice()))?)
}
_ => {
df_execution_err!("bloom_filter_arg must be valid binary scalar value")
Expand Down
6 changes: 3 additions & 3 deletions native-engine/datafusion-ext-plans/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,16 @@ resolver = "1"
default = ["tokio/rt-multi-thread"]

[dependencies]
aligned-vec = "0.6.1"
arrow = { workspace = true }
arrow-schema = { workspace = true }
async-trait = "0.1.83"
base64 = "0.22.1"
bitvec = "1.0.1"
byteorder = "1.5.0"
bytes = "1.8.0"
blaze-jni-bridge = { workspace = true }
bytesize = "1.1.0"
compact_bytes = "0.1.3"
compact_str = "0.8.0"
count-write = "0.1.0"
datafusion = { workspace = true }
datafusion-ext-commons = { workspace = true }
Expand All @@ -37,7 +37,7 @@ once_cell = "1.20.2"
panic-message = "0.3.0"
parking_lot = "0.12.3"
paste = "1.0.15"
smallvec = "1.13.2"
smallvec = "2.0.0-alpha.7"
tempfile = "3"
tokio = "1.41"
unchecked-index = "0.2.2"
Expand Down
Loading

0 comments on commit 79cf6f9

Please sign in to comment.