diff --git a/chain/chain/src/chain.rs b/chain/chain/src/chain.rs index 0641cad1261..31f5ef5d31f 100644 --- a/chain/chain/src/chain.rs +++ b/chain/chain/src/chain.rs @@ -16,6 +16,7 @@ use crate::error::{Error, ErrorKind}; use crate::store::{ChainStore, ChainStoreAccess, ChainStoreUpdate, StateSyncInfo}; use crate::types::{ Block, BlockHeader, BlockStatus, Provenance, RuntimeAdapter, ShardFullChunkOrOnePart, Tip, + ValidatorSignatureVerificationResult, }; /// Maximum number of orphans chain can store. @@ -269,6 +270,16 @@ impl Chain { Ok(header_head) } + pub fn save_block(&mut self, block: &Block) -> Result<(), Error> { + let mut chain_store_update = ChainStoreUpdate::new(&mut self.store); + + // TODO XXX MOO: do basic validation of the block + chain_store_update.save_block(block.clone()); + + chain_store_update.commit()?; + Ok(()) + } + /// Process a block header received during "header first" propagation. pub fn process_block_header(&mut self, header: &BlockHeader) -> Result<(), Error> { // We create new chain update, but it's not going to be committed so it's read only. @@ -761,7 +772,6 @@ impl Chain { ChunkHash, ChunkExtra, Vec, - Block, (CryptoHash, Vec), Vec<(CryptoHash, Vec)>, ), @@ -791,13 +801,10 @@ impl Chain { let incoming_receipts = ChainStoreUpdate::new(&mut self.store) .get_incoming_receipts_for_shard(shard_id, hash, &prev_chunk_header)?; - let block = self.get_block(&hash)?.clone(); - Ok(( prev_chunk_hash.clone(), prev_chunk_extra, payload, - block, outgoing_receipts, incoming_receipts.clone(), )) @@ -811,7 +818,6 @@ impl Chain { prev_chunk_hash: ChunkHash, prev_extra: ChunkExtra, payload: Vec, - block: Block, outgoing_receipts: (CryptoHash, Vec), incoming_receipts: Vec<(CryptoHash, Vec)>, ) -> Result<(), Error> { @@ -838,8 +844,6 @@ impl Chain { incoming_receipt.1, ); } - // TODO MOO XXX: validate block (can't call process_block here since we don't have the previous block) - chain_store_update.save_block(block); chain_store_update.commit()?; Ok(()) @@ -1472,7 +1476,8 @@ impl<'a> ChainUpdate<'a> { &validator, header.hash().as_ref(), &header.signature, - ) { + ) == ValidatorSignatureVerificationResult::Valid + { Ok(()) } else { Err(ErrorKind::InvalidSignature.into()) @@ -1603,7 +1608,7 @@ impl<'a> ChainUpdate<'a> { match self.chain_store_update.block_exists(&header.hash()) { Ok(true) => { let head = self.chain_store_update.head()?; - if header.height > 50 && header.height < head.height - 50 { + if head.height > 50 && header.height < head.height - 50 { // We flag this as an "abusive peer" but only in the case // where we have the full block in our store. // So this is not a particularly exhaustive check. diff --git a/chain/chain/src/store.rs b/chain/chain/src/store.rs index 4c6c7619669..494c839c270 100644 --- a/chain/chain/src/store.rs +++ b/chain/chain/src/store.rs @@ -452,7 +452,7 @@ pub struct ChainStoreUpdate<'a, T> { tail: Option, header_head: Option, sync_head: Option, - trie_changes: Option, + trie_changes: Vec, add_blocks_to_catchup: Vec<(CryptoHash, CryptoHash)>, remove_blocks_to_catchup: Vec, add_state_dl_infos: Vec, @@ -476,7 +476,7 @@ impl<'a, T: ChainStoreAccess> ChainStoreUpdate<'a, T> { tail: None, header_head: None, sync_head: None, - trie_changes: None, + trie_changes: vec![], add_blocks_to_catchup: vec![], remove_blocks_to_catchup: vec![], add_state_dl_infos: vec![], @@ -785,7 +785,7 @@ impl<'a, T: ChainStoreAccess> ChainStoreUpdate<'a, T> { } pub fn save_trie_changes(&mut self, trie_changes: WrappedTrieChanges) { - self.trie_changes = Some(trie_changes); + self.trie_changes.push(trie_changes); } pub fn add_block_to_catchup(&mut self, prev_hash: CryptoHash, block_hash: CryptoHash) { @@ -870,7 +870,7 @@ impl<'a, T: ChainStoreAccess> ChainStoreUpdate<'a, T> { for (hash, tx_result) in self.transaction_results.drain() { store_update.set_ser(COL_TRANSACTION_RESULT, hash.as_ref(), &tx_result)?; } - if let Some(trie_changes) = self.trie_changes { + for trie_changes in self.trie_changes { trie_changes .insertions_into(&mut store_update) .map_err(|err| ErrorKind::Other(err.to_string()))?; diff --git a/chain/chain/src/test_utils.rs b/chain/chain/src/test_utils.rs index 8ad6dc00831..d191821e64e 100644 --- a/chain/chain/src/test_utils.rs +++ b/chain/chain/src/test_utils.rs @@ -25,7 +25,10 @@ use near_store::{Store, StoreUpdate, Trie, TrieChanges, WrappedTrieChanges, COL_ use crate::error::{Error, ErrorKind}; use crate::store::ChainStoreAccess; -use crate::types::{ApplyTransactionResult, BlockHeader, RuntimeAdapter, Weight}; +use crate::types::{ + ApplyTransactionResult, BlockHeader, RuntimeAdapter, ValidatorSignatureVerificationResult, + Weight, +}; use crate::{Chain, ChainGenesis, ValidTransaction}; use std::cmp::Ordering; @@ -218,16 +221,20 @@ impl RuntimeAdapter for KeyValueRuntime { account_id: &AccountId, data: &[u8], signature: &Signature, - ) -> bool { + ) -> ValidatorSignatureVerificationResult { if let Some(validator) = self .validators .iter() .flatten() .find(|&validator_stake| &validator_stake.account_id == account_id) { - verify(data, signature, &validator.public_key) + if verify(data, signature, &validator.public_key) { + ValidatorSignatureVerificationResult::Valid + } else { + ValidatorSignatureVerificationResult::Invalid + } } else { - false + ValidatorSignatureVerificationResult::UnknownEpoch } } diff --git a/chain/chain/src/types.rs b/chain/chain/src/types.rs index 7ca03f0196a..65dcec964bd 100644 --- a/chain/chain/src/types.rs +++ b/chain/chain/src/types.rs @@ -63,6 +63,13 @@ pub enum ShardFullChunkOrOnePart<'a> { NoChunk, } +#[derive(Eq, PartialEq)] +pub enum ValidatorSignatureVerificationResult { + Valid, + Invalid, + UnknownEpoch, +} + pub struct ApplyTransactionResult { pub trie_changes: WrappedTrieChanges, pub new_root: MerkleHash, @@ -102,7 +109,7 @@ pub trait RuntimeAdapter: Send + Sync { account_id: &AccountId, data: &[u8], signature: &Signature, - ) -> bool; + ) -> ValidatorSignatureVerificationResult; /// Verify chunk header signature. fn verify_chunk_header_signature(&self, header: &ShardChunkHeader) -> Result; diff --git a/chain/client/src/client.rs b/chain/client/src/client.rs index f7630adc9e4..c8e1726c453 100644 --- a/chain/client/src/client.rs +++ b/chain/client/src/client.rs @@ -46,8 +46,15 @@ struct EconConfig { gas_price_adjustment_rate: u8, max_inflation_rate: u8, } +use near_chain::types::ValidatorSignatureVerificationResult; use std::cmp::max; +enum AccountAnnounceVerificationResult { + Valid, + UnknownEpoch, + Invalid(ReasonForBan), +} + pub struct ClientActor { config: ClientConfig, sync_status: SyncStatus, @@ -157,55 +164,59 @@ impl ClientActor { fn check_signature_account_announce( &self, announce_account: &AnnounceAccount, - ) -> Result<(), ReasonForBan> { + ) -> AccountAnnounceVerificationResult { // Check header is correct. let header_hash = announce_account.header_hash(); let header = announce_account.header(); // hash must match announcement hash ... if header_hash != header.hash { - return Err(ReasonForBan::InvalidHash); + return AccountAnnounceVerificationResult::Invalid(ReasonForBan::InvalidHash); } // ... and signature should be valid. - if !self.runtime_adapter.verify_validator_signature( + match self.runtime_adapter.verify_validator_signature( &announce_account.epoch_id, &announce_account.account_id, header_hash.as_ref(), &header.signature, ) { - return Err(ReasonForBan::InvalidSignature); + ValidatorSignatureVerificationResult::Valid => {} + ValidatorSignatureVerificationResult::Invalid => { + return AccountAnnounceVerificationResult::Invalid(ReasonForBan::InvalidSignature); + } + ValidatorSignatureVerificationResult::UnknownEpoch => { + return AccountAnnounceVerificationResult::UnknownEpoch; + } } // Check intermediates hops are correct. // Skip first element (header) - announce_account - .route - .iter() - .skip(1) - .fold(Ok(header_hash), |previous_hash, hop| { - // Folding function will return None if at least one hop checking fail, - // otherwise it will return hash from last hop. - if let Ok(previous_hash) = previous_hash { - let AnnounceAccountRoute { peer_id, hash: current_hash, signature } = hop; - - let real_current_hash = - &hash([previous_hash.as_ref(), peer_id.as_ref()].concat().as_slice()); - - if real_current_hash != current_hash { - return Err(ReasonForBan::InvalidHash); - } + match announce_account.route.iter().skip(1).fold(Ok(header_hash), |previous_hash, hop| { + // Folding function will return None if at least one hop checking fail, + // otherwise it will return hash from last hop. + if let Ok(previous_hash) = previous_hash { + let AnnounceAccountRoute { peer_id, hash: current_hash, signature } = hop; - if verify(current_hash.as_ref(), signature, &peer_id.public_key()) { - Ok(previous_hash) - } else { - Err(ReasonForBan::InvalidSignature) - } + let real_current_hash = + &hash([previous_hash.as_ref(), peer_id.as_ref()].concat().as_slice()); + + if real_current_hash != current_hash { + return Err(ReasonForBan::InvalidHash); + } + + if verify(current_hash.as_ref(), signature, &peer_id.public_key()) { + Ok(current_hash.clone()) } else { - previous_hash + return Err(ReasonForBan::InvalidSignature); } - }) - .map(|_hash| ()) + } else { + previous_hash + } + }) { + Ok(_) => AccountAnnounceVerificationResult::Valid, + Err(reason_for_ban) => AccountAnnounceVerificationResult::Invalid(reason_for_ban), + } } } @@ -256,6 +267,14 @@ impl Handler for ClientActor { self.receive_header(header, peer_id) } NetworkClientMessages::Block(block, peer_id, was_requested) => { + if let SyncStatus::StateSync(sync_hash, _) = &mut self.sync_status { + if block.hash() == *sync_hash { + if let Err(_) = self.chain.save_block(&block) { + error!(target: "client", "Failed to save a block during state sync"); + } + return NetworkClientResponses::NoResponse; + } + } self.receive_block(ctx, block, peer_id, was_requested) } NetworkClientMessages::BlockRequest(hash) => { @@ -309,7 +328,6 @@ impl Handler for ClientActor { prev_chunk_hash, prev_chunk_extra, payload, - block, outgoing_receipts, incoming_receipts, )) = self.chain.state_request(shard_id, hash) @@ -320,7 +338,6 @@ impl Handler for ClientActor { prev_chunk_hash, prev_chunk_extra, payload, - block, outgoing_receipts, incoming_receipts, }); @@ -336,7 +353,6 @@ impl Handler for ClientActor { prev_chunk_hash, prev_chunk_extra, payload, - block, outgoing_receipts, incoming_receipts, }) => { @@ -379,7 +395,6 @@ impl Handler for ClientActor { prev_chunk_hash, prev_chunk_extra, payload, - block, outgoing_receipts, incoming_receipts, ) { @@ -454,16 +469,21 @@ impl Handler for ClientActor { } NetworkClientMessages::AnnounceAccount(announce_account) => { match self.check_signature_account_announce(&announce_account) { - Ok(_) => { + AccountAnnounceVerificationResult::Valid => { actix::spawn( self.network_actor - .send(NetworkRequests::AnnounceAccount(announce_account)) + .send(NetworkRequests::AnnounceAccount(announce_account, false)) .map_err(|e| error!(target: "client", "{}", e)) .map(|_| ()), ); NetworkClientResponses::NoResponse } - Err(ban_reason) => NetworkClientResponses::Ban { ban_reason }, + AccountAnnounceVerificationResult::Invalid(ban_reason) => { + NetworkClientResponses::Ban { ban_reason } + } + AccountAnnounceVerificationResult::UnknownEpoch => { + NetworkClientResponses::NoResponse + } } } } @@ -733,13 +753,16 @@ impl ClientActor { actix::spawn( self.network_actor - .send(NetworkRequests::AnnounceAccount(AnnounceAccount::new( - block_producer.account_id.clone(), - next_epoch_id, - self.node_id, - hash, - signature, - ))) + .send(NetworkRequests::AnnounceAccount( + AnnounceAccount::new( + block_producer.account_id.clone(), + next_epoch_id, + self.node_id, + hash, + signature, + ), + true, + )) .map_err(|e| error!(target: "client", "{:?}", e)) .map(|_| ()), ); @@ -1077,7 +1100,10 @@ impl ClientActor { println!("{:?} MADE block", self.block_producer.as_ref().unwrap().account_id); near_chain::test_utils::display_chain(&mut self.chain); - assert!(ret.is_ok(), format!("{:?}", ret)); + assert!( + ret.is_ok(), + format!("{:?}, me: {:?}", ret, self.block_producer.clone().map(|x| x.account_id)) + ); ret.map_err(|err| err.into()) } @@ -1370,7 +1396,9 @@ impl ClientActor { state_sync_info.shards.iter().map(|tuple| tuple.0).collect(), )? { StateSyncResult::Unchanged => {} - StateSyncResult::Changed => {} + StateSyncResult::Changed(fetch_block) => { + assert!(!fetch_block); + } StateSyncResult::Completed => { let accepted_blocks = Arc::new(RwLock::new(vec![])); let blocks_missing_chunks = Arc::new(RwLock::new(vec![])); @@ -1491,17 +1519,31 @@ impl ClientActor { }; let me = &self.block_producer.as_ref().map(|x| x.account_id.clone()); + let shards_to_sync = (0..self.runtime_adapter.num_shards()) + .filter(|x| match me { + Some(me) => { + self.shards_mgr.cares_about_shard_this_or_next_epoch(&me, sync_hash, *x) + } + None => false, + }) + .collect(); match unwrap_or_run_later!(self.state_sync.run( sync_hash, &mut new_shard_sync, &mut self.chain, &self.runtime_adapter, - // TODO: add tracking shards here. - vec![0], + shards_to_sync )) { StateSyncResult::Unchanged => (), - StateSyncResult::Changed => { - self.sync_status = SyncStatus::StateSync(sync_hash, new_shard_sync) + StateSyncResult::Changed(fetch_block) => { + self.sync_status = SyncStatus::StateSync(sync_hash, new_shard_sync); + if let Some(peer_info) = + most_weight_peer(&self.network_info.most_weight_peers) + { + if fetch_block { + self.request_block_by_hash(sync_hash, peer_info.peer_info.id); + } + } } StateSyncResult::Completed => { info!(target: "sync", "State sync: all shards are done"); diff --git a/chain/client/src/sync.rs b/chain/client/src/sync.rs index c6a768b8d3a..236c62df986 100644 --- a/chain/client/src/sync.rs +++ b/chain/client/src/sync.rs @@ -400,7 +400,8 @@ pub enum StateSyncResult { /// No shard has changed its status Unchanged, /// At least one shard has changed its status - Changed, + /// Boolean parameter specifies whether the client needs to start fetching the block + Changed(bool), /// The state for all shards was downloaded Completed, } @@ -410,11 +411,16 @@ pub struct StateSync { network_recipient: Recipient, prev_state_sync: HashMap>, + last_time_block_requested: Option>, } impl StateSync { pub fn new(network_recipient: Recipient) -> Self { - StateSync { network_recipient, prev_state_sync: Default::default() } + StateSync { + network_recipient, + prev_state_sync: Default::default(), + last_time_block_requested: None, + } } pub fn run( @@ -429,11 +435,32 @@ impl StateSync { debug!("MOO run state sync tracking shards: {:?}", tracking_shards); + let now = Utc::now(); + let (request_block, have_block) = if !chain.block_exists(&sync_hash)? { + match self.last_time_block_requested { + None => (true, false), + Some(last_time) => { + error!(target: "sync", "State sync: block request for {} timed out in {} seconds", sync_hash, STATE_SYNC_TIMEOUT); + (now - last_time >= Duration::seconds(STATE_SYNC_TIMEOUT), false) + } + } + } else { + (false, true) + }; + + if request_block { + self.last_time_block_requested = Some(now); + } + if tracking_shards.is_empty() { // This case is possible if a validator cares about the same shards in the new epoch as // in the previous (or about a subset of them), return success right away - return Ok(StateSyncResult::Completed); + return if !have_block { + Ok(StateSyncResult::Changed(request_block)) + } else { + Ok(StateSyncResult::Completed) + }; } // Check for errors @@ -449,13 +476,16 @@ impl StateSync { } } + if !have_block { + all_done = false + }; + if all_done { self.prev_state_sync.clear(); debug!("MOO omg it's completed"); return Ok(StateSyncResult::Completed); } - let now = Utc::now(); let mut update_sync_status = false; for shard_id in tracking_shards { let (go, download_timeout) = match self.prev_state_sync.get(&shard_id) { @@ -470,7 +500,7 @@ impl StateSync { }; if download_timeout { - error!(target: "sync", "State sync: state download for shard {} timed out in {} minutes", shard_id, STATE_SYNC_TIMEOUT); + error!(target: "sync", "State sync: state download for shard {} timed out in {} seconds", shard_id, STATE_SYNC_TIMEOUT); } if go || download_timeout { @@ -501,7 +531,11 @@ impl StateSync { update_sync_status = true; } } - Ok(if update_sync_status { StateSyncResult::Changed } else { StateSyncResult::Unchanged }) + Ok(if update_sync_status || request_block { + StateSyncResult::Changed(request_block) + } else { + StateSyncResult::Unchanged + }) } fn request_state( diff --git a/chain/client/src/test_utils.rs b/chain/client/src/test_utils.rs index 1a6d7ad5092..9ca22bd0fb1 100644 --- a/chain/client/src/test_utils.rs +++ b/chain/client/src/test_utils.rs @@ -260,7 +260,7 @@ pub fn setup_mock_all_validators( } } } - NetworkRequests::AnnounceAccount(announce_account) => { + NetworkRequests::AnnounceAccount(announce_account, _force) => { let mut aa = announced_accounts1.write().unwrap(); let key = (announce_account.account_id.clone(), announce_account.epoch_id.clone()); if aa.get(&key).is_none() { diff --git a/chain/network/src/peer_manager.rs b/chain/network/src/peer_manager.rs index a2b0273a7cc..ad6c581815b 100644 --- a/chain/network/src/peer_manager.rs +++ b/chain/network/src/peer_manager.rs @@ -189,7 +189,7 @@ impl PeerManagerActor { } fn ban_peer(&mut self, peer_id: &PeerId, ban_reason: ReasonForBan) { - info!(target: "network", "Banning peer {:?}", peer_id); + info!(target: "network", "Banning peer {:?} for {:?}", peer_id, ban_reason); self.active_peers.remove(&peer_id); unwrap_or_error!(self.peer_store.peer_ban(peer_id, ban_reason), "Failed to save peer data"); } @@ -384,11 +384,17 @@ impl PeerManagerActor { .spawn(ctx); } - fn announce_account(&mut self, ctx: &mut Context, mut announce_account: AnnounceAccount) { + fn announce_account( + &mut self, + ctx: &mut Context, + mut announce_account: AnnounceAccount, + force: bool, + ) { // If this is a new account send an announcement to random set of peers. - if self.routing_table.update(&announce_account).is_new() { - if announce_account.header().peer_id != self.peer_id { + if self.routing_table.update(&announce_account).is_new() || force { + if announce_account.peer_id_sender() != self.peer_id { // If this announcement was not sent by this peer, add peer information + assert!(!force); announce_account.extend(self.peer_id, &self.config.secret_key); } @@ -533,8 +539,8 @@ impl Handler for PeerManagerActor { self.ban_peer(&peer_id, ban_reason); NetworkResponses::NoResponse } - NetworkRequests::AnnounceAccount(announce_account) => { - self.announce_account(ctx, announce_account); + NetworkRequests::AnnounceAccount(announce_account, force) => { + self.announce_account(ctx, announce_account, force); NetworkResponses::NoResponse } NetworkRequests::ChunkPartRequest { account_id, part_request } => { diff --git a/chain/network/src/types.rs b/chain/network/src/types.rs index 56c40f4f75b..d68e5db7286 100644 --- a/chain/network/src/types.rs +++ b/chain/network/src/types.rs @@ -531,7 +531,6 @@ impl TryFrom for PeerMessage { gas_limit: state_response.prev_gas_limit.try_into()?, }, payload: state_response.payload, - block: proto_to_type(state_response.block)?, outgoing_receipts: ( outgoing_receipts_proto.hash.try_into()?, outgoing_receipts_proto @@ -681,7 +680,6 @@ impl From for network_proto::PeerMessage { prev_chunk_hash, prev_chunk_extra, payload, - block, outgoing_receipts, incoming_receipts, }) => { @@ -699,7 +697,6 @@ impl From for network_proto::PeerMessage { prev_gas_used: prev_chunk_extra.gas_used.into(), prev_gas_limit: prev_chunk_extra.gas_limit.into(), payload, - block: SingularPtrField::some(block.into()), outgoing_receipts: SingularPtrField::some( network_proto::StateResponseReceipts { hash: outgoing_receipts.0.into(), @@ -962,7 +959,7 @@ pub enum NetworkRequests { /// Ban given peer. BanPeer { peer_id: PeerId, ban_reason: ReasonForBan }, /// Announce account - AnnounceAccount(AnnounceAccount), + AnnounceAccount(AnnounceAccount, bool), /// Request chunk part ChunkPartRequest { account_id: AccountId, part_request: ChunkPartRequestMsg }, @@ -1023,7 +1020,6 @@ pub struct StateResponseInfo { pub prev_chunk_hash: ChunkHash, pub prev_chunk_extra: ChunkExtra, pub payload: Vec, - pub block: Block, pub outgoing_receipts: (CryptoHash, Vec), pub incoming_receipts: Vec<(CryptoHash, Vec)>, } diff --git a/core/protos/protos/network.proto b/core/protos/protos/network.proto index 9c0dc8f5950..5b3d9965234 100644 --- a/core/protos/protos/network.proto +++ b/core/protos/protos/network.proto @@ -62,7 +62,6 @@ message StateResponse { uint64 prev_gas_limit = 6; repeated ValidatorStake validator_proposals = 7; bytes payload = 8; - Block block = 9; StateResponseReceipts outgoing_receipts = 10; repeated StateResponseReceipts incoming_receipts = 11; } diff --git a/near/src/config.rs b/near/src/config.rs index 548b4923cc6..f7d0e409bd1 100644 --- a/near/src/config.rs +++ b/near/src/config.rs @@ -13,6 +13,7 @@ use rand::distributions::Alphanumeric; use rand::{thread_rng, Rng}; use serde_derive::{Deserialize, Serialize}; +use crate::runtime::account_id_to_shard_id; use near_client::BlockProducer; use near_client::ClientConfig; use near_jsonrpc::RpcConfig; @@ -24,7 +25,7 @@ use near_primitives::crypto::signer::{EDSigner, InMemorySigner, KeyFile}; use near_primitives::hash::hash; use near_primitives::serialize::{to_base64, u128_dec_format}; use near_primitives::types::{ - AccountId, Balance, BlockIndex, GasUsage, ReadablePublicKey, ValidatorId, + AccountId, Balance, BlockIndex, GasUsage, ReadablePublicKey, ShardId, ValidatorId, }; use near_telemetry::TelemetryConfig; use node_runtime::config::RuntimeConfig; @@ -375,9 +376,14 @@ fn get_initial_supply(records: &[Vec]) -> Balance { } impl GenesisConfig { - pub fn legacy_test(seeds: Vec<&str>, num_validators: usize) -> Self { + pub fn legacy_test( + seeds: Vec<&str>, + num_validators: usize, + validators_per_shard: Vec, + ) -> Self { let mut validators = vec![]; - let mut records = vec![vec![]]; + let mut records = validators_per_shard.iter().map(|_| vec![]).collect::>(); + let num_shards = validators_per_shard.len() as ShardId; let default_test_contract = include_bytes!("../../runtime/wasm/runtest/res/wasm_with_mem.wasm").as_ref(); let encoded_test_contract = to_base64(default_test_contract); @@ -391,7 +397,8 @@ impl GenesisConfig { amount: TESTING_INIT_STAKE, }); } - records[0].push(StateRecord::Account { + let shard_id = account_id_to_shard_id(&account.to_string(), num_shards) as usize; + records[shard_id].push(StateRecord::Account { account_id: account.to_string(), account: Account { nonce: 0, @@ -404,7 +411,7 @@ impl GenesisConfig { storage_paid_at: 0, }, }); - records[0].push(StateRecord::Contract { + records[shard_id].push(StateRecord::Contract { account_id: account.to_string(), code: encoded_test_contract.clone(), }); @@ -423,8 +430,8 @@ impl GenesisConfig { genesis_time: Utc::now(), chain_id: random_chain_id(), num_block_producers: num_validators, - block_producers_per_shard: vec![num_validators], - avg_fisherman_per_shard: vec![0], + block_producers_per_shard: validators_per_shard.clone(), + avg_fisherman_per_shard: validators_per_shard.iter().map(|_| 0).collect(), dynamic_resharding: false, epoch_length: FAST_EPOCH_LENGTH, gas_limit: INITIAL_GAS_LIMIT, @@ -445,7 +452,12 @@ impl GenesisConfig { pub fn test(seeds: Vec<&str>) -> Self { let num_validators = seeds.len(); - Self::legacy_test(seeds, num_validators) + Self::legacy_test(seeds, num_validators, vec![num_validators]) + } + + pub fn test_sharded(seeds: Vec<&str>, validators_per_shard: Vec) -> Self { + let num_validators = seeds.len(); + Self::legacy_test(seeds, num_validators, validators_per_shard) } pub fn testing_spec(num_accounts: usize, num_validators: usize) -> Self { diff --git a/near/src/runtime.rs b/near/src/runtime.rs index 3346c69cb3e..b3f1695ab2f 100644 --- a/near/src/runtime.rs +++ b/near/src/runtime.rs @@ -8,7 +8,7 @@ use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt}; use kvdb::DBValue; use log::{debug, info}; -use near_chain::types::ApplyTransactionResult; +use near_chain::types::{ApplyTransactionResult, ValidatorSignatureVerificationResult}; use near_chain::{BlockHeader, Error, ErrorKind, RuntimeAdapter, ValidTransaction, Weight}; use near_epoch_manager::{BlockInfo, EpochConfig, EpochManager, RewardCalculator}; use near_primitives::account::{AccessKey, Account}; @@ -115,11 +115,11 @@ impl NightshadeRuntime { account.stake += *reward; } - assert!(account.stake >= max_of_stakes, "FATAL: staking invariance does not hold"); println!( "account {} stake {} max_of_stakes: {}", account_id, account.stake, max_of_stakes ); + assert!(account.stake >= max_of_stakes, "FATAL: staking invariance does not hold"); let return_stake = account.stake - max_of_stakes; account.stake -= return_stake; account.amount += return_stake; @@ -142,6 +142,11 @@ impl NightshadeRuntime { } } +pub fn account_id_to_shard_id(account_id: &AccountId, num_shards: ShardId) -> ShardId { + let mut cursor = Cursor::new((hash(&account_id.clone().into_bytes()).0).0); + cursor.read_u64::().expect("Must not happened") % (num_shards) +} + impl RuntimeAdapter for NightshadeRuntime { fn genesis_state(&self) -> (StoreUpdate, Vec) { let mut store_update = self.store.store_update(); @@ -194,13 +199,18 @@ impl RuntimeAdapter for NightshadeRuntime { account_id: &AccountId, data: &[u8], signature: &Signature, - ) -> bool { + ) -> ValidatorSignatureVerificationResult { let mut epoch_manager = self.epoch_manager.write().expect(POISONED_LOCK_ERR); if let Ok(Some(validator)) = epoch_manager.get_validator_by_account_id(epoch_id, account_id) { - return verify(data, signature, &validator.public_key); + if verify(data, signature, &validator.public_key) { + ValidatorSignatureVerificationResult::Valid + } else { + ValidatorSignatureVerificationResult::Invalid + } + } else { + ValidatorSignatureVerificationResult::UnknownEpoch } - false } fn verify_chunk_header_signature(&self, header: &ShardChunkHeader) -> Result { @@ -277,8 +287,7 @@ impl RuntimeAdapter for NightshadeRuntime { } fn account_id_to_shard_id(&self, account_id: &AccountId) -> ShardId { - let mut cursor = Cursor::new((hash(&account_id.clone().into_bytes()).0).0); - cursor.read_u64::().expect("Must not happened") % (self.num_shards()) + account_id_to_shard_id(account_id, self.num_shards()) } fn get_part_owner(&self, parent_hash: &CryptoHash, part_id: u64) -> Result { diff --git a/near/tests/sync_state_nodes.rs b/near/tests/sync_state_nodes.rs index 01fa58125a7..825b6e32162 100644 --- a/near/tests/sync_state_nodes.rs +++ b/near/tests/sync_state_nodes.rs @@ -8,6 +8,7 @@ use near::{load_test_config, start_with_config, GenesisConfig}; use near_client::GetBlock; use near_network::test_utils::{convert_boot_nodes, open_port, WaitOrTimeout}; use near_primitives::test_utils::{heavy_test, init_test_logger}; +use std::time::Duration; /// One client is in front, another must sync to it using state (fast) sync. #[test] @@ -86,3 +87,118 @@ fn sync_state_nodes() { system.run().unwrap(); }); } + +/// One client is in front, another must sync to it using state (fast) sync. +#[test] +fn sync_state_nodes_multishard() { + heavy_test(|| { + init_test_logger(); + + let genesis_config = + GenesisConfig::test_sharded(vec!["test1", "test2", "test3", "test4"], vec![2, 2]); + + let system = System::new("NEAR"); + + let (port1, port2, port3, port4) = (open_port(), open_port(), open_port(), open_port()); + + let mut near1 = load_test_config("test1", port1, &genesis_config); + near1.network_config.boot_nodes = + convert_boot_nodes(vec![("test3", port3), ("test4", port4)]); + near1.client_config.min_num_peers = 2; + near1.client_config.min_block_production_delay = Duration::from_millis(200); + near1.client_config.max_block_production_delay = Duration::from_millis(400); + + let mut near3 = load_test_config("test3", port3, &genesis_config); + near3.network_config.boot_nodes = + convert_boot_nodes(vec![("test1", port1), ("test4", port4)]); + near3.client_config.min_num_peers = 2; + near3.client_config.min_block_production_delay = + near1.client_config.min_block_production_delay; + near3.client_config.max_block_production_delay = + near1.client_config.max_block_production_delay; + + let mut near4 = load_test_config("test4", port4, &genesis_config); + near4.network_config.boot_nodes = + convert_boot_nodes(vec![("test1", port1), ("test3", port3)]); + near4.client_config.min_num_peers = 2; + near4.client_config.min_block_production_delay = + near1.client_config.min_block_production_delay; + near4.client_config.max_block_production_delay = + near1.client_config.max_block_production_delay; + + let dir1 = TempDir::new("sync_nodes_1").unwrap(); + let (_, view_client1) = start_with_config(dir1.path(), near1); + + let dir3 = TempDir::new("sync_nodes_3").unwrap(); + let (_, _) = start_with_config(dir3.path(), near3); + + let dir4 = TempDir::new("sync_nodes_4").unwrap(); + let (_, _) = start_with_config(dir4.path(), near4); + + let view_client2_holder = Arc::new(RwLock::new(None)); + + WaitOrTimeout::new( + Box::new(move |_ctx| { + if view_client2_holder.read().unwrap().is_none() { + let view_client2_holder2 = view_client2_holder.clone(); + let genesis_config2 = genesis_config.clone(); + + actix::spawn(view_client1.send(GetBlock::Best).then(move |res| { + match &res { + Ok(Ok(b)) if b.header.height >= 101 => { + let mut view_client2_holder2 = + view_client2_holder2.write().unwrap(); + + if view_client2_holder2.is_none() { + let mut near2 = + load_test_config("test2", port2, &genesis_config2); + near2.client_config.skip_sync_wait = false; + near2.client_config.min_num_peers = 3; + near2.client_config.min_block_production_delay = + Duration::from_millis(200); + near2.client_config.max_block_production_delay = + Duration::from_millis(400); + near2.network_config.boot_nodes = convert_boot_nodes(vec![ + ("test1", port1), + ("test3", port3), + ("test4", port4), + ]); + + let dir2 = TempDir::new("sync_nodes_2").unwrap(); + let (_, view_client2) = start_with_config(dir2.path(), near2); + *view_client2_holder2 = Some(view_client2); + } + } + Ok(Ok(b)) if b.header.height < 101 => { + println!("FIRST STAGE {}", b.header.height) + } + Err(_) => return futures::future::err(()), + _ => {} + }; + futures::future::ok(()) + })); + } + + if let Some(view_client2) = &*view_client2_holder.write().unwrap() { + actix::spawn(view_client2.send(GetBlock::Best).then(|res| { + match &res { + Ok(Ok(b)) if b.header.height >= 101 => System::current().stop(), + Ok(Ok(b)) if b.header.height < 101 => { + println!("SECOND STAGE {}", b.header.height) + } + Err(_) => return futures::future::err(()), + _ => {} + }; + futures::future::ok(()) + })); + } else { + } + }), + 100, + 60000, + ) + .start(); + + system.run().unwrap(); + }); +} diff --git a/tests/test_cases_runtime.rs b/tests/test_cases_runtime.rs index 464545e8492..4f51c655667 100644 --- a/tests/test_cases_runtime.rs +++ b/tests/test_cases_runtime.rs @@ -1,28 +1,34 @@ #[cfg(test)] mod test { use near::GenesisConfig; + use near_primitives::serialize::to_base64; + use near_primitives::utils::key_for_data; + use node_runtime::StateRecord; use testlib::node::RuntimeNode; use testlib::runtime_utils::{alice_account, bob_account}; use testlib::standard_test_cases::*; - use node_runtime::StateRecord; - use near_primitives::utils::key_for_data; - use near_primitives::serialize::to_base64; fn create_runtime_node() -> RuntimeNode { RuntimeNode::new(&alice_account()) } fn create_runtime_with_expensive_storage() -> RuntimeNode { - let mut genesis_config = - GenesisConfig::legacy_test(vec![&alice_account(), &bob_account(), "carol.near"], 1); + let mut genesis_config = GenesisConfig::legacy_test( + vec![&alice_account(), &bob_account(), "carol.near"], + 1, + vec![1], + ); // Set expensive state rent and add alice more money. genesis_config.runtime_config.storage_cost_byte_per_block = 100_000_000_000_000; genesis_config.runtime_config.poke_threshold = 10; match &mut genesis_config.records[0][0] { - StateRecord::Account { account, .. } => { account.amount = 10_000_000_000_000_000_000 }, + StateRecord::Account { account, .. } => account.amount = 10_000_000_000_000_000_000, _ => {} } - genesis_config.records[0].push(StateRecord::Data { key: to_base64(&key_for_data(&bob_account(), b"test")), value: to_base64(b"123") }); + genesis_config.records[0].push(StateRecord::Data { + key: to_base64(&key_for_data(&bob_account(), b"test")), + value: to_base64(b"123"), + }); RuntimeNode::new_from_genesis(&alice_account(), genesis_config) } diff --git a/tests/test_errors.rs b/tests/test_errors.rs index 4f91b4ce4b1..788fc8149f0 100644 --- a/tests/test_errors.rs +++ b/tests/test_errors.rs @@ -8,7 +8,7 @@ use testlib::node::{Node, ThreadNode}; fn start_node() -> ThreadNode { init_integration_logger(); - let genesis_config = GenesisConfig::legacy_test(vec!["alice.near", "bob.near"], 1); + let genesis_config = GenesisConfig::legacy_test(vec!["alice.near", "bob.near"], 1, vec![1]); let mut near_config = load_test_config("alice.near", open_port(), &genesis_config); near_config.client_config.skip_sync_wait = true;