Skip to content

Commit

Permalink
opt: remove preload_leaves in favor of a preload_and_prepare
Browse files Browse the repository at this point in the history
  • Loading branch information
rphmeier committed Dec 6, 2024
1 parent 4f5f0ce commit 898499c
Show file tree
Hide file tree
Showing 5 changed files with 159 additions and 87 deletions.
17 changes: 0 additions & 17 deletions nomt/src/beatree/leaf_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,6 @@ impl LeafCache {
shard.cache.get(&page_number).map(|x| x.clone())
}

/// Check whether the cache contains a key without updating the LRU state.
pub fn contains_key(&self, page_number: PageNumber) -> bool {
let shard = self.inner.shard_for(page_number);

shard.cache.contains(&page_number)
}

/// Insert a cache entry. This does not evict anything.
pub fn insert(&self, page_number: PageNumber, node: Arc<LeafNode>) {
let mut shard = self.inner.shard_for(page_number);
Expand All @@ -62,16 +55,6 @@ impl LeafCache {
}
}
}

#[cfg(test)]
pub fn all_page_numbers(&self) -> std::collections::BTreeSet<PageNumber> {
let mut set = std::collections::BTreeSet::new();
for shard in &self.inner.shards {
let shard = shard.lock();
set.extend(shard.cache.iter().map(|(pn, _)| *pn));
}
set
}
}

struct Shared {
Expand Down
127 changes: 126 additions & 1 deletion nomt/src/beatree/ops/update/leaf_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use crate::beatree::{
},
Key,
};
use crate::io::{IoCommand, IoHandle, IoKind, PAGE_SIZE};
use crate::io::{IoCommand, IoHandle, IoKind, IoPool, PAGE_SIZE};

