Skip to content

Commit

Permalink
[Bifrost] Support repair store in log-server
Browse files Browse the repository at this point in the history
  • Loading branch information
AhmedSoliman committed Oct 15, 2024
1 parent edf3365 commit 2a31949
Showing 1 changed file with 194 additions and 6 deletions.
200 changes: 194 additions & 6 deletions crates/log-server/src/loglet_worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use futures::future::OptionFuture;
use futures::stream::FuturesUnordered;
use futures::StreamExt;
use tokio::sync::mpsc;
use tracing::{debug, warn};
use tracing::{debug, trace, warn};

use restate_core::network::Incoming;
use restate_core::{cancellation_watcher, ShutdownError, TaskCenter, TaskHandle, TaskKind};
Expand Down Expand Up @@ -350,22 +350,43 @@ impl<S: LogStore> LogletWorker<S> {
}

// Are we writing an older record than local-tail, this must be from the sequencer.
if body.first_offset < next_ok_offset && peer != body.sequencer {
if body.first_offset < next_ok_offset
&& peer != body.sequencer
// not a repair store.
&& !body.flags.contains(StoreFlags::IgnoreSeal)
{
return (Status::SequencerMismatch, None);
}

if body.flags.contains(StoreFlags::IgnoreSeal) {
// store happens anyway.
todo!("repair stores are not implemented yet")
// We must be sealed (sanity check)
// Accept repair store only on sealed loglet.
if !self.loglet_state.is_sealed() {
warn!(
loglet_id = %self.loglet_id,
%peer,
first_offset = %body.first_offset,
"Ignoring repair store on unsealed loglet, repair should only happen on sealed loglets"
);
return (Status::Malformed, None);
}
trace!(
loglet_id = %self.loglet_id,
%peer,
first_offset = %body.first_offset,
"Admitting a repair store for sealed loglet to restore replication"
);
}

if body.first_offset > next_ok_offset {
// We can only accept writes coming in order. We don't support buffering out-of-order
// writes.
debug!(
loglet_id = %self.loglet_id,
"Can only accept writes coming in order, next_ok={} msg.first_offset={}",
next_ok_offset, body.first_offset
first_offset = %body.first_offset,
%peer,
"Can only accept writes coming in order, next_ok={}",
next_ok_offset,
);
return (Status::OutOfBounds, None);
}
Expand Down Expand Up @@ -867,6 +888,173 @@ mod tests {
Ok(())
}

#[test(tokio::test(start_paused = true))]
async fn test_repair_store() -> Result<()> {
const SEQUENCER: GenerationalNodeId = GenerationalNodeId::new(1, 1);
const PEER: GenerationalNodeId = GenerationalNodeId::new(2, 2);
const LOGLET: ReplicatedLogletId = ReplicatedLogletId::new(1);

let (tc, log_store) = setup().await?;
let mut loglet_state_map = LogletStateMap::default();
let (net_tx, mut net_rx) = mpsc::channel(10);
let connection = OwnedConnection::new_fake(SEQUENCER, CURRENT_PROTOCOL_VERSION, net_tx);

let (peer_net_tx, mut peer_net_rx) = mpsc::channel(10);
let repair_connection =
OwnedConnection::new_fake(PEER, CURRENT_PROTOCOL_VERSION, peer_net_tx);

let loglet_state = loglet_state_map.get_or_load(LOGLET, &log_store).await?;
let worker = LogletWorker::start(tc.clone(), LOGLET, log_store, loglet_state)?;

let payloads: Arc<[Record]> = vec![
Record::from("a sample record"),
Record::from("another record"),
]
.into();

// offsets 1, 2
let msg1 = Store {
header: LogServerRequestHeader::new(LOGLET, LogletOffset::INVALID),
timeout_at: None,
sequencer: SEQUENCER,
known_archived: LogletOffset::INVALID,
first_offset: LogletOffset::OLDEST,
flags: StoreFlags::empty(),
payloads: payloads.clone(),
};

// offsets 10, 11
let msg2 = Store {
header: LogServerRequestHeader::new(LOGLET, LogletOffset::new(10)),
timeout_at: None,
sequencer: SEQUENCER,
known_archived: LogletOffset::INVALID,
first_offset: LogletOffset::new(10),
flags: StoreFlags::empty(),
payloads: payloads.clone(),
};

let seal1 = Seal {
header: LogServerRequestHeader::new(LOGLET, LogletOffset::INVALID),
sequencer: SEQUENCER,
};

// 5, 6
let repair_message_before_local_tail = Store {
header: LogServerRequestHeader::new(LOGLET, LogletOffset::new(10)),
timeout_at: None,
sequencer: SEQUENCER,
known_archived: LogletOffset::INVALID,
first_offset: LogletOffset::new(5),
flags: StoreFlags::IgnoreSeal,
payloads: payloads.clone(),
};

// 16, 17
let repair_message_after_local_tail = Store {
header: LogServerRequestHeader::new(LOGLET, LogletOffset::new(16)),
timeout_at: None,
sequencer: SEQUENCER,
known_archived: LogletOffset::INVALID,
first_offset: LogletOffset::new(16),
flags: StoreFlags::IgnoreSeal,
payloads: payloads.clone(),
};

let msg1 = Incoming::for_testing(connection.downgrade(), msg1, None);
let msg2 = Incoming::for_testing(connection.downgrade(), msg2, None);
let repair1 = Incoming::for_testing(
repair_connection.downgrade(),
repair_message_before_local_tail,
None,
);
let repair2 = Incoming::for_testing(
repair_connection.downgrade(),
repair_message_after_local_tail,
None,
);
let seal1 = Incoming::for_testing(connection.downgrade(), seal1, None);

worker.enqueue_store(msg1).unwrap();
worker.enqueue_store(msg2).unwrap();
// first store is successful
let response = net_rx.recv().await.unwrap();
let stored: Stored = response
.body
.unwrap()
.try_decode(connection.protocol_version())?;
assert_that!(stored.status, eq(Status::Ok));
assert_that!(stored.sealed, eq(false));
assert_that!(stored.local_tail, eq(LogletOffset::new(3)));

// 10, 11
let response = net_rx.recv().await.unwrap();
let stored: Stored = response
.body
.unwrap()
.try_decode(connection.protocol_version())?;
assert_that!(stored.status, eq(Status::Ok));
assert_that!(stored.sealed, eq(false));
assert_that!(stored.local_tail, eq(LogletOffset::new(12)));

worker.enqueue_seal(seal1).unwrap();
// seal responses can come at any order, but we'll consume waiters queue before we process
// store messages.
// sealed
let response = net_rx.recv().await.unwrap();
let sealed: Sealed = response
.body
.unwrap()
.try_decode(connection.protocol_version())?;
assert_that!(sealed.status, eq(Status::Ok));
assert_that!(sealed.local_tail, eq(LogletOffset::new(12)));

// repair store (before local tail, local tail won't move)
worker.enqueue_store(repair1).unwrap();
let response = peer_net_rx.recv().await.unwrap();
let stored: Stored = response
.body
.unwrap()
.try_decode(connection.protocol_version())?;
assert_that!(stored.status, eq(Status::Ok));
assert_that!(stored.local_tail, eq(LogletOffset::new(12)));

worker.enqueue_store(repair2).unwrap();
let response = peer_net_rx.recv().await.unwrap();
let stored: Stored = response
.body
.unwrap()
.try_decode(connection.protocol_version())?;
assert_that!(stored.status, eq(Status::Ok));
assert_that!(stored.local_tail, eq(LogletOffset::new(18)));

// GetLogletInfo
// offsets 3, 4
let msg = GetLogletInfo {
header: LogServerRequestHeader::new(LOGLET, LogletOffset::INVALID),
};
let msg = Incoming::for_testing(connection.downgrade(), msg, None);
let msg_id = msg.msg_id();
worker.enqueue_get_loglet_info(msg).unwrap();

let response = net_rx.recv().await.unwrap();
let header = response.header.unwrap();
assert_that!(header.in_response_to(), eq(msg_id));
let info: LogletInfo = response
.body
.unwrap()
.try_decode(connection.protocol_version())?;
assert_that!(info.status, eq(Status::Ok));
assert_that!(info.local_tail, eq(LogletOffset::new(18)));
assert_that!(info.trim_point, eq(LogletOffset::INVALID));
assert_that!(info.sealed, eq(true));

tc.shutdown_node("test completed", 0).await;
RocksDbManager::get().shutdown().await;

Ok(())
}

#[test(tokio::test(start_paused = true))]
async fn test_simple_get_records_flow() -> Result<()> {
const SEQUENCER: GenerationalNodeId = GenerationalNodeId::new(1, 1);
Expand Down

0 comments on commit 2a31949

Please sign in to comment.