Skip to content

Commit

Permalink
prune mode sync from prune node
Browse files Browse the repository at this point in the history
  • Loading branch information
hansieodendaal committed Feb 1, 2024
1 parent 7d886e6 commit c37620c
Show file tree
Hide file tree
Showing 5 changed files with 246 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -70,20 +70,12 @@ impl DecideNextSync {
.filter(|sync_peer| {
let remote_metadata = sync_peer.claimed_chain_metadata();
debug!(target: LOG_TARGET, "Peer metadata: {}", remote_metadata);
let remote_is_archival_node = remote_metadata.pruned_height() == 0;
let general_sync_conditions =
// Must be able to provide the correct amount of full blocks past the pruned height (i.e. the
// pruning horizon), otherwise our horizon spec will not be met
remote_metadata.best_block_height().saturating_sub(remote_metadata.pruned_height()) >=
local_metadata.pruning_horizon() &&
// Must have a better blockchain tip than us
remote_metadata.best_block_height() > local_metadata.best_block_height() &&
// Must be able to provide full blocks from the height we need detailed information
remote_metadata.pruned_height() <= local_metadata.best_block_height();
let sync_from_prune_node = !remote_is_archival_node &&
// Must have done initial sync (to detect genesis TXO spends)
local_metadata.best_block_height() > 0;
general_sync_conditions && (remote_is_archival_node || sync_from_prune_node)
// Must be able to provide the correct amount of full blocks past the pruned height (i.e. the
// pruning horizon), otherwise our horizon spec will not be met
remote_metadata.best_block_height().saturating_sub(remote_metadata.pruned_height()) >=
local_metadata.pruning_horizon() &&
// Must have a better blockchain tip than us
remote_metadata.best_block_height() > local_metadata.best_block_height()
})
.collect::<Vec<_>>();

Expand Down
42 changes: 42 additions & 0 deletions base_layer/core/src/base_node/sync/rpc/sync_utxos_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,39 @@ where B: BlockchainBackend + 'static
current_header.hash().to_hex(),
end_header.hash().to_hex(),
);

// If this is a pruned node and outputs have been requested for an initial sync, we need to discover and send
// the outputs from the genesis block that have been pruned as well
let mut pruned_genesis_block_outputs = Vec::new();
let metadata = self
.db
.get_chain_metadata()
.await
.rpc_status_internal_error(LOG_TARGET)?;
if current_header.height == 1 && metadata.is_pruned_node() {
let genesis_block = self.db.fetch_genesis_block();
for output in genesis_block.block().body.outputs() {
let output_hash = output.hash();
if self
.db
.fetch_output(output_hash)
.await
.rpc_status_internal_error(LOG_TARGET)?
.is_none()
{
trace!(
target: LOG_TARGET,
"Spent genesis TXO (commitment '{}') to peer",
output.commitment.to_hex()
);
pruned_genesis_block_outputs.push(Ok(SyncUtxosResponse {
txo: Some(Txo::Commitment(output.commitment.as_bytes().to_vec())),
mined_header: current_header.hash().to_vec(),
}));
}
}
}

let start_header = current_header.clone();
loop {
let timer = Instant::now();
Expand Down Expand Up @@ -248,6 +281,15 @@ where B: BlockchainBackend + 'static
let mut txos = Vec::with_capacity(outputs.len() + inputs.len());
txos.append(&mut outputs);
txos.append(&mut inputs);
if start_header == current_header {
debug!(
target: LOG_TARGET,
"Adding {} genesis block pruned inputs in response for block #{} '{}'", pruned_genesis_block_outputs.len(),
current_header.height,
current_header_hash
);
txos.append(&mut pruned_genesis_block_outputs);
}
let txos = txos.into_iter();

// Ensure task stops if the peer prematurely stops their RPC session
Expand Down
4 changes: 4 additions & 0 deletions base_layer/core/src/chain_storage/async_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,10 @@ impl<B: BlockchainBackend + 'static> AsyncBlockchainDb<B> {
pub fn inner(&self) -> &BlockchainDatabase<B> {
&self.db
}

pub fn fetch_genesis_block(&self) -> ChainBlock {
self.db.fetch_genesis_block()
}
}

