diff --git a/src/stream/src/executor/backfill/snapshot_backfill.rs b/src/stream/src/executor/backfill/snapshot_backfill/executor.rs similarity index 72% rename from src/stream/src/executor/backfill/snapshot_backfill.rs rename to src/stream/src/executor/backfill/snapshot_backfill/executor.rs index 7d33ac89d403..bf01700608ad 100644 --- a/src/stream/src/executor/backfill/snapshot_backfill.rs +++ b/src/stream/src/executor/backfill/snapshot_backfill/executor.rs @@ -15,37 +15,40 @@ use std::cmp::min; use std::collections::VecDeque; use std::future::{pending, ready, Future}; -use std::mem::{replace, take}; +use std::mem::take; use std::sync::Arc; use anyhow::anyhow; -use futures::future::Either; -use futures::{pin_mut, Stream, TryStreamExt}; +use futures::future::{try_join_all, Either}; +use futures::{pin_mut, Stream, TryFutureExt, TryStreamExt}; use risingwave_common::array::{Op, StreamChunk}; +use risingwave_common::hash::VnodeBitmapExt; use risingwave_common::metrics::LabelGuardedIntCounter; use risingwave_common::row::OwnedRow; -use risingwave_common::util::chunk_coalesce::DataChunkBuilder; use risingwave_hummock_sdk::HummockReadEpoch; -use risingwave_storage::error::StorageResult; +use risingwave_storage::store::PrefetchOptions; use risingwave_storage::table::batch_table::storage_table::StorageTable; use risingwave_storage::table::ChangeLogRow; use risingwave_storage::StateStore; use tokio::select; use tokio::sync::mpsc::UnboundedReceiver; +use crate::executor::backfill::snapshot_backfill::state::BackfillState; +use crate::executor::backfill::snapshot_backfill::vnode_stream::VnodeStream; use crate::executor::backfill::utils::{create_builder, mapping_message}; use crate::executor::monitor::StreamingMetrics; use crate::executor::prelude::{try_stream, StreamExt}; use crate::executor::{ - expect_first_barrier, ActorContextRef, BackfillExecutor, Barrier, BoxedMessageStream, - DispatcherBarrier, DispatcherMessage, Execute, MergeExecutorInput, Message, - StreamExecutorError, StreamExecutorResult, + expect_first_barrier, ActorContextRef, Barrier, BoxedMessageStream, DispatcherBarrier, + DispatcherMessage, Execute, MergeExecutorInput, Message, StreamExecutorError, + StreamExecutorResult, }; use crate::task::CreateMviewProgressReporter; pub struct SnapshotBackfillExecutor { /// Upstream table upstream_table: StorageTable, + pk_in_output_indices: Vec, /// Upstream with the same schema with the upstream table. upstream: MergeExecutorInput, @@ -78,6 +81,14 @@ impl SnapshotBackfillExecutor { metrics: Arc, ) -> Self { assert_eq!(&upstream.info.schema, upstream_table.schema()); + let Some(pk_in_output_indices) = upstream_table.pk_in_output_indices() else { + panic!( + "storage table should include all pk columns in output: pk_indices: {:?}, output_indices: {:?}, schema: {:?}", + upstream_table.pk_indices(), + upstream_table.output_indices(), + upstream_table.schema() + ) + }; if let Some(rate_limit) = rate_limit { debug!( rate_limit, @@ -86,6 +97,7 @@ impl SnapshotBackfillExecutor { } Self { upstream_table, + pk_in_output_indices, upstream, output_indices, progress, @@ -105,6 +117,8 @@ impl SnapshotBackfillExecutor { let first_recv_barrier = receive_next_barrier(&mut self.barrier_rx).await?; debug!(epoch = ?first_recv_barrier.epoch, "get first inject barrier"); let should_backfill = first_barrier.epoch != first_recv_barrier.epoch; + let mut backfill_state = + BackfillState::new([], self.upstream_table.pk_serializer().clone()); let (mut barrier_epoch, mut need_report_finish) = { if should_backfill { @@ -134,11 +148,13 @@ impl SnapshotBackfillExecutor { ]); let snapshot_stream = make_consume_snapshot_stream( &self.upstream_table, + &self.pk_in_output_indices, first_barrier_epoch.prev, self.chunk_size, self.rate_limit, &mut self.barrier_rx, &mut self.progress, + &mut backfill_state, first_recv_barrier, ); @@ -195,15 +211,13 @@ impl SnapshotBackfillExecutor { // on the upstream. Otherwise, in `batch_iter_log_with_pk_bounds`, we may wait upstream epoch to be committed, // and the back-pressure may cause the upstream unable to consume the barrier and then cause deadlock. let stream = upstream_buffer - .run_future(self.upstream_table.batch_iter_log_with_pk_bounds( + .run_future(make_log_stream( + &self.upstream_table, barrier_epoch.prev, - HummockReadEpoch::Committed(barrier_epoch.prev), - false, + None, + self.chunk_size, )) .await?; - let data_types = self.upstream_table.schema().data_types(); - let builder = create_builder(None, self.chunk_size, data_types); - let stream = read_change_log(stream, builder); pin_mut!(stream); while let Some(chunk) = upstream_buffer.run_future(stream.try_next()).await? @@ -224,7 +238,22 @@ impl SnapshotBackfillExecutor { upstream_buffer.barrier_count(), ); + backfill_state.finish_epoch( + self.upstream_table.vnodes().iter_vnodes(), + barrier.epoch.prev, + ); + let uncommitted = backfill_state.uncommitted_state(); + // TODO: apply to progress state table + drop(uncommitted); + backfill_state.mark_committed(); + let update_vnode_bitmap = barrier.as_update_vnode_bitmap(self.actor_ctx.id); yield Message::Barrier(barrier); + if update_vnode_bitmap.is_some() { + return Err(anyhow!( + "should not update vnode bitmap during consuming log store" + ) + .into()); + } } } @@ -247,15 +276,37 @@ impl SnapshotBackfillExecutor { let mut upstream = self.upstream.into_executor(self.barrier_rx).execute(); // Phase 3: consume upstream while let Some(msg) = upstream.try_next().await? { - if let Message::Barrier(barrier) = &msg { - assert_eq!(barrier.epoch.prev, barrier_epoch.curr); - barrier_epoch = barrier.epoch; - if need_report_finish { - need_report_finish = false; - self.progress.finish_consuming_log_store(barrier_epoch); + match msg { + Message::Barrier(barrier) => { + assert_eq!(barrier.epoch.prev, barrier_epoch.curr); + backfill_state.finish_epoch( + self.upstream_table.vnodes().iter_vnodes(), + barrier.epoch.prev, + ); + barrier_epoch = barrier.epoch; + if need_report_finish { + need_report_finish = false; + self.progress.finish_consuming_log_store(barrier_epoch); + } + backfill_state.mark_committed(); + let uncommitted = backfill_state.uncommitted_state(); + drop(uncommitted); + let update_vnode_bitmap = barrier.as_update_vnode_bitmap(self.actor_ctx.id); + yield Message::Barrier(barrier); + if let Some(new_vnode_bitmap) = update_vnode_bitmap { + backfill_state.update_vnode_bitmap( + new_vnode_bitmap + .iter_vnodes() + .map(|vnode| (vnode, barrier_epoch.prev, None)), + ); + let _prev_vnode_bitmap = + self.upstream_table.update_vnode_bitmap(new_vnode_bitmap); + } + } + msg => { + yield msg; } } - yield msg; } } } @@ -276,56 +327,6 @@ impl Execute for SnapshotBackfillExecutor { } } -#[try_stream(ok = StreamChunk, error = StreamExecutorError)] -async fn read_change_log( - stream: impl Stream>, - mut builder: DataChunkBuilder, -) { - let chunk_size = builder.batch_size(); - pin_mut!(stream); - let mut ops = Vec::with_capacity(chunk_size); - while let Some(change_log_row) = stream.try_next().await? { - let change_log_row: ChangeLogRow = change_log_row; - match change_log_row { - ChangeLogRow::Insert(row) => { - ops.push(Op::Insert); - if let Some(chunk) = builder.append_one_row(row) { - let ops = replace(&mut ops, Vec::with_capacity(chunk_size)); - yield StreamChunk::from_parts(ops, chunk); - } - } - ChangeLogRow::Update { - old_value, - new_value, - } => { - if !builder.can_append(2) { - if let Some(chunk) = builder.consume_all() { - let ops = replace(&mut ops, Vec::with_capacity(chunk_size)); - yield StreamChunk::from_parts(ops, chunk); - } - } - ops.extend([Op::UpdateDelete, Op::UpdateInsert]); - assert!(builder.append_one_row(old_value).is_none()); - if let Some(chunk) = builder.append_one_row(new_value) { - let ops = replace(&mut ops, Vec::with_capacity(chunk_size)); - yield StreamChunk::from_parts(ops, chunk); - } - } - ChangeLogRow::Delete(row) => { - ops.push(Op::Delete); - if let Some(chunk) = builder.append_one_row(row) { - let ops = replace(&mut ops, Vec::with_capacity(chunk_size)); - yield StreamChunk::from_parts(ops, chunk); - } - } - } - } - - if let Some(chunk) = builder.consume_all() { - yield StreamChunk::from_parts(ops, chunk); - } -} - struct ConsumingSnapshot; struct ConsumingLogStore; @@ -492,32 +493,82 @@ async fn receive_next_barrier( .ok_or_else(|| anyhow!("end of barrier receiver"))?) } -#[try_stream(ok = StreamChunk, error = StreamExecutorError)] -async fn make_snapshot_stream<'a>( - row_stream: impl Stream> + 'a, - mut builder: DataChunkBuilder, -) { - pin_mut!(row_stream); - while let Some(row) = row_stream.try_next().await? { - if let Some(data_chunk) = builder.append_one_row(row) { - let ops = vec![Op::Insert; data_chunk.capacity()]; - yield StreamChunk::from_parts(ops, data_chunk); - } - } - if let Some(data_chunk) = builder.consume_all() { - let ops = vec![Op::Insert; data_chunk.capacity()]; - yield StreamChunk::from_parts(ops, data_chunk); - } +async fn make_log_stream( + upstream_table: &StorageTable, + prev_epoch: u64, + start_pk: Option, + chunk_size: usize, +) -> StreamExecutorResult> { + let data_types = upstream_table.schema().data_types(); + let start_pk = start_pk.as_ref(); + let vnode_streams = try_join_all(upstream_table.vnodes().iter_vnodes().map(move |vnode| { + upstream_table + .batch_iter_vnode_log( + prev_epoch, + HummockReadEpoch::Committed(prev_epoch), + start_pk, + vnode, + ) + .map_ok(move |stream| { + let stream = stream.map(|result| { + Ok(match result? { + ChangeLogRow::Insert(row) => ((Op::Insert, row), None), + ChangeLogRow::Update { + new_value, + old_value, + } => ( + (Op::UpdateDelete, old_value), + Some((Op::UpdateInsert, new_value)), + ), + ChangeLogRow::Delete(row) => ((Op::Delete, row), None), + }) + }); + (vnode, stream) + }) + })) + .await?; + let builder = create_builder(None, chunk_size, data_types.clone()); + Ok(VnodeStream::new(vnode_streams, builder)) } +async fn make_snapshot_stream( + upstream_table: &StorageTable, + snapshot_epoch: u64, + start_pk: Option, + rate_limit: Option, + chunk_size: usize, +) -> StreamExecutorResult> { + let data_types = upstream_table.schema().data_types(); + let start_pk = start_pk.as_ref(); + let vnode_streams = try_join_all(upstream_table.vnodes().iter_vnodes().map(move |vnode| { + upstream_table + .batch_iter_vnode( + HummockReadEpoch::Committed(snapshot_epoch), + start_pk, + vnode, + PrefetchOptions::prefetch_for_large_range_scan(), + ) + .map_ok(move |stream| { + let stream = stream.map(|result| Ok(((Op::Insert, result?), None))); + (vnode, stream) + }) + })) + .await?; + let builder = create_builder(rate_limit, chunk_size, data_types.clone()); + Ok(VnodeStream::new(vnode_streams, builder)) +} + +#[expect(clippy::too_many_arguments)] #[try_stream(ok = Message, error = StreamExecutorError)] async fn make_consume_snapshot_stream<'a, S: StateStore>( upstream_table: &'a StorageTable, + pk_in_output_indices: &'a [usize], snapshot_epoch: u64, chunk_size: usize, rate_limit: Option, barrier_rx: &'a mut UnboundedReceiver, progress: &'a mut CreateMviewProgressReporter, + backfill_state: &'a mut BackfillState, first_recv_barrier: Barrier, ) { let mut barrier_epoch = first_recv_barrier.epoch; @@ -530,15 +581,8 @@ async fn make_consume_snapshot_stream<'a, S: StateStore>( ); // start consume upstream snapshot - let snapshot_row_stream = BackfillExecutor::snapshot_read( - upstream_table, - HummockReadEpoch::Committed(snapshot_epoch), - None, - ); - let data_types = upstream_table.schema().data_types(); - let builder = create_builder(rate_limit, chunk_size, data_types.clone()); - let snapshot_stream = make_snapshot_stream(snapshot_row_stream, builder); - pin_mut!(snapshot_stream); + let mut snapshot_stream = + make_snapshot_stream(upstream_table, snapshot_epoch, None, rate_limit, chunk_size).await?; async fn select_barrier_and_snapshot_stream( barrier_rx: &mut UnboundedReceiver, @@ -576,9 +620,28 @@ async fn make_consume_snapshot_stream<'a, S: StateStore>( if barrier_epoch.curr >= snapshot_epoch { return Err(anyhow!("should not receive barrier with epoch {barrier_epoch:?} later than snapshot epoch {snapshot_epoch}").into()); } - debug!(?barrier_epoch, count, "update progress"); + if let Some(chunk) = snapshot_stream.consume_builder() { + count += chunk.cardinality(); + epoch_row_count += chunk.cardinality(); + yield Message::Chunk(chunk); + } + snapshot_stream + .for_vnode_pk_progress(pk_in_output_indices, |vnode, pk_progress| { + if let Some(pk) = pk_progress { + backfill_state.update_epoch_progress(vnode, snapshot_epoch, pk); + } else { + backfill_state.finish_epoch([vnode], snapshot_epoch); + } + }) + .await?; + let uncommitted = backfill_state.uncommitted_state(); + // TODO: apply to progress state table + drop(uncommitted); + backfill_state.mark_committed(); + debug!(?barrier_epoch, count, epoch_row_count, "update progress"); progress.update(barrier_epoch, barrier_epoch.prev, count as _); epoch_row_count = 0; + yield Message::Barrier(barrier); } Either::Right(Some(chunk)) => { @@ -597,6 +660,16 @@ async fn make_consume_snapshot_stream<'a, S: StateStore>( assert_eq!(barrier_to_report_finish.epoch.prev, barrier_epoch.curr); barrier_epoch = barrier_to_report_finish.epoch; info!(?barrier_epoch, count, "report finish"); + snapshot_stream + .for_vnode_pk_progress(pk_in_output_indices, |vnode, pk_progress| { + assert_eq!(pk_progress, None); + backfill_state.finish_epoch([vnode], snapshot_epoch); + }) + .await?; + let uncommitted = backfill_state.uncommitted_state(); + // TODO: apply to progress state table + drop(uncommitted); + backfill_state.mark_committed(); progress.finish(barrier_epoch, count as _); yield Message::Barrier(barrier_to_report_finish); diff --git a/src/stream/src/executor/backfill/snapshot_backfill/mod.rs b/src/stream/src/executor/backfill/snapshot_backfill/mod.rs new file mode 100644 index 000000000000..905dcf761569 --- /dev/null +++ b/src/stream/src/executor/backfill/snapshot_backfill/mod.rs @@ -0,0 +1,19 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +mod executor; +mod state; +mod vnode_stream; + +pub use executor::SnapshotBackfillExecutor; diff --git a/src/stream/src/executor/backfill/snapshot_backfill/state.rs b/src/stream/src/executor/backfill/snapshot_backfill/state.rs new file mode 100644 index 000000000000..ee44070872c0 --- /dev/null +++ b/src/stream/src/executor/backfill/snapshot_backfill/state.rs @@ -0,0 +1,337 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::hash_map::Entry; +use std::collections::HashMap; +use std::mem::replace; + +use risingwave_common::hash::VirtualNode; +use risingwave_common::must_match; +use risingwave_common::row::{OwnedRow, Row, RowExt}; +use risingwave_common::types::{DataType, ScalarImpl}; +use risingwave_common::util::row_serde::OrderedRowSerde; + +#[derive(Clone, Debug, Eq, PartialEq)] +pub(super) enum EpochBackfillProgress { + Consuming { latest_pk: OwnedRow }, + Consumed, +} + +#[derive(Clone, Debug, Eq, PartialEq)] +pub(super) struct VnodeBackfillProgress { + pub(super) epoch: u64, + pub(super) progress: EpochBackfillProgress, +} + +/// `vnode`, `epoch`, `is_finished` +const EXTRA_COLUMN_TYPES: [DataType; 3] = [DataType::Int16, DataType::Int64, DataType::Boolean]; + +impl VnodeBackfillProgress { + fn from_row(row: &OwnedRow, pk_serde: &OrderedRowSerde) -> (VirtualNode, Self) { + assert_eq!( + row.len(), + pk_serde.get_data_types().len() + EXTRA_COLUMN_TYPES.len() + ); + let vnode = must_match!(&row[0], Some(ScalarImpl::Int16(vnode)) => { + VirtualNode::from_scalar(*vnode) + }); + let epoch = must_match!(&row[1], Some(ScalarImpl::Int64(epoch)) => { + *epoch as u64 + }); + let is_finished = must_match!(&row[2], Some(ScalarImpl::Bool(is_finished)) => { + *is_finished + }); + ( + vnode, + Self { + epoch, + progress: if is_finished { + EpochBackfillProgress::Consuming { + latest_pk: row.slice(EXTRA_COLUMN_TYPES.len()..).to_owned_row(), + } + } else { + row.slice(EXTRA_COLUMN_TYPES.len()..) + .iter() + .enumerate() + .for_each(|(i, datum)| { + if datum.is_some() { + if cfg!(debug_assertions) { + panic!("get non-empty pk row: {:?}", row); + } else { + warn!( + ?vnode, + i, + row = ?row, + "get non-empty pk row. will be ignore" + ); + } + } + }); + EpochBackfillProgress::Consumed + }, + }, + ) + } +} + +enum VnodeBackfillState { + New(VnodeBackfillProgress), + Update { + latest: VnodeBackfillProgress, + committed: VnodeBackfillProgress, + }, + Committed(VnodeBackfillProgress), +} + +impl VnodeBackfillState { + fn update_inner(&mut self, latest_progress: VnodeBackfillProgress) { + let temp_place_holder = Self::temp_place_holder(); + let prev_state = replace(self, temp_place_holder); + *self = match prev_state { + VnodeBackfillState::New(_) => VnodeBackfillState::New(latest_progress), + VnodeBackfillState::Update { committed, .. } => VnodeBackfillState::Update { + latest: latest_progress, + committed, + }, + VnodeBackfillState::Committed(committed) => VnodeBackfillState::Update { + latest: latest_progress, + committed, + }, + }; + } + + fn mark_committed(&mut self) { + *self = VnodeBackfillState::Committed(match replace(self, Self::temp_place_holder()) { + VnodeBackfillState::New(progress) => progress, + VnodeBackfillState::Update { latest, .. } => latest, + VnodeBackfillState::Committed(progress) => progress, + }); + } + + fn latest_progress(&self) -> &VnodeBackfillProgress { + match self { + VnodeBackfillState::New(progress) => progress, + VnodeBackfillState::Update { latest, .. } => latest, + VnodeBackfillState::Committed(progress) => progress, + } + } + + fn temp_place_holder() -> Self { + Self::New(VnodeBackfillProgress { + epoch: 0, + progress: EpochBackfillProgress::Consumed, + }) + } +} + +mod progress_row { + use risingwave_common::hash::VirtualNode; + use risingwave_common::row::{OwnedRow, Row, RowExt}; + use risingwave_common::types::ScalarImpl; + + use crate::executor::backfill::snapshot_backfill::state::{ + EpochBackfillProgress, VnodeBackfillProgress, + }; + + pub(in super::super) type BackfillProgressRow<'a> = impl Row + 'a; + + impl VnodeBackfillProgress { + pub(super) fn build_row<'a>( + &'a self, + vnode: VirtualNode, + consumed_pk_rows: &'a OwnedRow, + ) -> BackfillProgressRow<'a> { + let (is_finished, pk) = match &self.progress { + EpochBackfillProgress::Consuming { latest_pk } => { + assert_eq!(latest_pk.len(), consumed_pk_rows.len()); + (false, latest_pk) + } + EpochBackfillProgress::Consumed => (true, consumed_pk_rows), + }; + [ + Some(ScalarImpl::Int16(vnode.to_scalar())), + Some(ScalarImpl::Int64(self.epoch as _)), + Some(ScalarImpl::Bool(is_finished)), + ] + .chain(pk) + } + } +} + +pub(super) use progress_row::*; + +pub(super) struct BackfillState { + vnode_state: HashMap, + pk_serde: OrderedRowSerde, + consumed_pk_rows: OwnedRow, +} + +impl BackfillState { + pub(super) fn new<'a>( + committed_progress: impl IntoIterator, + pk_serde: OrderedRowSerde, + ) -> Self { + let mut vnode_state = HashMap::new(); + for (vnode, progress) in committed_progress.into_iter().map(|(vnode, row)| { + let (row_vnode, progress) = VnodeBackfillProgress::from_row(row, &pk_serde); + assert_eq!(row_vnode, vnode); + (vnode, progress) + }) { + assert!(vnode_state + .insert(vnode, VnodeBackfillState::Committed(progress)) + .is_none()); + } + let consumed_pk_rows = OwnedRow::new(vec![None; pk_serde.get_data_types().len()]); + Self { + vnode_state, + pk_serde, + consumed_pk_rows, + } + } + + fn update_progress(&mut self, vnode: VirtualNode, progress: VnodeBackfillProgress) { + match self.vnode_state.entry(vnode) { + Entry::Occupied(entry) => { + let state = entry.into_mut(); + let prev_progress = state.latest_progress(); + if prev_progress == &progress { + // ignore if no update + return; + } + // sanity check + { + match &prev_progress.progress { + EpochBackfillProgress::Consuming { latest_pk: prev_pk } => { + assert_eq!(prev_progress.epoch, progress.epoch); + if let EpochBackfillProgress::Consuming { latest_pk: pk } = + &progress.progress + { + assert_eq!(pk.len(), self.pk_serde.get_data_types().len()); + if cfg!(debug_assertions) { + let mut prev_buf = vec![]; + self.pk_serde.serialize(prev_pk, &mut prev_buf); + let mut buf = vec![]; + self.pk_serde.serialize(pk, &mut buf); + assert!( + buf > prev_buf, + "new pk progress: {:?} not exceed prev pk progress: {:?}", + pk, + prev_pk + ); + } + } + } + EpochBackfillProgress::Consumed => { + assert!(prev_progress.epoch < progress.epoch); + } + } + } + state.update_inner(progress); + } + Entry::Vacant(entry) => { + entry.insert(VnodeBackfillState::New(progress)); + } + } + } + + pub(super) fn update_epoch_progress(&mut self, vnode: VirtualNode, epoch: u64, pk: OwnedRow) { + self.update_progress( + vnode, + VnodeBackfillProgress { + epoch, + progress: EpochBackfillProgress::Consuming { latest_pk: pk }, + }, + ) + } + + pub(super) fn finish_epoch( + &mut self, + vnodes: impl IntoIterator, + epoch: u64, + ) { + for vnode in vnodes { + self.update_progress( + vnode, + VnodeBackfillProgress { + epoch, + progress: EpochBackfillProgress::Consumed, + }, + ) + } + } + + #[expect(dead_code)] + pub(super) fn latest_progress(&self, vnode: VirtualNode) -> Option<&VnodeBackfillProgress> { + self.vnode_state + .get(&vnode) + .map(VnodeBackfillState::latest_progress) + } + + pub(super) fn uncommitted_state( + &self, + ) -> impl Iterator< + Item = ( + VirtualNode, + Option>, + BackfillProgressRow<'_>, + ), + > + '_ { + self.vnode_state + .iter() + .filter_map(|(vnode, state)| match state { + VnodeBackfillState::New(progress) => Some(( + *vnode, + None, + progress.build_row(*vnode, &self.consumed_pk_rows), + )), + VnodeBackfillState::Update { latest, committed } => Some(( + *vnode, + Some(committed.build_row(*vnode, &self.consumed_pk_rows)), + latest.build_row(*vnode, &self.consumed_pk_rows), + )), + VnodeBackfillState::Committed(_) => None, + }) + } + + pub(super) fn mark_committed(&mut self) { + self.vnode_state + .values_mut() + .for_each(VnodeBackfillState::mark_committed) + } + + pub(super) fn update_vnode_bitmap( + &mut self, + new_vnodes: impl Iterator)>, + ) { + let mut new_state = HashMap::new(); + for (vnode, epoch, pk) in new_vnodes { + let progress = VnodeBackfillProgress { + epoch, + progress: pk + .map(|latest_pk| EpochBackfillProgress::Consuming { latest_pk }) + .unwrap_or(EpochBackfillProgress::Consumed), + }; + if let Some(prev_progress) = self.vnode_state.get(&vnode) { + let prev_progress = must_match!(prev_progress, VnodeBackfillState::Committed(prev_progress) => { + prev_progress + }); + assert_eq!(prev_progress, &progress); + } + assert!(new_state + .insert(vnode, VnodeBackfillState::Committed(progress)) + .is_none()); + } + self.vnode_state = new_state; + } +} diff --git a/src/stream/src/executor/backfill/snapshot_backfill/vnode_stream.rs b/src/stream/src/executor/backfill/snapshot_backfill/vnode_stream.rs new file mode 100644 index 000000000000..71fde5771e3a --- /dev/null +++ b/src/stream/src/executor/backfill/snapshot_backfill/vnode_stream.rs @@ -0,0 +1,184 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::{HashMap, HashSet}; +use std::mem::{replace, take}; +use std::pin::Pin; +use std::task::{ready, Context, Poll}; + +use futures::stream::Peekable; +use futures::{Stream, StreamExt, TryStreamExt}; +use risingwave_common::array::{Op, StreamChunk}; +use risingwave_common::hash::VirtualNode; +use risingwave_common::row::{OwnedRow, Row, RowExt}; +use risingwave_common::util::chunk_coalesce::DataChunkBuilder; + +use crate::executor::StreamExecutorResult; + +pub(super) type BackfillRowItem = ((Op, OwnedRow), Option<(Op, OwnedRow)>); +pub(super) trait BackfillRowStream = + Stream> + Sized + 'static; + +pub(super) struct VnodeStream { + streams: HashMap>>>, + finished_vnode: HashSet, + data_chunk_builder: DataChunkBuilder, + ops: Vec, +} + +impl VnodeStream { + pub(super) fn new( + vnode_streams: impl IntoIterator, + data_chunk_builder: DataChunkBuilder, + ) -> Self { + assert!(data_chunk_builder.is_empty()); + assert!(data_chunk_builder.batch_size() >= 2); + let mut streams = HashMap::new(); + for (vnode, stream) in vnode_streams { + let stream = Box::pin(stream.peekable()); + assert!(streams.insert(vnode, stream).is_none()); + } + let ops = Vec::with_capacity(data_chunk_builder.batch_size()); + Self { + streams, + finished_vnode: HashSet::new(), + data_chunk_builder, + ops, + } + } +} + +impl VnodeStream { + fn poll_next_row( + &mut self, + cx: &mut Context<'_>, + ) -> Poll>> { + 'outer: loop { + if self.streams.is_empty() { + break Poll::Ready(Ok(None)); + } + for (vnode, stream) in &mut self.streams { + match stream.poll_next_unpin(cx) { + Poll::Ready(Some(Err(e))) => { + break 'outer Poll::Ready(Err(e)); + } + Poll::Ready(None) => { + let vnode = *vnode; + let _stream = self.streams.remove(&vnode).expect("should exist"); + self.finished_vnode.insert(vnode); + continue 'outer; + } + Poll::Ready(Some(Ok(row))) => { + break 'outer Poll::Ready(Ok(Some(row))); + } + Poll::Pending => { + continue; + } + } + } + break Poll::Pending; + } + } + + pub(super) fn consume_builder(&mut self) -> Option { + self.data_chunk_builder.consume_all().map(|chunk| { + let ops = replace( + &mut self.ops, + Vec::with_capacity(self.data_chunk_builder.batch_size()), + ); + StreamChunk::from_parts(ops, chunk) + }) + } + + pub(super) async fn for_vnode_pk_progress( + &mut self, + pk_indices: &[usize], + mut on_vnode_progress: impl FnMut(VirtualNode, Option), + ) -> StreamExecutorResult<()> { + assert!(self.data_chunk_builder.is_empty()); + for vnode in &self.finished_vnode { + on_vnode_progress(*vnode, None); + } + for (vnode, stream) in &mut self.streams { + match stream.as_mut().peek().await { + Some(Ok(((_, row), extra))) => { + let pk = row.project(pk_indices).to_owned_row(); + if cfg!(debug_assertions) + && let Some((_, extra_row)) = extra + { + assert_eq!(pk, extra_row.project(pk_indices).to_owned_row()); + } + on_vnode_progress(*vnode, Some(pk)); + } + Some(Err(_)) => { + return Err(stream.try_next().await.expect_err("checked Err")); + } + None => { + on_vnode_progress(*vnode, None); + } + } + } + Ok(()) + } +} + +impl Stream for VnodeStream { + type Item = StreamExecutorResult; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let this = self.get_mut(); + let capacity = this.data_chunk_builder.batch_size(); + loop { + match ready!(this.poll_next_row(cx)) { + Ok(Some(((op, row), extra))) => { + let may_chunk = if let Some((extra_op, extra_row)) = extra { + if this.data_chunk_builder.can_append(2) { + this.ops.extend([op, extra_op]); + assert!(this.data_chunk_builder.append_one_row(row).is_none()); + this.data_chunk_builder.append_one_row(extra_row) + } else { + let chunk = this + .data_chunk_builder + .consume_all() + .expect("should be Some when not can_append"); + let ops = replace(&mut this.ops, Vec::with_capacity(capacity)); + this.ops.extend([op, extra_op]); + assert!(this.data_chunk_builder.append_one_row(row).is_none()); + assert!(this.data_chunk_builder.append_one_row(extra_row).is_none()); + break Poll::Ready(Some(Ok(StreamChunk::from_parts(ops, chunk)))); + } + } else { + this.ops.push(op); + this.data_chunk_builder.append_one_row(row) + }; + if let Some(chunk) = may_chunk { + let ops = replace(&mut this.ops, Vec::with_capacity(capacity)); + break Poll::Ready(Some(Ok(StreamChunk::from_parts(ops, chunk)))); + } + } + Ok(None) => { + break if let Some(chunk) = this.data_chunk_builder.consume_all() { + let ops = take(&mut this.ops); + Poll::Ready(Some(Ok(StreamChunk::from_parts(ops, chunk)))) + } else { + Poll::Ready(None) + }; + } + Err(e) => { + break Poll::Ready(Some(Err(e))); + } + } + } + } +}