Skip to content

Commit

Permalink
AggExec: implement columnar accumulator states.
Browse files Browse the repository at this point in the history
refactor execution context
  • Loading branch information
zhangli20 committed Nov 14, 2024
1 parent 15751e9 commit b128a03
Show file tree
Hide file tree
Showing 71 changed files with 4,501 additions and 6,364 deletions.
42 changes: 9 additions & 33 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 1 addition & 2 deletions native-engine/blaze-serde/src/from_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ use datafusion_ext_exprs::{
string_ends_with::StringEndsWithExpr, string_starts_with::StringStartsWithExpr,
};
use datafusion_ext_plans::{
agg::{create_agg, AggExecMode, AggExpr, AggFunction, AggMode, GroupingExpr},
agg::{agg::create_agg, AggExecMode, AggExpr, AggFunction, AggMode, GroupingExpr},
agg_exec::AggExec,
broadcast_join_build_hash_map_exec::BroadcastJoinBuildHashMapExec,
broadcast_join_exec::BroadcastJoinExec,
Expand Down Expand Up @@ -536,7 +536,6 @@ impl TryInto<Arc<dyn ExecutionPlan>> for &protobuf::PhysicalPlanNode {
exec_mode,
physical_groupings,
physical_aggs,
agg.initial_input_buffer_offset as usize,
agg.supports_partial_skipping,
input,
)?))
Expand Down
40 changes: 20 additions & 20 deletions native-engine/blaze/src/rt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,24 +31,23 @@ use datafusion::{
common::Result,
error::DataFusionError,
execution::context::TaskContext,
physical_plan::{
metrics::{BaselineMetrics, ExecutionPlanMetricsSet},
ExecutionPlan,
},
physical_plan::{metrics::ExecutionPlanMetricsSet, ExecutionPlan},
};
use datafusion_ext_commons::df_execution_err;
use datafusion_ext_plans::{
common::{execution_context::ExecutionContext, output::TaskOutputter},
parquet_sink_exec::ParquetSinkExec,
};
use datafusion_ext_commons::{df_execution_err, streams::coalesce_stream::CoalesceInput};
use datafusion_ext_plans::{common::output::TaskOutputter, parquet_sink_exec::ParquetSinkExec};
use futures::{FutureExt, StreamExt};
use jni::objects::{GlobalRef, JObject};
use tokio::runtime::Runtime;

use crate::{handle_unwinded_scope, metrics::update_spark_metric_node};

