From 2a31949cb78be4a82e9beb1c37de45e3e9a82aba Mon Sep 17 00:00:00 2001 From: Ahmed Farghal Date: Tue, 15 Oct 2024 15:01:57 +0100 Subject: [PATCH] [Bifrost] Support repair store in log-server --- crates/log-server/src/loglet_worker.rs | 200 ++++++++++++++++++++++++- 1 file changed, 194 insertions(+), 6 deletions(-) diff --git a/crates/log-server/src/loglet_worker.rs b/crates/log-server/src/loglet_worker.rs index 5fbef0ec2..a96db7099 100644 --- a/crates/log-server/src/loglet_worker.rs +++ b/crates/log-server/src/loglet_worker.rs @@ -12,7 +12,7 @@ use futures::future::OptionFuture; use futures::stream::FuturesUnordered; use futures::StreamExt; use tokio::sync::mpsc; -use tracing::{debug, warn}; +use tracing::{debug, trace, warn}; use restate_core::network::Incoming; use restate_core::{cancellation_watcher, ShutdownError, TaskCenter, TaskHandle, TaskKind}; @@ -350,13 +350,32 @@ impl LogletWorker { } // Are we writing an older record than local-tail, this must be from the sequencer. - if body.first_offset < next_ok_offset && peer != body.sequencer { + if body.first_offset < next_ok_offset + && peer != body.sequencer + // not a repair store. + && !body.flags.contains(StoreFlags::IgnoreSeal) + { return (Status::SequencerMismatch, None); } if body.flags.contains(StoreFlags::IgnoreSeal) { - // store happens anyway. - todo!("repair stores are not implemented yet") + // We must be sealed (sanity check) + // Accept repair store only on sealed loglet. + if !self.loglet_state.is_sealed() { + warn!( + loglet_id = %self.loglet_id, + %peer, + first_offset = %body.first_offset, + "Ignoring repair store on unsealed loglet, repair should only happen on sealed loglets" + ); + return (Status::Malformed, None); + } + trace!( + loglet_id = %self.loglet_id, + %peer, + first_offset = %body.first_offset, + "Admitting a repair store for sealed loglet to restore replication" + ); } if body.first_offset > next_ok_offset { @@ -364,8 +383,10 @@ impl LogletWorker { // writes. debug!( loglet_id = %self.loglet_id, - "Can only accept writes coming in order, next_ok={} msg.first_offset={}", - next_ok_offset, body.first_offset + first_offset = %body.first_offset, + %peer, + "Can only accept writes coming in order, next_ok={}", + next_ok_offset, ); return (Status::OutOfBounds, None); } @@ -867,6 +888,173 @@ mod tests { Ok(()) } + #[test(tokio::test(start_paused = true))] + async fn test_repair_store() -> Result<()> { + const SEQUENCER: GenerationalNodeId = GenerationalNodeId::new(1, 1); + const PEER: GenerationalNodeId = GenerationalNodeId::new(2, 2); + const LOGLET: ReplicatedLogletId = ReplicatedLogletId::new(1); + + let (tc, log_store) = setup().await?; + let mut loglet_state_map = LogletStateMap::default(); + let (net_tx, mut net_rx) = mpsc::channel(10); + let connection = OwnedConnection::new_fake(SEQUENCER, CURRENT_PROTOCOL_VERSION, net_tx); + + let (peer_net_tx, mut peer_net_rx) = mpsc::channel(10); + let repair_connection = + OwnedConnection::new_fake(PEER, CURRENT_PROTOCOL_VERSION, peer_net_tx); + + let loglet_state = loglet_state_map.get_or_load(LOGLET, &log_store).await?; + let worker = LogletWorker::start(tc.clone(), LOGLET, log_store, loglet_state)?; + + let payloads: Arc<[Record]> = vec![ + Record::from("a sample record"), + Record::from("another record"), + ] + .into(); + + // offsets 1, 2 + let msg1 = Store { + header: LogServerRequestHeader::new(LOGLET, LogletOffset::INVALID), + timeout_at: None, + sequencer: SEQUENCER, + known_archived: LogletOffset::INVALID, + first_offset: LogletOffset::OLDEST, + flags: StoreFlags::empty(), + payloads: payloads.clone(), + }; + + // offsets 10, 11 + let msg2 = Store { + header: LogServerRequestHeader::new(LOGLET, LogletOffset::new(10)), + timeout_at: None, + sequencer: SEQUENCER, + known_archived: LogletOffset::INVALID, + first_offset: LogletOffset::new(10), + flags: StoreFlags::empty(), + payloads: payloads.clone(), + }; + + let seal1 = Seal { + header: LogServerRequestHeader::new(LOGLET, LogletOffset::INVALID), + sequencer: SEQUENCER, + }; + + // 5, 6 + let repair_message_before_local_tail = Store { + header: LogServerRequestHeader::new(LOGLET, LogletOffset::new(10)), + timeout_at: None, + sequencer: SEQUENCER, + known_archived: LogletOffset::INVALID, + first_offset: LogletOffset::new(5), + flags: StoreFlags::IgnoreSeal, + payloads: payloads.clone(), + }; + + // 16, 17 + let repair_message_after_local_tail = Store { + header: LogServerRequestHeader::new(LOGLET, LogletOffset::new(16)), + timeout_at: None, + sequencer: SEQUENCER, + known_archived: LogletOffset::INVALID, + first_offset: LogletOffset::new(16), + flags: StoreFlags::IgnoreSeal, + payloads: payloads.clone(), + }; + + let msg1 = Incoming::for_testing(connection.downgrade(), msg1, None); + let msg2 = Incoming::for_testing(connection.downgrade(), msg2, None); + let repair1 = Incoming::for_testing( + repair_connection.downgrade(), + repair_message_before_local_tail, + None, + ); + let repair2 = Incoming::for_testing( + repair_connection.downgrade(), + repair_message_after_local_tail, + None, + ); + let seal1 = Incoming::for_testing(connection.downgrade(), seal1, None); + + worker.enqueue_store(msg1).unwrap(); + worker.enqueue_store(msg2).unwrap(); + // first store is successful + let response = net_rx.recv().await.unwrap(); + let stored: Stored = response + .body + .unwrap() + .try_decode(connection.protocol_version())?; + assert_that!(stored.status, eq(Status::Ok)); + assert_that!(stored.sealed, eq(false)); + assert_that!(stored.local_tail, eq(LogletOffset::new(3))); + + // 10, 11 + let response = net_rx.recv().await.unwrap(); + let stored: Stored = response + .body + .unwrap() + .try_decode(connection.protocol_version())?; + assert_that!(stored.status, eq(Status::Ok)); + assert_that!(stored.sealed, eq(false)); + assert_that!(stored.local_tail, eq(LogletOffset::new(12))); + + worker.enqueue_seal(seal1).unwrap(); + // seal responses can come at any order, but we'll consume waiters queue before we process + // store messages. + // sealed + let response = net_rx.recv().await.unwrap(); + let sealed: Sealed = response + .body + .unwrap() + .try_decode(connection.protocol_version())?; + assert_that!(sealed.status, eq(Status::Ok)); + assert_that!(sealed.local_tail, eq(LogletOffset::new(12))); + + // repair store (before local tail, local tail won't move) + worker.enqueue_store(repair1).unwrap(); + let response = peer_net_rx.recv().await.unwrap(); + let stored: Stored = response + .body + .unwrap() + .try_decode(connection.protocol_version())?; + assert_that!(stored.status, eq(Status::Ok)); + assert_that!(stored.local_tail, eq(LogletOffset::new(12))); + + worker.enqueue_store(repair2).unwrap(); + let response = peer_net_rx.recv().await.unwrap(); + let stored: Stored = response + .body + .unwrap() + .try_decode(connection.protocol_version())?; + assert_that!(stored.status, eq(Status::Ok)); + assert_that!(stored.local_tail, eq(LogletOffset::new(18))); + + // GetLogletInfo + // offsets 3, 4 + let msg = GetLogletInfo { + header: LogServerRequestHeader::new(LOGLET, LogletOffset::INVALID), + }; + let msg = Incoming::for_testing(connection.downgrade(), msg, None); + let msg_id = msg.msg_id(); + worker.enqueue_get_loglet_info(msg).unwrap(); + + let response = net_rx.recv().await.unwrap(); + let header = response.header.unwrap(); + assert_that!(header.in_response_to(), eq(msg_id)); + let info: LogletInfo = response + .body + .unwrap() + .try_decode(connection.protocol_version())?; + assert_that!(info.status, eq(Status::Ok)); + assert_that!(info.local_tail, eq(LogletOffset::new(18))); + assert_that!(info.trim_point, eq(LogletOffset::INVALID)); + assert_that!(info.sealed, eq(true)); + + tc.shutdown_node("test completed", 0).await; + RocksDbManager::get().shutdown().await; + + Ok(()) + } + #[test(tokio::test(start_paused = true))] async fn test_simple_get_records_flow() -> Result<()> { const SEQUENCER: GenerationalNodeId = GenerationalNodeId::new(1, 1);