Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(merkle-tree): Fix tree truncation #3178

Merged
merged 3 commits into from
Oct 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion core/lib/merkle_tree/src/domain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -360,7 +360,10 @@ impl ZkSyncTree {
pub fn roll_back_logs(&mut self, last_l1_batch_to_keep: L1BatchNumber) -> anyhow::Result<()> {
self.tree.db.reset();
let retained_version_count = u64::from(last_l1_batch_to_keep.0 + 1);
self.tree.truncate_recent_versions(retained_version_count)
// Since `Patched<_>` doesn't implement `PruneDatabase`, we borrow the underlying DB, which is safe
// because the in-memory patch was reset above.
MerkleTree::new_unchecked(self.tree.db.inner_mut())
.truncate_recent_versions(retained_version_count)
}

/// Saves the accumulated changes in the tree to RocksDB.
Expand Down
38 changes: 20 additions & 18 deletions core/lib/merkle_tree/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -200,24 +200,6 @@ impl<DB: Database, H: HashTree> MerkleTree<DB, H> {
root.unwrap_or(Root::Empty)
}

/// Removes the most recent versions from the database.
///
/// The current implementation does not actually remove node data for the removed versions
/// since it's likely to be reused in the future (especially upper-level internal nodes).
///
/// # Errors
///
/// Proxies database I/O errors.
pub fn truncate_recent_versions(&mut self, retained_version_count: u64) -> anyhow::Result<()> {
let mut manifest = self.db.manifest().unwrap_or_default();
if manifest.version_count > retained_version_count {
manifest.version_count = retained_version_count;
let patch = PatchSet::from_manifest(manifest);
self.db.apply_patch(patch)?;
}
Ok(())
}

/// Extends this tree by creating its new version.
///
/// # Return value
Expand Down Expand Up @@ -259,6 +241,26 @@ impl<DB: Database, H: HashTree> MerkleTree<DB, H> {
}

impl<DB: PruneDatabase> MerkleTree<DB> {
/// Removes the most recent versions from the database.
///
/// The current implementation does not actually remove node data for the removed versions
/// since it's likely to be reused in the future (especially upper-level internal nodes).
///
/// # Errors
///
/// Proxies database I/O errors.
pub fn truncate_recent_versions(&mut self, retained_version_count: u64) -> anyhow::Result<()> {
let mut manifest = self.db.manifest().unwrap_or_default();
let current_version_count = manifest.version_count;
if current_version_count > retained_version_count {
// It is necessary to remove "future" stale keys since otherwise they may be used in future pruning and lead
// to non-obsolete tree nodes getting removed.
manifest.version_count = retained_version_count;
self.db.truncate(manifest, ..current_version_count)?;
}
Ok(())
}

/// Returns the first retained version of the tree.
pub fn first_retained_version(&self) -> Option<u64> {
match self.db.min_stale_key_version() {
Expand Down
67 changes: 66 additions & 1 deletion core/lib/merkle_tree/src/pruning.rs
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ mod tests {
use super::*;
use crate::{
types::{Node, NodeKey},
Database, Key, MerkleTree, PatchSet, TreeEntry, ValueHash,
Database, Key, MerkleTree, PatchSet, RocksDBWrapper, TreeEntry, ValueHash,
};

fn create_db() -> PatchSet {
Expand Down Expand Up @@ -506,4 +506,69 @@ mod tests {
println!("Keys are pruned after each update");
test_keys_are_removed_by_pruning_when_overwritten_in_multiple_batches(true);
}

fn test_pruning_with_truncation(db: impl PruneDatabase) {
let mut tree = MerkleTree::new(db).unwrap();
let kvs: Vec<_> = (0_u64..100)
.map(|i| TreeEntry::new(Key::from(i), i + 1, ValueHash::zero()))
.collect();
tree.extend(kvs).unwrap();

let overridden_kvs = vec![TreeEntry::new(
Key::from(0),
1,
ValueHash::repeat_byte(0xaa),
)];
tree.extend(overridden_kvs).unwrap();

let stale_keys = tree.db.stale_keys(1);
assert!(
stale_keys.iter().any(|key| !key.is_empty()),
"{stale_keys:?}"
);

// Revert `overridden_kvs`.
tree.truncate_recent_versions(1).unwrap();
assert_eq!(tree.latest_version(), Some(0));
let future_stale_keys = tree.db.stale_keys(1);
assert!(future_stale_keys.is_empty());

// Add a new version without the key. To make the matter more egregious, the inserted key
// differs from all existing keys, starting from the first nibble.
let new_key = Key::from_big_endian(&[0xaa; 32]);
let new_kvs = vec![TreeEntry::new(new_key, 101, ValueHash::repeat_byte(0xaa))];
tree.extend(new_kvs).unwrap();
assert_eq!(tree.latest_version(), Some(1));

let stale_keys = tree.db.stale_keys(1);
assert_eq!(stale_keys.len(), 1);
assert!(
stale_keys[0].is_empty() && stale_keys[0].version == 0,
"{stale_keys:?}"
);

let (mut pruner, _) = MerkleTreePruner::new(tree.db);
let prunable_version = pruner.last_prunable_version().unwrap();
assert_eq!(prunable_version, 1);
let stats = pruner
.prune_up_to(prunable_version)
.unwrap()
.expect("tree was not pruned");
assert_eq!(stats.target_retained_version, 1);
assert_eq!(stats.pruned_key_count, 1); // only the root node should have been pruned

let tree = MerkleTree::new(pruner.db).unwrap();
tree.verify_consistency(1, false).unwrap();
}

#[test]
fn pruning_with_truncation() {
test_pruning_with_truncation(PatchSet::default());
}

#[test]
fn pruning_with_truncation_on_rocksdb() {
let temp_dir = tempfile::TempDir::new().unwrap();
test_pruning_with_truncation(RocksDBWrapper::new(temp_dir.path()).unwrap());
}
}
30 changes: 30 additions & 0 deletions core/lib/merkle_tree/src/storage/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -400,6 +400,17 @@ pub trait PruneDatabase: Database {
///
/// Propagates database I/O errors.
fn prune(&mut self, patch: PrunePatchSet) -> anyhow::Result<()>;

/// Atomically truncates the specified range of versions and stale keys.
///
/// # Errors
///
/// Propagates database I/O errors.
fn truncate(
&mut self,
manifest: Manifest,
truncated_versions: ops::RangeTo<u64>,
) -> anyhow::Result<()>;
}

impl<T: PruneDatabase + ?Sized> PruneDatabase for &mut T {
Expand All @@ -414,6 +425,14 @@ impl<T: PruneDatabase + ?Sized> PruneDatabase for &mut T {
fn prune(&mut self, patch: PrunePatchSet) -> anyhow::Result<()> {
(**self).prune(patch)
}

fn truncate(
&mut self,
manifest: Manifest,
truncated_versions: ops::RangeTo<u64>,
) -> anyhow::Result<()> {
(**self).truncate(manifest, truncated_versions)
}
}

impl PruneDatabase for PatchSet {
Expand Down Expand Up @@ -447,6 +466,17 @@ impl PruneDatabase for PatchSet {
.retain(|version, _| !patch.deleted_stale_key_versions.contains(version));
Ok(())
}

fn truncate(
&mut self,
manifest: Manifest,
truncated_versions: ops::RangeTo<u64>,
) -> anyhow::Result<()> {
self.manifest = manifest;
self.stale_keys_by_version
.retain(|version, _| !truncated_versions.contains(version));
Ok(())
}
}

#[cfg(test)]
Expand Down
24 changes: 23 additions & 1 deletion core/lib/merkle_tree/src/storage/parallel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::{
any::Any,
collections::{HashMap, VecDeque},
error::Error as StdError,
mem,
mem, ops,
sync::{mpsc, Arc},
thread,
time::Duration,
Expand Down Expand Up @@ -375,6 +375,17 @@ impl<DB: PruneDatabase> PruneDatabase for ParallelDatabase<DB> {
.context("failed synchronizing database before pruning")?;
self.inner.prune(patch)
}

fn truncate(
&mut self,
manifest: Manifest,
truncated_versions: ops::RangeTo<u64>,
) -> anyhow::Result<()> {
// Require the underlying database to be fully synced.
self.wait_sync()
.context("failed synchronizing database before truncation")?;
self.inner.truncate(manifest, truncated_versions)
}
}

/// Database with either sequential or parallel persistence.
Expand Down Expand Up @@ -479,6 +490,17 @@ impl<DB: PruneDatabase> PruneDatabase for MaybeParallel<DB> {
Self::Parallel(db) => db.prune(patch),
}
}

fn truncate(
&mut self,
manifest: Manifest,
truncated_versions: ops::RangeTo<u64>,
) -> anyhow::Result<()> {
match self {
Self::Sequential(db) => db.truncate(manifest, truncated_versions),
Self::Parallel(db) => db.truncate(manifest, truncated_versions),
}
}
}

#[cfg(test)]
Expand Down
28 changes: 27 additions & 1 deletion core/lib/merkle_tree/src/storage/rocksdb.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
//! RocksDB implementation of [`Database`].

use std::{any::Any, cell::RefCell, path::Path, sync::Arc};
use std::{any::Any, cell::RefCell, ops, path::Path, sync::Arc};

use anyhow::Context as _;
use rayon::prelude::*;
Expand Down Expand Up @@ -351,6 +351,32 @@ impl PruneDatabase for RocksDBWrapper {
.write(write_batch)
.context("Failed writing a batch to RocksDB")
}

fn truncate(
&mut self,
manifest: Manifest,
truncated_versions: ops::RangeTo<u64>,
) -> anyhow::Result<()> {
anyhow::ensure!(
manifest.version_count <= truncated_versions.end,
"Invalid truncate call: manifest={manifest:?}, truncated_versions={truncated_versions:?}"
);
let mut write_batch = self.db.new_write_batch();

let tree_cf = MerkleTreeColumnFamily::Tree;
let mut node_bytes = Vec::with_capacity(128);
manifest.serialize(&mut node_bytes);
write_batch.put_cf(tree_cf, Self::MANIFEST_KEY, &node_bytes);

let stale_keys_cf = MerkleTreeColumnFamily::StaleKeys;
let first_version = &manifest.version_count.to_be_bytes() as &[_];
let last_version = &truncated_versions.end.to_be_bytes();
write_batch.delete_range_cf(stale_keys_cf, first_version..last_version);

self.db
.write(write_batch)
.context("Failed writing a batch to RocksDB")
}
}

#[cfg(test)]
Expand Down
6 changes: 3 additions & 3 deletions core/lib/merkle_tree/tests/integration/merkle_tree.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ use rand::{rngs::StdRng, seq::SliceRandom, Rng, SeedableRng};
use test_casing::test_casing;
use zksync_crypto_primitives::hasher::blake2::Blake2Hasher;
use zksync_merkle_tree::{
Database, HashTree, MerkleTree, PatchSet, Patched, TreeEntry, TreeInstruction, TreeLogEntry,
TreeRangeDigest,
Database, HashTree, MerkleTree, PatchSet, Patched, PruneDatabase, TreeEntry, TreeInstruction,
TreeLogEntry, TreeRangeDigest,
};
use zksync_types::{AccountTreeId, Address, StorageKey, H256, U256};

Expand Down Expand Up @@ -270,7 +270,7 @@ fn accumulating_commits(chunk_size: usize) {
test_accumulated_commits(PatchSet::default(), chunk_size);
}

fn test_root_hash_computing_with_reverts(db: &mut impl Database) {
fn test_root_hash_computing_with_reverts(db: &mut impl PruneDatabase) {
let (kvs, expected_hash) = &*ENTRIES_AND_HASH;
let (initial_update, final_update) = kvs.split_at(75);
let key_updates: Vec<_> = kvs
Expand Down
Loading