pub struct NativeExecutionRuntime {
exec_ctx: Arc<ExecutionContext>,
native_wrapper: GlobalRef,
plan: Arc<dyn ExecutionPlan>,
task_context: Arc<TaskContext>,
partition: usize,
batch_receiver: Receiver<Result<Option<RecordBatch>>>,
rt: Runtime,
}
Expand All @@ -61,21 +60,23 @@ impl NativeExecutionRuntime {
context: Arc<TaskContext>,
) -> Result<Self> {
// execute plan to output stream
let stream = plan.execute(partition, context.clone())?;
let schema = stream.schema();
let exec_ctx = ExecutionContext::new(
context.clone(),
partition,
plan.schema(),
&ExecutionPlanMetricsSet::new(),
);
let stream = exec_ctx.execute(&plan)?;

// coalesce
let mut stream = if plan.as_any().downcast_ref::<ParquetSinkExec>().is_some() {
stream // cannot coalesce parquet sink output
} else {
context.coalesce_with_default_batch_size(
stream,
&BaselineMetrics::new(&ExecutionPlanMetricsSet::new(), partition),
)?
exec_ctx.coalesce_with_default_batch_size(stream)
};

// init ffi schema
let ffi_schema = FFI_ArrowSchema::try_from(schema.as_ref())?;
let ffi_schema = FFI_ArrowSchema::try_from(exec_ctx.output_schema().as_ref())?;
jni_call!(BlazeCallNativeWrapper(native_wrapper.as_obj())
.importSchema(&ffi_schema as *const FFI_ArrowSchema as i64) -> ()
)?;
Expand All @@ -98,12 +99,11 @@ impl NativeExecutionRuntime {

let (batch_sender, batch_receiver) = std::sync::mpsc::sync_channel(1);
let nrt = Self {
exec_ctx,
native_wrapper: native_wrapper.clone(),
plan,
partition,
rt,
batch_receiver,
task_context: context,
};

// spawn batch producer
Expand Down Expand Up @@ -188,7 +188,7 @@ impl NativeExecutionRuntime {
}
};

let partition = self.partition;
let partition = self.exec_ctx.partition_id();
match next_batch() {
Ok(ret) => return ret,
Err(err) => {
Expand All @@ -203,13 +203,13 @@ impl NativeExecutionRuntime {
}

pub fn finalize(self) {
let partition = self.partition;
let partition = self.exec_ctx.partition_id();

log::info!("[partition={partition}] native execution finalizing");
self.update_metrics().unwrap_or_default();
drop(self.plan);

self.task_context.cancel_task(); // cancel all pending streams
self.exec_ctx.task_ctx().cancel_task(); // cancel all pending streams
self.rt.shutdown_background();
log::info!("[partition={partition}] native execution finalized");
}
Expand Down
1 change: 0 additions & 1 deletion native-engine/datafusion-ext-commons/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ num = "0.4.2"
once_cell = "1.20.2"
paste = "1.0.15"
radsort = "0.1.1"
slimmer_box = "0.6.5"
tempfile = "3"
thrift = "0.17.0"
tokio = "1.41"
Expand Down
99 changes: 0 additions & 99 deletions native-engine/datafusion-ext-commons/src/bytes_arena.rs

This file was deleted.

16 changes: 16 additions & 0 deletions native-engine/datafusion-ext-commons/src/io/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,22 @@ pub fn read_u8<R: Read>(input: &mut R) -> std::io::Result<u8> {
Ok(buf[0])
}

pub fn read_bytes_into_vec<R: Read>(
input: &mut R,
buf: &mut Vec<u8>,
len: usize,
) -> std::io::Result<()> {
buf.reserve(len);
unsafe {
// safety: space has been reserved
input.read_exact(std::slice::from_raw_parts_mut(
buf.as_mut_ptr().add(buf.len()),
len,
))?;
buf.set_len(buf.len() + len);
}
Ok(())
}
pub fn read_bytes_slice<R: Read>(input: &mut R, len: usize) -> std::io::Result<Box<[u8]>> {
// safety - assume_init() is safe for [u8]
let mut byte_slice = unsafe { Box::new_uninit_slice(len).assume_init() };
Expand Down
22 changes: 20 additions & 2 deletions native-engine/datafusion-ext-commons/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,12 @@ use once_cell::sync::OnceCell;
use unchecked_index::UncheckedIndex;

pub mod array_size;
pub mod bytes_arena;
pub mod cast;
pub mod ds;
pub mod hadoop_fs;
pub mod hash;
pub mod io;
pub mod rdxsort;
pub mod slim_bytes;
pub mod spark_bit_array;
pub mod spark_bloom_filter;
pub mod spark_hash;
Expand Down Expand Up @@ -94,6 +92,12 @@ pub const fn staging_mem_size_for_partial_sort() -> usize {
1048576
}

// bigger for better radix sort performance
// aggregate merging is row-based, so use bigger memory size
pub const fn staging_mem_size_for_agg_merge() -> usize {
16777216
}

// use bigger batch memory size writing shuffling data
pub const fn suggested_output_batch_mem_size() -> usize {
25165824
Expand Down Expand Up @@ -175,6 +179,20 @@ macro_rules! prefetch_write_data {
}};
}

#[macro_export]
macro_rules! likely {
($e:expr) => {{
std::intrinsics::likely($e)
}};
}

#[macro_export]
macro_rules! unlikely {
($e:expr) => {{
std::intrinsics::unlikely($e)
}};
}

pub trait UncheckedIndexIntoInner<T> {
fn into_inner(self) -> T;
}
Expand Down
Loading

0 comments on commit b128a03

Please sign in to comment.