Skip to content

Commit

Permalink
refactor(beatree)!: include value hash in overflow cells
Browse files Browse the repository at this point in the history
  • Loading branch information
rphmeier committed Dec 6, 2024
1 parent 07cff2b commit ed25760
Show file tree
Hide file tree
Showing 8 changed files with 85 additions and 41 deletions.
4 changes: 2 additions & 2 deletions nomt/src/beatree/leaf/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
/// padding: [u8] // empty space between cell_pointers and cells
/// cells: [Cell; n]
/// value cell: [u8]
/// overflow cell: (u64, [NodePointer]) | semantically, (value_size, [NodePointer]).
/// overflow cell: (u64, u256, [NodePointer]) | semantically, (value_size, value_hash, [NodePointer]).
/// ```
///
/// | n | [(key ++ offset); n] | ---- | [[u8]; n] |
Expand Down Expand Up @@ -41,7 +41,7 @@ pub const MAX_LEAF_VALUE_SIZE: usize = (LEAF_NODE_BODY_SIZE / 3) - 32;
/// The maximum number of node pointers which may appear directly in an overflow cell.
///
/// Note that this gives an overflow value cell maximum size of 100 bytes.
pub const MAX_OVERFLOW_CELL_NODE_POINTERS: usize = 23;
pub const MAX_OVERFLOW_CELL_NODE_POINTERS: usize = 15;

/// We use the high bit to encode whether a cell is an overflow cell.
const OVERFLOW_BIT: u16 = 1 << 15;
Expand Down
21 changes: 12 additions & 9 deletions nomt/src/beatree/leaf/overflow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,24 +89,27 @@ pub fn chunk(
}

/// Decode an overflow cell, returning the size of the value plus the pages numbers within the cell.
pub fn decode_cell<'a>(raw: &'a [u8]) -> (usize, impl Iterator<Item = PageNumber> + 'a) {
assert!(raw.len() >= 12);
pub fn decode_cell<'a>(raw: &'a [u8]) -> (usize, [u8; 32], impl Iterator<Item = PageNumber> + 'a) {
// the minimum legal size is the length plus one page pointer plus the value hash.
assert!(raw.len() >= 8 + 4 + 32);
assert_eq!(raw.len() % 4, 0);

let value_size = u64::from_le_bytes(raw[0..8].try_into().unwrap());
let value_hash: [u8; 32] = raw[8..40].try_into().unwrap();

let iter = raw[8..]
let iter = raw[40..]
.chunks(4)
.map(|slice| PageNumber(u32::from_le_bytes(slice.try_into().unwrap())));

(value_size as usize, iter)
(value_size as usize, value_hash, iter)
}

