Skip to content

Commit

Permalink
cleanup beatree & bitbox
Browse files Browse the repository at this point in the history
specifically, reuse the threadpool between the runs of sync.
this also locks beatree for the entire duration of sync.
  • Loading branch information
pepyakin committed Nov 25, 2024
1 parent d51f7bf commit 9f00583
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 65 deletions.
98 changes: 48 additions & 50 deletions nomt/src/beatree/mod.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use allocator::{PageNumber, Store, StoreReader, FREELIST_EMPTY};
use anyhow::{Context, Result};
use branch::BRANCH_NODE_SIZE;
use parking_lot::{Mutex, RwLock};
use std::{collections::BTreeMap, fs::File, mem, ops::DerefMut, path::Path, sync::Arc};
use parking_lot::{ArcMutexGuard, Mutex, RwLock};
use std::{collections::BTreeMap, fs::File, mem, path::Path, sync::Arc};
use threadpool::ThreadPool;

use crate::io::{fsyncer::Fsyncer, IoHandle, IoPool, PagePool};
Expand All @@ -25,9 +25,6 @@ pub type Key = [u8; 32];
pub struct Tree {
shared: Arc<RwLock<Shared>>,
sync: Arc<Mutex<Sync>>,
bbn_fsync: Arc<Fsyncer>,
ln_fsync: Arc<Fsyncer>,
sync_tp: ThreadPool,
}

