Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: new table stream graph #12240

Merged
merged 25 commits into from
Nov 15, 2023
Merged

feat: new table stream graph #12240

merged 25 commits into from
Nov 15, 2023

Conversation

shanicky
Copy link
Contributor

@shanicky shanicky commented Sep 12, 2023

I hereby agree to the terms of the RisingWave Labs, Inc. Contributor License Agreement.

What's changed and what's your intention?

WIP, Some details require further discussion

with external source

no user defined primary key

image
dev=> explain create table t (v1 int)  with (
    connector = 'datagen',
    fields.v1.kind = 'sequence',
    fields.v1.start = '1',
    fields.v1.end = '1000',
    datagen.split.num = '5',
) FORMAT PLAIN ENCODE JSON ;
                                                         QUERY PLAN
----------------------------------------------------------------------------------------------------------------------------
 StreamMaterialize { columns: [v1, _row_id(hidden)], stream_key: [_row_id], pk_columns: [_row_id], pk_conflict: Overwrite }
 └─StreamRowIdGen { row_id_index: 1 }
   └─StreamUnion { all: true }
     ├─StreamExchange [no_shuffle] { dist: SomeShard }
     │ └─StreamSource { source: t, columns: [v1, _row_id] }
     └─StreamExchange { dist: HashShard(_row_id) }
       └─StreamDml { columns: [v1, _row_id] }
         └─StreamSource
(8 rows)
image

append only

image
dev=> explain create table t (v1 int) append only  with (
    connector = 'datagen',
    fields.v1.kind = 'sequence',
    fields.v1.start = '1',
    fields.v1.end = '1000',
    datagen.split.num = '5',
) FORMAT PLAIN ENCODE JSON ;
                                                        QUERY PLAN
--------------------------------------------------------------------------------------------------------------------------
 StreamMaterialize { columns: [v1, _row_id(hidden)], stream_key: [_row_id], pk_columns: [_row_id], pk_conflict: NoCheck }
 └─StreamRowIdGen { row_id_index: 1 }
   └─StreamUnion { all: true }
     ├─StreamExchange [no_shuffle] { dist: SomeShard }
     │ └─StreamSource { source: t, columns: [v1, _row_id] }
     └─StreamExchange [no_shuffle] { dist: SomeShard }
       └─StreamDml { columns: [v1, _row_id] }
         └─StreamSource
(8 rows)
image

user defined primary key

image
dev=> explain create table t (v1 int primary key) with (
    connector = 'datagen',
    fields.v1.kind = 'sequence',
    fields.v1.start = '1',
    fields.v1.end = '1000',
    datagen.split.num = '5',
) FORMAT PLAIN ENCODE JSON ;
                                           QUERY PLAN
-------------------------------------------------------------------------------------------------
 StreamMaterialize { columns: [v1], stream_key: [v1], pk_columns: [v1], pk_conflict: Overwrite }
 └─StreamUnion { all: true }
   ├─StreamExchange { dist: HashShard(v1) }
   │ └─StreamSource { source: t, columns: [v1] }
   └─StreamExchange { dist: HashShard(v1) }
     └─StreamDml { columns: [v1] }
       └─StreamSource
(7 rows)
image

without external source

The result of removing the source fragment (even though it might seem strange).

no user defined primary key

image

append only

image

user defined primary key

image

Checklist

  • I have written necessary rustdoc comments
  • I have added necessary unit tests and integration tests
  • All checks passed in ./risedev check (or alias, ./risedev c)

@shanicky shanicky changed the title [wip] feat: new table dag [wip] feat: new table stream graph Sep 12, 2023
@shanicky shanicky marked this pull request as ready for review September 19, 2023 08:09
Comment on lines +477 to +483
dml_node = match kind {
PrimaryKeyKind::UserDefinedPrimaryKey | PrimaryKeyKind::RowIdAsPrimaryKey => {
RequiredDist::hash_shard(pk_column_indices)
.enforce_if_not_satisfies(dml_node, &Order::any())?
}
PrimaryKeyKind::AppendOnly => StreamExchange::new_no_shuffle(dml_node).into(),
};
Copy link
Contributor

@st1page st1page Sep 19, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The dist keys changes in planner test is because now we generate exchange here but previouesly we add it in materialize node. with