impl<B: BlockchainBackend + 'static> AsyncBlockchainDb<B> {
Expand Down
19 changes: 12 additions & 7 deletions base_layer/core/src/chain_storage/blockchain_database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,11 @@ where B: BlockchainBackend
Ok(blockchain_db)
}

/// Get the genesis block form the consensus manager
pub fn fetch_genesis_block(&self) -> ChainBlock {
self.consensus_manager.get_genesis_block()
}

/// Returns a reference to the consensus cosntants at the current height
pub fn consensus_constants(&self) -> Result<&ConsensusConstants, ChainStorageError> {
let height = self.get_height()?;
Expand Down Expand Up @@ -2363,18 +2368,18 @@ fn prune_database_if_needed<T: BlockchainBackend>(
return Ok(());
}

let db_height = metadata.best_block_height();
let abs_pruning_horizon = db_height.saturating_sub(pruning_horizon);

let prune_to_height_target = metadata.best_block_height().saturating_sub(pruning_horizon);
debug!(
target: LOG_TARGET,
"Current pruned height is: {}, pruning horizon is: {}, while the pruning interval is: {}",
"Blockchain height: {}, pruning horizon: {}, pruned height: {}, prune to height target: {}, pruning interval: {}",
metadata.best_block_height(),
metadata.pruning_horizon(),
metadata.pruned_height(),
abs_pruning_horizon,
prune_to_height_target,
pruning_interval,
);
if metadata.pruned_height() < abs_pruning_horizon.saturating_sub(pruning_interval) {
prune_to_height(db, abs_pruning_horizon)?;
if metadata.pruned_height() < prune_to_height_target.saturating_sub(pruning_interval) {
prune_to_height(db, prune_to_height_target)?;
}

Ok(())
Expand Down
187 changes: 182 additions & 5 deletions base_layer/core/tests/tests/horizon_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,9 @@ use crate::helpers::{

#[allow(clippy::too_many_lines)]
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn test_horizon_sync_from_archival_node_happy_path() {
async fn test_initial_horizon_sync_from_archival_node_happy_path() {
//` cargo test --release --test core_integration_tests
//` tests::horizon_sync::test_horizon_sync_from_archival_node_happy_path > .\target\output.txt 2>&1
//` tests::horizon_sync::test_initial_horizon_sync_from_archival_node_happy_path > .\target\output.txt 2>&1
// env_logger::init(); // Set `$env:RUST_LOG = "trace"`

// Create the network with Alice (pruning node) and Bob (archival node)
Expand Down Expand Up @@ -285,9 +285,9 @@ async fn test_horizon_sync_from_archival_node_happy_path() {

#[allow(clippy::too_many_lines)]
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn test_horizon_sync_from_prune_node_happy_path() {
async fn test_consecutive_horizon_sync_from_prune_node_happy_path() {
//` cargo test --release --test core_integration_tests
//` tests::horizon_sync::test_horizon_sync_from_prune_node_happy_path > .\target\output.txt 2>&1
//` tests::horizon_sync::test_initial_horizon_sync_from_prune_node_happy_path > .\target\output.txt 2>&1
// env_logger::init(); // Set `$env:RUST_LOG = "trace"`

// Create the network with Alice (pruning node) and Bob (archival node) and Carol (pruning node)
Expand Down Expand Up @@ -657,8 +657,185 @@ async fn test_horizon_sync_from_prune_node_happy_path() {
alice_node.blockchain_db.get_height().unwrap(),
alice_header_height - pruning_horizon_alice
);
// Carol will not be banned
assert!(!sync::wait_for_is_peer_banned(&alice_node, carol_node.node_identity.node_id(), 1).await);
}

#[allow(clippy::too_many_lines)]
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn test_initial_horizon_sync_from_prune_node_happy_path() {
//` cargo test --release --test core_integration_tests
//` tests::horizon_sync::test_initial_horizon_sync_from_prune_node_happy_path > .\target\output.txt 2>&1
// env_logger::init(); // Set `$env:RUST_LOG = "trace"`

// Create the network with Alice (pruning node) and Bob (archival node) and Carol (pruning node)
let pruning_horizon_alice = 4;
let pruning_horizon_carol = 12;
let (mut state_machines, mut peer_nodes, initial_block, consensus_manager, key_manager, initial_coinbase) =
sync::create_network_with_multiple_nodes(vec![
// Alice is a pruned node
BlockchainDatabaseConfig {
orphan_storage_capacity: 5,
pruning_horizon: pruning_horizon_alice,
pruning_interval: 5,
track_reorgs: false,
cleanup_orphans_at_startup: false,
},
// Carol is a pruned node
BlockchainDatabaseConfig {
orphan_storage_capacity: 5,
pruning_horizon: pruning_horizon_carol,
pruning_interval: 5,
track_reorgs: false,
cleanup_orphans_at_startup: false,
},
// Bob is an archival node
BlockchainDatabaseConfig::default(),
])
.await;
let mut alice_state_machine = state_machines.remove(0);
let mut carol_state_machine = state_machines.remove(0);
let alice_node = peer_nodes.remove(0);
let carol_node = peer_nodes.remove(0);
let bob_node = peer_nodes.remove(0);

// Create a blockchain that spends the genesys coinbase early on and then later spends some more coinbase outputs
let follow_up_coinbases_to_spend = 5;
let (_blocks, _coinbases) = sync::create_block_chain_with_transactions(
&bob_node,
&initial_block,
&initial_coinbase,
&consensus_manager,
&key_manager,
min(pruning_horizon_alice, pruning_horizon_carol),
28, // > follow_up_transaction_in_block + pruning_horizon_carol + 1
2, // < pruning_horizon_alice, < pruning_horizon_carol
14, // > pruning_horizon_alice, > pruning_horizon_carol
follow_up_coinbases_to_spend, // > spend_genesis_coinbase_in_block - 1, < follow_up_transaction_in_block
)
.await;

// 1. Carol attempts initial horizon sync from Bob archival node (to pruning height 16)
println!("\n1. Carol attempts initial horizon sync from Bob archival node (to pruning height 16)\n");

let output_hash = initial_coinbase.hash(&key_manager).await.unwrap();
assert!(carol_node.blockchain_db.fetch_output(output_hash).unwrap().is_some());
let commitment = initial_coinbase.commitment(&key_manager).await.unwrap();
assert!(carol_node
.blockchain_db
.fetch_unspent_output_hash_by_commitment(commitment.clone())
.unwrap()
.is_some());

let mut header_sync_carol_from_bob = sync::initialize_sync_headers_with_ping_pong_data(&carol_node, &bob_node);
let event = sync::sync_headers_execute(&mut carol_state_machine, &mut header_sync_carol_from_bob).await;
let carol_header_height = carol_node.blockchain_db.fetch_last_header().unwrap().height;
println!("Event: {} to header {}", state_event(&event), carol_header_height);
assert_eq!(carol_header_height, 28);
let event = decide_horizon_sync(&mut carol_state_machine, header_sync_carol_from_bob).await;
let mut horizon_sync = match event {
StateEvent::ProceedToHorizonSync(sync_peers) => HorizonStateSync::from(sync_peers),
_ => panic!("1. Carol should proceed to horizon sync"),
};
let event = sync::horizon_sync_execute(&mut carol_state_machine, &mut horizon_sync).await;

println!(
"Event: {} to block {}",
state_event(&event),
carol_node.blockchain_db.get_height().unwrap()
);
assert_eq!(event, StateEvent::HorizonStateSynchronized);
assert_eq!(
carol_node.blockchain_db.get_height().unwrap(),
carol_header_height - pruning_horizon_carol
);

assert!(carol_node.blockchain_db.fetch_output(output_hash).unwrap().is_none());
assert!(carol_node
.blockchain_db
.fetch_unspent_output_hash_by_commitment(commitment.clone())
.unwrap()
.is_none());

// Bob will not be banned
assert!(!sync::wait_for_is_peer_banned(&alice_node, bob_node.node_identity.node_id(), 1).await);
assert!(!sync::wait_for_is_peer_banned(&carol_node, bob_node.node_identity.node_id(), 1).await);

// 2. Carol attempts block sync from Bob to the tip (to height 28)
println!("\n2. Carol attempts block sync from Bob to the tip (to height 28)\n");

let mut block_sync = sync::initialize_sync_blocks(&bob_node);
let event = sync::sync_blocks_execute(&mut carol_state_machine, &mut block_sync).await;
println!(
"Event: {} to block {}",
state_event(&event),
carol_node.blockchain_db.get_height().unwrap()
);
assert_eq!(event, StateEvent::BlocksSynchronized);
assert_eq!(
carol_node.blockchain_db.get_height().unwrap(),
carol_node.blockchain_db.fetch_last_header().unwrap().height
);
// Bob will not be banned
assert!(!sync::wait_for_is_peer_banned(&carol_node, bob_node.node_identity.node_id(), 1).await);

// 3. Alice attempts initial horizon sync from Carol prune node (to height 24)
println!("\n3. Alice attempts initial horizon sync from Carol prune node (to height 24)\n");

assert!(alice_node.blockchain_db.fetch_output(output_hash).unwrap().is_some());
assert!(alice_node
.blockchain_db
.fetch_unspent_output_hash_by_commitment(commitment.clone())
.unwrap()
.is_some());

let mut header_sync_alice_from_carol = sync::initialize_sync_headers_with_ping_pong_data(&alice_node, &carol_node);
let event = sync::sync_headers_execute(&mut alice_state_machine, &mut header_sync_alice_from_carol).await;
let alice_header_height = alice_node.blockchain_db.fetch_last_header().unwrap().height;
println!("Event: {} to header {}", state_event(&event), alice_header_height);
assert_eq!(alice_header_height, 28);
let event = decide_horizon_sync(&mut alice_state_machine, header_sync_alice_from_carol).await;
let mut horizon_sync = match event {
StateEvent::ProceedToHorizonSync(sync_peers) => HorizonStateSync::from(sync_peers),
_ => panic!("3. Alice should proceed to horizon sync"),
};
let event = sync::horizon_sync_execute(&mut alice_state_machine, &mut horizon_sync).await;

println!(
"Event: {} to block {}",
state_event(&event),
alice_node.blockchain_db.get_height().unwrap()
);
assert_eq!(event, StateEvent::HorizonStateSynchronized);
assert_eq!(
alice_node.blockchain_db.get_height().unwrap(),
alice_header_height - pruning_horizon_alice
);

assert!(alice_node.blockchain_db.fetch_output(output_hash).unwrap().is_none());
assert!(alice_node
.blockchain_db
.fetch_unspent_output_hash_by_commitment(commitment.clone())
.unwrap()
.is_none());

// Carol will not be banned
assert!(!sync::wait_for_is_peer_banned(&alice_node, carol_node.node_identity.node_id(), 1).await);

// 4. Alice attempts block sync from Carol prune node to the tip (to height 28)
println!("\n4. Alice attempts block sync from Carol prune node to the tip (to height 28)\n");

let mut block_sync = sync::initialize_sync_blocks(&carol_node);
let event = sync::sync_blocks_execute(&mut alice_state_machine, &mut block_sync).await;
println!(
"Event: {} to block {}",
state_event(&event),
alice_node.blockchain_db.get_height().unwrap()
);
assert_eq!(event, StateEvent::BlocksSynchronized);
assert_eq!(
alice_node.blockchain_db.get_height().unwrap(),
alice_node.blockchain_db.fetch_last_header().unwrap().height
);
// Carol will not be banned
assert!(!sync::wait_for_is_peer_banned(&alice_node, carol_node.node_identity.node_id(), 1).await);
}

0 comments on commit c37620c

Please sign in to comment.