Skip to content

Commit

Permalink
Improve nested loop join code (#11863)
Browse files Browse the repository at this point in the history
* Improve nested loop join code

* fmt
  • Loading branch information
lewiszlw authored Aug 8, 2024
1 parent d0a1d30 commit 053795c
Showing 1 changed file with 41 additions and 49 deletions.
90 changes: 41 additions & 49 deletions datafusion/physical-plan/src/joins/nested_loop_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ use arrow::compute::concat_batches;
use arrow::datatypes::{Schema, SchemaRef};
use arrow::record_batch::RecordBatch;
use arrow::util::bit_util;
use datafusion_common::{exec_err, JoinSide, Result, Statistics};
use datafusion_common::{exec_datafusion_err, JoinSide, Result, Statistics};
use datafusion_execution::memory_pool::{MemoryConsumer, MemoryReservation};
use datafusion_execution::TaskContext;
use datafusion_expr::JoinType;
Expand Down Expand Up @@ -562,62 +562,54 @@ fn join_left_and_right_batch(
schema: &Schema,
visited_left_side: &SharedBitmapBuilder,
) -> Result<RecordBatch> {
let indices_result = (0..left_batch.num_rows())
let indices = (0..left_batch.num_rows())
.map(|left_row_index| {
build_join_indices(left_row_index, right_batch, left_batch, filter)
})
.collect::<Result<Vec<(UInt64Array, UInt32Array)>>>();
.collect::<Result<Vec<(UInt64Array, UInt32Array)>>>()
.map_err(|e| {
exec_datafusion_err!(
"Fail to build join indices in NestedLoopJoinExec, error:{e}"
)
})?;

let mut left_indices_builder = UInt64Builder::new();
let mut right_indices_builder = UInt32Builder::new();
let left_right_indices = match indices_result {
Err(err) => {
exec_err!("Fail to build join indices in NestedLoopJoinExec, error:{err}")
}
Ok(indices) => {
for (left_side, right_side) in indices {
left_indices_builder
.append_values(left_side.values(), &vec![true; left_side.len()]);
right_indices_builder
.append_values(right_side.values(), &vec![true; right_side.len()]);
}
Ok((
left_indices_builder.finish(),
right_indices_builder.finish(),
))
}
};
match left_right_indices {
Ok((left_side, right_side)) => {
// set the left bitmap
// and only full join need the left bitmap
if need_produce_result_in_final(join_type) {
let mut bitmap = visited_left_side.lock();
left_side.iter().flatten().for_each(|x| {
bitmap.set_bit(x as usize, true);
});
}
// adjust the two side indices base on the join type
let (left_side, right_side) = adjust_indices_by_join_type(
left_side,
right_side,
0..right_batch.num_rows(),
join_type,
false,
);
for (left_side, right_side) in indices {
left_indices_builder
.append_values(left_side.values(), &vec![true; left_side.len()]);
right_indices_builder
.append_values(right_side.values(), &vec![true; right_side.len()]);
}

build_batch_from_indices(
schema,
left_batch,
right_batch,
&left_side,
&right_side,
column_indices,
JoinSide::Left,
)
}
Err(e) => Err(e),
let left_side = left_indices_builder.finish();
let right_side = right_indices_builder.finish();
// set the left bitmap
// and only full join need the left bitmap
if need_produce_result_in_final(join_type) {
let mut bitmap = visited_left_side.lock();
left_side.iter().flatten().for_each(|x| {
bitmap.set_bit(x as usize, true);
});
}
// adjust the two side indices base on the join type
let (left_side, right_side) = adjust_indices_by_join_type(
left_side,
right_side,
0..right_batch.num_rows(),
join_type,
false,
);

build_batch_from_indices(
schema,
left_batch,
right_batch,
&left_side,
&right_side,
column_indices,
JoinSide::Left,
)
}

fn get_final_indices_from_shared_bitmap(
Expand Down

0 comments on commit 053795c

Please sign in to comment.