struct Shared {
Expand All @@ -48,6 +45,8 @@ struct Shared {
struct Sync {
tp: ThreadPool,
commit_concurrency: usize,
bbn_fsync: Arc<Fsyncer>,
ln_fsync: Arc<Fsyncer>,
}

impl Shared {
Expand Down Expand Up @@ -115,16 +114,16 @@ impl Tree {
};

let sync = Sync {
tp: ThreadPool::with_name("beatree-sync".into(), commit_concurrency),
// +1 for the begin_sync task.
tp: ThreadPool::with_name("beatree-sync".into(), commit_concurrency + 1),
commit_concurrency,
bbn_fsync: Arc::new(Fsyncer::new("bbn", bbn_file)),
ln_fsync: Arc::new(Fsyncer::new("ln", ln_file)),
};

Ok(Tree {
sync_tp: ThreadPool::with_name("beatree-sync".into(), 1),
shared: Arc::new(RwLock::new(shared)),
sync: Arc::new(Mutex::new(sync)),
bbn_fsync: Arc::new(Fsyncer::new("bbn", bbn_file)),
ln_fsync: Arc::new(Fsyncer::new("ln", ln_file)),
})
}

Expand All @@ -148,11 +147,17 @@ impl Tree {

/// Returns a controller for the sync process.
pub fn sync(&self) -> SyncController {
// Take the sync lock.
//
// That will exclude any other syncs from happening. This is a long running operation.
let sync = self.sync.lock_arc();
SyncController {
tree: self.clone(),
tp: self.sync_tp.clone(),
sync_data: Arc::new(Mutex::new(None)),
bbn_index: Arc::new(Mutex::new(None)),
inner: Arc::new(SharedSyncController {
sync,
shared: self.shared.clone(),
sync_data: Mutex::new(None),
bbn_index: Mutex::new(None),
}),
}
}

Expand All @@ -162,11 +167,11 @@ impl Tree {
/// The changeset is applied atomically. If the changeset is empty, the btree is not modified.
// There might be some temptation to unify this with prepare_sync, but this should not be done
// because in the future sync and commit will be called on different threads at different times.
fn commit(&self, changeset: Vec<(Key, Option<Vec<u8>>)>) {
fn commit(shared: &Arc<RwLock<Shared>>, changeset: Vec<(Key, Option<Vec<u8>>)>) {
if changeset.is_empty() {
return;
}
let mut inner = self.shared.write();
let mut inner = shared.write();
let staging = &mut inner.primary_staging;
for (key, value) in changeset {
staging.insert(key, value);
Expand All @@ -176,14 +181,7 @@ impl Tree {
/// Dump all changes performed by commits to the underlying storage medium.
///
/// Either blocks or panics if another sync is inflight.
fn prepare_sync(&self) -> (SyncData, Index) {
// Take the sync lock.
//
// That will exclude any other syncs from happening. This is a long running operation.
//
// Note the ordering of taking locks is important.
let mut sync = self.sync.lock();

fn prepare_sync(sync: &Sync, shared: &Arc<RwLock<Shared>>) -> (SyncData, Index) {
// Take the shared lock. Briefly.
let staged_changeset;
let bbn_index;
Expand All @@ -192,7 +190,7 @@ impl Tree {
let bbn_store;
let io_handle;
{
let mut shared = self.shared.write();
let mut shared = shared.write();
staged_changeset = shared.take_staged_changeset();
bbn_index = shared.bbn_index.clone();
page_pool = shared.page_pool.clone();
Expand All @@ -202,8 +200,6 @@ impl Tree {
}

{
let sync = sync.deref_mut();

// Update will modify the index in a CoW manner.
//
// Nodes that need to be modified are not modified in place but they are
Expand Down Expand Up @@ -235,11 +231,11 @@ impl Tree {
}
}

fn finish_sync(&self, bbn_index: Index) {
fn finish_sync(shared: &Arc<RwLock<Shared>>, bbn_index: Index) {
// Take the shared lock again to complete the update to the new shared state
let mut inner = self.shared.write();
inner.secondary_staging = None;
inner.bbn_index = bbn_index;
let mut shared = shared.write();
shared.secondary_staging = None;
shared.bbn_index = bbn_index;
}
}

Expand Down Expand Up @@ -288,10 +284,14 @@ pub fn create(db_dir: impl AsRef<Path>) -> anyhow::Result<()> {
/// should be discarded.
// TODO: error handling is coming in a follow up.
pub struct SyncController {
tree: Tree,
tp: ThreadPool,
sync_data: Arc<Mutex<Option<SyncData>>>,
bbn_index: Arc<Mutex<Option<Index>>>,
inner: Arc<SharedSyncController>,
}

struct SharedSyncController {
sync: ArcMutexGuard<parking_lot::RawMutex, Sync>,
shared: Arc<RwLock<Shared>>,
sync_data: Mutex<Option<SyncData>>,
bbn_index: Mutex<Option<Index>>,
}

impl SyncController {
Expand All @@ -301,23 +301,21 @@ impl SyncController {
///
/// Non-blocking.
pub fn begin_sync(&mut self, changeset: Vec<(Key, Option<Vec<u8>>)>) {
let beatree = self.tree.clone();
let sync_data = self.sync_data.clone();
let bbn_index = self.bbn_index.clone();
self.tp.execute(move || {
beatree.commit(changeset);
let (out_meta, out_bbn_index) = beatree.prepare_sync();

let mut sync_data = sync_data.lock();
let inner = self.inner.clone();
self.inner.sync.tp.execute(move || {
Tree::commit(&inner.shared, changeset);
let (out_meta, out_bbn_index) = Tree::prepare_sync(&inner.sync, &inner.shared);

let mut sync_data = inner.sync_data.lock();
*sync_data = Some(out_meta);
drop(sync_data);

let mut bbn_index = bbn_index.lock();
let mut bbn_index = inner.bbn_index.lock();
*bbn_index = Some(out_bbn_index);
drop(bbn_index);

beatree.bbn_fsync.fsync();
beatree.ln_fsync.fsync();
inner.sync.bbn_fsync.fsync();
inner.sync.ln_fsync.fsync();
});
}

Expand All @@ -326,11 +324,11 @@ impl SyncController {
///
/// This must be called after [`Self::begin_sync`].
pub fn wait_pre_meta(&mut self) -> anyhow::Result<SyncData> {
self.tree.bbn_fsync.wait()?;
self.tree.ln_fsync.wait()?;
self.inner.sync.bbn_fsync.wait()?;
self.inner.sync.ln_fsync.wait()?;

// UNWRAP: fsync of bbn and ln above ensures that sync_data is Some.
let sync_data = self.sync_data.lock().take().unwrap();
let sync_data = self.inner.sync_data.lock().take().unwrap();
Ok(sync_data)
}

Expand All @@ -339,7 +337,7 @@ impl SyncController {
/// Has to be called after the manifest is updated. Must be invoked by the sync
/// thread. Blocking.
pub fn post_meta(&mut self) {
let bbn_index = self.bbn_index.lock().take().unwrap();
self.tree.finish_sync(bbn_index);
let bbn_index = self.inner.bbn_index.lock().take().unwrap();
Tree::finish_sync(&self.inner.shared, bbn_index);
}
}
26 changes: 11 additions & 15 deletions nomt/src/bitbox/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ pub struct Shared {
occupied_buckets: AtomicUsize,
wal_fd: File,
ht_fd: File,
sync_tp: ThreadPool,
}

impl DB {
Expand Down Expand Up @@ -84,6 +85,7 @@ impl DB {
occupied_buckets: AtomicUsize::new(occupied_buckets),
wal_fd,
ht_fd,
sync_tp: ThreadPool::with_name("bitbox-sync".into(), 2),
}),
})
}
Expand All @@ -98,7 +100,7 @@ impl DB {
}

pub fn sync(&self) -> SyncController {
SyncController::new(self.clone(), ThreadPool::with_name("bitbox-sync".into(), 6))
SyncController::new(self.clone())
}

fn prepare_sync(
Expand Down Expand Up @@ -182,7 +184,6 @@ impl DB {

pub struct SyncController {
db: DB,
tp: ThreadPool,
/// The channel to send the result of the WAL writeout. Option is to allow `take`.
wal_result_tx: Option<Sender<anyhow::Result<()>>>,
/// The channel to receive the result of the WAL writeout.
Expand All @@ -192,10 +193,9 @@ pub struct SyncController {
}

impl SyncController {
fn new(db: DB, tp: ThreadPool) -> Self {
fn new(db: DB) -> Self {
let (wal_result_tx, wal_result_rx) = crossbeam_channel::bounded(1);
Self {
tp,
db,
wal_result_tx: Some(wal_result_tx),
wal_result_rx,
Expand All @@ -215,19 +215,18 @@ impl SyncController {
let page_pool = self.db.shared.page_pool.clone();
let bitbox = self.db.clone();
let ht_to_write = self.ht_to_write.clone();
let tp = self.tp.clone();
let wal_blob_builder = self.db.shared.wal_blob_builder.clone();
// UNWRAP: safe because begin_sync is called only once.
let wal_result_tx = self.wal_result_tx.take().unwrap();
self.tp.execute(move || {
self.db.shared.sync_tp.execute(move || {
page_cache.prepare_transaction(page_diffs.into_iter(), &mut merkle_tx);

let mut wal_blob_builder = wal_blob_builder.lock();
let ht_pages =
bitbox.prepare_sync(&page_pool, merkle_tx.new_pages, &mut *wal_blob_builder);
drop(wal_blob_builder);

Self::spawn_wal_writeout(wal_result_tx, &tp, &bitbox.shared);
Self::spawn_wal_writeout(wal_result_tx, bitbox);

let mut ht_to_write = ht_to_write.lock();
*ht_to_write = Some(ht_pages);
Expand All @@ -237,16 +236,13 @@ impl SyncController {
});
}

fn spawn_wal_writeout(
wal_result_tx: Sender<anyhow::Result<()>>,
tp: &ThreadPool,
shared: &Arc<Shared>,
) {
let shared = shared.clone();
fn spawn_wal_writeout(wal_result_tx: Sender<anyhow::Result<()>>, bitbox: DB) {
let bitbox = bitbox.clone();
let tp = bitbox.shared.sync_tp.clone();
tp.execute(move || {
let wal_blob_builder = shared.wal_blob_builder.lock();
let wal_blob_builder = bitbox.shared.wal_blob_builder.lock();
let wal_slice = wal_blob_builder.as_slice();
let wal_result = writeout::write_wal(&shared.wal_fd, wal_slice);
let wal_result = writeout::write_wal(&bitbox.shared.wal_fd, wal_slice);
let _ = wal_result_tx.send(wal_result);
});
}
Expand Down

0 comments on commit 9f00583

Please sign in to comment.