let table_required_dist = {

@shanicky
Copy link
Contributor Author

I used the following code to test its compatibility with the current main branch, and it seems to have no problems.

on main

dev=> create table t_no_pk (v1 int)  with (
    connector = 'datagen',
    fields.v1.kind = 'sequence',
    fields.v1.start = '1',
    fields.v1.end = '1000',
    datagen.split.num = '5',
) FORMAT PLAIN ENCODE JSON ;
CREATE_TABLE
dev=> create table t_ap (v1 int) append only  with (
    connector = 'datagen',
    fields.v1.kind = 'sequence',
    fields.v1.start = '1',
    fields.v1.end = '1000',
    datagen.split.num = '5',
) FORMAT PLAIN ENCODE JSON ;
NOTICE:  APPEND ONLY TABLE is currently an experimental feature.
CREATE_TABLE
dev=> create table t_pk (v1 int primary key) with (
    connector = 'datagen',
    fields.v1.kind = 'sequence',
    fields.v1.start = '1',
    fields.v1.end = '1000',
    datagen.split.num = '5',
) FORMAT PLAIN ENCODE JSON ;
CREATE_TABLE
dev=> create table tt_no_pk (v1 int);
CREATE_TABLE
dev=> create table tt_ap (v1 int) append only;
NOTICE:  APPEND ONLY TABLE is currently an experimental feature.
CREATE_TABLE
dev=> create table tt_pk (v1 int primary key);
CREATE_TABLE
dev=> insert into tt_no_pk values (1);
INSERT 0 1
dev=> insert into tt_ap values (1);
INSERT 0 1
dev=> insert into tt_pk values (1);
INSERT 0 1
dev=>

restart to current branch

dev=> alter table tt_no_pk add column v2 int;
ALTER_TABLE
dev=> select * from tt_no_pk ;
 v1 | v2
----+----
  1 |
(1 row)

dev=> alter table tt_ap add column v2 int ;
ALTER_TABLE
dev=> select * from tt_ap ;
 v1 | v2
----+----
  1 |
(1 row)

dev=> alter table tt_pk add column v2 int ;
ALTER_TABLE
dev=> select * from tt_pk ;
 v1 | v2
----+----
  1 |
(1 row)

dev=> alter table t_no_pk add column v2 int;
ALTER_TABLE
dev=> select * from t_no_pk ;
dev=> alter table t_ap add column v2 int;
ALTER_TABLE
dev=> select * from t_no_pk ;
dev=> select * from t_ap;
dev=> alter table t_pk add column v2 int;
ALTER_TABLE

@shanicky shanicky changed the title [wip] feat: new table stream graph feat: new table stream graph Sep 19, 2023
@codecov
Copy link

codecov bot commented Sep 19, 2023

Codecov Report

Attention: 13 lines in your changes are missing coverage. Please review.

Comparison is base (cde91a0) 67.86% compared to head (9a89780) 67.87%.
Report is 14 commits behind head on main.

Files Patch % Lines
src/meta/service/src/ddl_service.rs 0.00% 5 Missing ⚠️
src/frontend/src/optimizer/mod.rs 97.45% 3 Missing ⚠️
...ntend/src/optimizer/plan_node/stream_row_id_gen.rs 66.66% 3 Missing ⚠️
src/common/src/hash/consistent_hash/vnode.rs 66.66% 2 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main   #12240      +/-   ##
==========================================
+ Coverage   67.86%   67.87%   +0.01%     
==========================================
  Files        1526     1526              
  Lines      260047   260156     +109     
==========================================
+ Hits       176468   176588     +120     
+ Misses      83579    83568      -11     
Flag Coverage Δ
rust 67.87% <91.44%> (+0.01%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

@BugenZhao
Copy link
Member

Please ping me once this PR is ready for review. :)

@shanicky
Copy link
Contributor Author

Please ping me once this PR is ready for review. :)

This appears to be ready for review now 🥵


