Skip to content

Commit

Permalink
perf(stream): add simple strategy for if the stream project compact t…
Browse files Browse the repository at this point in the history
…he chunk (risingwavelabs#8758)
  • Loading branch information
st1page authored Mar 24, 2023
1 parent 8bedb3c commit 380e104
Show file tree
Hide file tree
Showing 5 changed files with 41 additions and 6 deletions.
13 changes: 13 additions & 0 deletions src/common/src/array/data_chunk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,19 @@ impl DataChunk {
&self.vis2
}

pub fn selectivity(&self) -> f64 {
match &self.vis2 {
Vis::Bitmap(b) => {
if b.is_empty() {
0.0
} else {
b.count_ones() as f64 / b.len() as f64
}
}
Vis::Compact(_) => 1.0,
}
}

pub fn with_visibility(&self, visibility: Bitmap) -> Self {
DataChunk::new(self.columns.clone(), visibility)
}
Expand Down
4 changes: 4 additions & 0 deletions src/common/src/array/stream_chunk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,10 @@ impl StreamChunk {
self.data.capacity()
}

pub fn selectivity(&self) -> f64 {
self.data.selectivity()
}

/// Get the reference of the underlying data chunk.
pub fn data_chunk(&self) -> &DataChunk {
&self.data
Expand Down
1 change: 1 addition & 0 deletions src/stream/src/executor/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,7 @@ async fn test_merger_sum_aggr() {
],
3,
MultiMap::new(),
0.0,
);

let items = Arc::new(Mutex::new(vec![]));
Expand Down
21 changes: 16 additions & 5 deletions src/stream/src/executor/project.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,10 @@ struct Inner {
/// All the watermark derivations, (input_column_index, output_column_index). And the
/// derivation expression is the project's expression itself.
watermark_derivations: MultiMap<usize, usize>,

/// the selectivity threshold which should be in [0,1]. for the chunk with selectivity less
/// than the threshold, the Project executor will construct a new chunk before expr evaluation,
materialize_selectivity_threshold: f64,
}

impl ProjectExecutor {
Expand All @@ -50,6 +54,7 @@ impl ProjectExecutor {
exprs: Vec<BoxedExpression>,
executor_id: u64,
watermark_derivations: MultiMap<usize, usize>,
materialize_selectivity_threshold: f64,
) -> Self {
let info = ExecutorInfo {
schema: input.schema().to_owned(),
Expand All @@ -74,6 +79,7 @@ impl ProjectExecutor {
},
exprs,
watermark_derivations,
materialize_selectivity_threshold,
},
}
}
Expand Down Expand Up @@ -110,10 +116,12 @@ impl Inner {
&self,
chunk: StreamChunk,
) -> StreamExecutorResult<Option<StreamChunk>> {
let chunk = chunk.compact();

let chunk = if chunk.selectivity() <= self.materialize_selectivity_threshold {
chunk.compact()
} else {
chunk
};
let (data_chunk, ops) = chunk.into_parts();

let mut projected_columns = Vec::new();

for expr in &self.exprs {
Expand All @@ -125,8 +133,9 @@ impl Inner {
let new_column = Column::new(evaluated_expr);
projected_columns.push(new_column);
}

let new_chunk = StreamChunk::new(ops, projected_columns, None);
let (_, vis) = data_chunk.into_parts();
let vis = vis.into_visibility();
let new_chunk = StreamChunk::new(ops, projected_columns, vis);
Ok(Some(new_chunk))
}

Expand Down Expand Up @@ -233,6 +242,7 @@ mod tests {
vec![test_expr],
1,
MultiMap::new(),
0.0,
));
let mut project = project.execute();

Expand Down Expand Up @@ -299,6 +309,7 @@ mod tests {
vec![a_expr, b_expr],
1,
MultiMap::from_iter(vec![(0, 0), (0, 1)].into_iter()),
0.0,
));
let mut project = project.execute();

Expand Down
8 changes: 7 additions & 1 deletion src/stream/src/from_proto/project.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
use multimap::MultiMap;
use risingwave_common::util::iter_util::ZipEqFast;
use risingwave_expr::expr::build_from_prost;
use risingwave_pb::expr::expr_node;
use risingwave_pb::stream_plan::ProjectNode;

use super::*;
Expand Down Expand Up @@ -49,14 +50,19 @@ impl ExecutorBuilder for ProjectExecutorBuilder {
.map(|key| *key as usize),
),
);

let extremely_light = node.get_select_list().iter().all(|expr| {
let expr_type = expr.get_expr_type().unwrap();
expr_type == expr_node::Type::InputRef || expr_type == expr_node::Type::ConstantValue
});
let materialize_selectivity_threshold = if extremely_light { 0.0 } else { 0.5 };
Ok(ProjectExecutor::new(
params.actor_context,
input,
params.pk_indices,
project_exprs,
params.executor_id,
watermark_derivations,
materialize_selectivity_threshold,
)
.boxed())
}
Expand Down

0 comments on commit 380e104

Please sign in to comment.