Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bug: multiple reads in source's internal table #5590

Closed
tabVersion opened this issue Sep 27, 2022 · 11 comments · Fixed by #6081
Closed

bug: multiple reads in source's internal table #5590

tabVersion opened this issue Sep 27, 2022 · 11 comments · Fixed by #6081
Assignees
Labels
type/bug Something isn't working
Milestone

Comments

@tabVersion
Copy link
Contributor

tabVersion commented Sep 27, 2022

Describe the bug

write one row in state table and it read multiple identical rows when select * from the internal table

this issue is introduced in #5433

To Reproduce

  1. start a cluster with kafka (./risedev d full & ./scripts/source/prepare_ci_kafka.sh)
  2. create a materialized source create table s2 (v1 int, v2 varchar) with ( connector = 'kafka', topic = 'kafka_2_partition_topic', properties.bootstrap.server = '127.0.0.1:29092', scan.startup.mode = 'earliest') row format json;
  3. get internal table id from dashboard, eg. __internal_s2_2_sourceinternaltable_1003
  4. select * from __internal_s2_2_sourceinternaltable_1003
dev=> select * from __internal_s2_2_sourceinternaltable_1003 ;
 partition_id |                                               offset
--------------+-----------------------------------------------------------------------------------------------------
 0            |                                                                                                    +
              | \x05kafka\x12U{"topic":"kafka_2_partition_topic","partition":0,"start_offset":3,"stop_offset":null}
 0            |                                                                                                    +
              | \x05kafka\x12U{"topic":"kafka_2_partition_topic","partition":0,"start_offset":3,"stop_offset":null}
 0            |                                                                                                    +
              | \x05kafka\x12U{"topic":"kafka_2_partition_topic","partition":0,"start_offset":3,"stop_offset":null}
 0            |                                                                                                    +
              | \x05kafka\x12U{"topic":"kafka_2_partition_topic","partition":0,"start_offset":3,"stop_offset":null}
(4 rows)
./risedev d full
./scripts/source/prepare_ci_kafka.sh
psql -h localhost -p 4566 -d dev -U root -c "create table s2 (v1 int, v2 varchar) with ( connector = 'kafka', topic = 'kafka_2_partition_topic', properties.bootstrap.server = '127.0.0.1:29092', scan.startup.mode = 'earliest') row format json;"

Expected behavior

1 row

Additional context

None

@tabVersion tabVersion added the type/bug Something isn't working label Sep 27, 2022
@github-actions github-actions bot added this to the release-0.1.14 milestone Sep 27, 2022
@st1page
Copy link
Contributor

st1page commented Sep 28, 2022

"parallelism": 4 in batch plan.
explain (distsql) select * from "__internal_s2_2_sourceinternaltable_1003";
 {
   "root_stage_id": 0,
   "stages": {
     "1": {
       "root": {
         "plan_node_id": 20,
         "plan_node_type": "BatchSeqScan",
         "schema": [
           {
             "dataType": {
               "typeName": "VARCHAR",
               "isNullable": true
             },
             "name": "__internal_s2_2_sourceinternaltable_1003.partition_id"
           },
           {
             "dataType": {
               "typeName": "VARCHAR",
               "isNullable": true
             },
             "name": "__internal_s2_2_sourceinternaltable_1003.offset"
           }
         ],
         "children": [],
         "source_stage_id": null
       },
       "parallelism": 4,
       "exchange_info": {
         "mode": "SINGLE"
       }
     },
     "0": {
       "root": {
         "plan_node_id": 21,
         "plan_node_type": "BatchExchange",
         "schema": [
           {
             "dataType": {
               "typeName": "VARCHAR",
               "isNullable": true
             },
             "name": "__internal_s2_2_sourceinternaltable_1003.partition_id"
           },
           {
             "dataType": {
               "typeName": "VARCHAR",
               "isNullable": true
             },
             "name": "__internal_s2_2_sourceinternaltable_1003.offset"
           }
         ],
         "children": [],
         "source_stage_id": 1
       },
       "parallelism": 1,
       "exchange_info": {
         "mode": "SINGLE"
       }
     }
   },
   "child_edges": {
     "1": [],
     "0": [
       1
     ]
   },
   "parent_edges": {
     "1": [
       0
     ],

@BugenZhao
Copy link
Member

After #5907, we will panic at The stage has single distribution, but contains a table scan node with multiple partitions. 😄

Actually, it's a little bit tricky to get the correct result. If we schedule multiple tasks, we'll get duplicated rows as there's no way to prune the distribution. If we schedule a single task, then it may behave strangely with #5850.

@tabVersion
Copy link
Contributor Author

reopen this issue because the case is still there

Just checked on the latest main (25f6655)

image

@BugenZhao
Copy link
Member

There seems something wrong with the distribution of BatchSeqScan::to_local: we should make it singleton for Source's internal tables. cc @kwannoel Would you please help to take a look?

@kwannoel kwannoel self-assigned this Jan 30, 2023
@kwannoel
Copy link
Contributor

kwannoel commented Jan 30, 2023

There seems something wrong with the distribution of BatchSeqScan::to_local: we should make it singleton for Source's internal tables. cc @kwannoel Would you please help to take a look?

To provide some context, the distribution for BatchSeqScan on non-system tables is SomeShard after this PR: #7240

} else {
// NOTE(kwannoel): This is a hack to force an exchange to always be inserted before scan.
Distribution::SomeShard
};

By making the distribution SomeShard, an Exchange would be inserted.
This forces BatchSeqScan to be executed on the compute node instead of the frontend.


If BatchSeqScan uses SomeShard, it will just defer to table_scan_info to get number of partitions, and infer parallelism from there:

if let Some(table_scan_info) = &table_scan_info {
table_scan_info
.partitions
.as_ref()
.map(|m| m.len())
.unwrap_or(1)

Is it correct to assume that each worker thread should scan independent partitions?

And if so, there should be no duplicated results, even if Distribution::SomeShard? 🤔

@liurenjie1024 liurenjie1024 changed the title multiple reads in source's internal table bug: multiple reads in source's internal table Feb 1, 2023
@liurenjie1024
Copy link
Contributor

Yes, I think this is caused by incorrect vnode mapping for internal table, and I'll take a look at this.

@tabVersion
Copy link
Contributor Author

any updates?

@liurenjie1024
Copy link
Contributor

I'll look into this recently.

@fuyufjh
Copy link
Member

fuyufjh commented Sep 11, 2023

Any updates?

@fuyufjh fuyufjh modified the milestones: release-1.2, release-1.3 Sep 11, 2023
@liurenjie1024
Copy link
Contributor

Will look into it later.

@liurenjie1024 liurenjie1024 modified the milestones: release-1.4, release-1.5 Nov 8, 2023
@liurenjie1024 liurenjie1024 removed this from the release-1.5 milestone Dec 4, 2023
@BugenZhao BugenZhao added this to the release-1.6 milestone Jan 2, 2024
@liurenjie1024 liurenjie1024 modified the milestones: release-1.6, release-1.7 Jan 9, 2024
@tabVersion
Copy link
Contributor Author

close as no updates

@tabVersion tabVersion closed this as not planned Won't fix, can't repro, duplicate, stale Mar 6, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
type/bug Something isn't working
Projects
None yet
Development

Successfully merging a pull request may close this issue.

6 participants