/// Tracker of all changes that happen to leaves during an update
pub type LeavesTracker = super::NodesTracker<LeafNode>;
Expand Down Expand Up @@ -134,6 +134,20 @@ pub fn run(

let worker_result_tx = worker_result_tx.clone();
thread_pool.execute(move || {
// TODO: handle error.
let prepared_leaves = preload_and_prepare(
&leaf_cache,
&leaf_reader,
&bbn_index,
io_handle.io_pool(),
changeset[worker_params.op_range.clone()]
.iter()
.map(|(k, _)| *k),
)
.unwrap();

let mut prepared_leaves_iter = prepared_leaves.into_iter().peekable();

// passing the large `Arc` values by reference ensures that they are dropped at the
// end of this scope, not the end of `run_worker`.
let res = run_worker(
Expand All @@ -142,6 +156,7 @@ pub fn run(
leaf_reader,
leaf_writer,
io_handle,
&mut prepared_leaves_iter,
&*changeset,
worker_params,
);
Expand Down Expand Up @@ -294,6 +309,7 @@ fn reset_leaf_base(
leaves_tracker: &mut LeavesTracker,
leaf_updater: &mut LeafUpdater,
has_extended_range: bool,
prepared_leaves: &mut PreparedLeafIter,
mut key: Key,
) {
if !has_extended_range {
Expand All @@ -303,11 +319,16 @@ fn reset_leaf_base(
leaf_reader,
leaves_tracker,
leaf_updater,
prepared_leaves,
key,
);
return;
}

// by the time we extend our range, we should've consumed all relevant leaves to our initial
// changeset.
assert!(prepared_leaves.peek().is_none());

if let Some((separator, node, next_separator)) = leaves_tracker.pending_base.take() {
let base = BaseLeaf::new(node, separator);
leaf_updater.reset_base(Some(base), next_separator);
Expand All @@ -326,6 +347,7 @@ fn reset_leaf_base(
leaf_reader,
leaves_tracker,
leaf_updater,
prepared_leaves,
key,
)
} else {
Expand All @@ -343,8 +365,34 @@ fn reset_leaf_base_fresh(
leaf_reader: &StoreReader,
leaves_tracker: &mut LeavesTracker,
leaf_updater: &mut LeafUpdater,
prepared_leaves: &mut PreparedLeafIter,
key: Key,
) {
// fast path: this leaf was expected to be used and prepared. avoid heavy index lookup.
if prepared_leaves
.peek()
.map_or(false, |prepared| prepared.separator <= key)
{
// UNWRAP: we just checked.
// note that the separator < key condition above only fails if we need to merge with an
// unprepared leaf.
let prepared_leaf = prepared_leaves.next().unwrap();

// we intend to work on this leaf, therefore, we delete it.
// any new leaves produced by the updater will replace it.
leaves_tracker.delete(
prepared_leaf.separator,
prepared_leaf.page_number,
prepared_leaf.cutoff,
);

// UNWRAP: prepared leaves always have a `Some` node.
let base = BaseLeaf::new(prepared_leaf.node.unwrap(), prepared_leaf.separator);
leaf_updater.reset_base(Some(base), prepared_leaf.cutoff);
return;
}

// slow path: unexpected leaf.
let Some((separator, cutoff, leaf_pn)) = indexed_leaf(bbn_index, key) else {
return;
};
Expand Down Expand Up @@ -376,6 +424,7 @@ fn run_worker(
leaf_reader: StoreReader,
leaf_writer: SyncAllocator,
io_handle: IoHandle,
prepared_leaves: &mut PreparedLeafIter,
changeset: &[(Key, Option<(Vec<u8>, bool)>)],
mut worker_params: WorkerParams<LeafNode>,
) -> LeafWorkerOutput {
Expand All @@ -399,6 +448,7 @@ fn run_worker(
&mut new_leaf_state.leaves_tracker,
&mut leaf_updater,
has_extended_range,
prepared_leaves,
changeset[worker_params.op_range.start].0,
);

Expand Down Expand Up @@ -437,6 +487,7 @@ fn run_worker(
&mut new_leaf_state.leaves_tracker,
&mut leaf_updater,
has_extended_range,
prepared_leaves,
k,
);
}
Expand Down Expand Up @@ -475,6 +526,7 @@ fn run_worker(
&mut new_leaf_state.leaves_tracker,
&mut leaf_updater,
has_extended_range,
prepared_leaves,
cutoff,
);
}
Expand Down Expand Up @@ -539,3 +591,76 @@ impl super::leaf_updater::HandleNewLeaf for NewLeafHandler {
self.leaves_tracker.insert(key, leaf, cutoff, page_number);
}
}

type PreparedLeafIter = std::iter::Peekable<std::vec::IntoIter<PreparedLeaf>>;

struct PreparedLeaf {
separator: Key,
// this is always `Some`.
node: Option<Arc<LeafNode>>,
cutoff: Option<Key>,
page_number: PageNumber,
}

fn preload_and_prepare(
leaf_cache: &LeafCache,
leaf_reader: &StoreReader,
bbn_index: &Index,
io_pool: &IoPool,
changeset: impl IntoIterator<Item = Key>,
) -> anyhow::Result<Vec<PreparedLeaf>> {
let io_handle = io_pool.make_handle();

let mut changeset_leaves: Vec<PreparedLeaf> = Vec::new();
let mut submissions = 0;
for key in changeset {
if let Some(ref last) = changeset_leaves.last() {
if last.cutoff.map_or(true, |c| key < c) {
continue;
}
}

match indexed_leaf(bbn_index, key) {
None => {
// this case only occurs when the DB is empty.
assert!(changeset_leaves.is_empty());
break;
}
Some((separator, cutoff, pn)) => {
if let Some(leaf) = leaf_cache.get(pn) {
changeset_leaves.push(PreparedLeaf {
separator,
node: Some(leaf),
cutoff,
page_number: pn,
});
} else {
// dispatch an I/O for the leaf and schedule the slot to be filled.

io_handle
.send(leaf_reader.io_command(pn, changeset_leaves.len() as u64))
.expect("I/O pool disconnected");

submissions += 1;

changeset_leaves.push(PreparedLeaf {
separator,
node: None,
cutoff,
page_number: pn,
});
}
}
}
}

for _ in 0..submissions {
let completion = io_handle.recv().expect("I/O Pool Disconnected");
completion.result?;
let page = completion.command.kind.unwrap_buf();
let node = Arc::new(LeafNode { inner: page });
changeset_leaves[completion.command.user_data as usize].node = Some(node);
}

Ok(changeset_leaves)
}
54 changes: 1 addition & 53 deletions nomt/src/beatree/ops/update/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use crate::beatree::{
allocator::{PageNumber, Store, StoreReader},
branch::BRANCH_NODE_BODY_SIZE,
index::Index,
leaf::node::{LeafNode, LEAF_NODE_BODY_SIZE},
leaf::node::LEAF_NODE_BODY_SIZE,
leaf_cache::LeafCache,
ops::get_key,
Key, SyncData,
Expand Down Expand Up @@ -57,14 +57,6 @@ pub fn update(
let (leaf_writer, leaf_finisher) = leaf_store.start_sync();
let (bbn_writer, bbn_finisher) = bbn_store.start_sync();

preload_leaves(
&leaf_cache,
&leaf_reader,
&bbn_index,
&io_handle,
changeset.keys().cloned(),
)?;

let leaf_stage_outputs = leaf_stage::run(
&bbn_index,
leaf_cache.clone(),
Expand Down Expand Up @@ -123,50 +115,6 @@ pub fn update(
))
}

fn preload_leaves(
leaf_cache: &LeafCache,
leaf_reader: &StoreReader,
bbn_index: &Index,
io_handle: &IoHandle,
keys: impl IntoIterator<Item = Key>,
) -> Result<()> {
let mut last_pn = None;

let mut submissions = 0;
for key in keys {
let Some((_, branch)) = bbn_index.lookup(key) else {
continue;
};
let Some((_, leaf_pn)) = super::search_branch(&branch, key) else {
continue;
};
if last_pn == Some(leaf_pn) {
continue;
}
last_pn = Some(leaf_pn);

if leaf_cache.contains_key(leaf_pn) {
continue;
}

io_handle
.send(leaf_reader.io_command(leaf_pn, leaf_pn.0 as u64))
.expect("I/O Pool Disconnected");

submissions += 1;
}

for _ in 0..submissions {
let completion = io_handle.recv().expect("I/O Pool Disconnected");
completion.result?;
let pn = PageNumber(completion.command.user_data as u32);
let page = completion.command.kind.unwrap_buf();
leaf_cache.insert(pn, Arc::new(LeafNode { inner: page }));
}

Ok(())
}

/// Container of possible changes made to a node
pub struct ChangedNodeEntry<Node> {
/// PageNumber of the Node that is being replaced by the current entry
Expand Down
32 changes: 22 additions & 10 deletions nomt/src/beatree/ops/update/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,11 @@ use crate::{
},
leaf_cache::LeafCache,
ops::{
self,
bit_ops::separate,
update::{
branch_stage::BranchStageOutput, branch_updater::tests::make_branch_until, get_key,
leaf_stage::LeafStageOutput, preload_leaves, LEAF_MERGE_THRESHOLD,
leaf_stage::LeafStageOutput, LEAF_MERGE_THRESHOLD,
},
},
Index,
Expand Down Expand Up @@ -171,6 +172,25 @@ impl Arbitrary for Key {
}
}

fn leaf_page_numbers(
bbn_index: &Index,
keys: impl IntoIterator<Item = [u8; 32]>,
) -> BTreeSet<PageNumber> {
let mut page_numbers = BTreeSet::new();
for key in keys {
let Some((_, branch)) = bbn_index.lookup(key) else {
continue;
};
let Some((_, leaf_pn)) = ops::search_branch(&branch, key) else {
continue;
};

page_numbers.insert(leaf_pn);
}

page_numbers
}

// given a changeset execute the leaf stage on top of a pre-initialized nomt-db
fn exec_leaf_stage(
commit_concurrency: usize,
Expand All @@ -182,15 +202,7 @@ fn exec_leaf_stage(

let bbn_index = &TREE_DATA.bbn_index;
let leaf_cache = LeafCache::new(commit_concurrency, 1024);
preload_leaves(
&leaf_cache,
&leaf_reader,
bbn_index,
&IO_POOL.make_handle(),
changeset.keys().cloned(),
)
.unwrap();
let leaf_page_numbers: BTreeSet<PageNumber> = leaf_cache.all_page_numbers();
let leaf_page_numbers = leaf_page_numbers(&bbn_index, changeset.keys().cloned());

let io_handle = IO_POOL.make_handle();
let leaf_stage_output = super::leaf_stage::run(
Expand Down
Loading

0 comments on commit 898499c

Please sign in to comment.