diff --git a/e2e_test/batch/join/issue_7115.slt.part b/e2e_test/batch/join/issue_7115.slt.part new file mode 100644 index 000000000000..85ce1682d758 --- /dev/null +++ b/e2e_test/batch/join/issue_7115.slt.part @@ -0,0 +1,23 @@ +# https://github.com/risingwavelabs/risingwave/issues/7115 +statement ok +SET RW_IMPLICIT_FLUSH TO true; + +statement ok +CREATE TABLE t (id int); + +statement ok +CREATE MATERIALIZED VIEW v as SELECT COUNT(*) cnt FROM t; + +statement ok +INSERT INTO t VALUES(1), (4), (6), (0), (5); + +query II +SELECT * FROM v JOIN (SELECT COUNT(*) FROM t) T; +---- +5 5 + +statement ok +DROP MATERIALIZED VIEW v; + +statement ok +DROP TABLE t; \ No newline at end of file diff --git a/src/frontend/planner_test/tests/testdata/batch_dist_agg.yaml b/src/frontend/planner_test/tests/testdata/batch_dist_agg.yaml index e5c2ee49d024..36d820523936 100644 --- a/src/frontend/planner_test/tests/testdata/batch_dist_agg.yaml +++ b/src/frontend/planner_test/tests/testdata/batch_dist_agg.yaml @@ -15,9 +15,9 @@ └─BatchSimpleAgg { aggs: [max(s.v)] } └─BatchScan { table: s, columns: [s.v], distribution: Single } batch_local_plan: | - BatchExchange { order: [], dist: Single } - └─BatchSimpleAgg { aggs: [max(s.v)] } - └─BatchScan { table: s, columns: [s.v], distribution: Single } + BatchSimpleAgg { aggs: [max(s.v)] } + └─BatchExchange { order: [], dist: Single } + └─BatchScan { table: s, columns: [s.v], distribution: SomeShard } - id: extreme_on_T before: - create_tables @@ -162,7 +162,7 @@ └─BatchExchange { order: [], dist: HashShard(s.k) } └─BatchScan { table: s, columns: [s.k, s.v], distribution: Single } batch_local_plan: | - BatchExchange { order: [], dist: Single } - └─BatchProject { exprs: [max(s.v)] } - └─BatchHashAgg { group_key: [s.k], aggs: [max(s.v)] } - └─BatchScan { table: s, columns: [s.k, s.v], distribution: Single } + BatchProject { exprs: [max(s.v)] } + └─BatchHashAgg { group_key: [s.k], aggs: [max(s.v)] } + └─BatchExchange { order: [], dist: Single } + └─BatchScan { table: s, columns: [s.k, s.v], distribution: SomeShard } diff --git a/src/frontend/planner_test/tests/testdata/batch_seq_scan.yaml b/src/frontend/planner_test/tests/testdata/batch_seq_scan.yaml new file mode 100644 index 000000000000..7dd4c445a10a --- /dev/null +++ b/src/frontend/planner_test/tests/testdata/batch_seq_scan.yaml @@ -0,0 +1,23 @@ +# This file is automatically generated. See `src/frontend/planner_test/README.md` for more information. +- id: batch_seq_scan_local + name: Test that BatchSeqScan pushed to compute node in local mode + sql: | + create table t (id int); + create materialized view v as select count(*) cnt from t; + SET QUERY_MODE TO distributed; + select * from v join (select count(*) from t) T; + batch_plan: | + BatchExchange { order: [], dist: Single } + └─BatchNestedLoopJoin { type: Inner, predicate: true, output: all } + ├─BatchScan { table: v, columns: [v.cnt], distribution: Single } + └─BatchSimpleAgg { aggs: [sum0(count)] } + └─BatchExchange { order: [], dist: Single } + └─BatchSimpleAgg { aggs: [count] } + └─BatchScan { table: t, columns: [], distribution: SomeShard } + batch_local_plan: | + BatchNestedLoopJoin { type: Inner, predicate: true, output: all } + ├─BatchExchange { order: [], dist: Single } + | └─BatchScan { table: v, columns: [v.cnt], distribution: SomeShard } + └─BatchSimpleAgg { aggs: [count] } + └─BatchExchange { order: [], dist: Single } + └─BatchScan { table: t, columns: [], distribution: SomeShard } diff --git a/src/frontend/planner_test/tests/testdata/dynamic_filter.yaml b/src/frontend/planner_test/tests/testdata/dynamic_filter.yaml index 36cbde64c18b..e61c906616f5 100644 --- a/src/frontend/planner_test/tests/testdata/dynamic_filter.yaml +++ b/src/frontend/planner_test/tests/testdata/dynamic_filter.yaml @@ -39,8 +39,9 @@ └─LogicalProject { exprs: [t2.v2, t2.v2] } └─LogicalScan { table: t2, columns: [t2.v2] } stream_error: |- - Feature is not yet implemented: stream nested-loop join - No tracking issue yet. Feel free to submit a feature request at https://github.com/risingwavelabs/risingwave/issues/new?labels=type%2Ffeature&template=feature_request.yml + Not supported: streaming nested-loop join + HINT: The non-equal join in the query requires a nested-loop join executor, which could be very expensive to run. Consider rewriting the query to use dynamic filter as a substitute if possible. + See also: https://github.com/risingwavelabs/rfcs/blob/main/rfcs/0033-dynamic-filter.md - name: | Output indices of Dynamic Filter TODO: currently implemented by adding a Project, https://github.com/risingwavelabs/risingwave/issues/3419 @@ -80,8 +81,9 @@ └─LogicalProject { exprs: [t2.v2, t2.v2] } └─LogicalScan { table: t2, columns: [t2.v2] } stream_error: |- - Feature is not yet implemented: stream nested-loop join - No tracking issue yet. Feel free to submit a feature request at https://github.com/risingwavelabs/risingwave/issues/new?labels=type%2Ffeature&template=feature_request.yml + Not supported: streaming nested-loop join + HINT: The non-equal join in the query requires a nested-loop join executor, which could be very expensive to run. Consider rewriting the query to use dynamic filter as a substitute if possible. + See also: https://github.com/risingwavelabs/rfcs/blob/main/rfcs/0033-dynamic-filter.md - name: Ensure error on output columns from inner before: - create_tables @@ -93,8 +95,9 @@ └─LogicalAgg { aggs: [max(t2.v2)] } └─LogicalScan { table: t2, columns: [t2.v2] } stream_error: |- - Feature is not yet implemented: stream nested-loop join - No tracking issue yet. Feel free to submit a feature request at https://github.com/risingwavelabs/risingwave/issues/new?labels=type%2Ffeature&template=feature_request.yml + Not supported: streaming nested-loop join + HINT: The non-equal join in the query requires a nested-loop join executor, which could be very expensive to run. Consider rewriting the query to use dynamic filter as a substitute if possible. + See also: https://github.com/risingwavelabs/rfcs/blob/main/rfcs/0033-dynamic-filter.md - name: Use Inner Join for equi condition before: - create_tables diff --git a/src/frontend/planner_test/tests/testdata/singleton.yaml b/src/frontend/planner_test/tests/testdata/singleton.yaml index 6796f7bd8463..93b27e0c28b3 100644 --- a/src/frontend/planner_test/tests/testdata/singleton.yaml +++ b/src/frontend/planner_test/tests/testdata/singleton.yaml @@ -13,7 +13,7 @@ └─BatchScan { table: mv, columns: [mv.v], distribution: Single } batch_local_plan: | BatchExchange { order: [], dist: Single } - └─BatchScan { table: mv, columns: [mv.v], distribution: Single } + └─BatchScan { table: mv, columns: [mv.v], distribution: SomeShard } - id: select_from_singleton_mv_join before: - create_singleton_mv @@ -25,9 +25,9 @@ └─BatchExchange { order: [], dist: UpstreamHashShard(mv.v) } └─BatchScan { table: mv, columns: [mv.v], distribution: Single } batch_local_plan: | - BatchExchange { order: [], dist: Single } - └─BatchLookupJoin { type: Inner, predicate: mv.v = mv.v, output: [mv.v] } - └─BatchScan { table: mv, columns: [mv.v], distribution: Single } + BatchLookupJoin { type: Inner, predicate: mv.v = mv.v, output: [mv.v] } + └─BatchExchange { order: [], dist: Single } + └─BatchScan { table: mv, columns: [mv.v], distribution: SomeShard } - id: select_from_singleton_mv_join_top_n before: - create_singleton_mv @@ -43,8 +43,8 @@ └─BatchScan { table: mv, columns: [mv.v], distribution: Single } batch_local_plan: | BatchProject { exprs: [mv.v] } - └─BatchExchange { order: [mv.v ASC], dist: Single } + └─BatchTopN { order: "[mv.v ASC]", limit: 10, offset: 0 } └─BatchTopN { order: "[mv.v ASC]", limit: 10, offset: 0 } - └─BatchTopN { order: "[mv.v ASC]", limit: 10, offset: 0 } - └─BatchLookupJoin { type: Inner, predicate: mv.v = mv.v, output: [mv.v, mv.v] } - └─BatchScan { table: mv, columns: [mv.v], distribution: Single } + └─BatchLookupJoin { type: Inner, predicate: mv.v = mv.v, output: [mv.v, mv.v] } + └─BatchExchange { order: [], dist: Single } + └─BatchScan { table: mv, columns: [mv.v], distribution: SomeShard } diff --git a/src/frontend/planner_test/tests/testdata/subquery.yaml b/src/frontend/planner_test/tests/testdata/subquery.yaml index 6318d8332956..6551564c73e0 100644 --- a/src/frontend/planner_test/tests/testdata/subquery.yaml +++ b/src/frontend/planner_test/tests/testdata/subquery.yaml @@ -351,5 +351,6 @@ select * from auction AS hop_1 where EXISTS (select hop_1.date_time from auction group by hop_1.date_time ); stream_error: |- - Feature is not yet implemented: stream nested-loop join - No tracking issue yet. Feel free to submit a feature request at https://github.com/risingwavelabs/risingwave/issues/new?labels=type%2Ffeature&template=feature_request.yml + Not supported: streaming nested-loop join + HINT: The non-equal join in the query requires a nested-loop join executor, which could be very expensive to run. Consider rewriting the query to use dynamic filter as a substitute if possible. + See also: https://github.com/risingwavelabs/rfcs/blob/main/rfcs/0033-dynamic-filter.md diff --git a/src/frontend/src/optimizer/mod.rs b/src/frontend/src/optimizer/mod.rs index d02a0dab73a4..2658fb0c2799 100644 --- a/src/frontend/src/optimizer/mod.rs +++ b/src/frontend/src/optimizer/mod.rs @@ -41,8 +41,7 @@ use self::plan_node::{ #[cfg(debug_assertions)] use self::plan_visitor::InputRefValidator; use self::plan_visitor::{ - has_batch_exchange, has_batch_seq_scan, has_batch_seq_scan_where, has_batch_source, - has_logical_apply, has_logical_over_agg, HasMaxOneRowApply, + has_batch_exchange, has_logical_apply, has_logical_over_agg, HasMaxOneRowApply, }; use self::property::RequiredDist; use self::rule::*; @@ -50,7 +49,7 @@ use crate::catalog::column_catalog::ColumnCatalog; use crate::catalog::table_catalog::{TableType, TableVersion}; use crate::handler::create_table::DmlFlag; use crate::optimizer::plan_node::{ - BatchExchange, ColumnPruningContext, PlanNodeType, PredicatePushdownContext, + BatchExchange, ColumnPruningContext, PlanNodeType, PlanTreeNode, PredicatePushdownContext, }; use crate::optimizer::property::Distribution; use crate::utils::Condition; @@ -455,12 +454,25 @@ impl PlanRoot { /// As we always run the root stage locally, we should ensure that singleton table scan is not /// the root stage. Returns `true` if we must insert an additional exchange to ensure this. fn require_additional_exchange_on_root(plan: PlanRef) -> bool { - assert_eq!(plan.distribution(), &Distribution::Single); + fn is_candidate_table_scan(plan: &PlanRef) -> bool { + if let Some(node) = plan.as_batch_seq_scan() + && !node.logical().is_sys_table() { + true + } else { + plan.node_type() == PlanNodeType::BatchSource + } + } - !has_batch_exchange(plan.clone()) // there's no (single) exchange - && ((has_batch_seq_scan(plan.clone()) // but there's a seq scan (which must be single) - && !has_batch_seq_scan_where(plan.clone(), |s| s.logical().is_sys_table())) // and it's not a system table - || has_batch_source(plan)) // or there's a source + fn no_exchange_before_table_scan(plan: PlanRef) -> bool { + if plan.node_type() == PlanNodeType::BatchExchange { + return false; + } + is_candidate_table_scan(&plan) + || plan.inputs().into_iter().any(no_exchange_before_table_scan) + } + + assert_eq!(plan.distribution(), &Distribution::Single); + no_exchange_before_table_scan(plan) // TODO: join between a normal table and a system table is not supported yet } diff --git a/src/frontend/src/optimizer/plan_node/batch_seq_scan.rs b/src/frontend/src/optimizer/plan_node/batch_seq_scan.rs index 4dbcd07161fd..f0aa09edf7c8 100644 --- a/src/frontend/src/optimizer/plan_node/batch_seq_scan.rs +++ b/src/frontend/src/optimizer/plan_node/batch_seq_scan.rs @@ -246,6 +246,19 @@ impl ToBatchProst for BatchSeqScan { impl ToLocalBatch for BatchSeqScan { fn to_local(&self) -> Result { - Ok(self.clone_with_dist().into()) + let dist = + if self.logical.is_sys_table() { + Distribution::Single + } else if let Some(distribution_key) = self.logical.distribution_key() + && !distribution_key.is_empty() { + Distribution::UpstreamHashShard( + distribution_key, + self.logical.table_desc().table_id, + ) + } else { + // NOTE(kwannoel): This is a hack to force an exchange to always be inserted before scan. + Distribution::SomeShard + }; + Ok(Self::new_inner(self.logical.clone(), dist, self.scan_ranges.clone()).into()) } }