Skip to content

Commit

Permalink
fix(recovery): wait_epoch should be called in recovery (close risingw…
Browse files Browse the repository at this point in the history
  • Loading branch information
soundOfDestiny authored Mar 10, 2023
1 parent 70f46f1 commit 25499e3
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 13 deletions.
23 changes: 14 additions & 9 deletions src/meta/src/barrier/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use risingwave_common::catalog::TableId;
use risingwave_common::hash::ActorMapping;
use risingwave_common::util::epoch::Epoch;
use risingwave_connector::source::SplitImpl;
use risingwave_hummock_sdk::HummockEpoch;
use risingwave_pb::source::{ConnectorSplit, ConnectorSplits};
use risingwave_pb::stream_plan::add_mutation::Dispatchers;
use risingwave_pb::stream_plan::barrier::Mutation;
Expand Down Expand Up @@ -493,6 +494,18 @@ where
Ok(())
}

pub async fn wait_epoch_commit(&self, epoch: HummockEpoch) -> MetaResult<()> {
let futures = self.info.node_map.values().map(|worker_node| async {
let client = self.client_pool.get(worker_node).await?;
let request = WaitEpochCommitRequest { epoch };
client.wait_epoch_commit(request).await
});

try_join_all(futures).await?;

Ok(())
}

/// Do some stuffs after barriers are collected and the new storage version is committed, for
/// the given command.
pub async fn post_collect(&self) -> MetaResult<()> {
Expand All @@ -504,15 +517,7 @@ where
// execution of the next command of `Update`, as some newly created operators may
// immediately initialize their states on that barrier.
Some(Mutation::Pause(..)) => {
let futures = self.info.node_map.values().map(|worker_node| async {
let client = self.client_pool.get(worker_node).await?;
let request = WaitEpochCommitRequest {
epoch: self.prev_epoch.0,
};
client.wait_epoch_commit(request).await
});

try_join_all(futures).await?;
self.wait_epoch_commit(self.prev_epoch.0).await?;
}

_ => {}
Expand Down
15 changes: 15 additions & 0 deletions src/meta/src/barrier/recovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,21 @@ where
self.source_manager.clone(),
));

#[cfg(not(all(test, feature = "failpoints")))]
{
use risingwave_common::util::epoch::INVALID_EPOCH;

let mce = self
.hummock_manager
.get_current_version()
.await
.max_committed_epoch;

if mce != INVALID_EPOCH {
command_ctx.wait_epoch_commit(mce).await?;
}
}

let (barrier_complete_tx, mut barrier_complete_rx) =
tokio::sync::mpsc::unbounded_channel();
self.inject_barrier(command_ctx.clone(), barrier_complete_tx)
Expand Down
8 changes: 4 additions & 4 deletions src/meta/src/stream/stream_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -668,7 +668,7 @@ mod tests {
}

impl MockServices {
async fn start(host: &str, port: u16) -> MetaResult<Self> {
async fn start(host: &str, port: u16, enable_recovery: bool) -> MetaResult<Self> {
let addr = SocketAddr::new(host.parse().unwrap(), port);
let state = Arc::new(FakeFragmentState {
actor_streams: Mutex::new(HashMap::new()),
Expand All @@ -692,7 +692,7 @@ mod tests {

sleep(Duration::from_secs(1)).await;

let env = MetaSrvEnv::for_test_opts(Arc::new(MetaOpts::test(true))).await;
let env = MetaSrvEnv::for_test_opts(Arc::new(MetaOpts::test(enable_recovery))).await;
let system_params = env.system_params_manager().get_params().await;
let meta_metrics = Arc::new(MetaMetrics::new());
let cluster_manager =
Expand Down Expand Up @@ -868,7 +868,7 @@ mod tests {

#[tokio::test]
async fn test_drop_materialized_view() -> MetaResult<()> {
let services = MockServices::start("127.0.0.1", 12334).await?;
let services = MockServices::start("127.0.0.1", 12334, false).await?;

let table_id = TableId::new(0);
let actors = make_mview_stream_actors(&table_id, 4);
Expand Down Expand Up @@ -926,7 +926,7 @@ mod tests {
async fn test_failpoints_drop_mv_recovery() {
let inject_barrier_err = "inject_barrier_err";
let inject_barrier_err_success = "inject_barrier_err_success";
let services = MockServices::start("127.0.0.1", 12335).await.unwrap();
let services = MockServices::start("127.0.0.1", 12335, true).await.unwrap();

let table_id = TableId::new(0);
let actors = make_mview_stream_actors(&table_id, 4);
Expand Down

0 comments on commit 25499e3

Please sign in to comment.