From 6a59a631e4e4df7895b60068a5cf70e182aaf1a6 Mon Sep 17 00:00:00 2001 From: Alex Skidanov Date: Fri, 16 Aug 2019 14:01:19 -0700 Subject: [PATCH] Fixing bug #1183: state root should not be indexed by the chunk hash Also fixing a bug we discovered earlier that if the last block of an epoch doesn't have a chunk for a particular shard, we do not do updated necessary for the epoch switch. Fixing it by forcefully executing `apply_transactions` with no txs and receipts for the last block. Conveniently, with the fix for #1183 it is possible (before we would not be able to store two different post-state roots for the same chunk) --- chain/chain/src/chain.rs | 130 ++++++++++++++++----------- chain/chain/src/store.rs | 50 ++++++++--- chain/chain/src/test_utils.rs | 2 +- chain/client/src/client.rs | 12 +-- chain/client/src/view_client.rs | 7 +- chain/client/tests/bug_repros.rs | 116 ++++++++++++++++++++++++ chain/client/tests/cross_shard_tx.rs | 3 +- chain/network/src/types.rs | 6 +- core/protos/protos/network.proto | 15 ++-- 9 files changed, 244 insertions(+), 97 deletions(-) create mode 100644 chain/client/tests/bug_repros.rs diff --git a/chain/chain/src/chain.rs b/chain/chain/src/chain.rs index 8667dc1dfd8..cb5942c8c99 100644 --- a/chain/chain/src/chain.rs +++ b/chain/chain/src/chain.rs @@ -231,7 +231,8 @@ impl Chain { for (chunk_header, state_root) in genesis.chunks.iter().zip(state_roots.iter()) { store_update.save_chunk_extra( - &chunk_header.chunk_hash(), + &genesis.hash(), + chunk_header.shard_id, ChunkExtra::new(state_root, vec![], 0, chain_genesis.gas_limit), ); } @@ -768,7 +769,6 @@ impl Chain { hash: CryptoHash, ) -> Result< ( - ChunkHash, ChunkExtra, Vec, (CryptoHash, Vec), @@ -777,6 +777,7 @@ impl Chain { Error, > { let prev_hash = self.get_block_header(&hash)?.prev_hash; + let prev_block = self.get_block(&prev_hash)?; if shard_id as usize >= prev_block.chunks.len() { @@ -784,8 +785,7 @@ impl Chain { } let prev_chunk_header = prev_block.chunks[shard_id as usize].clone(); - let prev_chunk_hash = prev_chunk_header.chunk_hash(); - let prev_chunk_extra = self.store.get_chunk_extra(&prev_chunk_hash)?.clone(); + let prev_chunk_extra = self.store.get_chunk_extra(&prev_hash, shard_id)?.clone(); let payload = self .runtime_adapter @@ -800,27 +800,20 @@ impl Chain { let incoming_receipts = ChainStoreUpdate::new(&mut self.store) .get_incoming_receipts_for_shard(shard_id, hash, &prev_chunk_header)?; - Ok(( - prev_chunk_hash.clone(), - prev_chunk_extra, - payload, - outgoing_receipts, - incoming_receipts.clone(), - )) + Ok((prev_chunk_extra, payload, outgoing_receipts, incoming_receipts.clone())) } pub fn set_shard_state( &mut self, _me: &Option, shard_id: ShardId, - _hash: CryptoHash, - prev_chunk_hash: ChunkHash, + sync_hash: CryptoHash, prev_extra: ChunkExtra, payload: Vec, outgoing_receipts: (CryptoHash, Vec), incoming_receipts: Vec<(CryptoHash, Vec)>, ) -> Result<(), Error> { - // TODO (#1126): verify that prev_chunk_hash, prev_state_root, payload and receipts match + // TODO (#1126): verify that prev_state_root, payload and receipts match // the corresponding merkle roots // Save state in the runtime, will also check it's validity. @@ -829,8 +822,9 @@ impl Chain { .map_err(|err| ErrorKind::InvalidStatePayload(err.to_string()))?; // Update pointers to state root and receipts. + let prev_block_hash = self.get_block_header(&sync_hash)?.prev_hash; let mut chain_store_update = self.store.store_update(); - chain_store_update.save_chunk_extra(&prev_chunk_hash, prev_extra); + chain_store_update.save_chunk_extra(&prev_block_hash, shard_id, prev_extra); chain_store_update.save_outgoing_receipt( &outgoing_receipts.0, shard_id, @@ -1026,16 +1020,18 @@ impl Chain { /// Get chunk extra that was computed after applying chunk with given hash. #[inline] - pub fn get_chunk_extra(&mut self, hash: &ChunkHash) -> Result<&ChunkExtra, Error> { - self.store.get_chunk_extra(hash) + pub fn get_chunk_extra( + &mut self, + block_hash: &CryptoHash, + shard_id: ShardId, + ) -> Result<&ChunkExtra, Error> { + self.store.get_chunk_extra(block_hash, shard_id) } /// Helper to return latest chunk extra for given shard. #[inline] pub fn get_latest_chunk_extra(&mut self, shard_id: ShardId) -> Result<&ChunkExtra, Error> { - let chunk_hash = - self.get_block(&self.head()?.last_block_hash)?.chunks[shard_id as usize].chunk_hash(); - self.store.get_chunk_extra(&chunk_hash) + self.store.get_chunk_extra(&self.head()?.last_block_hash, shard_id) } /// Get transaction result for given hash of transaction. @@ -1192,39 +1188,38 @@ impl<'a> ChainUpdate<'a> { (block.chunks.iter().zip(prev_block.chunks.iter())).enumerate() { let shard_id = shard_id as ShardId; - if chunk_header.height_included == block.header.height { - let chunk_hash = chunk_header.chunk_hash(); - let care_about_shard = match mode { - ApplyChunksMode::ThisEpoch => me.as_ref().map_or_else( - || false, - |me| { - self.runtime_adapter.cares_about_shard( - me, - &block.header.prev_hash, - shard_id, - ) - }, - ), - ApplyChunksMode::NextEpoch => me.as_ref().map_or_else( - || false, - |me| { - self.runtime_adapter.will_care_about_shard( - me, - &block.header.prev_hash, - shard_id, - ) && !self.runtime_adapter.cares_about_shard( - me, - &block.header.prev_hash, - shard_id, - ) - }, - ), - }; - if care_about_shard { + let care_about_shard = match mode { + ApplyChunksMode::ThisEpoch => me.as_ref().map_or_else( + || false, + |me| { + self.runtime_adapter.cares_about_shard( + me, + &block.header.prev_hash, + shard_id, + ) + }, + ), + ApplyChunksMode::NextEpoch => me.as_ref().map_or_else( + || false, + |me| { + self.runtime_adapter.will_care_about_shard( + me, + &block.header.prev_hash, + shard_id, + ) && !self.runtime_adapter.cares_about_shard( + me, + &block.header.prev_hash, + shard_id, + ) + }, + ), + }; + if care_about_shard { + if chunk_header.height_included == block.header.height { // Validate state root. let prev_chunk_extra = self .chain_store_update - .get_chunk_extra(&prev_chunk_header.chunk_hash())? + .get_chunk_extra(&block.header.prev_hash, shard_id)? .clone(); if prev_chunk_extra.state_root != chunk_header.prev_state_root { // TODO: MOO @@ -1246,12 +1241,15 @@ impl<'a> ChainUpdate<'a> { let gas_limit = chunk.header.gas_limit; // Apply block to runtime. - println!( - "[APPLY CHUNK] {:?} PREV BLOCK HASH: {:?}, BLOCK HASH: {:?} ROOT: {:?}", + debug!(target: "chain", + "[APPLY CHUNK] {:?} PREV BLOCK HASH: {:?}, BLOCK HASH: {:?} ROOT: {:?} SHARD ID: {:?}; WILL BE USING {} TXS and {} RECEIPTS", chunk_header.height_included, chunk_header.prev_block_hash, block.hash(), - chunk.header.prev_state_root + chunk.header.prev_state_root, + chunk.header.shard_id, + chunk.transactions.len(), + receipts.len(), ); let mut apply_result = self .runtime_adapter @@ -1269,7 +1267,8 @@ impl<'a> ChainUpdate<'a> { self.chain_store_update.save_trie_changes(apply_result.trie_changes); // Save state root after applying transactions. self.chain_store_update.save_chunk_extra( - &chunk_hash, + &block.hash(), + shard_id, ChunkExtra::new( &apply_result.new_root, apply_result.validator_proposals, @@ -1300,6 +1299,29 @@ impl<'a> ChainUpdate<'a> { ); } } + } else { + let mut new_extra = self + .chain_store_update + .get_chunk_extra(&prev_block.hash(), shard_id)? + .clone(); + + let apply_result = self + .runtime_adapter + .apply_transactions( + shard_id, + &new_extra.state_root, + block.header.height, + &prev_block.hash(), + &block.hash(), + &vec![], + &vec![], + ) + .map_err(|e| ErrorKind::Other(e.to_string()))?; + + self.chain_store_update.save_trie_changes(apply_result.trie_changes); + new_extra.state_root = apply_result.new_root; + + self.chain_store_update.save_chunk_extra(&block.hash(), shard_id, new_extra); } } } diff --git a/chain/chain/src/store.rs b/chain/chain/src/store.rs index 494c839c270..665286a6820 100644 --- a/chain/chain/src/store.rs +++ b/chain/chain/src/store.rs @@ -75,7 +75,11 @@ pub trait ChainStoreAccess { /// Get previous header. fn get_previous_header(&mut self, header: &BlockHeader) -> Result<&BlockHeader, Error>; /// Get chunk extra info for given chunk hash. - fn get_chunk_extra(&mut self, h: &ChunkHash) -> Result<&ChunkExtra, Error>; + fn get_chunk_extra( + &mut self, + block_hash: &CryptoHash, + shard_id: ShardId, + ) -> Result<&ChunkExtra, Error>; /// Get block header. fn get_block_header(&mut self, h: &CryptoHash) -> Result<&BlockHeader, Error>; /// Returns hash of the block on the main chain for given height. @@ -352,10 +356,19 @@ impl ChainStoreAccess for ChainStore { } /// Get state root hash after applying header with given hash. - fn get_chunk_extra(&mut self, h: &ChunkHash) -> Result<&ChunkExtra, Error> { + fn get_chunk_extra( + &mut self, + block_hash: &CryptoHash, + shard_id: ShardId, + ) -> Result<&ChunkExtra, Error> { option_to_not_found( - read_with_cache(&*self.store, COL_CHUNK_EXTRA, &mut self.chunk_extras, h.as_ref()), - &format!("CHUNK EXTRA: {}", h.0), + read_with_cache( + &*self.store, + COL_CHUNK_EXTRA, + &mut self.chunk_extras, + hash_struct(&(block_hash, shard_id)).as_ref(), + ), + &format!("CHUNK EXTRA: {}:{}", block_hash, shard_id), ) } @@ -443,7 +456,7 @@ pub struct ChainStoreUpdate<'a, T> { blocks: HashMap, deleted_blocks: HashSet, headers: HashMap, - chunk_extras: HashMap, + chunk_extras: HashMap<(CryptoHash, ShardId), ChunkExtra>, block_index: HashMap>, outgoing_receipts: HashMap<(CryptoHash, ShardId), Vec>, incoming_receipts: HashMap<(CryptoHash, ShardId), Vec>, @@ -581,11 +594,15 @@ impl<'a, T: ChainStoreAccess> ChainStoreAccess for ChainStoreUpdate<'a, T> { } /// Get state root hash after applying header with given hash. - fn get_chunk_extra(&mut self, hash: &ChunkHash) -> Result<&ChunkExtra, Error> { - if let Some(chunk_extra) = self.chunk_extras.get(hash) { + fn get_chunk_extra( + &mut self, + block_hash: &CryptoHash, + shard_id: ShardId, + ) -> Result<&ChunkExtra, Error> { + if let Some(chunk_extra) = self.chunk_extras.get(&(block_hash.clone(), shard_id)) { Ok(chunk_extra) } else { - self.chain_store.get_chunk_extra(hash) + self.chain_store.get_chunk_extra(block_hash, shard_id) } } @@ -724,8 +741,13 @@ impl<'a, T: ChainStoreAccess> ChainStoreUpdate<'a, T> { } /// Save post applying block state root. - pub fn save_chunk_extra(&mut self, hash: &ChunkHash, chunk_extra: ChunkExtra) { - self.chunk_extras.insert(hash.clone(), chunk_extra); + pub fn save_chunk_extra( + &mut self, + block_hash: &CryptoHash, + shard_id: ShardId, + chunk_extra: ChunkExtra, + ) { + self.chunk_extras.insert((block_hash.clone(), shard_id), chunk_extra); } pub fn delete_block(&mut self, hash: &CryptoHash) { @@ -839,9 +861,13 @@ impl<'a, T: ChainStoreAccess> ChainStoreUpdate<'a, T> { .set_ser(COL_BLOCK_HEADER, hash.as_ref(), &header) .map_err::(|e| e.into())?; } - for (hash, chunk_extra) in self.chunk_extras.drain() { + for (block_hash_and_shard_id, chunk_extra) in self.chunk_extras.drain() { store_update - .set_ser(COL_CHUNK_EXTRA, hash.as_ref(), &chunk_extra) + .set_ser( + COL_CHUNK_EXTRA, + hash_struct(&block_hash_and_shard_id).as_ref(), + &chunk_extra, + ) .map_err::(|e| e.into())?; } for (height, hash) in self.block_index.drain() { diff --git a/chain/chain/src/test_utils.rs b/chain/chain/src/test_utils.rs index d191821e64e..31e121c5a25 100644 --- a/chain/chain/src/test_utils.rs +++ b/chain/chain/src/test_utils.rs @@ -418,6 +418,7 @@ impl RuntimeAdapter for KeyValueRuntime { 0, )); } else { + assert!(false); // receipts should never be applied twice balance_transfers.push(( receipt.originator.clone(), receipt.receiver.clone(), @@ -476,7 +477,6 @@ impl RuntimeAdapter for KeyValueRuntime { .insert(to.clone(), accounts_mapping.get(&to).unwrap_or(&0) + amount); vec![] } else { - assert_ne!(amount, 0); assert_ne!(nonce, 0); let receipt = ReceiptTransaction::new( from.clone(), diff --git a/chain/client/src/client.rs b/chain/client/src/client.rs index 4d4fb2167b8..823826a4e46 100644 --- a/chain/client/src/client.rs +++ b/chain/client/src/client.rs @@ -326,18 +326,12 @@ impl Handler for ClientActor { hash, self.block_producer.clone().map(|x| x.account_id) ); - if let Ok(( - prev_chunk_hash, - prev_chunk_extra, - payload, - outgoing_receipts, - incoming_receipts, - )) = self.chain.state_request(shard_id, hash) + if let Ok((prev_chunk_extra, payload, outgoing_receipts, incoming_receipts)) = + self.chain.state_request(shard_id, hash) { return NetworkClientResponses::StateResponse(StateResponseInfo { shard_id, hash, - prev_chunk_hash, prev_chunk_extra, payload, outgoing_receipts, @@ -352,7 +346,6 @@ impl Handler for ClientActor { NetworkClientMessages::StateResponse(StateResponseInfo { shard_id, hash, - prev_chunk_hash, prev_chunk_extra, payload, outgoing_receipts, @@ -394,7 +387,6 @@ impl Handler for ClientActor { &self.block_producer.as_ref().map(|bp| bp.account_id.clone()), shard_id, hash, - prev_chunk_hash, prev_chunk_extra, payload, outgoing_receipts, diff --git a/chain/client/src/view_client.rs b/chain/client/src/view_client.rs index 0a557f5bb59..28ff57b33c6 100644 --- a/chain/client/src/view_client.rs +++ b/chain/client/src/view_client.rs @@ -92,14 +92,9 @@ impl Handler for ViewClientActor { let path_parts: Vec<&str> = msg.path.split('/').collect(); let account_id = AccountId::from(path_parts[1]); let shard_id = self.runtime_adapter.account_id_to_shard_id(&account_id); - let head_block = self - .chain - .get_block(&head.last_block_hash) - .map_err(|_e| "Failed to fetch head block while executing request")?; - let chunk_hash = head_block.chunks[shard_id as usize].chunk_hash().clone(); let state_root = self .chain - .get_chunk_extra(&chunk_hash) + .get_chunk_extra(&head.last_block_hash, shard_id) .map_err(|_e| "Failed to fetch the chunk while executing request")? .state_root; diff --git a/chain/client/tests/bug_repros.rs b/chain/client/tests/bug_repros.rs new file mode 100644 index 00000000000..141268eb65b --- /dev/null +++ b/chain/client/tests/bug_repros.rs @@ -0,0 +1,116 @@ +// This test tracks tests that reproduce previously fixed bugs to make sure the regressions we +// fix do not resurface + +use actix::{Addr, System}; +use near_chain::test_utils::account_id_to_shard_id; +use near_client::test_utils::setup_mock_all_validators; +use near_client::{ClientActor, ViewClientActor}; +use near_network::types::NetworkRequests::ChunkOnePartMessage; +use near_network::{NetworkClientMessages, NetworkRequests, NetworkResponses, PeerInfo}; +use near_primitives::block::Block; +use near_primitives::test_utils::init_test_logger; +use near_primitives::transaction::SignedTransaction; +use rand::{thread_rng, Rng}; +use std::sync::{Arc, RwLock}; + +#[test] +fn repro_1183() { + let validator_groups = 2; + init_test_logger(); + System::run(move || { + let connectors: Arc, Addr)>>> = + Arc::new(RwLock::new(vec![])); + + let validators = vec![vec!["test1", "test2", "test3", "test4"]]; + let key_pairs = vec![ + PeerInfo::random(), + PeerInfo::random(), + PeerInfo::random(), + PeerInfo::random(), // 4 + ]; + + let connectors1 = connectors.clone(); + let validators2 = validators.clone(); + let last_block: Arc>> = Arc::new(RwLock::new(None)); + let delayed_one_parts: Arc>> = Arc::new(RwLock::new(vec![])); + *connectors.write().unwrap() = setup_mock_all_validators( + validators.clone(), + key_pairs.clone(), + validator_groups, + true, + 200, + Arc::new(RwLock::new(move |_account_id: String, msg: &NetworkRequests| { + if let NetworkRequests::Block { block } = msg { + let mut last_block = last_block.write().unwrap(); + let mut delayed_one_parts = delayed_one_parts.write().unwrap(); + + if let Some(last_block) = last_block.clone() { + for (client, _) in connectors1.write().unwrap().iter() { + client.do_send(NetworkClientMessages::Block( + last_block.clone(), + PeerInfo::random().id, + false, + )) + } + } + for delayed_message in delayed_one_parts.iter() { + if let ChunkOnePartMessage { account_id, header_and_part, .. } = + delayed_message + { + for (i, name) in validators2.iter().flatten().enumerate() { + if &name.to_string() == account_id { + connectors1.write().unwrap()[i].0.do_send( + NetworkClientMessages::ChunkOnePart( + header_and_part.clone(), + ), + ); + } + } + } else { + assert!(false); + } + } + + let mut nonce_delta = 0; + for from in vec!["test1", "test2", "test3", "test4"] { + for to in vec!["test1", "test2", "test3", "test4"] { + connectors1.write().unwrap() + [account_id_to_shard_id(&from.to_string(), 4) as usize] + .0 + .do_send(NetworkClientMessages::Transaction( + SignedTransaction::create_payment_tx( + from.to_string(), + to.to_string(), + 1, + block.header.height * 16 + nonce_delta, + ), + )); + nonce_delta += 1 + } + } + + *last_block = Some(block.clone()); + *delayed_one_parts = vec![]; + + if block.header.height >= 25 { + System::current().stop(); + } + (NetworkResponses::NoResponse, false) + } else if let NetworkRequests::ChunkOnePartMessage { .. } = msg { + if thread_rng().gen_bool(0.5) { + (NetworkResponses::NoResponse, true) + } else { + let msg2 = msg.clone(); + delayed_one_parts.write().unwrap().push(msg2); + (NetworkResponses::NoResponse, false) + } + } else { + (NetworkResponses::NoResponse, true) + } + })), + ); + + near_network::test_utils::wait_or_panic(30000); + }) + .unwrap(); +} diff --git a/chain/client/tests/cross_shard_tx.rs b/chain/client/tests/cross_shard_tx.rs index d9fdf773d47..21270c8d437 100644 --- a/chain/client/tests/cross_shard_tx.rs +++ b/chain/client/tests/cross_shard_tx.rs @@ -148,7 +148,8 @@ mod tests { let query_response = match res { Ok(query_response) => query_response, - Err(_) => { + Err(e) => { + println!("Query failed with {:?}", e); *presumable_epoch.write().unwrap() += 1; let connectors_ = connectors.write().unwrap(); let connectors1 = connectors.clone(); diff --git a/chain/network/src/types.rs b/chain/network/src/types.rs index d68e5db7286..47008b067e9 100644 --- a/chain/network/src/types.rs +++ b/chain/network/src/types.rs @@ -519,7 +519,6 @@ impl TryFrom for PeerMessage { Ok(PeerMessage::StateResponse(StateResponseInfo { shard_id: state_response.shard_id, hash: state_response.hash.try_into()?, - prev_chunk_hash: ChunkHash(state_response.prev_chunk_hash.try_into()?), prev_chunk_extra: ChunkExtra { state_root: state_response.prev_state_root.try_into()?, validator_proposals: state_response @@ -677,7 +676,6 @@ impl From for network_proto::PeerMessage { PeerMessage::StateResponse(StateResponseInfo { shard_id, hash, - prev_chunk_hash, prev_chunk_extra, payload, outgoing_receipts, @@ -686,7 +684,6 @@ impl From for network_proto::PeerMessage { let state_response = network_proto::StateResponse { shard_id, hash: hash.into(), - prev_chunk_hash: prev_chunk_hash.0.into(), prev_state_root: prev_chunk_extra.state_root.into(), validator_proposals: RepeatedField::from_iter( prev_chunk_extra @@ -941,7 +938,7 @@ pub struct Ban { pub ban_reason: ReasonForBan, } -#[derive(Debug)] +#[derive(Debug, Clone)] pub enum NetworkRequests { /// Fetch information from the network. FetchInfo, @@ -1017,7 +1014,6 @@ impl Message for NetworkRequests { pub struct StateResponseInfo { pub shard_id: ShardId, pub hash: CryptoHash, - pub prev_chunk_hash: ChunkHash, pub prev_chunk_extra: ChunkExtra, pub payload: Vec, pub outgoing_receipts: (CryptoHash, Vec), diff --git a/core/protos/protos/network.proto b/core/protos/protos/network.proto index 5b3d9965234..fd5c9848e01 100644 --- a/core/protos/protos/network.proto +++ b/core/protos/protos/network.proto @@ -56,14 +56,13 @@ message StateResponseReceipts { message StateResponse { uint64 shard_id = 1; bytes hash = 2; - bytes prev_chunk_hash = 3; - bytes prev_state_root = 4; - uint64 prev_gas_used = 5; - uint64 prev_gas_limit = 6; - repeated ValidatorStake validator_proposals = 7; - bytes payload = 8; - StateResponseReceipts outgoing_receipts = 10; - repeated StateResponseReceipts incoming_receipts = 11; + bytes prev_state_root = 3; + uint64 prev_gas_used = 4; + uint64 prev_gas_limit = 5; + repeated ValidatorStake validator_proposals = 6; + bytes payload = 7; + StateResponseReceipts outgoing_receipts = 8; + repeated StateResponseReceipts incoming_receipts = 9; } message ChunkPartRequest{