Skip to content

Commit

Permalink
fix(pruned mode): prune inputs, keep track of kernel/utxo sum
Browse files Browse the repository at this point in the history
  • Loading branch information
sdbondi committed Nov 1, 2021
1 parent c162f07 commit 411907e
Show file tree
Hide file tree
Showing 25 changed files with 606 additions and 686 deletions.
693 changes: 346 additions & 347 deletions Cargo.lock

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ use crate::{
},
sync::rpc,
},
blocks::BlockHeader,
chain_storage::{async_db::AsyncBlockchainDb, BlockchainBackend, ChainStorageError, MmrTree, PrunedOutput},
blocks::{BlockHeader, UpdateBlockAccumulatedData},
chain_storage::{async_db::AsyncBlockchainDb, BlockchainBackend, ChainStorageError, MmrTree},
proto::base_node::{
sync_utxo as proto_sync_utxo,
sync_utxos_response::UtxoOrDeleted,
Expand All @@ -44,16 +44,10 @@ use crate::{
use croaring::Bitmap;
use futures::StreamExt;
use log::*;
use std::{
convert::{TryFrom, TryInto},
sync::Arc,
};
use tari_common_types::types::{HashDigest, RangeProofService};
use std::convert::{TryFrom, TryInto};
use tari_common_types::types::{Commitment, HashDigest, RangeProofService};
use tari_comms::PeerConnection;
use tari_crypto::{
commitment::HomomorphicCommitment,
tari_utilities::{hex::Hex, Hashable},
};
use tari_crypto::tari_utilities::{hex::Hex, Hashable};
use tari_mmr::{MerkleMountainRange, MutableMmr};

const LOG_TARGET: &str = "c::bn::state_machine_service::states::horizon_state_sync";
Expand All @@ -65,6 +59,8 @@ pub struct HorizonStateSynchronization<'a, B: BlockchainBackend> {
prover: &'a RangeProofService,
num_kernels: u64,
num_outputs: u64,
kernel_sum: Commitment,
utxo_sum: Commitment,
}

impl<'a, B: BlockchainBackend + 'static> HorizonStateSynchronization<'a, B> {
Expand All @@ -81,6 +77,8 @@ impl<'a, B: BlockchainBackend + 'static> HorizonStateSynchronization<'a, B> {
prover,
num_kernels: 0,
num_outputs: 0,
kernel_sum: Default::default(),
utxo_sum: Default::default(),
}
}

Expand Down Expand Up @@ -119,19 +117,38 @@ impl<'a, B: BlockchainBackend + 'static> HorizonStateSynchronization<'a, B> {
client: &mut rpc::BaseNodeSyncRpcClient,
to_header: &BlockHeader,
) -> Result<(), HorizonSyncError> {
self.initialize().await?;
debug!(target: LOG_TARGET, "Synchronizing kernels");
self.synchronize_kernels(client, to_header).await?;
debug!(target: LOG_TARGET, "Synchronizing outputs");
self.synchronize_outputs(client, to_header).await?;
Ok(())
}

async fn initialize(&mut self) -> Result<(), HorizonSyncError> {
let metadata = self.db().get_chain_metadata().await?;
let data = self
.db()
.fetch_block_accumulated_data(metadata.best_block().clone())
.await?;
self.utxo_sum = data.utxo_sum().clone();
self.kernel_sum = data.kernel_sum().clone();
debug!(
target: LOG_TARGET,
"Loaded utxo_sum = {}, kernel_sum = {}",
self.utxo_sum.to_hex(),
self.kernel_sum.to_hex()
);
Ok(())
}

async fn synchronize_kernels(
&mut self,
client: &mut rpc::BaseNodeSyncRpcClient,
to_header: &BlockHeader,
) -> Result<(), HorizonSyncError> {
let local_num_kernels = self.db().fetch_mmr_size(MmrTree::Kernel).await?;
let metadata = self.db().get_chain_metadata().await?;

let remote_num_kernels = to_header.kernel_mmr_size;
self.num_kernels = remote_num_kernels;
Expand Down Expand Up @@ -192,6 +209,8 @@ impl<'a, B: BlockchainBackend + 'static> HorizonStateSynchronization<'a, B> {
.map_err(HorizonSyncError::InvalidKernelSignature)?;

kernels.push(kernel.clone());
self.kernel_sum = &self.kernel_sum + &kernel.excess;
error!(target: LOG_TARGET, "DEBUG: kernel_sum = {}", self.kernel_sum.to_hex());
txn.insert_kernel_via_horizon_sync(kernel, current_header.hash().clone(), mmr_position as u32);
if mmr_position == current_header.header().kernel_mmr_size - 1 {
debug!(
Expand Down Expand Up @@ -221,11 +240,16 @@ impl<'a, B: BlockchainBackend + 'static> HorizonStateSynchronization<'a, B> {
});
}

