Skip to content

Commit

Permalink
perf(hash agg): use get_mut instead of pop+put pattern (risingw…
Browse files Browse the repository at this point in the history
…avelabs#8691)

Signed-off-by: Richard Chien <stdrc@outlook.com>
  • Loading branch information
stdrc authored Mar 21, 2023
1 parent bf95473 commit 9abe5dc
Showing 1 changed file with 13 additions and 9 deletions.
22 changes: 13 additions & 9 deletions src/stream/src/executor/hash_agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

use std::collections::{HashMap, HashSet};
use std::marker::PhantomData;
use std::ptr::NonNull;
use std::sync::Arc;

use futures::{stream, StreamExt, TryStreamExt};
Expand Down Expand Up @@ -436,11 +437,17 @@ impl<K: HashKey, S: StateStore> HashAggExecutor<K, S> {

// Calculate current outputs, concurrently.
let futs = keys_in_batch.into_iter().map(|key| {
// Pop out the agg group temporarily.
let mut agg_group = vars
.agg_group_cache
.pop(&key)
.expect("changed group must have corresponding AggGroup");
// Get agg group of the key.
let agg_group = {
let mut ptr: NonNull<_> = vars
.agg_group_cache
.get_mut(&key)
.expect("changed group must have corresponding AggGroup")
.into();
// SAFETY: `key`s in `keys_in_batch` are unique by nature, because they're
// from `group_change_set` which is a set.
unsafe { ptr.as_mut() }
};
async {
let curr_outputs = agg_group.get_outputs(&this.storages).await?;
Ok::<_, StreamExecutorError>((key, agg_group, curr_outputs))
Expand All @@ -452,7 +459,7 @@ impl<K: HashKey, S: StateStore> HashAggExecutor<K, S> {
.try_collect()
.await?;

for (key, mut agg_group, curr_outputs) in outputs_in_batch {
for (key, agg_group, curr_outputs) in outputs_in_batch {
let AggChangesInfo {
n_appended_ops,
result_row,
Expand All @@ -477,9 +484,6 @@ impl<K: HashKey, S: StateStore> HashAggExecutor<K, S> {
this.result_table.insert(result_row);
}
}

// Put the agg group back into the agg group cache.
vars.agg_group_cache.put(key, agg_group);
}

let columns = builders
Expand Down

0 comments on commit 9abe5dc

Please sign in to comment.