diff --git a/src/frontend/src/optimizer/plan_node/logical_source.rs b/src/frontend/src/optimizer/plan_node/logical_source.rs
index 542178a830b73..30c4ee19002ad 100644
--- a/src/frontend/src/optimizer/plan_node/logical_source.rs
+++ b/src/frontend/src/optimizer/plan_node/logical_source.rs
@@ -45,6 +45,7 @@ use crate::optimizer::plan_node::{
     ColumnPruningContext, PredicatePushdownContext, RewriteStreamContext, StreamDedup,
     ToStreamContext,
 };
+use crate::optimizer::property::Distribution::HashShard;
 use crate::optimizer::property::{Distribution, Order, RequiredDist};
 use crate::utils::{ColIndexMapping, Condition, IndexRewriter};
 
@@ -574,7 +575,7 @@ impl ToStream for LogicalSource {
         if let Some(row_id_index) = self.core.row_id_index
             && self.core.gen_row_id
         {
-            plan = StreamRowIdGen::new(plan, row_id_index).into();
+            plan = StreamRowIdGen::new_with_dist(plan, row_id_index, HashShard(vec![row_id_index])).into();
         }
         Ok(plan)
     }
diff --git a/src/frontend/src/optimizer/plan_node/stream_row_id_gen.rs b/src/frontend/src/optimizer/plan_node/stream_row_id_gen.rs
index b7634c652a34d..1f5be56477caa 100644
--- a/src/frontend/src/optimizer/plan_node/stream_row_id_gen.rs
+++ b/src/frontend/src/optimizer/plan_node/stream_row_id_gen.rs
@@ -30,6 +30,10 @@ pub struct StreamRowIdGen {
 
 impl StreamRowIdGen {
     pub fn new(input: PlanRef, row_id_index: usize) -> Self {
+        // if input.append_only() {
+        //     return Self::new_with_dist(input, row_id_index, Distribution::HashShard(vec![row_id_index]));
+        // }
+
         let distribution = input.distribution().clone();
         Self::new_with_dist(input, row_id_index, distribution)
     }