From d707976d5570b266459f4cd77049b3c4bc93e2ba Mon Sep 17 00:00:00 2001 From: gabriele-0201 Date: Wed, 11 Dec 2024 06:43:14 +0100 Subject: [PATCH] fix: adjust BranchOp handling The changeset not only includes fixes for how uncompressed separators are represented through BranchOp, but also contains a refactor to better encapsulate the update logic of BranchOp. --- nomt/src/beatree/ops/update/branch_updater.rs | 383 +++++++++--------- 1 file changed, 199 insertions(+), 184 deletions(-) diff --git a/nomt/src/beatree/ops/update/branch_updater.rs b/nomt/src/beatree/ops/update/branch_updater.rs index fb463d98..e6d54541 100644 --- a/nomt/src/beatree/ops/update/branch_updater.rs +++ b/nomt/src/beatree/ops/update/branch_updater.rs @@ -138,18 +138,23 @@ impl BranchUpdater { let res = self.keep_up_to(Some(&key)); match (res, pn) { + (_, Some(pn)) if self.gauge.prefix_compressed.is_some() => { + // prefix compression stopped so each added element must be an BranchOp::Insert + self.ops.push(BranchOp::Insert(key, pn)); + self.bulk_split_step(); + } // UNWRAP: if the item has been found it must be a base node (Some(pos), Some(pn)) if pos < self.base.as_ref().unwrap().node.prefix_compressed() as usize => { // a compressed separator has been changed self.ops.push(BranchOp::Update(pos, pn)); - self.bulk_split_step(self.ops.len() - 1); + self.bulk_split_step(); } (_, Some(pn)) => { // a new key or a previous uncompressed separator has been updated self.ops.push(BranchOp::Insert(key, pn)); - self.bulk_split_step(self.ops.len() - 1); + self.bulk_split_step(); } _ => (), } @@ -184,6 +189,7 @@ impl BranchUpdater { } else { self.prepare_merge_ops(last_ops_start); + // UNWRAP: protected above. DigestResult::NeedsMerge(self.cutoff.unwrap()) } } @@ -234,7 +240,7 @@ impl BranchUpdater { if from == to { // nothing to keep - return None; + return if found { Some(to) } else { None }; } let base_compressed_end = std::cmp::min(to, base.node.prefix_compressed() as usize); @@ -264,7 +270,13 @@ impl BranchUpdater { // push compressed chunk if let Some(chunk) = maybe_chunk { self.ops.push(BranchOp::KeepChunk(chunk)); - self.bulk_split_step(self.ops.len() - 1); + self.bulk_split_step(); + + if self.gauge.prefix_compressed.is_some() { + // prefix compression stopped so KeepChunk must be replaced by multiple Inserts + let last_op = self.ops.len() - 1; + replace_with_insert(&mut self.ops, last_op, self.base.as_ref()); + } } // convert every kept uncompressed separator into an Insert operation @@ -272,7 +284,7 @@ impl BranchUpdater { // UNWRAP: self.base is not None let (key, pn) = self.base.as_ref().unwrap().key_value(i); self.ops.push(BranchOp::Insert(key, pn)); - self.bulk_split_step(self.ops.len() - 1); + self.bulk_split_step(); } found @@ -280,109 +292,48 @@ impl BranchUpdater { // check whether bulk split needs to start, and if so, start it. // If ongoing, check if we need to cut off. - // returns the amount of operations consumed for the bulk creation - fn bulk_split_step(&mut self, op_index: usize) -> usize { - // UNWRAPs: if the operation is Update or KeepChunk, then it must be a base to keep the chunk from - let body_size_after = match self.ops[op_index] { + fn bulk_split_step(&mut self) { + let Some(last_op) = self.ops.last() else { + panic!("Attempted bulk_split_step on no BranchOp available"); + }; + + // UNWRAPs: `KeepChunk` or `Update` ops only exist when base is Some. + let body_size_after = match last_op { BranchOp::KeepChunk(ref chunk) => self .gauge .body_size_after_chunk(self.base.as_ref().unwrap(), &chunk), BranchOp::Update(pos, _) => { - let key = self.base.as_ref().unwrap().key(pos); + let key = self.base.as_ref().unwrap().key(*pos); self.gauge.body_size_after(key, separator_len(&key)) } - BranchOp::Insert(key, _) => self.gauge.body_size_after(key, separator_len(&key)), + BranchOp::Insert(key, _) => self.gauge.body_size_after(*key, separator_len(&key)), }; match self.bulk_split { None if body_size_after >= BRANCH_BULK_SPLIT_THRESHOLD => { - self.bulk_split = Some(BranchBulkSplitter::default()); - self.gauge = BranchGauge::new(); - let mut idx = 0; - while idx < self.ops.len() { - let consumed = self.bulk_split_step(idx); - idx += consumed; - } - idx + self.bulk_split = Some(BranchBulkSplitter::default()) } - Some(ref mut bulk_splitter) if body_size_after >= BRANCH_BULK_SPLIT_TARGET => { - let mut recursive_on_split = false; - - let accept_item = body_size_after <= BRANCH_NODE_BODY_SIZE || { - match self.ops[op_index] { - BranchOp::Insert(..) | BranchOp::Update(..) => { - if self.gauge.body_size() < BRANCH_MERGE_THRESHOLD { - // rare case: body was artifically small due to long shared prefix. - // start applying items without prefix compression. we assume items are less - // than half the body size, so the next item should apply cleanly. - self.gauge.stop_prefix_compression(); - true - } else { - false - } - } - BranchOp::KeepChunk(..) => { - let left_n_items = split_keep_chunk( - self.base.as_ref().unwrap(), - &self.gauge, - &mut self.ops, - op_index, - BRANCH_NODE_BODY_SIZE, - BRANCH_NODE_BODY_SIZE, - ); - - if left_n_items > 0 { - // KeepChunk has been successfully split, the left chunk is now able to fit - recursive_on_split = true; - true - } else { - // Not even a single element was able to fit. - if self.gauge.body_size() < BRANCH_MERGE_THRESHOLD { - // We can stop prefix compression and separate the first - // element of the keep_chunk into its own. - extract_insert_from_keep_chunk( - self.base.as_ref().unwrap(), - &mut self.ops, - op_index, - ); - - self.gauge.stop_prefix_compression(); - recursive_on_split = true; - - true - } else { - false - } - } - } - } - }; - - let n = if accept_item { - self.gauge.ingest_branch_op(&self.base, &self.ops[op_index]); - op_index + 1 - bulk_splitter.total_count - } else { - op_index - bulk_splitter.total_count - }; - - // push onto bulk splitter & restart gauge. - let last_gauge = std::mem::replace(&mut self.gauge, BranchGauge::new()); - bulk_splitter.push(n, last_gauge); + Some(_) if body_size_after >= BRANCH_BULK_SPLIT_TARGET => (), + _ => { + self.gauge.ingest_branch_op(self.base.as_ref(), last_op); + return; + } + }; - if !accept_item { - self.gauge.ingest_branch_op(&self.base, &self.ops[op_index]); + // continue or start the bulk split + // UNWRAPs: bulk_split has just been checked to be Some or has just been set to Some + let mut from = self.bulk_split.as_ref().unwrap().total_count; + loop { + match self.consume_and_update_until(from, BRANCH_BULK_SPLIT_TARGET) { + Ok((item_count, gauge)) => { + self.bulk_split.as_mut().unwrap().push(item_count, gauge); + from = from + item_count; } - - if recursive_on_split { - 1 + self.bulk_split_step(op_index + 1) - } else { - 1 + Err(gauge) => { + self.gauge = gauge; + break; } } - _ => { - self.gauge.ingest_branch_op(&self.base, &self.ops[op_index]); - 1 - } } } @@ -407,79 +358,41 @@ impl BranchUpdater { fn split(&mut self, new_branches: &mut impl HandleNewBranch) -> DigestResult { let midpoint = self.gauge.body_size() / 2; - let mut split_point = 0; - - let mut left_gauge = BranchGauge::new(); - while left_gauge.body_size() < midpoint { - match *&self.ops[split_point] { - BranchOp::Insert(key, _) => { - if left_gauge.body_size_after(key, separator_len(&key)) > BRANCH_NODE_BODY_SIZE - { - if left_gauge.body_size() < BRANCH_MERGE_THRESHOLD { - // rare case: body was artifically small due to long shared prefix. - // start applying items without prefix compression. we assume items are less - // than half the body size, so the next item should apply cleanly. - left_gauge.stop_prefix_compression(); - } else { - break; - } - } - } - BranchOp::Update(pos, _) => { - let key = self.base.as_ref().unwrap().key(pos); - if left_gauge.body_size_after(key, separator_len(&key)) > BRANCH_NODE_BODY_SIZE - { - if left_gauge.body_size() < BRANCH_MERGE_THRESHOLD { - // same case as above - left_gauge.stop_prefix_compression(); - } else { - break; - } - } - } - BranchOp::KeepChunk(..) => { - // try to split the chunk to make it fit into the available space - let n_items = split_keep_chunk( - self.base.as_ref().unwrap(), - &left_gauge, - &mut self.ops, - split_point, - midpoint, - BRANCH_NODE_BODY_SIZE, - ); - if n_items == 0 { - // if no item from the chunk is capable to fit then - // try to extract the first element from the chunk and storing it - // back as an Insert to check if `stop_prefix_compression` is required - extract_insert_from_keep_chunk( - self.base.as_ref().unwrap(), - &mut self.ops, - split_point, - ); + let (split_point, left_gauge) = match self.consume_and_update_until(0, midpoint) { + Ok((split_point, left_gauge)) => (split_point, left_gauge), + // If the current ops cannot reach the target and there is a cutoff + // return NeedsMerge with the relative cutoff. + Err(new_gauge) if self.cutoff.is_some() => { + self.gauge = new_gauge; + // UNWRAP: self.cutoff has just been checked to be Some. + return DigestResult::NeedsMerge(self.cutoff.unwrap()); + } + // If there is no cutoff, then construct the leaf with all the available ops. + Err(new_gauge) => (self.ops.len(), new_gauge), + }; - continue; - } - } - }; + let left_separator = self.op_first_key(&self.ops[0]); + let left_ops = &self.ops[..split_point]; + let left_node = self.build_branch(left_ops, &left_gauge); - left_gauge.ingest_branch_op(&self.base, &self.ops[split_point]); - split_point += 1; + if split_point == self.ops.len() { + // It could be possible due to prefix uncompression + // that after `consume_and_update_until` all ops fits in a single node. + new_branches.handle_new_branch(left_separator, left_node, self.cutoff); + self.ops.clear(); + self.gauge = BranchGauge::new(); + return DigestResult::Finished; } - let left_ops = &self.ops[..split_point]; - let right_ops = &self.ops[split_point..]; - - let left_separator = self.op_first_key(&self.ops[0]); let right_separator = self.op_first_key(&self.ops[split_point]); - - let left_node = self.build_branch(left_ops, &left_gauge); - new_branches.handle_new_branch(left_separator, left_node, Some(right_separator)); let mut right_gauge = BranchGauge::new(); + let right_ops = &self.ops[split_point..]; + for op in right_ops { - right_gauge.ingest_branch_op(&self.base, op); + right_gauge.ingest_branch_op(self.base.as_ref(), op); } if right_gauge.body_size() > BRANCH_NODE_BODY_SIZE { @@ -510,6 +423,98 @@ impl BranchUpdater { } } + // Starting from the specified index `from` within `self.ops`, consume and possibly + // change the operations themselves to achieve a sequence of operations that are able to + // construct a branch node with the specified target size. + // + // If `stop_prefix_compression` has to be called, then the target becomes BRANCH_MERGE_THRESHOLD + // to minimize the amount of uncompressed items inserted in the node. + // + // If reaching the target is not possible, then the gauge reflecting the last operations + // will be returned as an error. + fn consume_and_update_until( + &mut self, + from: usize, + mut target: usize, + ) -> Result<(usize, BranchGauge), BranchGauge> { + let mut pos = from; + let mut gauge = BranchGauge::new(); + + while pos < self.ops.len() && gauge.body_size() < target { + match *&self.ops[pos] { + BranchOp::Insert(key, _) => { + if gauge.body_size_after(key, separator_len(&key)) > BRANCH_NODE_BODY_SIZE { + if gauge.body_size() < BRANCH_MERGE_THRESHOLD { + // rare case: body was artifically small due to long shared prefix. + // start applying items without prefix compression. we assume items are less + // than half the body size, so the next item should apply cleanly. + gauge.stop_prefix_compression(); + // change the target requirement to minumize the number of non + // compressed separators saved into one node + target = BRANCH_MERGE_THRESHOLD; + } else { + break; + } + } + } + BranchOp::Update(update_pos, _) => { + // UNWRAP: `Update` op only exist when base is Some. + let key = self.base.as_ref().unwrap().key(update_pos); + + if gauge.body_size_after(key, separator_len(&key)) > BRANCH_NODE_BODY_SIZE { + if gauge.body_size() < BRANCH_MERGE_THRESHOLD { + // Replace the Update op and repeat the loop + // to see if `stop_prefix_compression` is activated + replace_with_insert(&mut self.ops, pos, self.base.as_ref()); + continue; + } else { + break; + } + } + } + BranchOp::KeepChunk(..) => { + // UNWRAP: `KeepChunk` op only exists when base is Some. + let base = self.base.as_ref().unwrap(); + + // Try to split the chunk to make it fit into the available space. + // `try_split_keep_chunk` works on the gauge thus it accounts for a possible + // stop of the prefix compression even if working on a KeepChunk operation + let left_n_items = try_split_keep_chunk( + base, + &gauge, + &mut self.ops, + pos, + target, + BRANCH_NODE_BODY_SIZE, + ); + + if left_n_items == 0 { + // If no item from the chunk is capable of fitting, + // then extract the first element from the chunk and repeat the loop + // to see if `stop_prefix_compression` is activated + extract_insert_from_keep_chunk(base, &mut self.ops, pos); + continue; + } + } + }; + + gauge.ingest_branch_op(self.base.as_ref(), &self.ops[pos]); + let n_ops = if gauge.prefix_compressed.is_some() { + // replace everything with Insert if the prefix compression was stopped + replace_with_insert(&mut self.ops, pos, self.base.as_ref()) + } else { + 1 + }; + pos += n_ops; + } + + if gauge.body_size() >= target { + Ok((pos - from, gauge)) + } else { + Err(gauge) + } + } + fn build_branch(&self, ops: &[BranchOp], gauge: &BranchGauge) -> BranchNode { let branch = BranchNode::new_in(&self.page_pool); @@ -564,6 +569,8 @@ impl BranchUpdater { .into_iter(), ); + // UNWRAP: apply_chunk works on an aggregation of `KeepChunk and `Update` ops, + // and they only exist when the base is Some. for pos in compressed_end..base_range.end { let (key, pn) = self.base.as_ref().unwrap().key_value(pos); builder.push(key, separator_len(&key), pn.0); @@ -579,6 +586,8 @@ impl BranchUpdater { // If yes, then update it and restart the loop on the next operation. // Otherwise, apply the pending chunk and let the same operation be re-evaluated. if pending_keep_chunk.is_some() { + // UNWRAPS: pending_keep_chunk has just been checked to be Some. + // If pending_keep_chunk is Some, then pending_ops_range is also. match &ops[i] { // found a insert, apply pending chunk BranchOp::Insert(_, _) => { @@ -589,7 +598,6 @@ impl BranchUpdater { ); } BranchOp::KeepChunk(chunk) => { - // UNWRAP: pending_keep_chunk has just been checked to be Some let range = pending_keep_chunk.as_mut().unwrap(); let ops_range = pending_ops_range.as_mut().unwrap(); if range.end == chunk.start { @@ -608,7 +616,6 @@ impl BranchUpdater { } } BranchOp::Update(pos, _) => { - // UNWRAP: pending_keep_chunk has just been checked to be Some let range = pending_keep_chunk.as_mut().unwrap(); let ops_range = pending_ops_range.as_mut().unwrap(); if range.end == *pos { @@ -657,32 +664,17 @@ impl BranchUpdater { fn prepare_merge_ops(&mut self, split_point: usize) { self.ops.drain(..split_point); - let Some(ref base) = self.base else { return }; - - // replace `KeepChunk` and `Update` ops with pure key-value ops, + // Replace `KeepChunk` and `Update` ops with pure key-value ops, // preparing for the base to be changed. - let mut new_insert = 0; - for i in 0..self.ops.len() { - match self.ops[new_insert + i] { - BranchOp::Insert(_, _) => (), - BranchOp::Update(pos, new_pn) => { - self.ops[new_insert + i] = BranchOp::Insert(base.key(pos), new_pn); - } - BranchOp::KeepChunk(chunk) => { - self.ops.remove(new_insert + i); - - for pos in (chunk.start..chunk.end).into_iter().rev() { - let (key, pn) = base.key_value(pos); - self.ops.insert(new_insert + i, BranchOp::Insert(key, pn)); - } - new_insert += chunk.end - chunk.start - 1; - } - } + let mut i = 0; + while i < self.ops.len() { + let replaced_ops = replace_with_insert(&mut self.ops, i, self.base.as_ref()); + i += replaced_ops; } } fn op_first_key(&self, branch_op: &BranchOp) -> Key { - // UNWRAP: `KeepChunk` leaf ops only exist when base is `Some`. + // UNWRAPs: `KeepChunk` leaf ops only exists when base is Some. match branch_op { BranchOp::Insert(k, _) => *k, BranchOp::Update(pos, _) => self.base.as_ref().unwrap().key(*pos), @@ -691,6 +683,31 @@ impl BranchUpdater { } } +fn replace_with_insert( + ops: &mut Vec, + op_index: usize, + base: Option<&BaseBranch>, +) -> usize { + match ops[op_index] { + BranchOp::Insert(_, _) => 1, + BranchOp::Update(pos, new_pn) => { + // UNWRAP: `Update` op only exists when base is Some. + ops[op_index] = BranchOp::Insert(base.unwrap().key(pos), new_pn); + 1 + } + BranchOp::KeepChunk(chunk) => { + ops.remove(op_index); + + for pos in (chunk.start..chunk.end).into_iter().rev() { + // UNWRAP: `KeepChunk` op only exists when base is Some. + let (key, pn) = base.unwrap().key_value(pos); + ops.insert(op_index, BranchOp::Insert(key, pn)); + } + chunk.end - chunk.start + } + } +} + // Given a vector of `BranchOp`, try to split the `index` operation, // which is expected to be KeepChunk, into two halves, // targeting a `target` size and and not exceeding a `limit`. @@ -699,7 +716,7 @@ impl BranchUpdater { // with a final size smaller than the target. Constraining the split to always // be bigger than the target causes the update algorithm to frequently // fall into underfull to overfull scenarios. -fn split_keep_chunk( +fn try_split_keep_chunk( base: &BaseBranch, gauge: &BranchGauge, ops: &mut Vec, @@ -752,7 +769,6 @@ fn split_keep_chunk( ops.insert(index, BranchOp::KeepChunk(left_chunk)); ops[index + 1] = BranchOp::KeepChunk(right_chunk); } - left_chunk_n_items } @@ -817,15 +833,14 @@ impl BranchGauge { self.n += 1; } - fn ingest_branch_op(&mut self, base: &Option, op: &BranchOp) { - // UNWRAPs: if op is Update or KeepChunk, then it must be a base + fn ingest_branch_op(&mut self, base: Option<&BaseBranch>, op: &BranchOp) { + // UNWRAPs: `KeepChunk` and `Update` ops only exist when base is Some. match op { BranchOp::Update(pos, _) => { let key = get_key(&base.as_ref().unwrap().node, *pos); self.ingest_key(key, separator_len(&key)); } BranchOp::KeepChunk(ref chunk) => { - // UNWRAP: `KeepChunk` requires the base branch to exist. self.ingest_chunk(base.as_ref().unwrap(), chunk); } BranchOp::Insert(key, _) => {