StreamUnion::new(Union {
all: true,
inputs: vec![external_source_node, dml_node],
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I’m not sure which id needs to be specified as the source here. In the case of ‘with connector’, we have two sources.

Copy link
Member

@BugenZhao BugenZhao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The plans look great!

@@ -119,9 +119,10 @@ impl VirtualNode {
if let Ok(idx) = keys.iter().exactly_one()
&& let ArrayImpl::Serial(serial_array) = &**data_chunk.column_at(*idx)
{
let fallback_row_id = Serial::from(rand::random::<i64>());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need this? And also for compute_row?

Copy link
Contributor Author

@shanicky shanicky Sep 20, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

image

In the scenario where there is no user-defined primary key, the user’s insert operation, upon entering through the DML node, first undergoes a shuffle before entering the union, and only then is the row ID node. Therefore, at the time of the shuffle, the row id column is still empty. This was previously handled by using unwrap_or_default, but this would shuffle all the traffic to vnode 0, resulting in a severe traffic skew. Therefore, we’ve adopted the random method to randomize it a bit, specifically, one chunk randomizes one vnode.

Indeed, we need to add support for compute_row. I’m going to make some modifications. It seems like it might not be necessary. 🤔

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks too hack to me. 🥵 It could be surprising that this is not a pure function. Since the motivation here is to avoid data skew, I guess using the hash code of the row is way much better as it guarantees stability at least.

Can we add another RowIdGen here as well? We just need to ensure that there's no overlapping with the latter one, which can be done by allocating a flag of one bit for this information I guess.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense, let’s use a hash of the entire row as a fallback when there’s no row id. 👍

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add another RowIdGen here as well? We just need to ensure that there's no overlapping with the latter one, which can be done by allocating a flag of one bit for this information I guess.

A little strange to me because we can not know which data source can give more records

@@ -37,7 +37,6 @@ impl StreamUnion {
pub fn new(logical: generic::Union<PlanRef>) -> Self {
let inputs = &logical.inputs;
let dist = inputs[0].distribution().clone();
assert!(inputs.iter().all(|input| *input.distribution() == dist));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any violation of this assertion? 👀

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

image

The distribution here may be different 🤔

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering what the distribution will be in this case. 🤔 SomeShard?

Copy link
Contributor Author

@shanicky shanicky Nov 8, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, it's okay here. I'll add assert back in.
It doesn't seem to work. When executing the following statement, one side is someshard and the other side is hashshard.

CREATE TABLE msrc (v INT) WITH (
    connector = 'datagen',
    fields.v.kind = 'sequence',
    fields.v.start = '1',
    fields.v.end  = '10',
    datagen.rows.per.second='15',
    datagen.split.num = '1'
) FORMAT PLAIN ENCODE JSON;
input StreamExchange [no_shuffle] { dist: SomeShard } -> [SomeShard]
input StreamExchange { dist: HashShard(_row_id) } -> [1]

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some shard and hash shard should be compatible here with -> some shard. I’ve handled it, and an assert will still be performed for typical unions.

@BugenZhao
Copy link
Member

I guess it's time to utilize the BarrierRecv executor! 😄

#8595

@shanicky shanicky force-pushed the peng/new-table-dag branch 2 times, most recently from 44e3ba4 to 692035c Compare October 23, 2023 06:56
@shanicky shanicky force-pushed the peng/new-table-dag branch 2 times, most recently from 2e994db to d965f44 Compare November 7, 2023 05:39
Copy link
Member

@stdrc stdrc left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you plz elaborate more on the background and motivation? Also may be better to have a more descriptive PR title.

@shanicky shanicky force-pushed the peng/new-table-dag branch 3 times, most recently from 531c202 to 2911174 Compare November 13, 2023 07:04
Comment on lines 147 to 149
if project.is_empty() {
return project.row().hash(Crc32FastBuilder).into();
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pass a full indices here is better 🤔

}

impl StreamRowIdGen {
pub fn new(input: PlanRef, row_id_index: usize) -> Self {
let distribution = if input.append_only() {
if input.append_only() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess we can not check in the new function, instead, passing the flag is needed. DmlExecutor is non-append only.

…amSource, StreamDml, StreamCdcTableScan & StreamWatermarkFilter
@st1page st1page requested a review from BugenZhao November 14, 2023 09:14
@shanicky
Copy link
Contributor Author

For chunks where the row id is none, hashing each row individually can break up the chunk and create a time difference in the calculation of data within the same chunk, leading to unexpected results (like ‘distinct on’ without ‘order by’). For most of our end-to-end tests, this might produce random results. Some potential solutions we can consider are:

  1. Creating a random vnode for chunks where the row id is none and transferring it.
  2. Inserting a row id generator at the entry point of each union and then centrally regenerating it after the union.
  3. Adding ‘order by’ and ‘row sort’ to all necessary end-to-end tests. 🥵

@BugenZhao
Copy link
Member

For chunks where the row id is none, hashing each row individually can break up the chunk

That's true. 🥵 But why the random vnode approach does not encounter this problem? Did we use the same vnode for all rows in a single chunk?

Copy link
Member

@BugenZhao BugenZhao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Generally LGTM!

@BugenZhao
Copy link
Member

I guess it's time to utilize the BarrierRecv executor! 😄

#8595

BTW, I realize that BarrierRecv can be a backend-only executor. Every executor that does not have input should be attached with a BarrierRecv as its input. So it seems not to be a breaking change.

@shanicky
Copy link
Contributor Author

For chunks where the row id is none, hashing each row individually can break up the chunk

That's true. 🥵 But why the random vnode approach does not encounter this problem? Did we use the same vnode for all rows in a single chunk?

Oh, I think the possible reason could be that many of our tests are run on initial tables, which haven’t been scaled or undergone vnode redistribution. So according to our current implementation, when the parallelism of the entire graph is the same, the vnode allocation is also sequential, like (1,2,3), (4,5,6), etc. In the previous implementation, after the DML comes in, each row id generator would add a row id according to its own allocated vnodes. Hence, the row ids assigned to a data chunk coming in are also (1,2,3,1,2,3,1,2,3), and after being passed through the serial type, they are also distributed to the following (1,2,3) operator. This keeps these rows together.

But if we change it to calculate the hash based on the entire row, it would lead to a very scattered distribution of hashes, so they would be randomly assigned to the subsequent operators.

The data in our e2e tests is quite limited, so we won’t encounter this problem before. 🥵

@BugenZhao
Copy link
Member

The data in our e2e tests is quite limited, so we won’t encounter this problem before. 🥵

That's interesting. If I understand you correctly, records fewer than 256/#pu were kept in order. Such a coincidence!

Signed-off-by: Shanicky Chen <peng@risingwave-labs.com>
@shanicky
Copy link
Contributor Author

retest this pr's compatibility with the current main branch

run in main branch

dev=> create table t_no_pk (v1 int)  with (
    connector = 'datagen',
    fields.v1.kind = 'sequence',
    fields.v1.start = '1',
    fields.v1.end = '1000',
    datagen.split.num = '5',
) FORMAT PLAIN ENCODE JSON ;
CREATE_TABLE
dev=> create table t_ap (v1 int) append only  with (
    connector = 'datagen',
    fields.v1.kind = 'sequence',
    fields.v1.start = '1',
    fields.v1.end = '1000',
    datagen.split.num = '5',
) FORMAT PLAIN ENCODE JSON ;
NOTICE:  APPEND ONLY TABLE is currently an experimental feature.
CREATE_TABLE
dev=> create table t_pk (v1 int primary key) with (
    connector = 'datagen',
    fields.v1.kind = 'sequence',
    fields.v1.start = '1',
    fields.v1.end = '1000',
    datagen.split.num = '5',
) FORMAT PLAIN ENCODE JSON ;
CREATE_TABLE
dev=> create table tt_no_pk (v1 int);
CREATE_TABLE
dev=> create table tt_ap (v1 int) append only;
NOTICE:  APPEND ONLY TABLE is currently an experimental feature.
CREATE_TABLE
dev=> create table tt_pk (v1 int primary key);
CREATE_TABLE
dev=> insert into tt_no_pk select * from generate_series(0, 9999, 1);
INSERT 0 10000
dev=> select count(*) from tt_no_pk;
 count
-------
 10000
(1 row)

dev=> insert into tt_ap select * from generate_series(0, 9999, 1);
INSERT 0 10000
dev=> insert into tt_pk select * from generate_series(0, 9999, 1);
INSERT 0 10000

restart to current branch

dev=> select count(*) from t_no_pk;
 count
-------
  1000
(1 row)

dev=> alter table t_no_pk add column v2 int;
ALTER_TABLE

dev=> select count(*) from t_no_pk;
 count
-------
  1000
(1 row)

dev=> select count(*) from t_ap;
 count
-------
  1000
(1 row)

dev=> alter table t_ap add column v2 int;
ALTER_TABLE
dev=> select count(*) from t_ap;
 count
-------
  1000
(1 row)

dev=> alter table t_pk add column v2 int;
ALTER_TABLE
dev=> select count(*) from t_pk;
 count
-------
  1000
(1 row)

dev=> alter table tt_pk add column v2 int;
ALTER_TABLE

dev=> select count(*) from tt_pk;
 count
-------
 10000
(1 row)

dev=> alter table tt_no_pk add column v2 int;
ALTER_TABLE
dev=> select count(*) from tt_no_pk;
 count
-------
 10000
(1 row)

dev=> alter table tt_ap add column v2 int;
ALTER_TABLE
dev=> select count(*) from tt_ap;
 count
-------
 10000
(1 row)

dev=> insert into t_ap values (100000000);
INSERT 0 1
dev=> select count(*) from t_ap;
 count
-------
  1001
(1 row)

@shanicky shanicky added this pull request to the merge queue Nov 15, 2023
Merged via the queue into main with commit 48bf62e Nov 15, 2023
27 checks passed
@shanicky shanicky deleted the peng/new-table-dag branch November 15, 2023 08:37
@BugenZhao
Copy link
Member

retest this pr's compatibility with the current main branch

Cool! Is it possible to add it to CI compatibility test? cc @kwannoel

@kwannoel
Copy link
Contributor

retest this pr's compatibility with the current main branch

Cool! Is it possible to add it to CI compatibility test? cc @kwannoel

#13403 will require testing against 4 latest versions. That will be a prerequisite to test this PR.

What additional things need to be tested? I saw some ALTER etc... @shanicky you may add these to the backwards compat test.

If it's just normal DML, DQL, DDL, the backwards compat test already uses nexmark and tpch query for testing.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants