diff --git a/core/src/kura.rs b/core/src/kura.rs index 3b75b5e970..bc6278f6ee 100644 --- a/core/src/kura.rs +++ b/core/src/kura.rs @@ -29,17 +29,18 @@ const SIZE_OF_BLOCK_HASH: u64 = Hash::LENGTH as u64; /// The interface of Kura subsystem #[derive(Debug)] pub struct Kura { - /// The mode of initialisation of [`Kura`]. - mode: InitMode, /// The block storage block_store: Mutex, /// The array of block hashes and a slot for an arc of the block. This is normally recovered from the index file. - #[allow(clippy::type_complexity)] - block_data: Mutex, Option>)>>, + block_data: Mutex, /// Path to file for plain text blocks. block_plain_text_path: Option, + /// Amount of blocks loaded during initialization + init_block_count: usize, } +type BlockData = Vec<(HashOf, Option>)>; + impl Kura { /// Initialize Kura and start a thread that receives /// and stores new blocks. @@ -57,26 +58,28 @@ impl Kura { .debug_output_new_blocks .then(|| store_dir.join("blocks.json")); + let block_data = Kura::init(&mut block_store, config.init_mode)?; + let block_count = block_data.len(); + info!(mode=?config.init_mode, block_count, "Kura init complete"); + let kura = Arc::new(Self { - mode: config.init_mode, block_store: Mutex::new(block_store), - block_data: Mutex::new(Vec::new()), + block_data: Mutex::new(block_data), block_plain_text_path, + init_block_count: block_count, }); - let block_count = kura.init()?; - - Ok((kura, block_count)) + Ok((kura, BlockCount(block_count))) } /// Create a kura instance that doesn't write to disk. Instead it serves as a handler /// for in-memory blocks only. pub fn blank_kura_for_testing() -> Arc { Arc::new(Self { - mode: InitMode::Strict, block_store: Mutex::new(BlockStore::new(PathBuf::new())), block_data: Mutex::new(Vec::new()), block_plain_text_path: None, + init_block_count: 0, }) } @@ -105,30 +108,25 @@ impl Kura { /// - file storage is unavailable /// - data in file storage is invalid or corrupted #[iroha_logger::log(skip_all, name = "kura_init")] - fn init(self: &Arc) -> Result { - let mut block_store = self.block_store.lock(); - + fn init(block_store: &mut BlockStore, mode: InitMode) -> Result { let block_index_count: usize = block_store .read_index_count()? .try_into() .expect("INTERNAL BUG: block index count exceeds usize::MAX"); - let block_hashes = match self.mode { + let block_hashes = match mode { InitMode::Fast => { - Kura::init_fast_mode(&block_store, block_index_count).or_else(|error| { + Kura::init_fast_mode(block_store, block_index_count).or_else(|error| { warn!(%error, "Hashes file is broken. Falling back to strict init mode."); - Kura::init_strict_mode(&mut block_store, block_index_count) + Kura::init_strict_mode(block_store, block_index_count) }) } - InitMode::Strict => Kura::init_strict_mode(&mut block_store, block_index_count), + InitMode::Strict => Kura::init_strict_mode(block_store, block_index_count), }?; - let block_count = block_hashes.len(); - info!(mode=?self.mode, block_count, "Kura init complete"); - // The none value is set in order to indicate that the blocks exist on disk but are not yet loaded. - *self.block_data.lock() = block_hashes.into_iter().map(|hash| (hash, None)).collect(); - Ok(BlockCount(block_count)) + let block_data = block_hashes.into_iter().map(|hash| (hash, None)).collect(); + Ok(block_data) } fn init_fast_mode( @@ -193,9 +191,12 @@ impl Kura { kura: &Kura, mut shutdown_receiver: tokio::sync::oneshot::Receiver<()>, ) { - let (mut written_block_count, mut latest_block_hash) = { - let block_data_guard = kura.block_data.lock(); - (block_data_guard.len(), block_data_guard.last().map(|d| d.0)) + let mut written_block_count = kura.init_block_count; + let mut latest_written_block_hash = { + let block_data = kura.block_data.lock(); + written_block_count + .checked_sub(1) + .map(|idx| block_data[idx].0) }; let mut should_exit = false; @@ -206,24 +207,24 @@ impl Kura { should_exit = true; } - let block_data_guard = kura.block_data.lock(); + let block_data = kura.block_data.lock(); - let new_latest_block_hash = block_data_guard.last().map(|d| d.0); - if block_data_guard.len() == written_block_count - && new_latest_block_hash != latest_block_hash - { + let new_latest_written_block_hash = written_block_count + .checked_sub(1) + .map(|idx| block_data[idx].0); + if new_latest_written_block_hash != latest_written_block_hash { written_block_count -= 1; // There has been a soft-fork and we need to rewrite the top block. } - latest_block_hash = new_latest_block_hash; + latest_written_block_hash = new_latest_written_block_hash; - if written_block_count >= block_data_guard.len() { + if written_block_count >= block_data.len() { if should_exit { info!("Kura has written remaining blocks to disk and is shutting down."); return; } - written_block_count = block_data_guard.len(); - drop(block_data_guard); + written_block_count = block_data.len(); + drop(block_data); std::thread::sleep(std::time::Duration::from_millis(1)); continue; } @@ -231,8 +232,8 @@ impl Kura { // If we get here there are blocks to be written. let start_height = written_block_count; let mut blocks_to_be_written = Vec::new(); - while written_block_count < block_data_guard.len() { - let block_ref = block_data_guard[written_block_count].1.as_ref().expect( + while written_block_count < block_data.len() { + let block_ref = block_data[written_block_count].1.as_ref().expect( "INTERNAL BUG: The block to be written is None. Check store_block function.", ); blocks_to_be_written.push(Arc::clone(block_ref)); @@ -240,7 +241,7 @@ impl Kura { } // We don't want to hold up other threads so we drop the lock on the block data. - drop(block_data_guard); + drop(block_data); if let Some(path) = kura.block_plain_text_path.as_ref() { let mut plain_text_file = std::fs::OpenOptions::new() @@ -548,6 +549,7 @@ impl BlockStore { let path = self.path_to_blockchain.join(INDEX_FILE_NAME); let mut index_file = std::fs::OpenOptions::new() .write(true) + .truncate(false) .create(true) .open(path.clone()) .add_err_context(&path)?; @@ -635,6 +637,7 @@ impl BlockStore { let path = self.path_to_blockchain.join(HASHES_FILE_NAME); let mut hashes_file = std::fs::OpenOptions::new() .write(true) + .truncate(false) .create(true) .open(path.clone()) .add_err_context(&path)?; @@ -688,18 +691,21 @@ impl BlockStore { let path = self.path_to_blockchain.join(INDEX_FILE_NAME); std::fs::OpenOptions::new() .write(true) + .truncate(false) .create(true) .open(path.clone()) .add_err_context(&path)?; let path = self.path_to_blockchain.join(DATA_FILE_NAME); std::fs::OpenOptions::new() .write(true) + .truncate(false) .create(true) .open(path.clone()) .add_err_context(&path)?; let path = self.path_to_blockchain.join(HASHES_FILE_NAME); std::fs::OpenOptions::new() .write(true) + .truncate(false) .create(true) .open(path.clone()) .add_err_context(&path)?; @@ -778,12 +784,32 @@ impl AddErrContextExt for Result { #[cfg(test)] mod tests { + use std::{str::FromStr, thread, time::Duration}; use iroha_crypto::KeyPair; + use iroha_data_model::{ + account::Account, + domain::{Domain, DomainId}, + executor::Executor, + isi::Log, + peer::PeerId, + transaction::{TransactionBuilder, WasmSmartContract}, + ChainId, Level, + }; + use iroha_genesis::GenesisBuilder; + use nonzero_ext::nonzero; use tempfile::TempDir; + use test_samples::gen_account_in; use super::*; - use crate::block::ValidBlock; + use crate::{ + block::{BlockBuilder, ValidBlock}, + query::store::LiveQueryStore, + smartcontracts::Registrable, + state::State, + sumeragi::network_topology::Topology, + StateReadOnly, World, + }; fn indices(value: [(u64, u64); N]) -> [BlockIndex; N] { let mut ret = [BlockIndex { @@ -962,4 +988,200 @@ mod tests { }) .unwrap(); } + + #[test] + fn kura_not_miss_replace_block() { + let rt = tokio::runtime::Builder::new_multi_thread() + .enable_time() + .build() + .unwrap(); + + { + let _rt_guard = rt.enter(); + let _logger = iroha_logger::test_logger(); + } + + let temp_dir = TempDir::new().unwrap(); + + let [block_genesis, block, block_soft_fork, block_next] = + create_blocks(&rt).try_into().unwrap(); + + // Create kura and write some blocks + { + let (kura, block_count) = Kura::new(&Config { + init_mode: InitMode::Strict, + store_dir: iroha_config::base::WithOrigin::inline( + temp_dir.path().to_str().unwrap().into(), + ), + debug_output_new_blocks: false, + }) + .unwrap(); + // Starting with empty block store + assert_eq!(block_count.0, 0); + + let _handle = Kura::start(kura.clone()); + + kura.store_block(block_genesis.clone()); + kura.store_block(block); + // Add wait so that previous block is written to the block store + thread::sleep(Duration::from_secs(1)); + kura.replace_top_block(block_soft_fork.clone()); + kura.store_block(block_next.clone()); + } + + // Reinitialize kura and check that correct blocks are loaded + { + let (kura, block_count) = Kura::new(&Config { + init_mode: InitMode::Strict, + store_dir: iroha_config::base::WithOrigin::inline( + temp_dir.path().to_str().unwrap().into(), + ), + debug_output_new_blocks: false, + }) + .unwrap(); + + assert_eq!(block_count.0, 3); + + assert_eq!( + kura.get_block_by_height(nonzero!(1_usize)), + Some(Arc::new(block_genesis.into())) + ); + assert_eq!( + kura.get_block_by_height(nonzero!(2_usize)), + Some(Arc::new(block_soft_fork.into())) + ); + assert_eq!( + kura.get_block_by_height(nonzero!(3_usize)), + Some(Arc::new(block_next.into())) + ); + } + } + + #[allow(clippy::too_many_lines)] + fn create_blocks(rt: &tokio::runtime::Runtime) -> Vec { + let mut blocks = Vec::new(); + + let (leader_public_key, leader_private_key) = KeyPair::random().into_parts(); + let peer_id = PeerId::new("127.0.0.1:8080".parse().unwrap(), leader_public_key); + let topology = Topology::new(vec![peer_id]); + + let chain_id = ChainId::from("00000000-0000-0000-0000-000000000000"); + + let (genesis_id, genesis_key_pair) = gen_account_in("genesis"); + let genesis_domain_id = DomainId::from_str("genesis").expect("Valid"); + let genesis_domain = Domain::new(genesis_domain_id).build(&genesis_id); + let genesis_account = Account::new(genesis_id.clone()).build(&genesis_id); + let (account_id, account_keypair) = gen_account_in("wonderland"); + let domain_id = DomainId::from_str("wonderland").expect("Valid"); + let domain = Domain::new(domain_id).build(&genesis_id); + let account = Account::new(account_id.clone()).build(&genesis_id); + + let live_query_store = LiveQueryStore::test(); + let live_query_store = { + let _rt_guard = rt.enter(); + live_query_store.start() + }; + let state = State::new( + World::with([domain, genesis_domain], [account, genesis_account], []), + Kura::blank_kura_for_testing(), + live_query_store, + ); + + let executor = { + let executor_blob = std::fs::read("../defaults/executor.wasm").unwrap(); + Executor::new(WasmSmartContract::from_compiled(executor_blob)) + }; + let genesis = GenesisBuilder::default().build_and_sign( + executor, + chain_id.clone(), + &genesis_key_pair, + topology.as_ref().to_owned(), + ); + + { + let mut state_block = state.block(); + let block_genesis = ValidBlock::validate( + genesis.0, + &topology, + &chain_id, + &genesis_id, + &mut state_block, + ) + .unpack(|_| {}) + .unwrap() + .commit(&topology) + .unpack(|_| {}) + .unwrap(); + let _events = + state_block.apply_without_execution(&block_genesis, topology.as_ref().to_owned()); + state_block.commit(); + blocks.push(block_genesis); + } + + let tx1 = TransactionBuilder::new(chain_id.clone(), account_id.clone()) + .with_instructions([Log::new(Level::INFO, "msg1".to_string())]) + .sign(account_keypair.private_key()); + + let tx2 = TransactionBuilder::new(chain_id.clone(), account_id) + .with_instructions([Log::new(Level::INFO, "msg2".to_string())]) + .sign(account_keypair.private_key()); + let tx1 = crate::AcceptedTransaction::accept( + tx1, + &chain_id, + state.view().transaction_executor().limits, + ) + .unwrap(); + let tx2 = crate::AcceptedTransaction::accept( + tx2, + &chain_id, + state.view().transaction_executor().limits, + ) + .unwrap(); + + { + let mut state_block = state.block(); + let block = BlockBuilder::new(vec![tx1.clone()]) + .chain(0, &mut state_block) + .sign(&leader_private_key) + .unpack(|_| {}) + .commit(&topology) + .unpack(|_| {}) + .unwrap(); + let _events = state_block.apply_without_execution(&block, topology.as_ref().to_owned()); + state_block.commit(); + blocks.push(block); + } + + { + let mut state_block = state.block_and_revert(); + let block_soft_fork = BlockBuilder::new(vec![tx1]) + .chain(1, &mut state_block) + .sign(&leader_private_key) + .unpack(|_| {}) + .commit(&topology) + .unpack(|_| {}) + .unwrap(); + let _events = + state_block.apply_without_execution(&block_soft_fork, topology.as_ref().to_owned()); + state_block.commit(); + blocks.push(block_soft_fork); + } + + { + let mut state_block: crate::state::StateBlock = state.block(); + let block_next = BlockBuilder::new(vec![tx2]) + .chain(0, &mut state_block) + .sign(&leader_private_key) + .unpack(|_| {}) + .commit(&topology) + .unpack(|_| {}) + .unwrap(); + let _events = + state_block.apply_without_execution(&block_next, topology.as_ref().to_owned()); + state_block.commit(); + blocks.push(block_next); + } + + blocks + } }