Skip to content

Commit

Permalink
Merge branch 'main' into xxchan/managerial-antelope
Browse files Browse the repository at this point in the history
  • Loading branch information
xxchan authored May 10, 2023
2 parents d6e46a5 + 8874bf5 commit 4f72ee8
Show file tree
Hide file tree
Showing 20 changed files with 354 additions and 339 deletions.
4 changes: 2 additions & 2 deletions src/frontend/src/optimizer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -592,7 +592,7 @@ fn exist_and_no_exchange_before(plan: &PlanRef, is_candidate: fn(&PlanRef) -> bo
fn require_additional_exchange_on_root_in_distributed_mode(plan: PlanRef) -> bool {
fn is_user_table(plan: &PlanRef) -> bool {
plan.as_batch_seq_scan()
.map(|node| !node.logical().is_sys_table())
.map(|node| !node.logical().is_sys_table)
.unwrap_or(false)
}

Expand Down Expand Up @@ -625,7 +625,7 @@ fn require_additional_exchange_on_root_in_distributed_mode(plan: PlanRef) -> boo
fn require_additional_exchange_on_root_in_local_mode(plan: PlanRef) -> bool {
fn is_user_table(plan: &PlanRef) -> bool {
plan.as_batch_seq_scan()
.map(|node| !node.logical().is_sys_table())
.map(|node| !node.logical().is_sys_table)
.unwrap_or(false)
}

Expand Down
47 changes: 20 additions & 27 deletions src/frontend/src/optimizer/plan_node/batch_seq_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,36 +24,35 @@ use risingwave_pb::batch_plan::row_seq_scan_node::ChunkSize;
use risingwave_pb::batch_plan::{RowSeqScanNode, SysRowSeqScanNode};
use risingwave_pb::plan_common::PbColumnDesc;

use super::{ExprRewritable, PlanBase, PlanRef, ToBatchPb, ToDistributedBatch};
use super::{generic, ExprRewritable, PlanBase, PlanRef, ToBatchPb, ToDistributedBatch};
use crate::catalog::ColumnId;
use crate::expr::ExprRewriter;
use crate::optimizer::plan_node::{LogicalScan, ToLocalBatch};
use crate::optimizer::plan_node::ToLocalBatch;
use crate::optimizer::property::{Distribution, DistributionDisplay, Order};

/// `BatchSeqScan` implements [`super::LogicalScan`] to scan from a row-oriented table
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct BatchSeqScan {
pub base: PlanBase,
logical: LogicalScan,
logical: generic::Scan,
scan_ranges: Vec<ScanRange>,
}

impl BatchSeqScan {
fn new_inner(logical: LogicalScan, dist: Distribution, scan_ranges: Vec<ScanRange>) -> Self {
let ctx = logical.base.ctx.clone();
fn new_inner(logical: generic::Scan, dist: Distribution, scan_ranges: Vec<ScanRange>) -> Self {
let order = if scan_ranges.len() > 1 {
Order::any()
} else {
logical.get_out_column_index_order()
};
let base = PlanBase::new_batch(ctx, logical.schema().clone(), dist, order);
let base = PlanBase::new_batch_from_logical(&logical, dist, order);

{
// validate scan_range
scan_ranges.iter().for_each(|scan_range| {
assert!(!scan_range.is_full_table_scan());
let scan_pk_prefix_len = scan_range.eq_conds.len();
let order_len = logical.table_desc().order_column_indices().len();
let order_len = logical.table_desc.order_column_indices().len();
assert!(
scan_pk_prefix_len < order_len
|| (scan_pk_prefix_len == order_len && is_full_range(&scan_range.range)),
Expand All @@ -69,15 +68,15 @@ impl BatchSeqScan {
}
}

pub fn new(logical: LogicalScan, scan_ranges: Vec<ScanRange>) -> Self {
pub fn new(logical: generic::Scan, scan_ranges: Vec<ScanRange>) -> Self {
// Use `Single` by default, will be updated later with `clone_with_dist`.
Self::new_inner(logical, Distribution::Single, scan_ranges)
}

fn clone_with_dist(&self) -> Self {
Self::new_inner(
self.logical.clone(),
if self.logical.is_sys_table() {
if self.logical.is_sys_table {
Distribution::Single
} else {
match self.logical.distribution_key() {
Expand All @@ -97,7 +96,7 @@ impl BatchSeqScan {
// inserted.
Distribution::UpstreamHashShard(
distribution_key,
self.logical.table_desc().table_id,
self.logical.table_desc.table_id,
)
}
}
Expand All @@ -109,7 +108,7 @@ impl BatchSeqScan {

/// Get a reference to the batch seq scan's logical.
#[must_use]
pub fn logical(&self) -> &LogicalScan {
pub fn logical(&self) -> &generic::Scan {
&self.logical
}

Expand Down Expand Up @@ -154,7 +153,7 @@ impl fmt::Display for BatchSeqScan {
write!(
f,
"BatchScan {{ table: {}, columns: [{}]",
self.logical.table_name(),
self.logical.table_name,
match verbose {
true => self.logical.column_names_with_table_prefix(),
false => self.logical.column_names(),
Expand Down Expand Up @@ -223,14 +222,14 @@ impl ToBatchPb for BatchSeqScan {
.map(PbColumnDesc::from)
.collect();

if self.logical.is_sys_table() {
if self.logical.is_sys_table {
NodeBody::SysRowSeqScan(SysRowSeqScanNode {
table_id: self.logical.table_desc().table_id.table_id,
table_id: self.logical.table_desc.table_id.table_id,
column_descs,
})
} else {
NodeBody::RowSeqScan(RowSeqScanNode {
table_desc: Some(self.logical.table_desc().to_protobuf()),
table_desc: Some(self.logical.table_desc.to_protobuf()),
column_ids: self
.logical
.output_column_ids()
Expand All @@ -243,7 +242,7 @@ impl ToBatchPb for BatchSeqScan {
ordered: !self.order().is_any(),
chunk_size: self
.logical
.chunk_size()
.chunk_size
.map(|chunk_size| ChunkSize { chunk_size }),
})
}
Expand All @@ -252,12 +251,12 @@ impl ToBatchPb for BatchSeqScan {

impl ToLocalBatch for BatchSeqScan {
fn to_local(&self) -> Result<PlanRef> {
let dist = if self.logical.is_sys_table() {
let dist = if self.logical.is_sys_table {
Distribution::Single
} else if let Some(distribution_key) = self.logical.distribution_key()
&& !distribution_key.is_empty()
{
Distribution::UpstreamHashShard(distribution_key, self.logical.table_desc().table_id)
Distribution::UpstreamHashShard(distribution_key, self.logical.table_desc.table_id)
} else {
// NOTE(kwannoel): This is a hack to force an exchange to always be inserted before
// scan.
Expand All @@ -273,14 +272,8 @@ impl ExprRewritable for BatchSeqScan {
}

fn rewrite_exprs(&self, r: &mut dyn ExprRewriter) -> PlanRef {
Self::new(
self.logical
.rewrite_exprs(r)
.as_logical_scan()
.unwrap()
.clone(),
self.scan_ranges.clone(),
)
.into()
let mut logical = self.logical.clone();
logical.rewrite_exprs(r);
Self::new(logical, self.scan_ranges.clone()).into()
}
}
Loading

0 comments on commit 4f72ee8

Please sign in to comment.