txn.update_pruned_hash_set(
MmrTree::Kernel,
let kernel_hash_set = kernel_mmr.get_pruned_hash_set()?;
txn.update_block_accumulated_data_via_horizon_sync(
current_header.hash().clone(),
kernel_mmr.get_pruned_hash_set()?,
UpdateBlockAccumulatedData {
kernel_sum: Some(self.kernel_sum.clone()),
kernel_hash_set: Some(kernel_hash_set),
..Default::default()
},
);
txn.set_pruned_height(metadata.pruned_height(), self.kernel_sum.clone(), self.utxo_sum.clone());

txn.commit().await?;
if mmr_position < end - 1 {
Expand Down Expand Up @@ -258,6 +282,8 @@ impl<'a, B: BlockchainBackend + 'static> HorizonStateSynchronization<'a, B> {
) -> Result<(), HorizonSyncError> {
let local_num_outputs = self.db().fetch_mmr_size(MmrTree::Utxo).await?;

let metadata = self.db().get_chain_metadata().await?;

let remote_num_outputs = to_header.output_mmr_size;
self.num_outputs = remote_num_outputs;

Expand Down Expand Up @@ -322,10 +348,11 @@ impl<'a, B: BlockchainBackend + 'static> HorizonStateSynchronization<'a, B> {
let block_data = db
.fetch_block_accumulated_data(current_header.header().prev_hash.clone())
.await?;
let (_, output_pruned_set, rp_pruned_set, mut full_bitmap) = block_data.dissolve();
let (_, output_pruned_set, witness_pruned_set, _) = block_data.dissolve();
let mut full_bitmap = self.db().fetch_deleted_bitmap_at_tip().await?.into_bitmap();

let mut output_mmr = MerkleMountainRange::<HashDigest, _>::new(output_pruned_set);
let mut witness_mmr = MerkleMountainRange::<HashDigest, _>::new(rp_pruned_set);
let mut witness_mmr = MerkleMountainRange::<HashDigest, _>::new(witness_pruned_set);

while let Some(response) = output_stream.next().await {
let res: SyncUtxosResponse = response?;
Expand Down Expand Up @@ -356,6 +383,7 @@ impl<'a, B: BlockchainBackend + 'static> HorizonStateSynchronization<'a, B> {
output_hashes.push(output.hash());
witness_hashes.push(output.witness_hash());
unpruned_outputs.push(output.clone());
self.utxo_sum = &self.utxo_sum + &output.commitment;
txn.insert_output_via_horizon_sync(
output,
current_header.hash().clone(),
Expand Down Expand Up @@ -415,8 +443,8 @@ impl<'a, B: BlockchainBackend + 'static> HorizonStateSynchronization<'a, B> {
witness_mmr.push(hash)?;
}

// Check that the difference bitmap is excessively large. Bitmap::deserialize panics if greater than
// isize::MAX, however isize::MAX is still an inordinate amount of data. An
// Check that the difference bitmap isn't excessively large. Bitmap::deserialize panics if greater
// than isize::MAX, however isize::MAX is still an inordinate amount of data. An
// arbitrary 4 MiB limit is used.
const MAX_DIFF_BITMAP_BYTE_LEN: usize = 4 * 1024 * 1024;
if diff_bitmap.len() > MAX_DIFF_BITMAP_BYTE_LEN {
Expand Down Expand Up @@ -471,14 +499,19 @@ impl<'a, B: BlockchainBackend + 'static> HorizonStateSynchronization<'a, B> {
}

txn.update_deleted_bitmap(diff_bitmap.clone());
txn.update_pruned_hash_set(MmrTree::Utxo, current_header.hash().clone(), pruned_output_set);
txn.update_pruned_hash_set(
MmrTree::Witness,

let witness_hash_set = witness_mmr.get_pruned_hash_set()?;
txn.update_block_accumulated_data_via_horizon_sync(
current_header.hash().clone(),
witness_mmr.get_pruned_hash_set()?,
UpdateBlockAccumulatedData {
utxo_sum: Some(self.utxo_sum.clone()),
utxo_hash_set: Some(pruned_output_set),
witness_hash_set: Some(witness_hash_set),
deleted_diff: Some(diff_bitmap.into()),
..Default::default()
},
);
txn.update_block_accumulated_data_with_deleted_diff(current_header.hash().clone(), diff_bitmap);

txn.set_pruned_height(metadata.pruned_height(), self.kernel_sum.clone(), self.utxo_sum.clone());
txn.commit().await?;

current_header = db.fetch_chain_header(current_header.height() + 1).await?;
Expand Down Expand Up @@ -518,99 +551,25 @@ impl<'a, B: BlockchainBackend + 'static> HorizonStateSynchronization<'a, B> {
async fn finalize_horizon_sync(&mut self) -> Result<(), HorizonSyncError> {
debug!(target: LOG_TARGET, "Validating horizon state");

let info = HorizonSyncInfo::new(
self.shared.set_state_info(StateInfo::HorizonSync(HorizonSyncInfo::new(
vec![self.sync_peer.peer_node_id().clone()],
HorizonSyncStatus::Finalizing,
);
self.shared.set_state_info(StateInfo::HorizonSync(info));
)));

let header = self.db().fetch_chain_header(self.horizon_sync_height).await?;
let mut pruned_utxo_sum = HomomorphicCommitment::default();
let mut pruned_kernel_sum = HomomorphicCommitment::default();

let mut prev_mmr = 0;
let mut prev_kernel_mmr = 0;
let bitmap = Arc::new(
self.db()
.fetch_complete_deleted_bitmap_at(header.hash().clone())
.await?
.into_bitmap(),
);
let expected_prev_best_block = self.shared.db.get_chain_metadata().await?.best_block().clone();
for h in 0..=header.height() {
let curr_header = self.db().fetch_chain_header(h).await?;

trace!(
target: LOG_TARGET,
"Fetching utxos from db: height:{}, header.output_mmr:{}, prev_mmr:{}, end:{}",
curr_header.height(),
curr_header.header().output_mmr_size,
prev_mmr,
curr_header.header().output_mmr_size - 1
);
let (utxos, _) = self
.db()
.fetch_utxos_by_mmr_position(prev_mmr, curr_header.header().output_mmr_size - 1, bitmap.clone())
.await?;
trace!(
target: LOG_TARGET,
"Fetching kernels from db: height:{}, header.kernel_mmr:{}, prev_mmr:{}, end:{}",
curr_header.height(),
curr_header.header().kernel_mmr_size,
prev_kernel_mmr,
curr_header.header().kernel_mmr_size - 1
);
let kernels = self
.db()
.fetch_kernels_by_mmr_position(prev_kernel_mmr, curr_header.header().kernel_mmr_size - 1)
.await?;

let mut utxo_sum = HomomorphicCommitment::default();
debug!(target: LOG_TARGET, "Number of kernels returned: {}", kernels.len());
debug!(target: LOG_TARGET, "Number of utxos returned: {}", utxos.len());
let mut prune_counter = 0;
for u in utxos {
match u {
PrunedOutput::NotPruned { output } => {
utxo_sum = &output.commitment + &utxo_sum;
},
_ => {
prune_counter += 1;
},
}
}
if prune_counter > 0 {
debug!(target: LOG_TARGET, "Pruned {} outputs", prune_counter);
}
prev_mmr = curr_header.header().output_mmr_size;

pruned_utxo_sum = &utxo_sum + &pruned_utxo_sum;

for k in kernels {
pruned_kernel_sum = &k.excess + &pruned_kernel_sum;
}
prev_kernel_mmr = curr_header.header().kernel_mmr_size;

trace!(
target: LOG_TARGET,
"Height: {} Kernel sum:{:?} Pruned UTXO sum: {:?}",
h,
pruned_kernel_sum,
pruned_utxo_sum
);
}

self.shared
.sync_validators
.final_horizon_state
.validate(
&*self.db().clone().into_inner().db_read_access()?,
&*self.db().inner().db_read_access()?,
header.height(),
&pruned_utxo_sum,
&pruned_kernel_sum,
&self.utxo_sum,
&self.kernel_sum,
)
.map_err(HorizonSyncError::FinalStateValidationFailed)?;

let metadata = self.db().get_chain_metadata().await?;
info!(
target: LOG_TARGET,
"Horizon state validation succeeded! Committing horizon state."
Expand All @@ -621,9 +580,9 @@ impl<'a, B: BlockchainBackend + 'static> HorizonStateSynchronization<'a, B> {
header.height(),
header.hash().clone(),
header.accumulated_data().total_accumulated_difficulty,
expected_prev_best_block,
metadata.best_block().clone(),
)
.set_pruned_height(header.height(), pruned_kernel_sum, pruned_utxo_sum)
.set_pruned_height(header.height(), self.kernel_sum.clone(), self.utxo_sum.clone())
.commit()
.await?;

Expand Down
41 changes: 32 additions & 9 deletions base_layer/core/src/blocks/accumulated_data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,43 +49,55 @@ use tari_mmr::{pruned_hashset::PrunedHashSet, ArrayLike};

const LOG_TARGET: &str = "c::bn::acc_data";

#[derive(Debug, Serialize, Deserialize)]
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct BlockAccumulatedData {
pub(crate) kernels: PrunedHashSet,
pub(crate) outputs: PrunedHashSet,
pub(crate) witness: PrunedHashSet,
pub(crate) deleted: DeletedBitmap,
pub(crate) range_proofs: PrunedHashSet,
pub(crate) kernel_sum: Commitment,
pub(crate) utxo_sum: Commitment,
}

impl BlockAccumulatedData {
pub fn new(
kernels: PrunedHashSet,
outputs: PrunedHashSet,
range_proofs: PrunedHashSet,
witness: PrunedHashSet,
deleted: Bitmap,
total_kernel_sum: Commitment,
kernel_sum: Commitment,
utxo_sum: Commitment,
) -> Self {
Self {
kernels,
outputs,
range_proofs,
witness,
deleted: DeletedBitmap { deleted },
kernel_sum: total_kernel_sum,
kernel_sum,
utxo_sum,
}
}

pub fn deleted(&self) -> &Bitmap {
&self.deleted.deleted
}

pub fn set_deleted(&mut self, deleted: DeletedBitmap) -> &mut Self {
self.deleted = deleted;
self
}

pub fn dissolve(self) -> (PrunedHashSet, PrunedHashSet, PrunedHashSet, Bitmap) {
(self.kernels, self.outputs, self.range_proofs, self.deleted.deleted)
(self.kernels, self.outputs, self.witness, self.deleted.deleted)
}

pub fn kernel_sum(&self) -> &Commitment {
&self.kernel_sum
}

pub fn utxo_sum(&self) -> &Commitment {
&self.utxo_sum
}
}

impl Default for BlockAccumulatedData {
Expand All @@ -96,8 +108,9 @@ impl Default for BlockAccumulatedData {
deleted: DeletedBitmap {
deleted: Bitmap::create(),
},
range_proofs: Default::default(),
witness: Default::default(),
kernel_sum: Default::default(),
utxo_sum: Default::default(),
}
}
}
Expand All @@ -110,11 +123,21 @@ impl Display for BlockAccumulatedData {
self.outputs.len().unwrap_or(0),
self.deleted.deleted.cardinality(),
self.kernels.len().unwrap_or(0),
self.range_proofs.len().unwrap_or(0)
self.witness.len().unwrap_or(0)
)
}
}

#[derive(Debug, Clone, Default)]
pub struct UpdateBlockAccumulatedData {
pub kernel_hash_set: Option<PrunedHashSet>,
pub utxo_hash_set: Option<PrunedHashSet>,
pub witness_hash_set: Option<PrunedHashSet>,
pub deleted_diff: Option<DeletedBitmap>,
pub utxo_sum: Option<Commitment>,
pub kernel_sum: Option<Commitment>,
}

/// Wrapper struct to serialize and deserialize Bitmap
#[derive(Debug, Clone)]
pub struct DeletedBitmap {
Expand Down
Loading

0 comments on commit 411907e

Please sign in to comment.