/// Encode a list of page numbers into an overflow cell.
pub fn encode_cell(value_size: usize, pages: &[PageNumber]) -> Vec<u8> {
let mut v = vec![0u8; 8 + pages.len() * 4];
pub fn encode_cell(value_size: usize, value_hash: [u8; 32], pages: &[PageNumber]) -> Vec<u8> {
let mut v = vec![0u8; 8 + 32 + pages.len() * 4];
v[0..8].copy_from_slice(&(value_size as u64).to_le_bytes());
for (pn, slice) in pages.iter().zip(v[8..].chunks_mut(4)) {
v[8..40].copy_from_slice(&value_hash);
for (pn, slice) in pages.iter().zip(v[40..].chunks_mut(4)) {
slice.copy_from_slice(&pn.0.to_le_bytes());
}

Expand Down Expand Up @@ -170,7 +173,7 @@ fn needed_pages(size: usize) -> usize {

/// Read a large value from pages referenced by an overflow cell.
pub fn read(cell: &[u8], leaf_reader: &StoreReader) -> Vec<u8> {
let (value_size, cell_pages) = decode_cell(cell);
let (value_size, _, cell_pages) = decode_cell(cell);
let total_pages = total_needed_pages(value_size);

let mut value = Vec::with_capacity(value_size);
Expand All @@ -193,7 +196,7 @@ pub fn read(cell: &[u8], leaf_reader: &StoreReader) -> Vec<u8> {

/// Iterate all pages related to an overflow cell and push onto a free-list.
pub fn delete(cell: &[u8], leaf_reader: &StoreReader, freed: &mut Vec<PageNumber>) {
let (value_size, cell_pages) = decode_cell(cell);
let (value_size, _, cell_pages) = decode_cell(cell);
let total_pages = total_needed_pages(value_size);

let start = freed.len();
Expand Down
56 changes: 49 additions & 7 deletions nomt/src/beatree/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ use allocator::{PageNumber, Store, StoreReader, FREELIST_EMPTY};
use anyhow::{Context, Result};
use branch::BRANCH_NODE_SIZE;
use crossbeam_channel::Receiver;
use leaf::node::MAX_LEAF_VALUE_SIZE;
use nomt_core::trie::ValueHash;
use parking_lot::{ArcMutexGuard, Mutex, RwLock};
use std::{collections::BTreeMap, fs::File, mem, path::Path, sync::Arc};
use threadpool::ThreadPool;
Expand Down Expand Up @@ -38,10 +40,10 @@ struct Shared {
leaf_store_rd: StoreReader,
/// Primary staging collects changes that are committed but not synced yet. Upon sync, changes
/// from here are moved to secondary staging.
primary_staging: BTreeMap<Key, Option<Vec<u8>>>,
primary_staging: BTreeMap<Key, ValueChange>,
/// Secondary staging collects committed changes that are currently being synced. This is None
/// if there is no sync in progress.
secondary_staging: Option<Arc<BTreeMap<Key, Option<Vec<u8>>>>>,
secondary_staging: Option<Arc<BTreeMap<Key, ValueChange>>>,
leaf_cache: leaf_cache::LeafCache,
}

Expand All @@ -53,7 +55,7 @@ struct Sync {
}

impl Shared {
fn take_staged_changeset(&mut self) -> Arc<BTreeMap<Key, Option<Vec<u8>>>> {
fn take_staged_changeset(&mut self) -> Arc<BTreeMap<Key, ValueChange>> {
assert!(self.secondary_staging.is_none());
let staged = Arc::new(mem::take(&mut self.primary_staging));
self.secondary_staging = Some(staged.clone());
Expand Down Expand Up @@ -127,12 +129,12 @@ impl Tree {

// First look up in the primary staging which contains the most recent changes.
if let Some(val) = shared.primary_staging.get(&key) {
return val.clone();
return val.as_option().map(|v| v.to_vec());
}

// Then check the secondary staging which is a bit older, but fresher still than the btree.
if let Some(val) = shared.secondary_staging.as_ref().and_then(|x| x.get(&key)) {
return val.clone();
return val.as_option().map(|v| v.to_vec());
}

// Finally, look up in the btree.
Expand Down Expand Up @@ -168,7 +170,7 @@ impl Tree {
/// The changeset is applied atomically. If the changeset is empty, the btree is not modified.
// There might be some temptation to unify this with prepare_sync, but this should not be done
// because in the future sync and commit will be called on different threads at different times.
fn commit(shared: &Arc<RwLock<Shared>>, changeset: Vec<(Key, Option<Vec<u8>>)>) {
fn commit(shared: &Arc<RwLock<Shared>>, changeset: Vec<(Key, ValueChange)>) {
if changeset.is_empty() {
return;
}
Expand Down Expand Up @@ -245,6 +247,46 @@ impl Tree {
}
}

/// A change in the value associated with a key.
#[derive(Clone)]
pub enum ValueChange {
/// The key-value pair is deleted.
Delete,
/// A new value small enough to fit in a leaf is inserted.
Insert(Vec<u8>),
/// A new value which requires an overflow page is inserted.
InsertOverflow(Vec<u8>, ValueHash),
}

impl ValueChange {
/// Create a [`ValueChange`] from an option, determining whether to use the normal or overflow
/// variant based on size.
pub fn from_option<T: crate::ValueHasher>(maybe_value: Option<Vec<u8>>) -> Self {
match maybe_value {
None => ValueChange::Delete,
Some(v) => Self::insert::<T>(v),
}
}

/// Create an insertion, determining whether to use the normal or overflow variant based on size.
pub fn insert<T: crate::ValueHasher>(v: Vec<u8>) -> Self {
if v.len() > MAX_LEAF_VALUE_SIZE {
let value_hash = T::hash_value(&v);
ValueChange::InsertOverflow(v, value_hash)
} else {
ValueChange::Insert(v)
}
}

/// Get the value bytes, optionally.
pub fn as_option(&self) -> Option<&[u8]> {
match self {
ValueChange::Delete => None,
ValueChange::Insert(ref v) | ValueChange::InsertOverflow(ref v, _) => Some(&v[..]),
}
}
}

/// Data generated during update
pub struct SyncData {
pub ln_freelist_pn: u32,
Expand Down Expand Up @@ -307,7 +349,7 @@ impl SyncController {
/// Accepts a list of changes to be committed to the btree.
///
/// Non-blocking.
pub fn begin_sync(&mut self, changeset: Vec<(Key, Option<Vec<u8>>)>) {
pub fn begin_sync(&mut self, changeset: Vec<(Key, ValueChange)>) {
let inner = self.inner.clone();
self.inner.sync.tp.execute(move || {
Tree::commit(&inner.shared, changeset);
Expand Down
17 changes: 7 additions & 10 deletions nomt/src/beatree/ops/update/leaf_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,7 @@ use threadpool::ThreadPool;
use crate::beatree::{
allocator::{PageNumber, StoreReader, SyncAllocator},
index::Index,
leaf::{
node::{LeafNode, MAX_LEAF_VALUE_SIZE},
overflow,
},
leaf::{node::LeafNode, overflow},
leaf_cache::LeafCache,
ops::{
search_branch,
Expand All @@ -23,7 +20,7 @@ use crate::beatree::{
leaf_updater::{BaseLeaf, DigestResult as LeafDigestResult, LeafUpdater},
},
},
Key,
Key, ValueChange,
};
use crate::io::{IoCommand, IoHandle, IoKind, IoPool, PAGE_SIZE};

Expand Down Expand Up @@ -87,7 +84,7 @@ pub fn run(
leaf_reader: StoreReader,
leaf_writer: SyncAllocator,
io_handle: IoHandle,
changeset: Arc<BTreeMap<Key, Option<Vec<u8>>>>,
changeset: Arc<BTreeMap<Key, ValueChange>>,
thread_pool: ThreadPool,
num_workers: usize,
) -> anyhow::Result<LeafStageOutput> {
Expand All @@ -101,16 +98,16 @@ pub fn run(
let changeset = changeset
.iter()
.map(|(k, v)| match v {
Some(v) if v.len() <= MAX_LEAF_VALUE_SIZE => Ok((*k, Some((v.clone(), false)))),
Some(large_value) => {
ValueChange::Insert(v) => Ok((*k, Some((v.clone(), false)))),
ValueChange::InsertOverflow(large_value, value_hash) => {
let (pages, num_writes) =
overflow::chunk(&large_value, &leaf_writer, &page_pool, &io_handle)?;
overflow_io += num_writes;

let cell = overflow::encode_cell(large_value.len(), &pages);
let cell = overflow::encode_cell(large_value.len(), value_hash.clone(), &pages);
Ok((*k, Some((cell, true))))
}
None => Ok((*k, None)),
ValueChange::Delete => Ok((*k, None)),
})
.collect::<anyhow::Result<Vec<_>>>()?;

Expand Down
4 changes: 2 additions & 2 deletions nomt/src/beatree/ops/update/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use crate::beatree::{
leaf::node::LEAF_NODE_BODY_SIZE,
leaf_cache::LeafCache,
ops::get_key,
Key, SyncData,
Key, SyncData, ValueChange,
};
use crate::io::{IoHandle, PagePool};

Expand Down Expand Up @@ -43,7 +43,7 @@ const LEAF_BULK_SPLIT_TARGET: usize = (LEAF_NODE_BODY_SIZE * 3) / 4;
///
/// The changeset is a list of key value pairs to be added or removed from the btree.
pub fn update(
changeset: Arc<BTreeMap<Key, Option<Vec<u8>>>>,
changeset: Arc<BTreeMap<Key, ValueChange>>,
mut bbn_index: Index,
leaf_cache: LeafCache,
leaf_store: Store,
Expand Down
14 changes: 7 additions & 7 deletions nomt/src/beatree/ops/update/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use crate::{
leaf_stage::LeafStageOutput, LEAF_MERGE_THRESHOLD,
},
},
Index,
Index, ValueChange,
},
io::{start_test_io_pool, IoPool, PagePool},
};
Expand Down Expand Up @@ -130,7 +130,7 @@ fn init_beatree() -> TreeData {
initial_items
.clone()
.into_iter()
.map(|(k, v)| (k, Some(v)))
.map(|(k, v)| (k, ValueChange::Insert(v)))
.collect(),
),
Index::default(),
Expand Down Expand Up @@ -194,7 +194,7 @@ fn leaf_page_numbers(
// given a changeset execute the leaf stage on top of a pre-initialized nomt-db
fn exec_leaf_stage(
commit_concurrency: usize,
changeset: BTreeMap<[u8; 32], Option<Vec<u8>>>,
changeset: BTreeMap<[u8; 32], ValueChange>,
) -> (LeafStageOutput, BTreeSet<PageNumber>) {
let leaf_store = TREE_DATA.leaf_store();
let leaf_reader = StoreReader::new(leaf_store.clone(), PAGE_POOL.clone());
Expand Down Expand Up @@ -317,17 +317,17 @@ fn leaf_stage_inner(insertions: BTreeMap<Key, u16>, deletions: Vec<u16>) -> Test
.into_iter()
// rescale deletions to contain indexes over only alredy present items in the db
.map(|d| rescale(d, 0, KEYS.len()))
.map(|index| (KEYS[index], None))
.map(|index| (KEYS[index], ValueChange::Delete))
.collect();

let insertions: BTreeMap<_, _> = insertions
.into_iter()
// rescale raw_size to be between 0 and MAX_LEAF_VALUE_SIZE
.map(|(k, raw_size)| (k, rescale(raw_size, 1, MAX_LEAF_VALUE_SIZE)))
.map(|(k, size)| (k.inner, Some(vec![170; size])))
.map(|(k, size)| (k.inner, ValueChange::Insert(vec![170; size])))
.collect();

let mut changeset: BTreeMap<[u8; 32], Option<Vec<u8>>> = insertions.clone();
let mut changeset: BTreeMap<[u8; 32], ValueChange> = insertions.clone();
changeset.extend(deletions.clone());

let (leaf_stage_output, prior_leaf_page_numbers) = exec_leaf_stage(64, changeset);
Expand All @@ -338,7 +338,7 @@ fn leaf_stage_inner(insertions: BTreeMap<Key, u16>, deletions: Vec<u16>) -> Test
deletions.into_iter().map(|(k, _)| k).collect(),
insertions
.into_iter()
.map(|(k, v)| (k, v.unwrap()))
.map(|(k, v)| (k, v.as_option().unwrap().to_vec()))
.collect(),
))
}
Expand Down
2 changes: 1 addition & 1 deletion nomt/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,7 @@ impl<T: HashAlgorithm> Nomt<T> {
let mut tx = self.store.new_value_tx();
for (path, read_write) in actuals {
if let KeyReadWrite::Write(value) | KeyReadWrite::ReadThenWrite(_, value) = read_write {
tx.write_value(path, value);
tx.write_value::<T>(path, value);
}
}

Expand Down
8 changes: 5 additions & 3 deletions nomt/src/store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use crate::{
page_cache::PageCache,
page_diff::PageDiff,
rollback::Rollback,
ValueHasher,
};
use meta::Meta;
use nomt_core::{page_id::PageId, trie::KeyPath};
Expand Down Expand Up @@ -260,13 +261,14 @@ impl Store {
/// An atomic transaction on raw key/value pairs to be applied against the store
/// with [`Store::commit`].
pub struct ValueTransaction {
batch: Vec<(KeyPath, Option<Vec<u8>>)>,
batch: Vec<(KeyPath, beatree::ValueChange)>,
}

impl ValueTransaction {
/// Write a value to flat storage.
pub fn write_value(&mut self, path: KeyPath, value: Option<Vec<u8>>) {
self.batch.push((path, value))
pub fn write_value<T: ValueHasher>(&mut self, path: KeyPath, value: Option<Vec<u8>>) {
self.batch
.push((path, beatree::ValueChange::from_option::<T>(value)))
}
}

Expand Down

0 comments on commit ed25760

Please sign in to comment.