Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(batch): ensure BatchSeqScan runs on compute node #7240

Merged
merged 26 commits into from
Jan 10, 2023
Merged

Conversation

kwannoel
Copy link
Contributor

@kwannoel kwannoel commented Jan 6, 2023

I hereby agree to the terms of the Singularity Data, Inc. Contributor License Agreement.

What's changed and what's your intention?

Batch Scan should run on compute node. This means all table scan should have exchange before, so they won't be in root node.

Local Execution Mode: Change Distribution to SomeShard, such that an Exchange will be inserted when enforcing the distribution.
Distributed Execution Mode: Ensure all table scans have exchange before, by fixing require_addiitonal_exchange_on_root.

Documentation

If your pull request contains user-facing changes, please specify the types of the changes, and create a release note. Otherwise, please feel free to remove this section.

Types of user-facing changes

Please keep the types that apply to your changes, and remove those that do not apply.

  • Installation and deployment
  • Connector (sources & sinks)
  • SQL commands, functions, and operators
  • RisingWave cluster configuration changes
  • Other (please specify in the release note below)

Release note

Please create a release note for your changes. In the release note, focus on the impact on users, and mention the environment or conditions where the impact may occur.

Refer to a related PR or issue link (optional)

Closes #7115

@github-actions github-actions bot added the type/fix Bug fix label Jan 6, 2023
@kwannoel kwannoel changed the title fix(batch): insert exchange to ensure seq scan runs on compute node fix(batch): insert Exchange so BatchSeqScan runs on compute node Jan 6, 2023
@kwannoel kwannoel marked this pull request as ready for review January 9, 2023 07:00
Comment on lines 510 to 519
// We remark that since the `to_local_with_order_required` does not enforce single
// distribution, we enforce at the root if needed.
let insert_exchange = match plan.distribution() {
Distribution::Single => Self::require_additional_exchange_on_root(plan.clone()),
_ => true,
};
if insert_exchange {
plan =
BatchExchange::new(plan, self.required_order.clone(), Distribution::Single).into()
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This logic seems to be used to handle dml. I think we should keep it.

Comment on lines 570 to 572
// Ensure there is exchange before all seq scan.
plan = Self::enforce_exchange_above_table_scan(plan, &self.required_order);
Copy link
Contributor

@chenzl25 chenzl25 Jan 9, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMHO, the enforcement here is trying to fix some unexpected plans at the end, while I think it could be more proper to handle it at the to_local() method of the BatchSeqScan and BatchSource. We can try to provide SomeShard for the BatchSeqScan in local mode, so that we can always enforce an exchange on the top of table scan and by the way we can utilize the optimization such as push filter/ project through the exchange.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMHO, the enforcement here is trying to fix some unexpected plans at the end, while I think it could be more proper to handle it at the to_local() method of the BatchSeqScan and BatchSource. >

Good suggestion, I think that makes more sense.

We can try to provide SomeShard for the BatchSeqScan in local mode, so that we can always enforce an exchange on the top of table scan

Hmm don't quite understand this. Is SomeShard required to enforce an exchange on top of table scan? Why is that so?

by the way we can utilize the optimization such as push filter/ project through the exchange.

Yes I think this is good suggestion. Thanks.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For local batch plans, almost all operators will require their input with Singleton distribution, while some TableScan's distribution actually is Singleton. Our enforcement will think since the distribution is already satisfied, we can just skip placing an exchange operator here. But as we know, in local execution we need an exchange operator to keep table scan run in the CN. So I think we can hack the distribution for table scan when we call the to_local() method and return a new clone table scan with SomeShard distribution so that the enforcement of the exchange will always work because we require a Singleton distribution while the table can can only provide the SomeShard distribution now.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 for @chenzl25 's suggestion. The root cause of the issue in #7115 is that left table is a singleton, so we can't add exechange on top of table scan. I think in local mode we can always treats table scan as not single, so the overall process need not to change much.

@codecov
Copy link

codecov bot commented Jan 9, 2023

Codecov Report

Merging #7240 (9636785) into main (03bef46) will increase coverage by 0.02%.
The diff coverage is 96.55%.

@@            Coverage Diff             @@
##             main    #7240      +/-   ##
==========================================
+ Coverage   73.06%   73.08%   +0.02%     
==========================================
  Files        1067     1067              
  Lines      170734   170760      +26     
==========================================
+ Hits       124750   124806      +56     
+ Misses      45984    45954      -30     
Flag Coverage Δ
rust 73.08% <96.55%> (+0.02%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

Impacted Files Coverage Δ
...frontend/src/optimizer/plan_node/batch_seq_scan.rs 94.85% <90.90%> (-0.30%) ⬇️
src/frontend/src/optimizer/mod.rs 96.61% <100.00%> (+0.10%) ⬆️
src/common/src/cache.rs 97.31% <0.00%> (-0.23%) ⬇️
src/stream/src/executor/aggregation/minput.rs 96.49% <0.00%> (+0.10%) ⬆️
src/storage/src/hummock/sstable_store.rs 64.91% <0.00%> (+0.18%) ⬆️
src/common/src/types/ordered_float.rs 31.25% <0.00%> (+0.19%) ⬆️
src/object_store/src/object/mod.rs 51.51% <0.00%> (+0.21%) ⬆️
src/meta/src/manager/cluster.rs 77.11% <0.00%> (+0.24%) ⬆️
src/meta/src/hummock/mock_hummock_meta_client.rs 66.31% <0.00%> (+0.52%) ⬆️
... and 3 more

📣 We’re building smart automated test selection to slash your CI/CD build times. Learn more

Copy link
Contributor

@liurenjie1024 liurenjie1024 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Generally LGTM, just need to fix the tests.

create materialized view v as select count(*) cnt from t;

statement ok
SET QUERY_MODE TO local;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just rename the suffix of this file to .slt.part, and it will run in both local mode and distributed mode.

Copy link
Contributor Author

@kwannoel kwannoel Jan 10, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Haven't fixed for distributed_mode yet unfortunately. That's why use this as workaround for now. Only local_mode is fixed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh just got what you mean after thinking about it. Shall change it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add some data to verify it.

Copy link
Contributor

@chenzl25 chenzl25 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM!!

@kwannoel kwannoel changed the title fix(batch): insert Exchange so BatchSeqScan runs on compute node fix(batch): ensure BatchSeqScan runs on compute node Jan 10, 2023
@mergify mergify bot merged commit 4c87356 into main Jan 10, 2023
@mergify mergify bot deleted the kwannoel/batch branch January 10, 2023 10:24
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
type/fix Bug fix
Projects
None yet
Development

Successfully merging this pull request may close these issues.

bug(batch): Batch scheduler panics if one side of the join has singleton exchange and none for the other side.
4 participants