Skip to content

Commit

Permalink
refactor(plan_node): simplify limit nodes (more likely to be the last…
Browse files Browse the repository at this point in the history
… crusade) (#10246)
  • Loading branch information
ice1000 authored Jun 8, 2023
1 parent bd7cf4d commit ab86af8
Show file tree
Hide file tree
Showing 11 changed files with 142 additions and 87 deletions.
38 changes: 16 additions & 22 deletions src/frontend/src/optimizer/plan_node/batch_limit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,7 @@ use risingwave_pb::batch_plan::plan_node::NodeBody;
use risingwave_pb::batch_plan::LimitNode;

use super::{
ExprRewritable, LogicalLimit, PlanBase, PlanRef, PlanTreeNodeUnary, ToBatchPb,
ToDistributedBatch,
generic, ExprRewritable, PlanBase, PlanRef, PlanTreeNodeUnary, ToBatchPb, ToDistributedBatch,
};
use crate::optimizer::plan_node::ToLocalBatch;
use crate::optimizer::property::{Order, RequiredDist};
Expand All @@ -29,25 +28,23 @@ use crate::optimizer::property::{Order, RequiredDist};
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct BatchLimit {
pub base: PlanBase,
logical: LogicalLimit,
logical: generic::Limit<PlanRef>,
}

impl BatchLimit {
pub fn new(logical: LogicalLimit) -> Self {
let ctx = logical.base.ctx.clone();
let base = PlanBase::new_batch(
ctx,
logical.schema().clone(),
logical.input().distribution().clone(),
logical.input().order().clone(),
pub fn new(logical: generic::Limit<PlanRef>) -> Self {
let base = PlanBase::new_batch_from_logical(
&logical,
logical.input.distribution().clone(),
logical.input.order().clone(),
);
BatchLimit { base, logical }
}

fn two_phase_limit(&self, input: PlanRef) -> Result<PlanRef> {
let new_limit = self.logical.limit() + self.logical.offset();
let new_limit = self.logical.limit + self.logical.offset;
let new_offset = 0;
let logical_partial_limit = LogicalLimit::new(input, new_limit, new_offset);
let logical_partial_limit = generic::Limit::new(input, new_limit, new_offset);
let batch_partial_limit = Self::new(logical_partial_limit);
let any_order = Order::any();

Expand All @@ -74,22 +71,19 @@ impl BatchLimit {

impl fmt::Display for BatchLimit {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"BatchLimit {{ limit: {limit}, offset: {offset} }}",
limit = self.logical.limit,
offset = self.logical.offset
)
self.logical.fmt_with_name(f, "BatchLimit")
}
}

impl PlanTreeNodeUnary for BatchLimit {
fn input(&self) -> PlanRef {
self.logical.input()
self.logical.input.clone()
}

fn clone_with_input(&self, input: PlanRef) -> Self {
Self::new(self.logical.clone_with_input(input))
let mut core = self.logical.clone();
core.input = input;
Self::new(core)
}
}
impl_plan_tree_node_for_unary! {BatchLimit}
Expand All @@ -102,8 +96,8 @@ impl ToDistributedBatch for BatchLimit {
impl ToBatchPb for BatchLimit {
fn to_batch_prost_body(&self) -> NodeBody {
NodeBody::Limit(LimitNode {
limit: self.logical.limit(),
offset: self.logical.offset(),
limit: self.logical.limit,
offset: self.logical.offset,
})
}
}
Expand Down
8 changes: 4 additions & 4 deletions src/frontend/src/optimizer/plan_node/batch_topn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,13 @@ use risingwave_common::error::Result;
use risingwave_pb::batch_plan::plan_node::NodeBody;
use risingwave_pb::batch_plan::TopNNode;

use super::generic::Limit;
use super::generic::TopNLimit;
use super::utils::impl_distill_by_unit;
use super::{
generic, ExprRewritable, PlanBase, PlanRef, PlanTreeNodeUnary, ToBatchPb, ToDistributedBatch,
};
use crate::optimizer::plan_node::batch::BatchPlanRef;
use crate::optimizer::plan_node::{BatchLimit, LogicalLimit, ToLocalBatch};
use crate::optimizer::plan_node::{BatchLimit, ToLocalBatch};
use crate::optimizer::property::{Order, RequiredDist};

/// `BatchTopN` implements [`super::LogicalTopN`] to find the top N elements with a heap
Expand All @@ -45,13 +45,13 @@ impl BatchTopN {
}

fn two_phase_topn(&self, input: PlanRef) -> Result<PlanRef> {
let new_limit = Limit::new(
let new_limit = TopNLimit::new(
self.logical.limit_attr.limit() + self.logical.offset,
self.logical.limit_attr.with_ties(),
);
let new_offset = 0;
let partial_input: PlanRef = if input.order().satisfies(&self.logical.order) {
let logical_partial_limit = LogicalLimit::new(input, new_limit.limit(), new_offset);
let logical_partial_limit = generic::Limit::new(input, new_limit.limit(), new_offset);
let batch_partial_limit = BatchLimit::new(logical_partial_limit);
batch_partial_limit.into()
} else {
Expand Down
76 changes: 76 additions & 0 deletions src/frontend/src/optimizer/plan_node/generic/limit.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
// Copyright 2023 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::fmt;
use std::hash::Hash;

use pretty_xmlish::Pretty;
use risingwave_common::catalog::Schema;

use super::{DistillUnit, GenericPlanNode, GenericPlanRef};
use crate::optimizer::property::FunctionalDependencySet;
use crate::OptimizerContextRef;

#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct Limit<PlanRef> {
pub input: PlanRef,
pub limit: u64,
pub offset: u64,
}

impl<PlanRef: GenericPlanRef> GenericPlanNode for Limit<PlanRef> {
fn ctx(&self) -> OptimizerContextRef {
self.input.ctx()
}

fn schema(&self) -> Schema {
self.input.schema().clone()
}

fn functional_dependency(&self) -> FunctionalDependencySet {
self.input.functional_dependency().clone()
}

fn logical_pk(&self) -> Option<Vec<usize>> {
Some(self.input.logical_pk().to_vec())
}
}
impl<PlanRef> Limit<PlanRef> {
pub(crate) fn fmt_with_name(&self, f: &mut fmt::Formatter<'_>, name: &str) -> fmt::Result {
write!(
f,
"{} {{ limit: {}, offset: {} }}",
name, self.limit, self.offset
)
}

pub fn new(input: PlanRef, limit: u64, offset: u64) -> Self {
Limit {
input,
limit,
offset,
}
}
}

impl<PlanRef> DistillUnit for Limit<PlanRef> {
fn distill_with_name<'a>(&self, name: &'a str) -> Pretty<'a> {
Pretty::childless_record(
name,
vec![
("limit", Pretty::debug(&self.limit)),
("offset", Pretty::debug(&self.offset)),
],
)
}
}
2 changes: 2 additions & 0 deletions src/frontend/src/optimizer/plan_node/generic/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ mod delete;
pub use delete::*;
mod insert;
pub use insert::*;
mod limit;
pub use limit::*;

pub trait DistillUnit {
fn distill_with_name<'a>(&self, name: &'a str) -> Pretty<'a>;
Expand Down
22 changes: 11 additions & 11 deletions src/frontend/src/optimizer/plan_node/generic/top_n.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use crate::TableCatalog;
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct TopN<PlanRef> {
pub input: PlanRef,
pub limit_attr: Limit,
pub limit_attr: TopNLimit,
pub offset: u64,
pub order: Order,
pub group_key: Vec<usize>,
Expand Down Expand Up @@ -89,7 +89,7 @@ impl<PlanRef: stream::StreamPlanRef> TopN<PlanRef> {
impl<PlanRef: GenericPlanRef> TopN<PlanRef> {
pub fn with_group(
input: PlanRef,
limit_attr: Limit,
limit_attr: TopNLimit,
offset: u64,
order: Order,
group_key: Vec<usize>,
Expand All @@ -107,7 +107,7 @@ impl<PlanRef: GenericPlanRef> TopN<PlanRef> {
}
}

pub fn without_group(input: PlanRef, limit_attr: Limit, offset: u64, order: Order) -> Self {
pub fn without_group(input: PlanRef, limit_attr: TopNLimit, offset: u64, order: Order) -> Self {
if limit_attr.with_ties() {
assert!(offset == 0, "WITH TIES is not supported with OFFSET");
}
Expand Down Expand Up @@ -194,7 +194,7 @@ impl<PlanRef: GenericPlanRef> GenericPlanNode for TopN<PlanRef> {

/// [`Limit`] is used to specify the number of records to return.
#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
pub enum Limit {
pub enum TopNLimit {
/// The number of records returned is exactly the same as the number after `LIMIT` in the SQL
/// query.
Simple(u64),
Expand All @@ -204,7 +204,7 @@ pub enum Limit {
WithTies(u64),
}

impl Limit {
impl TopNLimit {
pub fn new(limit: u64, with_ties: bool) -> Self {
if with_ties {
Self::WithTies(limit)
Expand All @@ -215,24 +215,24 @@ impl Limit {

pub fn limit(&self) -> u64 {
match self {
Limit::Simple(limit) => *limit,
Limit::WithTies(limit) => *limit,
TopNLimit::Simple(limit) => *limit,
TopNLimit::WithTies(limit) => *limit,
}
}

pub fn with_ties(&self) -> bool {
match self {
Limit::Simple(_) => false,
Limit::WithTies(_) => true,
TopNLimit::Simple(_) => false,
TopNLimit::WithTies(_) => true,
}
}

/// Whether this [`Limit`] returns at most one record for each value. Only `LIMIT 1` without
/// `WITH TIES` satisfies this condition.
pub fn max_one_row(&self) -> bool {
match self {
Limit::Simple(limit) => *limit == 1,
Limit::WithTies(_) => false,
TopNLimit::Simple(limit) => *limit == 1,
TopNLimit::WithTies(_) => false,
}
}
}
6 changes: 3 additions & 3 deletions src/frontend/src/optimizer/plan_node/logical_dedup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use itertools::Itertools;
use risingwave_common::error::Result;
use risingwave_common::util::column_index_mapping::ColIndexMapping;

use super::generic::Limit;
use super::generic::TopNLimit;
use super::utils::impl_distill_by_unit;
use super::{
gen_filter_and_pushdown, generic, BatchGroupTopN, ColPrunable, ColumnPruningContext,
Expand Down Expand Up @@ -116,7 +116,7 @@ impl ToStream for LogicalDedup {
// If the input is not append-only, we use a `StreamGroupTopN` with the limit being 1.
let logical_top_n = generic::TopN::with_group(
input,
Limit::new(1, false),
TopNLimit::new(1, false),
0,
Order::default(),
self.dedup_cols().to_vec(),
Expand All @@ -131,7 +131,7 @@ impl ToBatch for LogicalDedup {
let input = self.input().to_batch()?;
let logical_top_n = generic::TopN::with_group(
input,
Limit::new(1, false),
TopNLimit::new(1, false),
0,
Order::default(),
self.dedup_cols().to_vec(),
Expand Down
Loading

0 comments on commit ab86af8

Please sign in to comment.