diff --git a/nomt/src/beatree/branch/node.rs b/nomt/src/beatree/branch/node.rs index 69678b4c..7e1d4df9 100644 --- a/nomt/src/beatree/branch/node.rs +++ b/nomt/src/beatree/branch/node.rs @@ -5,6 +5,7 @@ use bitvec::prelude::*; use super::BRANCH_NODE_SIZE; use crate::{ beatree::{ + allocator::PageNumber, ops::{bit_ops::bitwise_memcpy, get_key}, Key, }, @@ -360,6 +361,11 @@ impl BranchNodeBuilder { } } + // returns the number of separtors already pushed + pub fn n_pushed(&self) -> usize { + self.index + } + pub fn push(&mut self, key: Key, mut separator_len: usize, pn: u32) { assert!(self.index < self.branch.n() as usize); @@ -390,8 +396,17 @@ impl BranchNodeBuilder { } // Copy the given chunk of separators from the provided base to the new node. - // Only compressed separators are expected in the specified range. - pub fn push_chunk(&mut self, base: &BranchNode, from: usize, to: usize) { + // Only compressed separators are expected in the specified range `from..to`. + // + // `updated` represents a list of new page numbers alongside the index + // of the separator to update within the specified range + pub fn push_chunk( + &mut self, + base: &BranchNode, + from: usize, + to: usize, + updated: impl Iterator, + ) { let n_items = to - from; assert!(self.index + n_items <= self.branch.prefix_compressed() as usize); @@ -439,6 +454,11 @@ impl BranchNodeBuilder { self.branch.node_pointers_mut()[self.index..self.index + n_items] .copy_from_slice(base_node_pointers); + // update page numbers of modified separators + for (i, new_pn) in updated { + self.branch.set_node_pointer(self.index + i, new_pn.0) + } + // 3. copy and shift separators if bit_prefix_len_difference == 0 { // fast path, copy and shift all compressed separators at once @@ -625,7 +645,7 @@ mod test { 4, /*prefix_compressed*/ 9, /*prefix_len*/ ); - builder.push_chunk(&base_branch, 1, 4); + builder.push_chunk(&base_branch, 1, 4, [].into_iter()); let mut key255 = [0; 32]; key255[0] = 0x11; key255[1] = 255; @@ -648,7 +668,7 @@ mod test { 4, /*prefix_compressed*/ 7, /*prefix_len*/ ); - builder.push_chunk(&base_branch, 1, 4); + builder.push_chunk(&base_branch, 1, 4, [].into_iter()); builder.push(key255, separator_len(&key255), 10); // prefix: 0001000 // key 1: 1_10000000_00000001 // cell_poiter: 17 diff --git a/nomt/src/beatree/ops/mod.rs b/nomt/src/beatree/ops/mod.rs index 228e4a97..faa1257f 100644 --- a/nomt/src/beatree/ops/mod.rs +++ b/nomt/src/beatree/ops/mod.rs @@ -63,38 +63,50 @@ pub fn lookup( /// Binary search a branch node for the child node containing the key. This returns the last child /// node pointer whose separator is less than or equal to the given key. fn search_branch(branch: &BranchNode, key: Key) -> Option<(usize, PageNumber)> { + let (found, pos) = find_key_pos(branch, &key, None); + + if found { + return Some((pos, branch.node_pointer(pos).into())); + } else if pos == 0 { + return None; + } else { + // first key greater than the one we are looking for has been returned, + // thus the correct child is the previous one + return Some((pos - 1, branch.node_pointer(pos - 1).into())); + } +} + +// Binary search for a key within a branch node. +// Accept a field to override the starting point of the binary search. +// It returns true and the index of the specified key, +// or false and the index containing the first key greater than the specified one. +pub fn find_key_pos(branch: &BranchNode, key: &Key, low: Option) -> (bool, usize) { let prefix = branch.prefix(); let n = branch.n() as usize; let prefix_compressed = branch.prefix_compressed() as usize; match key.view_bits::()[..prefix.len()].cmp(prefix) { - Ordering::Less => return None, - Ordering::Greater if n == prefix_compressed => { - let i = n - 1; - return Some((i, branch.node_pointer(i).into())); - } + Ordering::Less => return (false, 0), + Ordering::Greater if n == prefix_compressed => return (false, n), Ordering::Equal | Ordering::Greater => {} } - let mut low = 0; + let mut low = low.unwrap_or(0); let mut high = branch.n() as usize; while low < high { let mid = low + (high - low) / 2; - if key < get_key(branch, mid) { - high = mid; - } else { - low = mid + 1; + match key.cmp(&get_key(branch, mid)) { + Ordering::Equal => { + return (true, mid); + } + Ordering::Less => high = mid, + Ordering::Greater => low = mid + 1, } } - // sanity: this only happens if `key` is less than separator 0. - if high == 0 { - return None; - } - let node_pointer = branch.node_pointer(high - 1); - Some((high - 1, node_pointer.into())) + (false, high) } // Extract the key at a given index from a BranchNode, taking into account prefix compression. diff --git a/nomt/src/beatree/ops/update/branch_stage.rs b/nomt/src/beatree/ops/update/branch_stage.rs index 1b3b3f74..dc9f7abb 100644 --- a/nomt/src/beatree/ops/update/branch_stage.rs +++ b/nomt/src/beatree/ops/update/branch_stage.rs @@ -231,7 +231,7 @@ fn reset_branch_base( } if let Some((_, node, next_separator)) = branches_tracker.pending_base.take() { - let base = BaseBranch { node, iter_pos: 0 }; + let base = BaseBranch::new(node); branch_updater.reset_base(Some(base), next_separator); } else { if let Some(separator) = branches_tracker @@ -266,10 +266,7 @@ fn reset_branch_base_fresh( branches_tracker.delete(separator, branch.bbn_pn().into(), cutoff); - let base = BaseBranch { - node: branch, - iter_pos: 0, - }; + let base = BaseBranch::new(branch); branch_updater.reset_base(Some(base), cutoff); } diff --git a/nomt/src/beatree/ops/update/branch_updater.rs b/nomt/src/beatree/ops/update/branch_updater.rs index 2133653f..002addc2 100644 --- a/nomt/src/beatree/ops/update/branch_updater.rs +++ b/nomt/src/beatree/ops/update/branch_updater.rs @@ -1,10 +1,12 @@ -use std::cmp::Ordering; -use std::sync::Arc; +use std::{ops::Range, sync::Arc}; use crate::beatree::{ allocator::PageNumber, branch::{self as branch_node, BranchNode, BranchNodeBuilder, BRANCH_NODE_BODY_SIZE}, - ops::bit_ops::{prefix_len, separator_len}, + ops::{ + bit_ops::{prefix_len, separator_len}, + find_key_pos, + }, Key, }; use crate::io::PagePool; @@ -14,16 +16,39 @@ use super::{ }; pub struct BaseBranch { - pub node: Arc, - pub iter_pos: usize, + node: Arc, + low: usize, } impl BaseBranch { - fn next_key(&self) -> Option { - if self.iter_pos >= self.node.n() as usize { - None + pub fn new(node: Arc) -> Self { + BaseBranch { node, low: 0 } + } + + // Try to find the given key starting from `self.low` up to the end. + // Returns None if `self.low` is already at the end of the node, + // or if there are no keys left bigger than the specified one. + // If there are available keys in the node, then it returns the index + // of the specified key with the boolean set to true or the index containing + // the first key bigger than the one specified and the boolean set to false. + fn find_key(&mut self, key: &Key) -> Option<(bool, usize)> { + if self.low == self.node.n() as usize { + return None; + } + + let (found, pos) = find_key_pos(&self.node, key, Some(self.low)); + + if found { + // the key was present return its index and point to the right after key + self.low = pos + 1; + return Some((true, pos)); + } else if pos == self.low { + // there are no keys left bigger than the specified one + return None; } else { - Some(self.key(self.iter_pos)) + // key was not present, return and point to the smallest bigger key + self.low = pos; + return Some((false, pos)); } } @@ -34,15 +59,20 @@ impl BaseBranch { fn key_value(&self, i: usize) -> (Key, PageNumber) { (self.key(i), self.node.node_pointer(i).into()) } - - fn advance_iter(&mut self) { - self.iter_pos += 1; - } } +// BranchOp used to create a new node starting off a possible base node. +// +// `Update` and `KeepChunk` refers only to compressed separator within the base node. enum BranchOp { + // Separator and its page number Insert(Key, PageNumber), - Keep(usize, usize), + // Contains the position at which the separator is saved in the base, + // along with the updated page number + Update(usize, PageNumber), + // Contains a range of separators that will be transfered unchanged + // in the new node from the base + KeepChunk(usize, usize), } pub enum DigestResult { @@ -84,10 +114,24 @@ impl BranchUpdater { /// Ingest a key and page number into the branch updater. pub fn ingest(&mut self, key: Key, pn: Option) { - self.keep_up_to(Some(&key)); - if let Some(pn) = pn { - self.ops.push(BranchOp::Insert(key, pn)); - self.bulk_split_step(self.ops.len() - 1); + // keep all elements that are skipped looking for `key` + let res = self.keep_up_to(Some(&key)); + + match (res, pn) { + // UNWRAP: if the item has been found it must be a base node + (Some(pos), Some(pn)) + if pos < self.base.as_ref().unwrap().node.prefix_compressed() as usize => + { + // a compressed separator has been changed + self.ops.push(BranchOp::Update(pos, pn)); + self.bulk_split_step(self.ops.len() - 1); + } + (_, Some(pn)) => { + // a new key or a previous uncompressed separator has been updated + self.ops.push(BranchOp::Insert(key, pn)); + self.bulk_split_step(self.ops.len() - 1); + } + _ => (), } } @@ -111,7 +155,7 @@ impl BranchUpdater { self.split(new_branches) } else if self.gauge.body_size() >= BRANCH_MERGE_THRESHOLD || self.cutoff.is_none() { let node = self.build_branch(&self.ops[last_ops_start..], &self.gauge); - let separator = self.op_key(&self.ops[last_ops_start]); + let separator = self.op_first_key(&self.ops[last_ops_start]); new_branches.handle_new_branch(separator, node, self.cutoff); self.ops.clear(); @@ -137,66 +181,147 @@ impl BranchUpdater { self.cutoff = None; } - fn keep_up_to(&mut self, up_to: Option<&Key>) { - while let Some(next_key) = self.base.as_ref().and_then(|b| b.next_key()) { - let Some(ref mut base_node) = self.base else { - return; + // Advance the base looking for `up_to`, stops if a bigger Key is found or the end is reached. + // Collect in `self.ops` all separators that are skipped. + // Returns the index at which 'up_to' was found, otherwise, returns None. + fn keep_up_to(&mut self, up_to: Option<&Key>) -> Option { + if self.base.is_none() { + // empty db + return None; + } + + let (from, base_compressed_end, to, found) = { + // UNWRAP: self.base is not None + let base = self.base.as_mut().unwrap(); + + let from = base.low; + let base_n = base.node.n() as usize; + + let (found, to) = match up_to { + // Nothing more to do, the end has already been reached + None if from == base_n => return None, + // Jump directly to the end of the base node and update `base.low` accordingly + None => { + base.low = base_n; + (false, base_n) + } + Some(up_to) => match base.find_key(up_to) { + Some(res) => res, + // already at the end + None => return None, + }, }; - let order = up_to - .map(|up_to| up_to.cmp(&next_key)) - .unwrap_or(Ordering::Greater); - if order == Ordering::Less { - break; + if from == to { + // nothing to keep + return None; } - if order == Ordering::Greater { - let separator_len = separator_len(&next_key); - self.ops - .push(BranchOp::Keep(base_node.iter_pos, separator_len)); + let base_compressed_end = std::cmp::min(to, base.node.prefix_compressed() as usize); - base_node.advance_iter(); - self.bulk_split_step(self.ops.len() - 1); - } else { - base_node.advance_iter(); - break; - } + (from, base_compressed_end, to, found) + }; + + // push compressed chunk + self.ops + .push(BranchOp::KeepChunk(from, base_compressed_end)); + self.bulk_split_step(self.ops.len() - 1); + + // convert every kept uncompressed separator into an Insert operation + for i in base_compressed_end..to { + // UNWRAP: self.base is not None + let (key, pn) = self.base.as_ref().unwrap().key_value(i); + self.ops.push(BranchOp::Insert(key, pn)); + self.bulk_split_step(self.ops.len() - 1); + } + + if found { + Some(to) + } else { + None } } // check whether bulk split needs to start, and if so, start it. - // if ongoing, check if we need to cut off. - fn bulk_split_step(&mut self, op_index: usize) { - // UNWRAP: `Keep` ops require separator len to exist. - let (key, separator_len) = match self.ops[op_index] { - BranchOp::Keep(i, separator_len) => (self.base.as_ref().unwrap().key(i), separator_len), - BranchOp::Insert(k, _) => (k, separator_len(&k)), + // If ongoing, check if we need to cut off. + // returns the amount of operations consumed for the bulk creation + fn bulk_split_step(&mut self, op_index: usize) -> usize { + // UNWRAPs: if the operation is Update or KeepChunk, then it must be a base to keep the chunk from + let body_size_after = match self.ops[op_index] { + BranchOp::KeepChunk(from, to) => { + self.gauge + .body_size_after_chunk(self.base.as_ref().unwrap(), from, to) + } + BranchOp::Update(pos, _) => { + let key = self.base.as_ref().unwrap().key(pos); + self.gauge.body_size_after(key, separator_len(&key)) + } + BranchOp::Insert(key, _) => self.gauge.body_size_after(key, separator_len(&key)), }; - let body_size_after = self.gauge.body_size_after(key, separator_len); match self.bulk_split { None if body_size_after >= BRANCH_BULK_SPLIT_THRESHOLD => { self.bulk_split = Some(BranchBulkSplitter::default()); self.gauge = BranchGauge::new(); - for i in 0..=op_index { - self.bulk_split_step(i); + let mut idx = 0; + while idx < self.ops.len() { + let consumed = self.bulk_split_step(idx); + idx += consumed; } + idx } Some(ref mut bulk_splitter) if body_size_after >= BRANCH_BULK_SPLIT_TARGET => { + let mut recursive_on_split = false; + let accept_item = body_size_after <= BRANCH_NODE_BODY_SIZE || { - if self.gauge.body_size() < BRANCH_MERGE_THRESHOLD { - // rare case: body was artifically small due to long shared prefix. - // start applying items without prefix compression. we assume items are less - // than half the body size, so the next item should apply cleanly. - self.gauge.stop_prefix_compression(); - true - } else { - false + match self.ops[op_index] { + BranchOp::Insert(..) | BranchOp::Update(..) => { + if self.gauge.body_size() < BRANCH_MERGE_THRESHOLD { + // rare case: body was artifically small due to long shared prefix. + // start applying items without prefix compression. we assume items are less + // than half the body size, so the next item should apply cleanly. + self.gauge.stop_prefix_compression(); + true + } else { + false + } + } + BranchOp::KeepChunk(from, to) => { + let left_n_items = split_keep_chunk( + self.base.as_ref().unwrap(), + &self.gauge, + &mut self.ops, + op_index, + BRANCH_NODE_BODY_SIZE, + BRANCH_NODE_BODY_SIZE, + ); + + if left_n_items > 0 { + // KeepChunk has been successfully split, the left chunk is now able to fit + recursive_on_split = true; + true + } else { + // Not even a single element was able to fit. + if self.gauge.body_size() < BRANCH_MERGE_THRESHOLD { + // We can stop prefix compression and separate the first + // element of the keep_chunk into its own. + self.ops[op_index] = BranchOp::KeepChunk(from + 1, to); + let (key, pn) = self.base.as_ref().unwrap().key_value(from); + self.ops.insert(op_index, BranchOp::Insert(key, pn)); + + self.gauge.stop_prefix_compression(); + recursive_on_split = true; + true + } else { + false + } + } + } } }; let n = if accept_item { - self.gauge.ingest(key, separator_len); + self.gauge.ingest_branch_op(&self.base, &self.ops[op_index]); op_index + 1 - bulk_splitter.total_count } else { op_index - bulk_splitter.total_count @@ -207,10 +332,19 @@ impl BranchUpdater { bulk_splitter.push(n, last_gauge); if !accept_item { - self.gauge.ingest(key, separator_len); + self.gauge.ingest_branch_op(&self.base, &self.ops[op_index]); } + + if recursive_on_split { + 1 + self.bulk_split_step(op_index + 1) + } else { + 1 + } + } + _ => { + self.gauge.ingest_branch_op(&self.base, &self.ops[op_index]); + 1 } - _ => self.gauge.ingest(key, separator_len), } } @@ -222,7 +356,7 @@ impl BranchUpdater { let mut start = 0; for (item_count, gauge) in splitter.items { let branch_ops = &self.ops[start..][..item_count]; - let separator = self.op_key(&self.ops[start]); + let separator = self.op_first_key(&self.ops[start]); let new_node = self.build_branch(branch_ops, &gauge); new_branches.handle_new_branch(separator, new_node, self.cutoff); @@ -239,52 +373,72 @@ impl BranchUpdater { let mut left_gauge = BranchGauge::new(); while left_gauge.body_size() < midpoint { - let (key, separator_len) = match self.ops[split_point] { - BranchOp::Keep(i, separator_len) => { - // UNWRAP: keep ops require base to exist. - let k = self.base.as_ref().unwrap().key(i); - (k, separator_len) + match *&self.ops[split_point] { + BranchOp::Insert(key, _) => { + if left_gauge.body_size_after(key, separator_len(&key)) > BRANCH_NODE_BODY_SIZE + { + if left_gauge.body_size() < BRANCH_MERGE_THRESHOLD { + // rare case: body was artifically small due to long shared prefix. + // start applying items without prefix compression. we assume items are less + // than half the body size, so the next item should apply cleanly. + left_gauge.stop_prefix_compression(); + } else { + break; + } + } } - BranchOp::Insert(k, _) => (k, separator_len(&k)), - }; - - if left_gauge.body_size_after(key, separator_len) > BRANCH_NODE_BODY_SIZE { - if left_gauge.body_size() < BRANCH_MERGE_THRESHOLD { - // rare case: body was artifically small due to long shared prefix. - // start applying items without prefix compression. we assume items are less - // than half the body size, so the next item should apply cleanly. - left_gauge.stop_prefix_compression(); - } else { - break; + BranchOp::Update(pos, _) => { + let key = self.base.as_ref().unwrap().key(pos); + if left_gauge.body_size_after(key, separator_len(&key)) > BRANCH_NODE_BODY_SIZE + { + if left_gauge.body_size() < BRANCH_MERGE_THRESHOLD { + // same case as above + left_gauge.stop_prefix_compression(); + } else { + break; + } + } } - } + BranchOp::KeepChunk(from, to) => { + // try to split the chunk to make it fit into the available space + let n_items = split_keep_chunk( + self.base.as_ref().unwrap(), + &left_gauge, + &mut self.ops, + split_point, + midpoint, + BRANCH_NODE_BODY_SIZE, + ); + + if n_items == 0 { + // if no item from the chunk is capable to fit then + // try to extract the first element from the chunk and storing it + // back as an Insert to check if `stop_prefix_compression` is required + self.ops[split_point] = BranchOp::KeepChunk(from + 1, to); + let (key, pn) = self.base.as_ref().unwrap().key_value(from); + self.ops.insert(split_point, BranchOp::Insert(key, pn)); + continue; + } + } + }; - left_gauge.ingest(key, separator_len); + left_gauge.ingest_branch_op(&self.base, &self.ops[split_point]); split_point += 1; } let left_ops = &self.ops[..split_point]; let right_ops = &self.ops[split_point..]; - let left_separator = self.op_key(&self.ops[0]); - let right_separator = self.op_key(&self.ops[split_point]); + let left_separator = self.op_first_key(&self.ops[0]); + let right_separator = self.op_first_key(&self.ops[split_point]); let left_node = self.build_branch(left_ops, &left_gauge); new_branches.handle_new_branch(left_separator, left_node, Some(right_separator)); let mut right_gauge = BranchGauge::new(); - for op in &self.ops[split_point..] { - let (key, separator_len) = match op { - BranchOp::Keep(i, separator_len) => { - // UNWRAP: keep ops require base to exist. - let k = self.base.as_ref().unwrap().key(*i); - (k, *separator_len) - } - BranchOp::Insert(k, _) => (*k, separator_len(&k)), - }; - - right_gauge.ingest(key, separator_len); + for op in right_ops { + right_gauge.ingest_branch_op(&self.base, op); } if right_gauge.body_size() > BRANCH_NODE_BODY_SIZE { @@ -337,63 +491,125 @@ impl BranchUpdater { return builder.finish(); }; - let base_prefix_compressed = base.node.prefix_compressed() as usize; - let mut pending_chunk = None; - for i in 0..gauge.prefix_compressed_items() { - match (&ops[i], &mut pending_chunk) { - // start a chunk - (BranchOp::Keep(pos, _), None) if *pos < base_prefix_compressed => { - pending_chunk = Some((*pos, pos + 1)) + // This second phase of joining Update and KeepChunk into a unique update chunk is performed + // for two reasons: + // + // 1. It could often happen that the sequence of KeepChunk are interleaved by Update with only a change + // in the node pointers + // 2. To avoid keeping all the update information within the BranchOp::KeepChunk because it would require + // further allocations + let apply_chunk = + |builder: &mut BranchNodeBuilder, base_range: Range, ops_range: Range| { + let n_compressed_left = gauge + .prefix_compressed_items() + .saturating_sub(builder.n_pushed()); + + let compressed_end = + std::cmp::min(base_range.start + n_compressed_left, base_range.end); + + builder.push_chunk( + &base.node, + base_range.start, + compressed_end, + ops[ops_range] + .iter() + .filter_map(|op| { + if let BranchOp::Update(pos, pn) = op { + Some((pos - base_range.start, *pn)) + } else { + None + } + }) + .into_iter(), + ); + + for pos in compressed_end..base_range.end { + let (key, pn) = self.base.as_ref().unwrap().key_value(pos); + builder.push(key, separator_len(&key), pn.0); } - // keep a non-compressed separator - (BranchOp::Keep(pos, separator_len), None) => { - let (k, pn) = base.key_value(*pos); - builder.push(k, *separator_len, pn.0); + }; + + let mut pending_keep_chunk = None; + // contains a range within `ops` which define the `pending_keep_chunk` + let mut pending_ops_range = None; + let mut i = 0; + while i < ops.len() { + // Check if the chunk could grow. + // If yes, then update it and restart the loop on the next operation. + // Otherwise, apply the pending chunk and let the same operation be re-evaluated. + if pending_keep_chunk.is_some() { + match &ops[i] { + // found a insert, apply pending chunk + BranchOp::Insert(_, _) => { + apply_chunk( + &mut builder, + pending_keep_chunk.take().unwrap(), + pending_ops_range.take().unwrap(), + ); + } + BranchOp::KeepChunk(from, to) => { + // UNWRAP: pending_keep_chunk has just been checked to be Some + let range = pending_keep_chunk.as_mut().unwrap(); + let ops_range = pending_ops_range.as_mut().unwrap(); + if range.end == *from { + // KeepChunk that follow the pending chunk + range.end = *to; + ops_range.end += 1; + i += 1; + continue; + } else { + // KeepChunk that doens't follow the pending chunk + apply_chunk( + &mut builder, + pending_keep_chunk.take().unwrap(), + pending_ops_range.take().unwrap(), + ); + } + } + BranchOp::Update(pos, _) => { + // UNWRAP: pending_keep_chunk has just been checked to be Some + let range = pending_keep_chunk.as_mut().unwrap(); + let ops_range = pending_ops_range.as_mut().unwrap(); + if range.end == *pos { + // Update that follow the pending chunk + range.end += 1; + ops_range.end += 1; + i += 1; + continue; + } else { + // Update that doens't follow the pending chunk + apply_chunk( + &mut builder, + pending_keep_chunk.take().unwrap(), + pending_ops_range.take().unwrap(), + ); + } + } } - // chunk grows - (BranchOp::Keep(pos, _), Some((_from, to))) - if *pos == *to && *pos < base_prefix_compressed => - { - *to += 1 + } + + match &ops[i] { + BranchOp::Insert(key, pn) => { + builder.push(*key, separator_len(key), pn.0); + i += 1; } - // chunk ends - (BranchOp::Keep(pos, separator_len), Some((from, to))) => { - // apply pending chunk - builder.push_chunk(&base.node, *from, *to); - pending_chunk = if *pos < base_prefix_compressed { - // start a new chunk - Some((*pos, pos + 1)) - } else { - // apply non-compressed - let (k, pn) = base.key_value(*pos); - builder.push(k, *separator_len, pn.0); - None - }; + BranchOp::KeepChunk(from, to) => { + pending_keep_chunk = Some(*from..*to); + pending_ops_range = Some(i..i + 1); + i += 1; } - // chunk ends, and insert new leaf - (BranchOp::Insert(k, pn), Some((from, to))) => { - builder.push_chunk(&base.node, *from, *to); - pending_chunk = None; - builder.push(*k, separator_len(k), pn.0); + BranchOp::Update(pos, _) => { + pending_keep_chunk = Some(*pos..*pos + 1); + pending_ops_range = Some(i..i + 1); + i += 1; } - (BranchOp::Insert(k, pn), None) => builder.push(*k, separator_len(k), pn.0), - } + }; } - // apply possible pending chunk - if let Some((from, to)) = pending_chunk { - builder.push_chunk(&base.node, from, to); + if let (Some(range), Some(ops_range)) = (pending_keep_chunk, pending_ops_range) { + apply_chunk(&mut builder, range, ops_range); } - for i in gauge.prefix_compressed_items()..ops.len() { - match &ops[i] { - BranchOp::Insert(key, pn) => builder.push(*key, separator_len(key), pn.0), - BranchOp::Keep(pos, separator_len) => { - let (k, pn) = base.key_value(*pos); - builder.push(k, *separator_len, pn.0); - } - } - } builder.finish() } @@ -402,23 +618,87 @@ impl BranchUpdater { let Some(ref base) = self.base else { return }; - // then replace `Keep` ops with pure key-value ops, preparing for the base to be changed. - for op in self.ops.iter_mut() { - let BranchOp::Keep(i, _) = *op else { continue }; - let (k, pn) = base.key_value(i); - *op = BranchOp::Insert(k, pn); + // replace `KeepChunk` and `Update` ops with pure key-value ops, + // preparing for the base to be changed. + let mut new_insert = 0; + for i in 0..self.ops.len() { + match self.ops[new_insert + i] { + BranchOp::Insert(_, _) => (), + BranchOp::Update(pos, new_pn) => { + self.ops[new_insert + i] = BranchOp::Insert(base.key(pos), new_pn); + } + BranchOp::KeepChunk(from, to) => { + self.ops.remove(new_insert + i); + + for pos in (from..to).into_iter().rev() { + let (key, pn) = base.key_value(pos); + self.ops.insert(new_insert + i, BranchOp::Insert(key, pn)); + } + new_insert += to - from - 1; + } + } } } - fn op_key(&self, op: &BranchOp) -> Key { - // UNWRAP: `Keep` ops require base to exist. - match op { + fn op_first_key(&self, branch_op: &BranchOp) -> Key { + // UNWRAP: `KeepChunk` leaf ops only exist when base is `Some`. + match branch_op { BranchOp::Insert(k, _) => *k, - BranchOp::Keep(i, _) => self.base.as_ref().unwrap().key(*i), + BranchOp::Update(pos, _) => self.base.as_ref().unwrap().key(*pos), + BranchOp::KeepChunk(from, _) => self.base.as_ref().unwrap().key(*from), + } + } +} + +// Given a vector of `BranchOp`, try to split the `index` operation, +// which is expected to be KeepChunk, into two halves, +// targeting a `target` size and and not exceeding a `limit`. +// +// `target` and `limit` are required to understand when to accept a split +// with a final size smaller than the target. Constraining the split to always +// be bigger than the target causes the update algorithm to frequently +// fall into underfull to overfull scenarios. +fn split_keep_chunk( + base: &BaseBranch, + gauge: &BranchGauge, + ops: &mut Vec, + index: usize, + target: usize, + limit: usize, +) -> usize { + let BranchOp::KeepChunk(from, to) = ops[index] else { + panic!("Attempted to split non `BranchOp::KeepChunk` operation"); + }; + + let mut left_chunk_n_items = 0; + let mut gauge = gauge.clone(); + for i in from..to { + left_chunk_n_items += 1; + + let key = get_key(&base.node, i); + let separator_len = separator_len(&key); + let body_size_after = gauge.body_size_after(key, separator_len); + + if body_size_after >= target { + // if an item jumps from below the target to bigger then the limit, do not use it + if body_size_after > limit { + left_chunk_n_items -= 1; + } + break; } + gauge.ingest_key(key, separator_len); + } + + // if none or all elements are taken then nothing needs to be changed + if left_chunk_n_items != 0 && to - from != left_chunk_n_items { + ops.insert(index, BranchOp::KeepChunk(from, from + left_chunk_n_items)); + ops[index + 1] = BranchOp::KeepChunk(from + left_chunk_n_items, to); } + + left_chunk_n_items } +#[derive(Clone)] struct BranchGauge { // key and length of the first separator if any first_separator: Option<(Key, usize)>, @@ -442,7 +722,7 @@ impl BranchGauge { } } - fn ingest(&mut self, key: Key, len: usize) { + fn ingest_key(&mut self, key: Key, len: usize) { let Some((ref first, _)) = self.first_separator else { self.first_separator = Some((key, len)); self.prefix_len = len; @@ -458,6 +738,25 @@ impl BranchGauge { self.n += 1; } + fn ingest_branch_op(&mut self, base: &Option, op: &BranchOp) { + // UNWRAPs: if op is Update or KeepChunk, then it must be a base + match op { + BranchOp::Update(pos, _) => { + let key = get_key(&base.as_ref().unwrap().node, *pos); + self.ingest_key(key, separator_len(&key)); + } + BranchOp::KeepChunk(from, to) => { + for i in *from..*to { + let key = get_key(&base.as_ref().unwrap().node, i); + self.ingest_key(key, separator_len(&key)); + } + } + BranchOp::Insert(key, _) => { + self.ingest_key(*key, separator_len(key)); + } + } + } + fn stop_prefix_compression(&mut self) { assert!(self.prefix_compressed.is_none()); self.prefix_compressed = Some((self.n, self.sum_separator_lengths)); @@ -507,6 +806,84 @@ impl BranchGauge { branch_node::body_size(p, t, self.n + 1) } + fn body_size_after_chunk(&self, base: &BaseBranch, from: usize, to: usize) -> usize { + let n_items = to - from; + + // chunk do not interact with uncompressed items, thus expect to copy them in the + // compressed separators + // update the prefix len base on the new chunk and evaluate its new length + if self.prefix_compressed.is_some() { + let mut base_separators_len = 0; + + for i in from..to { + let key = get_key(&base.node, i); + let separator_len = separator_len(&key); + base_separators_len += separator_len; + } + + let t = self.total_separator_lengths(self.prefix_len) + base_separators_len; + return branch_node::body_size(self.prefix_len, t, self.n + n_items); + } + + let p; + let t; + + if let Some((ref first, _)) = self.first_separator { + let mut first_separators_len = 0; + let mut base_separators_len = 0; + + let mut new_prefix_len = 0; + for i in from..from + n_items { + let key = get_key(&base.node, i); + new_prefix_len = prefix_len(first, &key); + let separator_len = separator_len(&key); + if i == from { + first_separators_len = separator_len; + } else { + base_separators_len += separator_len; + } + } + p = new_prefix_len; + + let base_compressed_separators_len = + first_separators_len.saturating_sub(p) + base_separators_len - ((n_items - 1) * p); + + t = self.total_separator_lengths(p) + base_compressed_separators_len + } else { + let mut first_separators_len = 0; + let mut base_separators_len = 0; + + let first = get_key(&base.node, from); + let mut new_prefix_len = 0; + for i in from..from + n_items { + let key = get_key(&base.node, i); + new_prefix_len = prefix_len(&first, &key); + let separator_len = separator_len(&key); + if i == from { + first_separators_len = separator_len; + } else { + base_separators_len += separator_len; + } + } + p = if new_prefix_len == 256 { + separator_len(&first) + } else { + new_prefix_len + }; + + let base_compressed_separators_len = + first_separators_len.saturating_sub(p) + base_separators_len - ((n_items - 1) * p); + + t = self.total_separator_lengths(p) + base_compressed_separators_len + + // both prefix and total separatos bit len is inherited from the base + //p = base.node.prefix_len() as usize; + //t = base.node.view().raw_separators_data(from, to).bit_len; + } + + branch_node::body_size(p, t, self.n + n_items) + } + fn body_size(&self) -> usize { branch_node::body_size( self.prefix_len, @@ -558,7 +935,7 @@ pub mod tests { fn gauge_stop_uncompressed() { let mut gauge = BranchGauge::new(); - gauge.ingest([0; 32], 0); + gauge.ingest_key([0; 32], 0); // push items with a long (16-byte) shared prefix until just before the halfway point. let mut items: Vec = (1..1000u16) @@ -577,7 +954,7 @@ pub mod tests { break; } - gauge.ingest(item, len); + gauge.ingest_key(item, len); } assert!(gauge.body_size() < BRANCH_MERGE_THRESHOLD); @@ -637,7 +1014,7 @@ pub mod tests { break; } items.push((next_key, items.len())); - gauge.ingest(next_key, s_len); + gauge.ingest_key(next_key, s_len); } make_branch(items) @@ -666,7 +1043,7 @@ pub mod tests { } items.push((next_key, items.len())); - gauge.ingest(next_key, s_len); + gauge.ingest_key(next_key, s_len); } let mut branch_node = make_raw_branch(items); @@ -695,7 +1072,7 @@ pub mod tests { PAGE_POOL.clone(), Some(BaseBranch { node: branch, - iter_pos: 0, + low: 0, }), None, ); @@ -724,7 +1101,7 @@ pub mod tests { PAGE_POOL.clone(), Some(BaseBranch { node: branch, - iter_pos: 0, + low: 0, }), None, ); @@ -754,7 +1131,7 @@ pub mod tests { PAGE_POOL.clone(), Some(BaseBranch { node: branch, - iter_pos: 0, + low: 0, }), None, ); @@ -790,7 +1167,7 @@ pub mod tests { PAGE_POOL.clone(), Some(BaseBranch { node: branch, - iter_pos: 0, + low: 0, }), None, ); @@ -834,7 +1211,7 @@ pub mod tests { PAGE_POOL.clone(), Some(BaseBranch { node: branch, - iter_pos: 0, + low: 0, }), Some(key2(0)), ); @@ -851,7 +1228,7 @@ pub mod tests { updater.reset_base( Some(BaseBranch { node: branch2, - iter_pos: 0, + low: 0, }), None, ); @@ -876,7 +1253,7 @@ pub mod tests { PAGE_POOL.clone(), Some(BaseBranch { node: branch, - iter_pos: 0, + low: 0, }), None, ); @@ -901,7 +1278,7 @@ pub mod tests { PAGE_POOL.clone(), Some(BaseBranch { node: branch, - iter_pos: 0, + low: 0, }), None, ); @@ -945,7 +1322,7 @@ pub mod tests { PAGE_POOL.clone(), Some(BaseBranch { node: branch, - iter_pos: 0, + low: 0, }), Some(key2(0)), ); @@ -961,7 +1338,7 @@ pub mod tests { updater.reset_base( Some(BaseBranch { node: branch2, - iter_pos: 0, + low: 0, }), None, ); @@ -984,7 +1361,7 @@ pub mod tests { let mut gauge = BranchGauge::new(); for i in 0..new_branch_1.n() as usize { let key = get_key(&new_branch_1, i); - gauge.ingest(key, separator_len(&key)) + gauge.ingest_key(key, separator_len(&key)) } gauge.body_size() };