Skip to content

Commit

Permalink
refactor(common): unify order-related types (risingwavelabs#8449)
Browse files Browse the repository at this point in the history
Signed-off-by: Richard Chien <stdrc@outlook.com>
Co-authored-by: Bugen Zhao <i@bugenzhao.com>
  • Loading branch information
stdrc and BugenZhao authored Mar 10, 2023
1 parent 4b008ac commit 8d0e869
Show file tree
Hide file tree
Showing 105 changed files with 1,076 additions and 1,289 deletions.
14 changes: 7 additions & 7 deletions src/batch/benches/sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use criterion::{criterion_group, criterion_main, BatchSize, BenchmarkId, Criteri
use risingwave_batch::executor::{BoxedExecutor, SortExecutor};
use risingwave_common::enable_jemalloc_on_linux;
use risingwave_common::types::DataType;
use risingwave_common::util::sort_util::{OrderPair, OrderType};
use risingwave_common::util::sort_util::{ColumnOrder, OrderType};
use tokio::runtime::Runtime;
use utils::{create_input, execute_executor};

Expand All @@ -30,9 +30,9 @@ fn create_order_by_executor(
single_column: bool,
) -> BoxedExecutor {
const CHUNK_SIZE: usize = 1024;
let (child, order_pairs) = if single_column {
let (child, column_orders) = if single_column {
let input = create_input(&[DataType::Int64], chunk_size, chunk_num);
(input, vec![OrderPair::new(0, OrderType::Ascending)])
(input, vec![ColumnOrder::new(0, OrderType::ascending())])
} else {
let input = create_input(
&[
Expand All @@ -47,16 +47,16 @@ fn create_order_by_executor(
(
input,
vec![
OrderPair::new(0, OrderType::Ascending),
OrderPair::new(1, OrderType::Descending),
OrderPair::new(2, OrderType::Ascending),
ColumnOrder::new(0, OrderType::ascending()),
ColumnOrder::new(1, OrderType::descending()),
ColumnOrder::new(2, OrderType::ascending()),
],
)
};

Box::new(SortExecutor::new(
child,
order_pairs,
column_orders,
"SortExecutor".into(),
CHUNK_SIZE,
))
Expand Down
14 changes: 7 additions & 7 deletions src/batch/benches/top_n.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use criterion::{criterion_group, criterion_main, BatchSize, BenchmarkId, Criteri
use risingwave_batch::executor::{BoxedExecutor, TopNExecutor};
use risingwave_common::enable_jemalloc_on_linux;
use risingwave_common::types::DataType;
use risingwave_common::util::sort_util::{OrderPair, OrderType};
use risingwave_common::util::sort_util::{ColumnOrder, OrderType};
use tokio::runtime::Runtime;
use utils::{create_input, execute_executor};

Expand All @@ -32,9 +32,9 @@ fn create_top_n_executor(
limit: usize,
) -> BoxedExecutor {
const CHUNK_SIZE: usize = 1024;
let (child, order_pairs) = if single_column {
let (child, column_orders) = if single_column {
let input = create_input(&[DataType::Int64], chunk_size, chunk_num);
(input, vec![OrderPair::new(0, OrderType::Ascending)])
(input, vec![ColumnOrder::new(0, OrderType::ascending())])
} else {
let input = create_input(
&[
Expand All @@ -49,16 +49,16 @@ fn create_top_n_executor(
(
input,
vec![
OrderPair::new(0, OrderType::Ascending),
OrderPair::new(1, OrderType::Descending),
OrderPair::new(2, OrderType::Ascending),
ColumnOrder::new(0, OrderType::ascending()),
ColumnOrder::new(1, OrderType::descending()),
ColumnOrder::new(2, OrderType::ascending()),
],
)
};

Box::new(TopNExecutor::new(
child,
order_pairs,
column_orders,
offset,
limit,
false,
Expand Down
36 changes: 18 additions & 18 deletions src/batch/src/executor/group_top_n.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use risingwave_common::types::DataType;
use risingwave_common::util::chunk_coalesce::DataChunkBuilder;
use risingwave_common::util::encoding_for_comparison::encode_chunk;
use risingwave_common::util::iter_util::ZipEqFast;
use risingwave_common::util::sort_util::OrderPair;
use risingwave_common::util::sort_util::ColumnOrder;
use risingwave_pb::batch_plan::plan_node::NodeBody;

use super::top_n::{HeapElem, TopNHeap};
Expand All @@ -41,7 +41,7 @@ use crate::task::BatchTaskContext;
/// For each group, use a N-heap to store the smallest N rows.
pub struct GroupTopNExecutor<K: HashKey> {
child: BoxedExecutor,
order_pairs: Vec<OrderPair>,
column_orders: Vec<ColumnOrder>,
offset: usize,
limit: usize,
group_key: Vec<usize>,
Expand All @@ -54,7 +54,7 @@ pub struct GroupTopNExecutor<K: HashKey> {

pub struct GroupTopNExecutorBuilder {
child: BoxedExecutor,
order_pairs: Vec<OrderPair>,
column_orders: Vec<ColumnOrder>,
offset: usize,
limit: usize,
group_key: Vec<usize>,
Expand All @@ -70,7 +70,7 @@ impl HashKeyDispatcher for GroupTopNExecutorBuilder {
fn dispatch_impl<K: HashKey>(self) -> Self::Output {
Box::new(GroupTopNExecutor::<K>::new(
self.child,
self.order_pairs,
self.column_orders,
self.offset,
self.limit,
self.with_ties,
Expand Down Expand Up @@ -98,10 +98,10 @@ impl BoxedExecutorBuilder for GroupTopNExecutorBuilder {
NodeBody::GroupTopN
)?;

let order_pairs = top_n_node
let column_orders = top_n_node
.column_orders
.iter()
.map(OrderPair::from_protobuf)
.map(ColumnOrder::from_protobuf)
.collect();

let group_key = top_n_node
Expand All @@ -117,7 +117,7 @@ impl BoxedExecutorBuilder for GroupTopNExecutorBuilder {

let builder = Self {
child,
order_pairs,
column_orders,
offset: top_n_node.get_offset() as usize,
limit: top_n_node.get_limit() as usize,
group_key,
Expand All @@ -135,7 +135,7 @@ impl<K: HashKey> GroupTopNExecutor<K> {
#[expect(clippy::too_many_arguments)]
pub fn new(
child: BoxedExecutor,
order_pairs: Vec<OrderPair>,
column_orders: Vec<ColumnOrder>,
offset: usize,
limit: usize,
with_ties: bool,
Expand All @@ -146,7 +146,7 @@ impl<K: HashKey> GroupTopNExecutor<K> {
let schema = child.schema().clone();
Self {
child,
order_pairs,
column_orders,
offset,
limit,
with_ties,
Expand Down Expand Up @@ -186,7 +186,7 @@ impl<K: HashKey> GroupTopNExecutor<K> {
let chunk = Arc::new(chunk?.compact());
let keys = K::build(self.group_key.as_slice(), &chunk)?;

for (row_id, (encoded_row, key)) in encode_chunk(&chunk, &self.order_pairs)
for (row_id, (encoded_row, key)) in encode_chunk(&chunk, &self.column_orders)
.into_iter()
.zip_eq_fast(keys.into_iter())
.enumerate()
Expand Down Expand Up @@ -256,19 +256,19 @@ mod tests {
5 2 2
",
));
let order_pairs = vec![
OrderPair {
column_idx: 1,
order_type: OrderType::Ascending,
let column_orders = vec![
ColumnOrder {
column_index: 1,
order_type: OrderType::ascending(),
},
OrderPair {
column_idx: 0,
order_type: OrderType::Ascending,
ColumnOrder {
column_index: 0,
order_type: OrderType::ascending(),
},
];
let top_n_executor = (GroupTopNExecutorBuilder {
child: Box::new(mock_executor),
order_pairs,
column_orders,
offset: 1,
limit: 3,
with_ties: false,
Expand Down
2 changes: 1 addition & 1 deletion src/batch/src/executor/join/distributed_lookup_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ impl BoxedExecutorBuilder for DistributedLookupJoinExecutorBuilder {
let order_types: Vec<OrderType> = table_desc
.pk
.iter()
.map(|order| OrderType::from_protobuf(&order.get_order_type().unwrap().direction()))
.map(|order| OrderType::from_protobuf(order.get_order_type().unwrap()))
.collect();

let pk_indices = table_desc
Expand Down
18 changes: 9 additions & 9 deletions src/batch/src/executor/join/local_lookup_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -465,7 +465,7 @@ mod tests {
use risingwave_common::hash::HashKeyDispatcher;
use risingwave_common::types::{DataType, ScalarImpl};
use risingwave_common::util::chunk_coalesce::DataChunkBuilder;
use risingwave_common::util::sort_util::{OrderPair, OrderType};
use risingwave_common::util::sort_util::{ColumnOrder, OrderType};
use risingwave_expr::expr::{
new_binary_expr, BoxedExpression, InputRefExpression, LiteralExpression,
};
Expand Down Expand Up @@ -557,20 +557,20 @@ mod tests {
}

fn create_order_by_executor(child: BoxedExecutor) -> BoxedExecutor {
let order_pairs = vec![
OrderPair {
column_idx: 0,
order_type: OrderType::Ascending,
let column_orders = vec![
ColumnOrder {
column_index: 0,
order_type: OrderType::ascending(),
},
OrderPair {
column_idx: 1,
order_type: OrderType::Ascending,
ColumnOrder {
column_index: 1,
order_type: OrderType::ascending(),
},
];

Box::new(SortExecutor::new(
child,
order_pairs,
column_orders,
"SortExecutor".into(),
CHUNK_SIZE,
))
Expand Down
22 changes: 11 additions & 11 deletions src/batch/src/executor/merge_sort_exchange.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use risingwave_common::array::DataChunk;
use risingwave_common::catalog::{Field, Schema};
use risingwave_common::error::{Result, RwError};
use risingwave_common::types::ToOwnedDatum;
use risingwave_common::util::sort_util::{HeapElem, OrderPair};
use risingwave_common::util::sort_util::{ColumnOrder, HeapElem};
use risingwave_pb::batch_plan::plan_node::NodeBody;
use risingwave_pb::batch_plan::ExchangeSource as ProstExchangeSource;

Expand All @@ -39,7 +39,7 @@ pub struct MergeSortExchangeExecutorImpl<CS, C> {
context: C,
/// keeps one data chunk of each source if any
source_inputs: Vec<Option<DataChunk>>,
order_pairs: Arc<Vec<OrderPair>>,
column_orders: Arc<Vec<ColumnOrder>>,
min_heap: BinaryHeap<HeapElem>,
proto_sources: Vec<ProstExchangeSource>,
sources: Vec<ExchangeSourceImpl>, // impl
Expand Down Expand Up @@ -76,7 +76,7 @@ impl<CS: 'static + Send + CreateSource, C: BatchTaskContext> MergeSortExchangeEx
assert!(source_idx < self.source_inputs.len());
let chunk_ref = self.source_inputs[source_idx].as_ref().unwrap();
self.min_heap.push(HeapElem {
order_pairs: self.order_pairs.clone(),
column_orders: self.column_orders.clone(),
chunk: chunk_ref.clone(),
chunk_idx: source_idx,
elem_idx: row_idx,
Expand Down Expand Up @@ -191,12 +191,12 @@ impl BoxedExecutorBuilder for MergeSortExchangeExecutorBuilder {
NodeBody::MergeSortExchange
)?;

let order_pairs = sort_merge_node
let column_orders = sort_merge_node
.column_orders
.iter()
.map(OrderPair::from_protobuf)
.map(ColumnOrder::from_protobuf)
.collect();
let order_pairs = Arc::new(order_pairs);
let column_orders = Arc::new(column_orders);

let exchange_node = sort_merge_node.get_exchange()?;
let proto_sources: Vec<ProstExchangeSource> = exchange_node.get_sources().to_vec();
Expand All @@ -213,7 +213,7 @@ impl BoxedExecutorBuilder for MergeSortExchangeExecutorBuilder {
Ok(Box::new(MergeSortExchangeExecutor::<C> {
context: source.context().clone(),
source_inputs: vec![None; num_sources],
order_pairs,
column_orders,
min_heap: BinaryHeap::new(),
proto_sources,
sources: vec![],
Expand Down Expand Up @@ -260,9 +260,9 @@ mod tests {
proto_sources.push(ProstExchangeSource::default());
source_creators.push(fake_create_source.clone());
}
let order_pairs = Arc::new(vec![OrderPair {
column_idx: 0,
order_type: OrderType::Ascending,
let column_orders = Arc::new(vec![ColumnOrder {
column_index: 0,
order_type: OrderType::ascending(),
}]);

let executor = Box::new(MergeSortExchangeExecutorImpl::<
Expand All @@ -271,7 +271,7 @@ mod tests {
> {
context: ComputeNodeContext::for_test(),
source_inputs: vec![None; proto_sources.len()],
order_pairs,
column_orders,
min_heap: BinaryHeap::new(),
proto_sources,
sources: vec![],
Expand Down
Loading

0 comments on commit 8d0e869

Please sign in to comment.