Skip to content

Commit

Permalink
Use explicity pattern match
Browse files Browse the repository at this point in the history
  • Loading branch information
Dandandan committed May 2, 2021
1 parent 2b4fe02 commit ab760af
Showing 1 changed file with 29 additions and 23 deletions.
52 changes: 29 additions & 23 deletions datafusion/src/physical_plan/hash_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -371,10 +371,9 @@ impl ExecutionPlan for HashJoinExec {

let column_indices = self.column_indices_from_schema()?;
let num_rows = left_data.1.num_rows();
let visited_left_side = if self.join_type == JoinType::Left {
vec![false; num_rows]
} else {
vec![]
let visited_left_side = match self.join_type {
JoinType::Left => vec![false; num_rows],
JoinType::Inner | JoinType::Right => vec![],
};
Ok(Box::pin(HashJoinStream {
schema: self.schema.clone(),
Expand Down Expand Up @@ -1058,35 +1057,42 @@ impl Stream for HashJoinStream {
self.num_output_batches += 1;
self.num_output_rows += batch.num_rows();

if self.join_type == JoinType::Left {
left_side.iter().flatten().for_each(|x| {
self.visited_left_side[x as usize] = true;
});
match self.join_type {
JoinType::Left => {
left_side.iter().flatten().for_each(|x| {
self.visited_left_side[x as usize] = true;
});
}
JoinType::Inner | JoinType::Right => {}
}
}
Some(result.map(|x| x.0))
}
other => {
let start = Instant::now();
// For the left join, produce rows for unmatched rows
if self.join_type == JoinType::Left && !self.is_exhausted {
let result = produce_unmatched(
&self.visited_left_side,
&self.schema,
&self.column_indices,
&self.left_data,
);
if let Ok(ref batch) = result {
self.num_input_batches += 1;
self.num_input_rows += batch.num_rows();
match self.join_type {
JoinType::Left if !self.is_exhausted => {
let result = produce_unmatched(
&self.visited_left_side,
&self.schema,
&self.column_indices,
&self.left_data,
);
if let Ok(ref batch) = result {
self.join_time += start.elapsed().as_millis() as usize;
self.num_output_batches += 1;
self.num_output_rows += batch.num_rows();
self.num_input_batches += 1;
self.num_input_rows += batch.num_rows();
if let Ok(ref batch) = result {
self.join_time +=
start.elapsed().as_millis() as usize;
self.num_output_batches += 1;
self.num_output_rows += batch.num_rows();
}
}
self.is_exhausted = true;
return Some(result);
}
self.is_exhausted = true;
return Some(result);
JoinType::Left | JoinType::Inner | JoinType::Right => {}
}

debug!(
Expand Down

0 comments on commit ab760af

Please sign in to comment.