Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(batch): ensure BatchSeqScan runs on compute node #7240

Merged
merged 26 commits into from
Jan 10, 2023
Merged
Show file tree
Hide file tree
Changes from 21 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions e2e_test/batch/join/issue_7115.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
# https://github.com/risingwavelabs/risingwave/issues/7115

statement ok
create table t (id int);

statement ok
create materialized view v as select count(*) cnt from t;

statement ok
SET QUERY_MODE TO local;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just rename the suffix of this file to .slt.part, and it will run in both local mode and distributed mode.

Copy link
Contributor Author

@kwannoel kwannoel Jan 10, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Haven't fixed for distributed_mode yet unfortunately. That's why use this as workaround for now. Only local_mode is fixed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh just got what you mean after thinking about it. Shall change it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add some data to verify it.


query I
select * from v join (select count(*) from t) T;
----

# FIXME(Noel): Distributed mode not debugged yet.
# statement ok
# SET query_mode TO distributed;
#
# query I
# select * from v join (select count(*) from t) T;
# ----

statement ok
DROP MATERIALIZED VIEW v;

statement ok
DROP TABLE t;
14 changes: 7 additions & 7 deletions src/frontend/planner_test/tests/testdata/batch_dist_agg.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@
└─BatchSimpleAgg { aggs: [max(s.v)] }
└─BatchScan { table: s, columns: [s.v], distribution: Single }
batch_local_plan: |
BatchExchange { order: [], dist: Single }
└─BatchSimpleAgg { aggs: [max(s.v)] }
└─BatchScan { table: s, columns: [s.v], distribution: Single }
BatchSimpleAgg { aggs: [max(s.v)] }
└─BatchExchange { order: [], dist: Single }
kwannoel marked this conversation as resolved.
Show resolved Hide resolved
└─BatchScan { table: s, columns: [s.v], distribution: SomeShard }
- id: extreme_on_T
before:
- create_tables
Expand Down Expand Up @@ -162,7 +162,7 @@
└─BatchExchange { order: [], dist: HashShard(s.k) }
└─BatchScan { table: s, columns: [s.k, s.v], distribution: Single }
batch_local_plan: |
BatchExchange { order: [], dist: Single }
└─BatchProject { exprs: [max(s.v)] }
└─BatchHashAgg { group_key: [s.k], aggs: [max(s.v)] }
└─BatchScan { table: s, columns: [s.k, s.v], distribution: Single }
BatchProject { exprs: [max(s.v)] }
└─BatchHashAgg { group_key: [s.k], aggs: [max(s.v)] }
└─BatchExchange { order: [], dist: Single }
kwannoel marked this conversation as resolved.
Show resolved Hide resolved
└─BatchScan { table: s, columns: [s.k, s.v], distribution: SomeShard }
15 changes: 9 additions & 6 deletions src/frontend/planner_test/tests/testdata/dynamic_filter.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,9 @@
└─LogicalProject { exprs: [t2.v2, t2.v2] }
└─LogicalScan { table: t2, columns: [t2.v2] }
stream_error: |-
Feature is not yet implemented: stream nested-loop join
No tracking issue yet. Feel free to submit a feature request at https://github.com/risingwavelabs/risingwave/issues/new?labels=type%2Ffeature&template=feature_request.yml
Not supported: streaming nested-loop join
HINT: The non-equal join in the query requires a nested-loop join executor, which could be very expensive to run. Consider rewriting the query to use dynamic filter as a substitute if possible.
See also: https://github.com/risingwavelabs/rfcs/blob/main/rfcs/0033-dynamic-filter.md
- name: |
Output indices of Dynamic Filter
TODO: currently implemented by adding a Project, https://github.com/risingwavelabs/risingwave/issues/3419
Expand Down Expand Up @@ -80,8 +81,9 @@
└─LogicalProject { exprs: [t2.v2, t2.v2] }
└─LogicalScan { table: t2, columns: [t2.v2] }
stream_error: |-
Feature is not yet implemented: stream nested-loop join
No tracking issue yet. Feel free to submit a feature request at https://github.com/risingwavelabs/risingwave/issues/new?labels=type%2Ffeature&template=feature_request.yml
Not supported: streaming nested-loop join
HINT: The non-equal join in the query requires a nested-loop join executor, which could be very expensive to run. Consider rewriting the query to use dynamic filter as a substitute if possible.
See also: https://github.com/risingwavelabs/rfcs/blob/main/rfcs/0033-dynamic-filter.md
- name: Ensure error on output columns from inner
before:
- create_tables
Expand All @@ -93,8 +95,9 @@
└─LogicalAgg { aggs: [max(t2.v2)] }
└─LogicalScan { table: t2, columns: [t2.v2] }
stream_error: |-
Feature is not yet implemented: stream nested-loop join
No tracking issue yet. Feel free to submit a feature request at https://github.com/risingwavelabs/risingwave/issues/new?labels=type%2Ffeature&template=feature_request.yml
Not supported: streaming nested-loop join
HINT: The non-equal join in the query requires a nested-loop join executor, which could be very expensive to run. Consider rewriting the query to use dynamic filter as a substitute if possible.
See also: https://github.com/risingwavelabs/rfcs/blob/main/rfcs/0033-dynamic-filter.md
- name: Use Inner Join for equi condition
before:
- create_tables
Expand Down
13 changes: 13 additions & 0 deletions src/frontend/planner_test/tests/testdata/local_execution_mode.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# This file is automatically generated. See `src/frontend/planner_test/README.md` for more information.
- id: create_tables
sql: |
create table t (id int);
create materialized view v as select count(*) cnt from t;
select * from v join (select count(*) from t) T;
batch_local_plan: |
BatchNestedLoopJoin { type: Inner, predicate: true, output: all }
├─BatchExchange { order: [], dist: Single }
| └─BatchScan { table: v, columns: [v.cnt], distribution: SomeShard }
└─BatchSimpleAgg { aggs: [count] }
└─BatchExchange { order: [], dist: Single }
└─BatchScan { table: t, columns: [], distribution: SomeShard }
16 changes: 8 additions & 8 deletions src/frontend/planner_test/tests/testdata/singleton.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
└─BatchScan { table: mv, columns: [mv.v], distribution: Single }
batch_local_plan: |
BatchExchange { order: [], dist: Single }
└─BatchScan { table: mv, columns: [mv.v], distribution: Single }
└─BatchScan { table: mv, columns: [mv.v], distribution: SomeShard }
- id: select_from_singleton_mv_join
before:
- create_singleton_mv
Expand All @@ -25,9 +25,9 @@
└─BatchExchange { order: [], dist: UpstreamHashShard(mv.v) }
└─BatchScan { table: mv, columns: [mv.v], distribution: Single }
batch_local_plan: |
BatchExchange { order: [], dist: Single }
└─BatchLookupJoin { type: Inner, predicate: mv.v = mv.v, output: [mv.v] }
└─BatchScan { table: mv, columns: [mv.v], distribution: Single }
BatchLookupJoin { type: Inner, predicate: mv.v = mv.v, output: [mv.v] }
└─BatchExchange { order: [], dist: Single }
└─BatchScan { table: mv, columns: [mv.v], distribution: SomeShard }
- id: select_from_singleton_mv_join_top_n
before:
- create_singleton_mv
Expand All @@ -43,8 +43,8 @@
└─BatchScan { table: mv, columns: [mv.v], distribution: Single }
batch_local_plan: |
BatchProject { exprs: [mv.v] }
└─BatchExchange { order: [mv.v ASC], dist: Single }
└─BatchTopN { order: "[mv.v ASC]", limit: 10, offset: 0 }
└─BatchTopN { order: "[mv.v ASC]", limit: 10, offset: 0 }
└─BatchTopN { order: "[mv.v ASC]", limit: 10, offset: 0 }
└─BatchLookupJoin { type: Inner, predicate: mv.v = mv.v, output: [mv.v, mv.v] }
└─BatchScan { table: mv, columns: [mv.v], distribution: Single }
└─BatchLookupJoin { type: Inner, predicate: mv.v = mv.v, output: [mv.v, mv.v] }
└─BatchExchange { order: [], dist: Single }
└─BatchScan { table: mv, columns: [mv.v], distribution: SomeShard }
5 changes: 3 additions & 2 deletions src/frontend/planner_test/tests/testdata/subquery.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -351,5 +351,6 @@
select * from auction AS hop_1
where EXISTS (select hop_1.date_time from auction group by hop_1.date_time );
stream_error: |-
Feature is not yet implemented: stream nested-loop join
No tracking issue yet. Feel free to submit a feature request at https://github.com/risingwavelabs/risingwave/issues/new?labels=type%2Ffeature&template=feature_request.yml
Not supported: streaming nested-loop join
HINT: The non-equal join in the query requires a nested-loop join executor, which could be very expensive to run. Consider rewriting the query to use dynamic filter as a substitute if possible.
See also: https://github.com/risingwavelabs/rfcs/blob/main/rfcs/0033-dynamic-filter.md
15 changes: 14 additions & 1 deletion src/frontend/src/optimizer/plan_node/batch_seq_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,19 @@ impl ToBatchProst for BatchSeqScan {

impl ToLocalBatch for BatchSeqScan {
fn to_local(&self) -> Result<PlanRef> {
Ok(self.clone_with_dist().into())
let dist =
if self.logical.is_sys_table() {
Distribution::Single
} else if let Some(distribution_key) = self.logical.distribution_key()
&& !distribution_key.is_empty() {
Distribution::UpstreamHashShard(
distribution_key,
self.logical.table_desc().table_id,
)
} else {
// NOTE(kwannoel): This is a hack to force an exchange to always be inserted before scan.
Distribution::SomeShard
};
Ok(Self::new_inner(self.logical.clone(), dist, self.scan_ranges.clone()).into())
}
}