Skip to content

Commit

Permalink
chore: tuck rollback sync
Browse files Browse the repository at this point in the history
Closes THR-33
  • Loading branch information
pepyakin committed Nov 29, 2024
1 parent 3fe3792 commit afb1320
Show file tree
Hide file tree
Showing 2 changed files with 106 additions and 80 deletions.
100 changes: 92 additions & 8 deletions nomt/src/rollback/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use std::{

use dashmap::DashMap;
use nomt_core::trie::KeyPath;
use parking_lot::Mutex;
use parking_lot::{Condvar, Mutex};
use threadpool::ThreadPool;

use self::delta::Delta;
Expand Down Expand Up @@ -49,6 +49,7 @@ struct InMemory {

struct Shared {
worker_tp: ThreadPool,
sync_tp: ThreadPool,
in_memory: Mutex<InMemory>,
seglog: Mutex<SegmentedLog>,
/// The number of items that we should keep in the log. Deltas that are past this limit are
Expand Down Expand Up @@ -112,6 +113,7 @@ impl Rollback {
)?;
let shared = Arc::new(Shared {
worker_tp: ThreadPool::new(rollback_tp_size),
sync_tp: ThreadPool::new(1),
in_memory: Mutex::new(in_memory),
seglog: Mutex::new(seglog),
max_rollback_log_len: max_rollback_log_len as usize,
Expand Down Expand Up @@ -199,8 +201,13 @@ impl Rollback {
Ok(Some(traceback))
}

/// Returns a controller for the sync process.
pub fn sync(&self) -> SyncController {
SyncController::new(self.clone())
}

/// Dumps the contents of the staging to the rollback.
pub fn writeout_start(&self) -> WriteoutData {
fn writeout_start(&self) -> WriteoutData {
let mut in_memory = self.shared.in_memory.lock();
let seglog = self.shared.seglog.lock();

Expand Down Expand Up @@ -239,7 +246,7 @@ impl Rollback {
///
/// We use this point in time to prune the rollback log, by removing the deltas that are no
/// longer needed.
pub fn writeout_end(
fn writeout_end(
&self,
new_start_live: Option<u64>,
new_end_live: Option<u64>,
Expand All @@ -256,13 +263,90 @@ impl Rollback {
}
}

pub struct WriteoutData {
pub rollback_start_live: u64,
pub rollback_end_live: u64,
pub struct SyncController {
rollback: Rollback,
wd: Arc<Mutex<Option<WriteoutData>>>,
wd_cv: Arc<Condvar>,
post_meta: Arc<Mutex<Option<anyhow::Result<()>>>>,
post_meta_cv: Arc<Condvar>,
}

impl SyncController {
fn new(rollback: Rollback) -> Self {
let wd = Arc::new(Mutex::new(None));
let wd_cv = Arc::new(Condvar::new());
let post_meta = Arc::new(Mutex::new(None));
let post_meta_cv = Arc::new(Condvar::new());
Self {
rollback,
wd,
wd_cv,
post_meta,
post_meta_cv,
}
}

/// Begins the sync process.
///
/// This function doesn't block.
pub fn begin_sync(&mut self) {
let tp = self.rollback.shared.sync_tp.clone();
let rollback = self.rollback.clone();
let wd = self.wd.clone();
let wd_cv = self.wd_cv.clone();
tp.execute(move || {
let writeout_data = rollback.writeout_start();
let _ = wd.lock().replace(writeout_data);
wd_cv.notify_one();
});
}

/// Wait for the rollback writeout to complete. Returns the new rollback live range
/// `(start_live, end_live)`.
///
/// This should be called by the sync thread. Blocking.
pub fn wait_pre_meta(&self) -> (u64, u64) {
let mut wd = self.wd.lock();
self.wd_cv.wait_while(&mut wd, |wd| wd.is_none());
// UNWRAP: we checked above that `wd` is not `None`.
let wd = wd.as_ref().unwrap();
(wd.rollback_start_live, wd.rollback_end_live)
}

/// This should be called after the meta has been updated.
///
/// This function doesn't block.
pub fn post_meta(&self) {
let tp = self.rollback.shared.sync_tp.clone();
let wd = self.wd.lock().take().unwrap();
let post_meta = self.post_meta.clone();
let post_meta_cv = self.post_meta_cv.clone();
let rollback = self.rollback.clone();
tp.execute(move || {
let result =
rollback.writeout_end(wd.prune_to_new_start_live, wd.prune_to_new_end_live);
let _ = post_meta.lock().replace(result);
post_meta_cv.notify_one();
});
}

/// Wait until the post-meta writeout completes.
pub fn wait_post_meta(&self) -> anyhow::Result<()> {
let mut post_meta = self.post_meta.lock();
self.post_meta_cv
.wait_while(&mut post_meta, |post_meta| post_meta.is_none());
// UNWRAP: we checked above that `post_meta` is not `None`.
post_meta.take().unwrap()
}
}

struct WriteoutData {
rollback_start_live: u64,
rollback_end_live: u64,
/// If this is `Some`, then the [`Rollback::writeout_end`] should be called with this value.
pub prune_to_new_start_live: Option<u64>,
prune_to_new_start_live: Option<u64>,
/// If this is `Some`, then the [`Rollback::writeout_end`] should be called with this value.
pub prune_to_new_end_live: Option<u64>,
prune_to_new_end_live: Option<u64>,
}

pub struct ReverseDeltaBuilder {
Expand Down
86 changes: 14 additions & 72 deletions nomt/src/store/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,7 @@ use super::{
};
use crate::{beatree, bitbox, merkle, page_cache::PageCache, rollback};

use crossbeam::channel::{self, Receiver};
use threadpool::ThreadPool;

pub struct Sync {
pub(crate) tp: ThreadPool,
pub(crate) sync_seqn: u32,
pub(crate) bitbox_num_pages: u32,
pub(crate) bitbox_seed: [u8; 16],
Expand All @@ -23,7 +19,6 @@ impl Sync {
panic_on_sync: bool,
) -> Self {
Self {
tp: ThreadPool::with_name("store-sync".into(), 6),
sync_seqn,
bitbox_num_pages,
bitbox_seed,
Expand All @@ -44,10 +39,9 @@ impl Sync {
self.sync_seqn += 1;
let sync_seqn = self.sync_seqn;

let rollback_writeout_wd_rx = spawn_rollback_writeout_start(&self.tp, &rollback);

let mut bitbox_sync = bitbox.sync();
let mut beatree_sync = beatree.sync();
let mut rollback_sync = rollback.map(|rollback| rollback.sync());

let merkle_tx = MerkleTransaction {
page_pool: shared.page_pool.clone(),
Expand All @@ -56,29 +50,17 @@ impl Sync {
};
bitbox_sync.begin_sync(page_cache, merkle_tx, page_diffs);
beatree_sync.begin_sync(value_tx.batch);

let rollback_writeout_wd = rollback_writeout_wd_rx
.map(|rollback_writeout_wd| rollback_writeout_wd.recv().unwrap());

let rollback_start_live;
let rollback_end_live;
let rollback_prune_to_new_start_live;
let rollback_prune_to_new_end_live;
if let Some(rollback_writeout_wd) = &rollback_writeout_wd {
rollback_start_live = rollback_writeout_wd.rollback_start_live;
rollback_end_live = rollback_writeout_wd.rollback_end_live;
rollback_prune_to_new_start_live = rollback_writeout_wd.prune_to_new_start_live;
rollback_prune_to_new_end_live = rollback_writeout_wd.prune_to_new_end_live;
} else {
rollback_start_live = 0;
rollback_end_live = 0;
rollback_prune_to_new_start_live = None;
rollback_prune_to_new_end_live = None;
if let Some(ref mut rollback) = rollback_sync {
rollback.begin_sync();
}

// TODO: comprehensive error handling is coming later.
bitbox_sync.wait_pre_meta().unwrap();
let beatree_meta_wd = beatree_sync.wait_pre_meta().unwrap();
let (rollback_start_live, rollback_end_live) = match rollback_sync {
Some(ref rollback) => rollback.wait_pre_meta(),
None => (0, 0),
};

let new_meta = Meta {
magic: meta::MAGIC,
Expand All @@ -99,58 +81,18 @@ impl Sync {
panic!("panic_on_sync is true");
}

// Spawn a task to finish off the rollback writeout, if required.
let rollback_writeout_end_rx = if let Some(rollback) = rollback {
spawn_rollback_writeout_end(
&self.tp,
&rollback,
rollback_prune_to_new_start_live,
rollback_prune_to_new_end_live,
)
} else {
let (tx, rx) = channel::bounded(1);
tx.send(()).unwrap();
rx
};
if let Some(ref rollback) = rollback_sync {
rollback.post_meta();
}

bitbox_sync.post_meta(shared.io_pool.make_handle())?;
beatree_sync.post_meta();

rollback_writeout_end_rx.recv().unwrap();
if let Some(ref rollback) = rollback_sync {
// TODO: comprehensive error handling is coming later.
rollback.wait_post_meta().unwrap();
}

Ok(())
}
}

fn spawn_rollback_writeout_start(
tp: &ThreadPool,
rollback: &Option<rollback::Rollback>,
) -> Option<Receiver<rollback::WriteoutData>> {
match rollback {
None => None,
Some(rollback) => {
let (result_tx, result_rx) = channel::bounded(1);
let rollback = rollback.clone();
tp.execute(move || {
let writeout_data = rollback.writeout_start();
let _ = result_tx.send(writeout_data);
});
Some(result_rx)
}
}
}

fn spawn_rollback_writeout_end(
tp: &ThreadPool,
rollback: &rollback::Rollback,
new_start_live: Option<u64>,
new_end_live: Option<u64>,
) -> Receiver<()> {
let (result_tx, result_rx) = channel::bounded(1);
let rollback = rollback.clone();
tp.execute(move || {
rollback.writeout_end(new_start_live, new_end_live).unwrap();
let _ = result_tx.send(());
});
result_rx
}

0 comments on commit afb1320

Please sign in to comment.