From 459481de080e2be2a860dcaae0c8e9a69d49f353 Mon Sep 17 00:00:00 2001 From: Shanin Roman Date: Mon, 22 Jul 2024 16:42:25 +0300 Subject: [PATCH 1/4] test(kura): add test kura not miss replace top block call Signed-off-by: Shanin Roman --- Cargo.lock | 1 + core/Cargo.toml | 1 + core/src/kura.rs | 228 ++++++++++++++++++++++++++++++++++++++++++++++- 3 files changed, 229 insertions(+), 1 deletion(-) diff --git a/Cargo.lock b/Cargo.lock index 4193c4bd693..c3b7d9557a9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3023,6 +3023,7 @@ dependencies = [ "iroha_primitives", "iroha_telemetry", "iroha_version", + "iroha_wasm_builder", "iroha_wasm_codec", "nonzero_ext", "once_cell", diff --git a/core/Cargo.toml b/core/Cargo.toml index 5a83eef9911..a39ff5f824e 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -69,6 +69,7 @@ indexmap = "2.2.6" [dev-dependencies] iroha_executor_data_model = { workspace = true } +iroha_wasm_builder = {workspace = true } test_samples = { workspace = true } criterion = { workspace = true } diff --git a/core/src/kura.rs b/core/src/kura.rs index 3b75b5e970b..e5eb64ebc8a 100644 --- a/core/src/kura.rs +++ b/core/src/kura.rs @@ -778,12 +778,33 @@ impl AddErrContextExt for Result { #[cfg(test)] mod tests { + use std::{str::FromStr, thread, time::Duration}; use iroha_crypto::KeyPair; + use iroha_data_model::{ + account::Account, + domain::{Domain, DomainId}, + executor::Executor, + isi::Log, + peer::PeerId, + transaction::{TransactionBuilder, WasmSmartContract}, + ChainId, Level, + }; + use iroha_genesis::GenesisBuilder; + use iroha_primitives::unique_vec::UniqueVec; + use nonzero_ext::nonzero; use tempfile::TempDir; + use test_samples::gen_account_in; use super::*; - use crate::block::ValidBlock; + use crate::{ + block::{BlockBuilder, ValidBlock}, + query::store::LiveQueryStore, + smartcontracts::Registrable, + state::State, + sumeragi::network_topology::Topology, + StateReadOnly, World, + }; fn indices(value: [(u64, u64); N]) -> [BlockIndex; N] { let mut ret = [BlockIndex { @@ -962,4 +983,209 @@ mod tests { }) .unwrap(); } + + #[test] + fn kura_not_miss_replace_block() { + let rt = tokio::runtime::Builder::new_multi_thread() + .enable_time() + .build() + .unwrap(); + + { + let _rt_guard = rt.enter(); + let _logger = iroha_logger::test_logger(); + } + + let temp_dir = TempDir::new().unwrap(); + + let [block_genesis, block, block_soft_fork, block_next] = + create_blocks(&rt).try_into().unwrap(); + + // Create kura and write some blocks + { + let (kura, block_count) = Kura::new(&Config { + init_mode: InitMode::Strict, + store_dir: iroha_config::base::WithOrigin::inline( + temp_dir.path().to_str().unwrap().into(), + ), + debug_output_new_blocks: false, + }) + .unwrap(); + // Starting with empty block store + assert_eq!(block_count.0, 0); + + let _handle = Kura::start(kura.clone()); + + kura.store_block(block_genesis.clone()); + kura.store_block(block); + // Add wait so that previous block is written to the block store + thread::sleep(Duration::from_secs(1)); + kura.replace_top_block(block_soft_fork.clone()); + kura.store_block(block_next.clone()); + } + + // Reinitialize kura and check that correct blocks are loaded + { + let (kura, block_count) = Kura::new(&Config { + init_mode: InitMode::Strict, + store_dir: iroha_config::base::WithOrigin::inline( + temp_dir.path().to_str().unwrap().into(), + ), + debug_output_new_blocks: false, + }) + .unwrap(); + + assert_eq!(block_count.0, 3); + + assert_eq!( + kura.get_block_by_height(nonzero!(1_usize)), + Some(Arc::new(block_genesis.into())) + ); + assert_eq!( + kura.get_block_by_height(nonzero!(2_usize)), + Some(Arc::new(block_soft_fork.into())) + ); + assert_eq!( + kura.get_block_by_height(nonzero!(3_usize)), + Some(Arc::new(block_next.into())) + ); + } + } + + #[allow(clippy::too_many_lines)] + fn create_blocks(rt: &tokio::runtime::Runtime) -> Vec { + let mut blocks = Vec::new(); + + let (leader_public_key, leader_private_key) = KeyPair::random().into_parts(); + let peer_id = PeerId::new("127.0.0.1:8080".parse().unwrap(), leader_public_key); + let topology = Topology::new(vec![peer_id]); + + let chain_id = ChainId::from("00000000-0000-0000-0000-000000000000"); + + let (genesis_id, genesis_key_pair) = gen_account_in("genesis"); + let genesis_domain_id = DomainId::from_str("genesis").expect("Valid"); + let genesis_domain = Domain::new(genesis_domain_id).build(&genesis_id); + let genesis_account = Account::new(genesis_id.clone()).build(&genesis_id); + let (account_id, account_keypair) = gen_account_in("wonderland"); + let domain_id = DomainId::from_str("wonderland").expect("Valid"); + let domain = Domain::new(domain_id).build(&genesis_id); + let account = Account::new(account_id.clone()).build(&genesis_id); + + let live_query_store = LiveQueryStore::test(); + let live_query_store = { + let _rt_guard = rt.enter(); + live_query_store.start() + }; + let state = State::new( + World::with( + [domain, genesis_domain], + [account, genesis_account], + [], + UniqueVec::new(), + ), + Kura::blank_kura_for_testing(), + live_query_store, + ); + + let executor = { + let wasm_blob = iroha_wasm_builder::Builder::new("../default_executor") + .build() + .unwrap() + .optimize() + .unwrap() + .into_bytes() + .unwrap(); + + Executor::new(WasmSmartContract::from_compiled(wasm_blob)) + }; + let genesis = GenesisBuilder::default().build_and_sign( + executor, + chain_id.clone(), + &genesis_key_pair, + topology.as_ref().to_owned(), + ); + + { + let mut state_block = state.block(); + let block_genesis = ValidBlock::validate( + genesis.0, + &topology, + &chain_id, + &genesis_id, + &mut state_block, + ) + .unpack(|_| {}) + .unwrap() + .commit(&topology) + .unpack(|_| {}) + .unwrap(); + let _events = state_block.apply_without_execution(&block_genesis); + state_block.commit(); + blocks.push(block_genesis); + } + + let tx1 = TransactionBuilder::new(chain_id.clone(), account_id.clone()) + .with_instructions([Log::new(Level::INFO, "msg1".to_string())]) + .sign(account_keypair.private_key()); + + let tx2 = TransactionBuilder::new(chain_id.clone(), account_id) + .with_instructions([Log::new(Level::INFO, "msg2".to_string())]) + .sign(account_keypair.private_key()); + let tx1 = crate::AcceptedTransaction::accept( + tx1, + &chain_id, + state.view().transaction_executor().limits, + ) + .unwrap(); + let tx2 = crate::AcceptedTransaction::accept( + tx2, + &chain_id, + state.view().transaction_executor().limits, + ) + .unwrap(); + + { + let mut state_block = state.block(); + let block = BlockBuilder::new(vec![tx1.clone()], topology.clone(), Vec::new()) + .chain(0, &mut state_block) + .sign(&leader_private_key) + .unpack(|_| {}) + .commit(&topology) + .unpack(|_| {}) + .unwrap(); + let _events = state_block.apply_without_execution(&block); + state_block.commit(); + blocks.push(block); + } + + { + let mut state_block = state.block_and_revert(); + let block_soft_fork = BlockBuilder::new(vec![tx1], topology.clone(), Vec::new()) + .chain(1, &mut state_block) + .sign(&leader_private_key) + .unpack(|_| {}) + .commit(&topology) + .unpack(|_| {}) + .unwrap(); + let _events = state_block.apply_without_execution(&block_soft_fork); + state_block.commit(); + blocks.push(block_soft_fork); + } + + { + let mut state_block: crate::state::StateBlock = state.block(); + let block_next = BlockBuilder::new(vec![tx2], topology.clone(), Vec::new()) + .chain(0, &mut state_block) + .sign(&leader_private_key) + .unpack(|_| {}) + .commit(&topology) + .unpack(|_| {}) + .unwrap(); + let _events = state_block.apply_without_execution(&block_next); + state_block.commit(); + blocks.push(block_next); + } + + blocks + } } From fc3fd7db2cb5321eee401c471acc4d38e4108968 Mon Sep 17 00:00:00 2001 From: Shanin Roman Date: Mon, 22 Jul 2024 17:03:01 +0300 Subject: [PATCH 2/4] fix(kura): properly initialize kura Signed-off-by: Shanin Roman --- core/src/kura.rs | 53 +++++++++++++++++++++++++----------------------- 1 file changed, 28 insertions(+), 25 deletions(-) diff --git a/core/src/kura.rs b/core/src/kura.rs index e5eb64ebc8a..d2b327efd06 100644 --- a/core/src/kura.rs +++ b/core/src/kura.rs @@ -29,8 +29,6 @@ const SIZE_OF_BLOCK_HASH: u64 = Hash::LENGTH as u64; /// The interface of Kura subsystem #[derive(Debug)] pub struct Kura { - /// The mode of initialisation of [`Kura`]. - mode: InitMode, /// The block storage block_store: Mutex, /// The array of block hashes and a slot for an arc of the block. This is normally recovered from the index file. @@ -38,6 +36,8 @@ pub struct Kura { block_data: Mutex, Option>)>>, /// Path to file for plain text blocks. block_plain_text_path: Option, + /// Amount of blocks loaded during initialization + init_block_count: usize, } impl Kura { @@ -57,26 +57,28 @@ impl Kura { .debug_output_new_blocks .then(|| store_dir.join("blocks.json")); + let block_data = Kura::init(&mut block_store, config.init_mode)?; + let block_count = block_data.len(); + info!(mode=?config.init_mode, block_count, "Kura init complete"); + let kura = Arc::new(Self { - mode: config.init_mode, block_store: Mutex::new(block_store), - block_data: Mutex::new(Vec::new()), + block_data: Mutex::new(block_data), block_plain_text_path, + init_block_count: block_count, }); - let block_count = kura.init()?; - - Ok((kura, block_count)) + Ok((kura, BlockCount(block_count))) } /// Create a kura instance that doesn't write to disk. Instead it serves as a handler /// for in-memory blocks only. pub fn blank_kura_for_testing() -> Arc { Arc::new(Self { - mode: InitMode::Strict, block_store: Mutex::new(BlockStore::new(PathBuf::new())), block_data: Mutex::new(Vec::new()), block_plain_text_path: None, + init_block_count: 0, }) } @@ -105,30 +107,28 @@ impl Kura { /// - file storage is unavailable /// - data in file storage is invalid or corrupted #[iroha_logger::log(skip_all, name = "kura_init")] - fn init(self: &Arc) -> Result { - let mut block_store = self.block_store.lock(); - + fn init( + block_store: &mut BlockStore, + mode: InitMode, + ) -> Result, Option>)>> { let block_index_count: usize = block_store .read_index_count()? .try_into() .expect("INTERNAL BUG: block index count exceeds usize::MAX"); - let block_hashes = match self.mode { + let block_hashes = match mode { InitMode::Fast => { - Kura::init_fast_mode(&block_store, block_index_count).or_else(|error| { + Kura::init_fast_mode(block_store, block_index_count).or_else(|error| { warn!(%error, "Hashes file is broken. Falling back to strict init mode."); - Kura::init_strict_mode(&mut block_store, block_index_count) + Kura::init_strict_mode(block_store, block_index_count) }) } - InitMode::Strict => Kura::init_strict_mode(&mut block_store, block_index_count), + InitMode::Strict => Kura::init_strict_mode(block_store, block_index_count), }?; - let block_count = block_hashes.len(); - info!(mode=?self.mode, block_count, "Kura init complete"); - // The none value is set in order to indicate that the blocks exist on disk but are not yet loaded. - *self.block_data.lock() = block_hashes.into_iter().map(|hash| (hash, None)).collect(); - Ok(BlockCount(block_count)) + let block_data = block_hashes.into_iter().map(|hash| (hash, None)).collect(); + Ok(block_data) } fn init_fast_mode( @@ -193,9 +193,12 @@ impl Kura { kura: &Kura, mut shutdown_receiver: tokio::sync::oneshot::Receiver<()>, ) { - let (mut written_block_count, mut latest_block_hash) = { - let block_data_guard = kura.block_data.lock(); - (block_data_guard.len(), block_data_guard.last().map(|d| d.0)) + let mut written_block_count = kura.init_block_count; + let mut latest_written_block_hash = { + let block_data = kura.block_data.lock(); + written_block_count + .checked_sub(1) + .map(|idx| block_data[idx].0) }; let mut should_exit = false; @@ -210,11 +213,11 @@ impl Kura { let new_latest_block_hash = block_data_guard.last().map(|d| d.0); if block_data_guard.len() == written_block_count - && new_latest_block_hash != latest_block_hash + && new_latest_block_hash != latest_written_block_hash { written_block_count -= 1; // There has been a soft-fork and we need to rewrite the top block. } - latest_block_hash = new_latest_block_hash; + latest_written_block_hash = new_latest_block_hash; if written_block_count >= block_data_guard.len() { if should_exit { From c5915d4009ca00a5edea2d9886b3c8760bafa4de Mon Sep 17 00:00:00 2001 From: Shanin Roman Date: Mon, 22 Jul 2024 17:34:16 +0300 Subject: [PATCH 3/4] fix(kura): correctly handle replace_top_block Signed-off-by: Shanin Roman --- core/src/kura.rs | 24 ++++++++++++------------ 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/core/src/kura.rs b/core/src/kura.rs index d2b327efd06..140951d325a 100644 --- a/core/src/kura.rs +++ b/core/src/kura.rs @@ -209,24 +209,24 @@ impl Kura { should_exit = true; } - let block_data_guard = kura.block_data.lock(); + let block_data = kura.block_data.lock(); - let new_latest_block_hash = block_data_guard.last().map(|d| d.0); - if block_data_guard.len() == written_block_count - && new_latest_block_hash != latest_written_block_hash - { + let new_latest_written_block_hash = written_block_count + .checked_sub(1) + .map(|idx| block_data[idx].0); + if new_latest_written_block_hash != latest_written_block_hash { written_block_count -= 1; // There has been a soft-fork and we need to rewrite the top block. } - latest_written_block_hash = new_latest_block_hash; + latest_written_block_hash = new_latest_written_block_hash; - if written_block_count >= block_data_guard.len() { + if written_block_count >= block_data.len() { if should_exit { info!("Kura has written remaining blocks to disk and is shutting down."); return; } - written_block_count = block_data_guard.len(); - drop(block_data_guard); + written_block_count = block_data.len(); + drop(block_data); std::thread::sleep(std::time::Duration::from_millis(1)); continue; } @@ -234,8 +234,8 @@ impl Kura { // If we get here there are blocks to be written. let start_height = written_block_count; let mut blocks_to_be_written = Vec::new(); - while written_block_count < block_data_guard.len() { - let block_ref = block_data_guard[written_block_count].1.as_ref().expect( + while written_block_count < block_data.len() { + let block_ref = block_data[written_block_count].1.as_ref().expect( "INTERNAL BUG: The block to be written is None. Check store_block function.", ); blocks_to_be_written.push(Arc::clone(block_ref)); @@ -243,7 +243,7 @@ impl Kura { } // We don't want to hold up other threads so we drop the lock on the block data. - drop(block_data_guard); + drop(block_data); if let Some(path) = kura.block_plain_text_path.as_ref() { let mut plain_text_file = std::fs::OpenOptions::new() From 72eae0c98ab69e32013994cd1e51b2bee260a4f5 Mon Sep 17 00:00:00 2001 From: Shanin Roman Date: Mon, 22 Jul 2024 17:38:21 +0300 Subject: [PATCH 4/4] fix(kura): fix warnings inside kura Signed-off-by: Shanin Roman --- Cargo.lock | 1 - core/Cargo.toml | 1 - core/src/kura.rs | 51 +++++++++++++++++++++--------------------------- 3 files changed, 22 insertions(+), 31 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index c3b7d9557a9..4193c4bd693 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3023,7 +3023,6 @@ dependencies = [ "iroha_primitives", "iroha_telemetry", "iroha_version", - "iroha_wasm_builder", "iroha_wasm_codec", "nonzero_ext", "once_cell", diff --git a/core/Cargo.toml b/core/Cargo.toml index a39ff5f824e..5a83eef9911 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -69,7 +69,6 @@ indexmap = "2.2.6" [dev-dependencies] iroha_executor_data_model = { workspace = true } -iroha_wasm_builder = {workspace = true } test_samples = { workspace = true } criterion = { workspace = true } diff --git a/core/src/kura.rs b/core/src/kura.rs index 140951d325a..bc6278f6ee1 100644 --- a/core/src/kura.rs +++ b/core/src/kura.rs @@ -32,14 +32,15 @@ pub struct Kura { /// The block storage block_store: Mutex, /// The array of block hashes and a slot for an arc of the block. This is normally recovered from the index file. - #[allow(clippy::type_complexity)] - block_data: Mutex, Option>)>>, + block_data: Mutex, /// Path to file for plain text blocks. block_plain_text_path: Option, /// Amount of blocks loaded during initialization init_block_count: usize, } +type BlockData = Vec<(HashOf, Option>)>; + impl Kura { /// Initialize Kura and start a thread that receives /// and stores new blocks. @@ -107,10 +108,7 @@ impl Kura { /// - file storage is unavailable /// - data in file storage is invalid or corrupted #[iroha_logger::log(skip_all, name = "kura_init")] - fn init( - block_store: &mut BlockStore, - mode: InitMode, - ) -> Result, Option>)>> { + fn init(block_store: &mut BlockStore, mode: InitMode) -> Result { let block_index_count: usize = block_store .read_index_count()? .try_into() @@ -551,6 +549,7 @@ impl BlockStore { let path = self.path_to_blockchain.join(INDEX_FILE_NAME); let mut index_file = std::fs::OpenOptions::new() .write(true) + .truncate(false) .create(true) .open(path.clone()) .add_err_context(&path)?; @@ -638,6 +637,7 @@ impl BlockStore { let path = self.path_to_blockchain.join(HASHES_FILE_NAME); let mut hashes_file = std::fs::OpenOptions::new() .write(true) + .truncate(false) .create(true) .open(path.clone()) .add_err_context(&path)?; @@ -691,18 +691,21 @@ impl BlockStore { let path = self.path_to_blockchain.join(INDEX_FILE_NAME); std::fs::OpenOptions::new() .write(true) + .truncate(false) .create(true) .open(path.clone()) .add_err_context(&path)?; let path = self.path_to_blockchain.join(DATA_FILE_NAME); std::fs::OpenOptions::new() .write(true) + .truncate(false) .create(true) .open(path.clone()) .add_err_context(&path)?; let path = self.path_to_blockchain.join(HASHES_FILE_NAME); std::fs::OpenOptions::new() .write(true) + .truncate(false) .create(true) .open(path.clone()) .add_err_context(&path)?; @@ -794,7 +797,6 @@ mod tests { ChainId, Level, }; use iroha_genesis::GenesisBuilder; - use iroha_primitives::unique_vec::UniqueVec; use nonzero_ext::nonzero; use tempfile::TempDir; use test_samples::gen_account_in; @@ -1080,26 +1082,14 @@ mod tests { live_query_store.start() }; let state = State::new( - World::with( - [domain, genesis_domain], - [account, genesis_account], - [], - UniqueVec::new(), - ), + World::with([domain, genesis_domain], [account, genesis_account], []), Kura::blank_kura_for_testing(), live_query_store, ); let executor = { - let wasm_blob = iroha_wasm_builder::Builder::new("../default_executor") - .build() - .unwrap() - .optimize() - .unwrap() - .into_bytes() - .unwrap(); - - Executor::new(WasmSmartContract::from_compiled(wasm_blob)) + let executor_blob = std::fs::read("../defaults/executor.wasm").unwrap(); + Executor::new(WasmSmartContract::from_compiled(executor_blob)) }; let genesis = GenesisBuilder::default().build_and_sign( executor, @@ -1122,7 +1112,8 @@ mod tests { .commit(&topology) .unpack(|_| {}) .unwrap(); - let _events = state_block.apply_without_execution(&block_genesis); + let _events = + state_block.apply_without_execution(&block_genesis, topology.as_ref().to_owned()); state_block.commit(); blocks.push(block_genesis); } @@ -1149,42 +1140,44 @@ mod tests { { let mut state_block = state.block(); - let block = BlockBuilder::new(vec![tx1.clone()], topology.clone(), Vec::new()) + let block = BlockBuilder::new(vec![tx1.clone()]) .chain(0, &mut state_block) .sign(&leader_private_key) .unpack(|_| {}) .commit(&topology) .unpack(|_| {}) .unwrap(); - let _events = state_block.apply_without_execution(&block); + let _events = state_block.apply_without_execution(&block, topology.as_ref().to_owned()); state_block.commit(); blocks.push(block); } { let mut state_block = state.block_and_revert(); - let block_soft_fork = BlockBuilder::new(vec![tx1], topology.clone(), Vec::new()) + let block_soft_fork = BlockBuilder::new(vec![tx1]) .chain(1, &mut state_block) .sign(&leader_private_key) .unpack(|_| {}) .commit(&topology) .unpack(|_| {}) .unwrap(); - let _events = state_block.apply_without_execution(&block_soft_fork); + let _events = + state_block.apply_without_execution(&block_soft_fork, topology.as_ref().to_owned()); state_block.commit(); blocks.push(block_soft_fork); } { let mut state_block: crate::state::StateBlock = state.block(); - let block_next = BlockBuilder::new(vec![tx2], topology.clone(), Vec::new()) + let block_next = BlockBuilder::new(vec![tx2]) .chain(0, &mut state_block) .sign(&leader_private_key) .unpack(|_| {}) .commit(&topology) .unpack(|_| {}) .unwrap(); - let _events = state_block.apply_without_execution(&block_next); + let _events = + state_block.apply_without_execution(&block_next, topology.as_ref().to_owned()); state_block.commit(); blocks.push(block_next); }