diff --git a/nomt/src/beatree/leaf/node.rs b/nomt/src/beatree/leaf/node.rs index 2e48180b..d69dfebb 100644 --- a/nomt/src/beatree/leaf/node.rs +++ b/nomt/src/beatree/leaf/node.rs @@ -6,7 +6,7 @@ /// padding: [u8] // empty space between cell_pointers and cells /// cells: [Cell; n] /// value cell: [u8] -/// overflow cell: (u64, [NodePointer]) | semantically, (value_size, [NodePointer]). +/// overflow cell: (u64, u256, [NodePointer]) | semantically, (value_size, value_hash, [NodePointer]). /// ``` /// /// | n | [(key ++ offset); n] | ---- | [[u8]; n] | @@ -41,7 +41,7 @@ pub const MAX_LEAF_VALUE_SIZE: usize = (LEAF_NODE_BODY_SIZE / 3) - 32; /// The maximum number of node pointers which may appear directly in an overflow cell. /// /// Note that this gives an overflow value cell maximum size of 100 bytes. -pub const MAX_OVERFLOW_CELL_NODE_POINTERS: usize = 23; +pub const MAX_OVERFLOW_CELL_NODE_POINTERS: usize = 15; /// We use the high bit to encode whether a cell is an overflow cell. const OVERFLOW_BIT: u16 = 1 << 15; diff --git a/nomt/src/beatree/leaf/overflow.rs b/nomt/src/beatree/leaf/overflow.rs index d938e0c1..4bcdb068 100644 --- a/nomt/src/beatree/leaf/overflow.rs +++ b/nomt/src/beatree/leaf/overflow.rs @@ -89,24 +89,27 @@ pub fn chunk( } /// Decode an overflow cell, returning the size of the value plus the pages numbers within the cell. -pub fn decode_cell<'a>(raw: &'a [u8]) -> (usize, impl Iterator + 'a) { - assert!(raw.len() >= 12); +pub fn decode_cell<'a>(raw: &'a [u8]) -> (usize, [u8; 32], impl Iterator + 'a) { + // the minimum legal size is the length plus one page pointer plus the value hash. + assert!(raw.len() >= 8 + 4 + 32); assert_eq!(raw.len() % 4, 0); let value_size = u64::from_le_bytes(raw[0..8].try_into().unwrap()); + let value_hash: [u8; 32] = raw[8..40].try_into().unwrap(); - let iter = raw[8..] + let iter = raw[40..] .chunks(4) .map(|slice| PageNumber(u32::from_le_bytes(slice.try_into().unwrap()))); - (value_size as usize, iter) + (value_size as usize, value_hash, iter) } /// Encode a list of page numbers into an overflow cell. -pub fn encode_cell(value_size: usize, pages: &[PageNumber]) -> Vec { - let mut v = vec![0u8; 8 + pages.len() * 4]; +pub fn encode_cell(value_size: usize, value_hash: [u8; 32], pages: &[PageNumber]) -> Vec { + let mut v = vec![0u8; 8 + 32 + pages.len() * 4]; v[0..8].copy_from_slice(&(value_size as u64).to_le_bytes()); - for (pn, slice) in pages.iter().zip(v[8..].chunks_mut(4)) { + v[8..40].copy_from_slice(&value_hash); + for (pn, slice) in pages.iter().zip(v[40..].chunks_mut(4)) { slice.copy_from_slice(&pn.0.to_le_bytes()); } @@ -170,7 +173,7 @@ fn needed_pages(size: usize) -> usize { /// Read a large value from pages referenced by an overflow cell. pub fn read(cell: &[u8], leaf_reader: &StoreReader) -> Vec { - let (value_size, cell_pages) = decode_cell(cell); + let (value_size, _, cell_pages) = decode_cell(cell); let total_pages = total_needed_pages(value_size); let mut value = Vec::with_capacity(value_size); @@ -193,7 +196,7 @@ pub fn read(cell: &[u8], leaf_reader: &StoreReader) -> Vec { /// Iterate all pages related to an overflow cell and push onto a free-list. pub fn delete(cell: &[u8], leaf_reader: &StoreReader, freed: &mut Vec) { - let (value_size, cell_pages) = decode_cell(cell); + let (value_size, _, cell_pages) = decode_cell(cell); let total_pages = total_needed_pages(value_size); let start = freed.len(); diff --git a/nomt/src/beatree/mod.rs b/nomt/src/beatree/mod.rs index 23501ab1..39eea9c9 100644 --- a/nomt/src/beatree/mod.rs +++ b/nomt/src/beatree/mod.rs @@ -2,6 +2,8 @@ use allocator::{PageNumber, Store, StoreReader, FREELIST_EMPTY}; use anyhow::{Context, Result}; use branch::BRANCH_NODE_SIZE; use crossbeam_channel::Receiver; +use leaf::node::MAX_LEAF_VALUE_SIZE; +use nomt_core::trie::ValueHash; use parking_lot::{ArcMutexGuard, Mutex, RwLock}; use std::{collections::BTreeMap, fs::File, mem, path::Path, sync::Arc}; use threadpool::ThreadPool; @@ -38,10 +40,10 @@ struct Shared { leaf_store_rd: StoreReader, /// Primary staging collects changes that are committed but not synced yet. Upon sync, changes /// from here are moved to secondary staging. - primary_staging: BTreeMap>>, + primary_staging: BTreeMap, /// Secondary staging collects committed changes that are currently being synced. This is None /// if there is no sync in progress. - secondary_staging: Option>>>>, + secondary_staging: Option>>, leaf_cache: leaf_cache::LeafCache, } @@ -53,7 +55,7 @@ struct Sync { } impl Shared { - fn take_staged_changeset(&mut self) -> Arc>>> { + fn take_staged_changeset(&mut self) -> Arc> { assert!(self.secondary_staging.is_none()); let staged = Arc::new(mem::take(&mut self.primary_staging)); self.secondary_staging = Some(staged.clone()); @@ -127,12 +129,12 @@ impl Tree { // First look up in the primary staging which contains the most recent changes. if let Some(val) = shared.primary_staging.get(&key) { - return val.clone(); + return val.as_option().map(|v| v.to_vec()); } // Then check the secondary staging which is a bit older, but fresher still than the btree. if let Some(val) = shared.secondary_staging.as_ref().and_then(|x| x.get(&key)) { - return val.clone(); + return val.as_option().map(|v| v.to_vec()); } // Finally, look up in the btree. @@ -168,7 +170,7 @@ impl Tree { /// The changeset is applied atomically. If the changeset is empty, the btree is not modified. // There might be some temptation to unify this with prepare_sync, but this should not be done // because in the future sync and commit will be called on different threads at different times. - fn commit(shared: &Arc>, changeset: Vec<(Key, Option>)>) { + fn commit(shared: &Arc>, changeset: Vec<(Key, ValueChange)>) { if changeset.is_empty() { return; } @@ -245,6 +247,46 @@ impl Tree { } } +/// A change in the value associated with a key. +#[derive(Clone)] +pub enum ValueChange { + /// The key-value pair is deleted. + Delete, + /// A new value small enough to fit in a leaf is inserted. + Insert(Vec), + /// A new value which requires an overflow page is inserted. + InsertOverflow(Vec, ValueHash), +} + +impl ValueChange { + /// Create a [`ValueChange`] from an option, determining whether to use the normal or overflow + /// variant based on size. + pub fn from_option(maybe_value: Option>) -> Self { + match maybe_value { + None => ValueChange::Delete, + Some(v) => Self::insert::(v), + } + } + + /// Create an insertion, determining whether to use the normal or overflow variant based on size. + pub fn insert(v: Vec) -> Self { + if v.len() > MAX_LEAF_VALUE_SIZE { + let value_hash = T::hash_value(&v); + ValueChange::InsertOverflow(v, value_hash) + } else { + ValueChange::Insert(v) + } + } + + /// Get the value bytes, optionally. + pub fn as_option(&self) -> Option<&[u8]> { + match self { + ValueChange::Delete => None, + ValueChange::Insert(ref v) | ValueChange::InsertOverflow(ref v, _) => Some(&v[..]), + } + } +} + /// Data generated during update pub struct SyncData { pub ln_freelist_pn: u32, @@ -307,7 +349,7 @@ impl SyncController { /// Accepts a list of changes to be committed to the btree. /// /// Non-blocking. - pub fn begin_sync(&mut self, changeset: Vec<(Key, Option>)>) { + pub fn begin_sync(&mut self, changeset: Vec<(Key, ValueChange)>) { let inner = self.inner.clone(); self.inner.sync.tp.execute(move || { Tree::commit(&inner.shared, changeset); diff --git a/nomt/src/beatree/ops/update/leaf_stage.rs b/nomt/src/beatree/ops/update/leaf_stage.rs index 62e9c36a..009cd0fb 100644 --- a/nomt/src/beatree/ops/update/leaf_stage.rs +++ b/nomt/src/beatree/ops/update/leaf_stage.rs @@ -7,10 +7,7 @@ use threadpool::ThreadPool; use crate::beatree::{ allocator::{PageNumber, StoreReader, SyncAllocator}, index::Index, - leaf::{ - node::{LeafNode, MAX_LEAF_VALUE_SIZE}, - overflow, - }, + leaf::{node::LeafNode, overflow}, leaf_cache::LeafCache, ops::{ search_branch, @@ -23,7 +20,7 @@ use crate::beatree::{ leaf_updater::{BaseLeaf, DigestResult as LeafDigestResult, LeafUpdater}, }, }, - Key, + Key, ValueChange, }; use crate::io::{IoCommand, IoHandle, IoKind, IoPool, PAGE_SIZE}; @@ -87,7 +84,7 @@ pub fn run( leaf_reader: StoreReader, leaf_writer: SyncAllocator, io_handle: IoHandle, - changeset: Arc>>>, + changeset: Arc>, thread_pool: ThreadPool, num_workers: usize, ) -> anyhow::Result { @@ -101,16 +98,16 @@ pub fn run( let changeset = changeset .iter() .map(|(k, v)| match v { - Some(v) if v.len() <= MAX_LEAF_VALUE_SIZE => Ok((*k, Some((v.clone(), false)))), - Some(large_value) => { + ValueChange::Insert(v) => Ok((*k, Some((v.clone(), false)))), + ValueChange::InsertOverflow(large_value, value_hash) => { let (pages, num_writes) = overflow::chunk(&large_value, &leaf_writer, &page_pool, &io_handle)?; overflow_io += num_writes; - let cell = overflow::encode_cell(large_value.len(), &pages); + let cell = overflow::encode_cell(large_value.len(), value_hash.clone(), &pages); Ok((*k, Some((cell, true)))) } - None => Ok((*k, None)), + ValueChange::Delete => Ok((*k, None)), }) .collect::>>()?; diff --git a/nomt/src/beatree/ops/update/mod.rs b/nomt/src/beatree/ops/update/mod.rs index 592a9c27..d475466d 100644 --- a/nomt/src/beatree/ops/update/mod.rs +++ b/nomt/src/beatree/ops/update/mod.rs @@ -11,7 +11,7 @@ use crate::beatree::{ leaf::node::LEAF_NODE_BODY_SIZE, leaf_cache::LeafCache, ops::get_key, - Key, SyncData, + Key, SyncData, ValueChange, }; use crate::io::{IoHandle, PagePool}; @@ -43,7 +43,7 @@ const LEAF_BULK_SPLIT_TARGET: usize = (LEAF_NODE_BODY_SIZE * 3) / 4; /// /// The changeset is a list of key value pairs to be added or removed from the btree. pub fn update( - changeset: Arc>>>, + changeset: Arc>, mut bbn_index: Index, leaf_cache: LeafCache, leaf_store: Store, diff --git a/nomt/src/beatree/ops/update/tests.rs b/nomt/src/beatree/ops/update/tests.rs index 9b5ebdb3..e09cff12 100644 --- a/nomt/src/beatree/ops/update/tests.rs +++ b/nomt/src/beatree/ops/update/tests.rs @@ -15,7 +15,7 @@ use crate::{ leaf_stage::LeafStageOutput, LEAF_MERGE_THRESHOLD, }, }, - Index, + Index, ValueChange, }, io::{start_test_io_pool, IoPool, PagePool}, }; @@ -130,7 +130,7 @@ fn init_beatree() -> TreeData { initial_items .clone() .into_iter() - .map(|(k, v)| (k, Some(v))) + .map(|(k, v)| (k, ValueChange::Insert(v))) .collect(), ), Index::default(), @@ -194,7 +194,7 @@ fn leaf_page_numbers( // given a changeset execute the leaf stage on top of a pre-initialized nomt-db fn exec_leaf_stage( commit_concurrency: usize, - changeset: BTreeMap<[u8; 32], Option>>, + changeset: BTreeMap<[u8; 32], ValueChange>, ) -> (LeafStageOutput, BTreeSet) { let leaf_store = TREE_DATA.leaf_store(); let leaf_reader = StoreReader::new(leaf_store.clone(), PAGE_POOL.clone()); @@ -317,17 +317,17 @@ fn leaf_stage_inner(insertions: BTreeMap, deletions: Vec) -> Test .into_iter() // rescale deletions to contain indexes over only alredy present items in the db .map(|d| rescale(d, 0, KEYS.len())) - .map(|index| (KEYS[index], None)) + .map(|index| (KEYS[index], ValueChange::Delete)) .collect(); let insertions: BTreeMap<_, _> = insertions .into_iter() // rescale raw_size to be between 0 and MAX_LEAF_VALUE_SIZE .map(|(k, raw_size)| (k, rescale(raw_size, 1, MAX_LEAF_VALUE_SIZE))) - .map(|(k, size)| (k.inner, Some(vec![170; size]))) + .map(|(k, size)| (k.inner, ValueChange::Insert(vec![170; size]))) .collect(); - let mut changeset: BTreeMap<[u8; 32], Option>> = insertions.clone(); + let mut changeset: BTreeMap<[u8; 32], ValueChange> = insertions.clone(); changeset.extend(deletions.clone()); let (leaf_stage_output, prior_leaf_page_numbers) = exec_leaf_stage(64, changeset); @@ -338,7 +338,7 @@ fn leaf_stage_inner(insertions: BTreeMap, deletions: Vec) -> Test deletions.into_iter().map(|(k, _)| k).collect(), insertions .into_iter() - .map(|(k, v)| (k, v.unwrap())) + .map(|(k, v)| (k, v.as_option().unwrap().to_vec())) .collect(), )) } diff --git a/nomt/src/lib.rs b/nomt/src/lib.rs index b53d7ecc..74e0208e 100644 --- a/nomt/src/lib.rs +++ b/nomt/src/lib.rs @@ -348,7 +348,7 @@ impl Nomt { let mut tx = self.store.new_value_tx(); for (path, read_write) in actuals { if let KeyReadWrite::Write(value) | KeyReadWrite::ReadThenWrite(_, value) = read_write { - tx.write_value(path, value); + tx.write_value::(path, value); } } diff --git a/nomt/src/store/mod.rs b/nomt/src/store/mod.rs index 16df0950..ef60450c 100644 --- a/nomt/src/store/mod.rs +++ b/nomt/src/store/mod.rs @@ -10,6 +10,7 @@ use crate::{ page_cache::PageCache, page_diff::PageDiff, rollback::Rollback, + ValueHasher, }; use meta::Meta; use nomt_core::{page_id::PageId, trie::KeyPath}; @@ -260,13 +261,14 @@ impl Store { /// An atomic transaction on raw key/value pairs to be applied against the store /// with [`Store::commit`]. pub struct ValueTransaction { - batch: Vec<(KeyPath, Option>)>, + batch: Vec<(KeyPath, beatree::ValueChange)>, } impl ValueTransaction { /// Write a value to flat storage. - pub fn write_value(&mut self, path: KeyPath, value: Option>) { - self.batch.push((path, value)) + pub fn write_value(&mut self, path: KeyPath, value: Option>) { + self.batch + .push((path, beatree::ValueChange::from_option::(value))) } }