diff --git a/chain/chain/src/chain.rs b/chain/chain/src/chain.rs index df901512da7..792d22c6dd8 100644 --- a/chain/chain/src/chain.rs +++ b/chain/chain/src/chain.rs @@ -777,11 +777,10 @@ impl<'a> ChainUpdate<'a> { self.chain_store_update.save_post_state_root(&chunk_hash, &state_root); // Save resulting receipts. for (receipt_shard_id, receipts) in new_receipts.drain() { - self.chain_store_update.save_receipt( - &block.hash(), - receipt_shard_id, - receipts, - ); + // The receipts in store are indexed by the SOURCE shard_id, not destination, + // since they are later retrieved by the chunk producer of the source + // shard to be distributed to the recipients. + self.chain_store_update.save_receipt(&block.hash(), shard_id, receipts); } // Save receipt and transaction results. for (i, tx_result) in tx_results.drain(..).enumerate() { diff --git a/chain/chain/src/test_utils.rs b/chain/chain/src/test_utils.rs index 13ddfe6748c..c5614a03e9e 100644 --- a/chain/chain/src/test_utils.rs +++ b/chain/chain/src/test_utils.rs @@ -1,15 +1,16 @@ use std::collections::HashMap; -use std::sync::Arc; +use std::sync::{Arc, RwLock}; use chrono::Utc; +use log::info; use near_primitives::crypto::signature::{PublicKey, Signature}; use near_primitives::crypto::signer::InMemorySigner; use near_primitives::hash::{hash, CryptoHash}; use near_primitives::rpc::{AccountViewCallResult, QueryResponse}; use near_primitives::test_utils::get_public_key_from_seed; use near_primitives::transaction::{ - ReceiptTransaction, SignedTransaction, TransactionResult, TransactionStatus, + AsyncCall, ReceiptTransaction, SignedTransaction, TransactionResult, TransactionStatus, }; use near_primitives::types::{AccountId, BlockIndex, MerkleHash, ShardId}; use near_store::test_utils::create_test_store; @@ -18,7 +19,10 @@ use near_store::{Store, StoreUpdate, Trie, TrieChanges, WrappedTrieChanges}; use crate::error::{Error, ErrorKind}; use crate::types::{BlockHeader, ReceiptResult, RuntimeAdapter, Weight}; use crate::{Chain, ValidTransaction}; +use near_primitives::merkle::merklize; use near_primitives::sharding::ShardChunkHeader; +use near_primitives::transaction::ReceiptBody::NewCall; +use near_primitives::transaction::TransactionBody::SendMoney; /// Simple key value runtime for tests. pub struct KeyValueRuntime { @@ -27,6 +31,13 @@ pub struct KeyValueRuntime { root: MerkleHash, validators: Vec<(AccountId, PublicKey)>, validators_per_shard: u64, + + // A mapping state_root => {account id => amounts}, for transactions and receipts + amounts: RwLock>>, +} + +pub fn account_id_to_shard_id(account_id: &AccountId, num_shards: ShardId) -> ShardId { + ((hash(&account_id.clone().into_bytes()).0).0[0] as u64) % num_shards } impl KeyValueRuntime { @@ -40,6 +51,13 @@ impl KeyValueRuntime { validators_per_shard: u64, ) -> Self { let trie = Arc::new(Trie::new(store.clone())); + let mut initial_amounts = HashMap::new(); + for (i, validator) in validators.iter().enumerate() { + initial_amounts.insert(validator.clone(), (1000 + 100 * i) as u128); + } + + let mut amounts = HashMap::new(); + amounts.insert(MerkleHash::default(), initial_amounts); KeyValueRuntime { store, trie, @@ -49,6 +67,7 @@ impl KeyValueRuntime { .map(|account_id| (account_id.clone(), get_public_key_from_seed(account_id))) .collect(), validators_per_shard, + amounts: RwLock::new(amounts), } } @@ -98,7 +117,7 @@ impl RuntimeAdapter for KeyValueRuntime { height: BlockIndex, shard_id: ShardId, ) -> Result> { - assert!((self.validators.len() as u64) % self.num_shards() == 0); + assert_eq!((self.validators.len() as u64) % self.num_shards(), 0); let validators_per_shard = self.validators_per_shard; let offset = (shard_id / validators_per_shard * validators_per_shard) as usize; // The +1 is so that if all validators validate all shards in a test, the chunk producer @@ -128,7 +147,7 @@ impl RuntimeAdapter for KeyValueRuntime { } fn account_id_to_shard_id(&self, account_id: &AccountId) -> ShardId { - ((hash(&account_id.clone().into_bytes()).0).0[0] as u64) % self.num_shards() + account_id_to_shard_id(account_id, self.num_shards()) } fn get_part_owner( @@ -145,7 +164,7 @@ impl RuntimeAdapter for KeyValueRuntime { _height: BlockIndex, shard_id: ShardId, ) -> bool { - assert!((self.validators.len() as u64) % self.num_shards() == 0); + assert_eq!((self.validators.len() as u64) % self.num_shards(), 0); let validators_per_shard = self.validators_per_shard; let offset = (shard_id / validators_per_shard * validators_per_shard) as usize; for validator in self.validators[offset..offset + (validators_per_shard as usize)].iter() { @@ -167,44 +186,151 @@ impl RuntimeAdapter for KeyValueRuntime { fn apply_transactions( &self, - _shard_id: ShardId, + shard_id: ShardId, state_root: &MerkleHash, _block_index: BlockIndex, _prev_block_hash: &CryptoHash, - _receipts: &Vec, + receipts: &Vec, transactions: &Vec, ) -> Result< (WrappedTrieChanges, MerkleHash, Vec, ReceiptResult), Box, > { let mut tx_results = vec![]; - for _ in transactions { - tx_results.push(TransactionResult { - status: TransactionStatus::Completed, - logs: vec![], - receipts: vec![], - result: None, - }); + + let mut accounts_mapping = + self.amounts.read().unwrap().get(state_root).cloned().unwrap_or_else(|| HashMap::new()); + + let mut balance_transfers = vec![]; + + for receipt in receipts.iter() { + if let NewCall(call) = &receipt.body { + info!( + "MOO applying receipt in shard {} from {} to {} amt {}", + shard_id, receipt.originator, receipt.receiver, call.amount + ); + assert_eq!(self.account_id_to_shard_id(&receipt.receiver), shard_id); + balance_transfers.push(( + receipt.originator.clone(), + receipt.receiver.clone(), + call.amount, + )); + } else { + unreachable!(); + } } + + for transaction in transactions { + if let SendMoney(send_money_tx) = &transaction.body { + info!( + "MOO applying transaction from {} to {} amt {}", + send_money_tx.originator, send_money_tx.receiver, send_money_tx.amount + ); + assert_eq!(self.account_id_to_shard_id(&send_money_tx.originator), shard_id); + balance_transfers.push(( + send_money_tx.originator.clone(), + send_money_tx.receiver.clone(), + send_money_tx.amount, + )); + } else { + unreachable!(); + } + } + + let mut new_receipts = HashMap::new(); + + for (from, to, amount) in balance_transfers { + let mut good_to_go = false; + + if self.account_id_to_shard_id(&from) != shard_id { + // This is a receipt, was already debited + good_to_go = true; + } else if let Some(balance) = accounts_mapping.get(&from) { + if *balance >= amount { + let new_balance = balance - amount; + accounts_mapping.insert(from.clone(), new_balance); + good_to_go = true; + } + } + + if good_to_go { + let new_receipt_hashes = if self.account_id_to_shard_id(&to) == shard_id { + accounts_mapping + .insert(to.clone(), accounts_mapping.get(&to).unwrap_or(&0) + amount); + vec![] + } else { + info!( + "MOO creating receipt from shard {} from {} to {} amt {}", + shard_id, from, to, amount + ); + let receipt = ReceiptTransaction::new( + from.clone(), + to, + CryptoHash::default(), + NewCall(AsyncCall::new(vec![], vec![], amount, from)), + ); + let receipt_hash = receipt.get_hash(); + new_receipts.entry(receipt.shard_id()).or_insert_with(|| vec![]).push(receipt); + vec![receipt_hash] + }; + + tx_results.push(TransactionResult { + status: TransactionStatus::Completed, + logs: vec![], + receipts: new_receipt_hashes, + result: None, + }); + } + } + + let mut new_balances = vec![]; + for (name, _) in self.validators.iter() { + let mut seen = false; + for (key, value) in accounts_mapping.iter() { + if key == name { + assert!(!seen); + seen = true; + new_balances.push(*value); + } + } + if !seen { + new_balances.push(0); + } + } + let (new_state_root, _) = merklize(&new_balances); + self.amounts.write().unwrap().insert(new_state_root, accounts_mapping); + + info!( + "MOO Applied transactions in shard {}, new state root {}, balances {:?}", + shard_id, new_state_root, new_balances + ); + Ok(( WrappedTrieChanges::new(self.trie.clone(), TrieChanges::empty(state_root.clone())), - *state_root, + new_state_root, tx_results, - HashMap::default(), + new_receipts, )) } fn query( &self, - _state_root: MerkleHash, + state_root: MerkleHash, _height: BlockIndex, path: Vec<&str>, _data: &[u8], ) -> Result> { + let account_id = path[1].to_string(); + let account_id2 = account_id.clone(); Ok(QueryResponse::ViewAccount(AccountViewCallResult { - account_id: path[1].to_string(), + account_id, nonce: 0, - amount: 1000, + amount: self + .amounts + .read() + .unwrap() + .get(&state_root) + .map_or_else(|| 0, |mapping| *mapping.get(&account_id2).unwrap_or(&0)), stake: 0, public_keys: vec![], code_hash: CryptoHash::default(), diff --git a/chain/client/Cargo.toml b/chain/client/Cargo.toml index e9cc1050eff..b392c731c96 100644 --- a/chain/client/Cargo.toml +++ b/chain/client/Cargo.toml @@ -21,3 +21,6 @@ near-chain = { path = "../chain" } near-network = { path = "../network" } near-pool = { path = "../pool" } near-chunks = { path = "../chunks" } + +[features] +expensive_tests = [] diff --git a/chain/client/src/client.rs b/chain/client/src/client.rs index 5e3206c8b21..d5a91a7ac0d 100644 --- a/chain/client/src/client.rs +++ b/chain/client/src/client.rs @@ -520,7 +520,7 @@ impl ClientActor { let transactions = self.shards_mgr.prepare_transactions(shard_id, self.config.block_expected_weight)?; - debug!("Creating a chunk with {} transactions for shard {}", transactions.len(), shard_id); + info!("Creating a chunk with {} transactions for shard {}", transactions.len(), shard_id); let mut receipts = vec![]; let mut receipts_block_hash = prev_block_hash.clone(); diff --git a/chain/client/src/test_utils.rs b/chain/client/src/test_utils.rs index d24bf6a08ca..265d468e88b 100644 --- a/chain/client/src/test_utils.rs +++ b/chain/client/src/test_utils.rs @@ -5,7 +5,9 @@ use actix::{Actor, Addr, AsyncContext, Context, Recipient}; use chrono::{DateTime, Utc}; use near_chain::test_utils::KeyValueRuntime; -use near_network::{NetworkRequests, NetworkResponses, PeerManagerActor}; +use near_network::{ + NetworkClientMessages, NetworkRequests, NetworkResponses, PeerInfo, PeerManagerActor, +}; use near_primitives::crypto::signer::InMemorySigner; use near_store::test_utils::create_test_store; @@ -79,23 +81,92 @@ pub fn setup_mock( /// Sets up ClientActor and ViewClientActor with mock PeerManager. pub fn setup_mock_all_validators( validators: Vec<&'static str>, + key_pairs: Vec, validators_per_shard: u64, skip_sync_wait: bool, - network_mock: Arc NetworkResponses>>, + network_mock: Arc (NetworkResponses, bool)>>, ) -> Vec<(Addr, Addr)> { let validators_clone = validators.clone(); + let key_pairs = key_pairs.clone(); let genesis_time = Utc::now(); let mut ret = vec![]; + + let connectors: Arc, Addr)>>> = + Arc::new(RwLock::new(vec![])); + + // Lock the connectors so that none of the threads spawned below access them until we overwrite + // them at the end of this function + let mut locked_connectors = connectors.write().unwrap(); + for account_id in validators { let view_client_addr = Arc::new(RwLock::new(None)); let view_client_addr1 = view_client_addr.clone(); let validators_clone1 = validators_clone.clone(); + let validators_clone2 = validators_clone.clone(); + let key_pairs = key_pairs.clone(); + let connectors1 = connectors.clone(); let network_mock1 = network_mock.clone(); let client_addr = ClientActor::create(move |ctx| { let _client_addr = ctx.address(); let pm = NetworkMock::mock(Box::new(move |msg, _ctx| { let msg = msg.downcast_ref::().unwrap(); - let resp = network_mock1.write().unwrap().deref_mut()(account_id.to_string(), msg); + let (resp, perform_default) = + network_mock1.write().unwrap().deref_mut()(account_id.to_string(), msg); + + if perform_default { + let mut my_key_pair = None; + for (i, name) in validators_clone2.iter().enumerate() { + if *name == account_id { + my_key_pair = Some(key_pairs[i].clone()); + } + } + let my_key_pair = my_key_pair.unwrap(); + + match msg { + NetworkRequests::Block { block } => { + for (client, _) in connectors1.write().unwrap().iter() { + client.do_send(NetworkClientMessages::Block( + block.clone(), + PeerInfo::random().id, + false, + )) + } + } + NetworkRequests::ChunkOnePart { account_id, header_and_part } => { + for (i, name) in validators_clone2.iter().enumerate() { + if name == account_id { + connectors1.write().unwrap()[i].0.do_send( + NetworkClientMessages::ChunkOnePart( + header_and_part.clone(), + ), + ); + } + } + } + NetworkRequests::ChunkPartRequest { account_id, part_request } => { + for (i, name) in validators_clone2.iter().enumerate() { + if name == account_id { + connectors1.write().unwrap()[i].0.do_send( + NetworkClientMessages::ChunkPartRequest( + part_request.clone(), + my_key_pair.id, + ), + ); + } + } + } + NetworkRequests::ChunkPart { peer_id, part } => { + for (i, peer_info) in key_pairs.iter().enumerate() { + if peer_info.id == *peer_id { + connectors1.write().unwrap()[i] + .0 + .do_send(NetworkClientMessages::ChunkPart(part.clone())); + } + } + } + _ => {} + }; + } Box::new(Some(resp)) })) .start(); @@ -112,6 +183,7 @@ pub fn setup_mock_all_validators( }); ret.push((client_addr, view_client_addr.clone().read().unwrap().clone().unwrap())); } + *locked_connectors = ret.clone(); ret } diff --git a/chain/client/tests/chunks_management.rs b/chain/client/tests/chunks_management.rs index adf0f53373f..14d3939565f 100644 --- a/chain/client/tests/chunks_management.rs +++ b/chain/client/tests/chunks_management.rs @@ -24,13 +24,14 @@ fn chunks_produced_and_distributed_one_val_per_shard() { chunks_produced_and_distributed_common(1); } -/// Runs block producing client and stops after network mock received two blocks. +/// Runs block producing client and stops after network mock received seven blocks +/// Confirms that the blocks form a chain (which implies the chunks are distributed). +/// Confirms that the number of messages transmitting the chunks matches the expected number. fn chunks_produced_and_distributed_common(validators_per_shard: u64) { init_test_logger(); System::run(move || { let connectors: Arc, Addr)>>> = Arc::new(RwLock::new(vec![])); - let connectors1 = connectors.clone(); let heights = Arc::new(RwLock::new(HashMap::new())); let heights1 = heights.clone(); @@ -54,17 +55,10 @@ fn chunks_produced_and_distributed_common(validators_per_shard: u64) { *connectors.write().unwrap() = setup_mock_all_validators( validators.clone(), + key_pairs.clone(), validators_per_shard, true, - Arc::new(RwLock::new(move |account_id: String, msg: &NetworkRequests| { - let mut my_key_pair = None; - for (i, name) in validators.iter().enumerate() { - if *name == account_id { - my_key_pair = Some(key_pairs[i].clone()); - } - } - let my_key_pair = my_key_pair.unwrap(); - + Arc::new(RwLock::new(move |_account_id: String, msg: &NetworkRequests| { match msg { NetworkRequests::Block { block } => { check_height(block.hash(), block.header.height); @@ -106,51 +100,19 @@ fn chunks_produced_and_distributed_common(validators_per_shard: u64) { System::current().stop(); } - - for (client, _) in connectors1.write().unwrap().iter() { - client.do_send(NetworkClientMessages::Block( - block.clone(), - PeerInfo::random().id, - false, - )) - } } - NetworkRequests::ChunkOnePart { account_id, header_and_part } => { + NetworkRequests::ChunkOnePart { account_id: _, header_and_part: _ } => { one_part_msgs += 1; - for (i, name) in validators.iter().enumerate() { - if name == account_id { - connectors1.write().unwrap()[i].0.do_send( - NetworkClientMessages::ChunkOnePart(header_and_part.clone()), - ); - } - } } - NetworkRequests::ChunkPartRequest { account_id, part_request } => { + NetworkRequests::ChunkPartRequest { account_id: _, part_request: _ } => { part_request_msgs += 1; - for (i, name) in validators.iter().enumerate() { - if name == account_id { - connectors1.write().unwrap()[i].0.do_send( - NetworkClientMessages::ChunkPartRequest( - part_request.clone(), - my_key_pair.id, - ), - ); - } - } } - NetworkRequests::ChunkPart { peer_id, part } => { + NetworkRequests::ChunkPart { peer_id: _, part: _ } => { part_msgs += 1; - for (i, peer_info) in key_pairs.iter().enumerate() { - if peer_info.id == *peer_id { - connectors1.write().unwrap()[i] - .0 - .do_send(NetworkClientMessages::ChunkPart(part.clone())); - } - } } _ => {} }; - NetworkResponses::NoResponse + (NetworkResponses::NoResponse, true) })), ); diff --git a/chain/client/tests/cross_shard_tx.rs b/chain/client/tests/cross_shard_tx.rs new file mode 100644 index 00000000000..16fdc486e6a --- /dev/null +++ b/chain/client/tests/cross_shard_tx.rs @@ -0,0 +1,285 @@ +use actix::{Addr, System}; +use futures::future; +use futures::future::Future; +use near_client::test_utils::setup_mock_all_validators; +use near_client::{ClientActor, Query, ViewClientActor}; +use near_network::{NetworkRequests, NetworkResponses, PeerInfo}; +use near_primitives::rpc::QueryResponse::ViewAccount; +use near_primitives::test_utils::init_test_logger; +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::{Arc, RwLock}; + +/// Tests that the KeyValueRuntime properly sets balances in genesis and makes them queriable +#[test] +fn test_keyvalue_runtime_balances() { + let validators_per_shard = 2; + let successful_queries = Arc::new(AtomicUsize::new(0)); + init_test_logger(); + System::run(move || { + let connectors: Arc, Addr)>>> = + Arc::new(RwLock::new(vec![])); + + let validators = vec!["test1", "test2", "test3", "test4"]; + let key_pairs = + vec![PeerInfo::random(), PeerInfo::random(), PeerInfo::random(), PeerInfo::random()]; + + *connectors.write().unwrap() = setup_mock_all_validators( + validators.clone(), + key_pairs.clone(), + validators_per_shard, + true, + Arc::new(RwLock::new(move |_account_id: String, _msg: &NetworkRequests| { + (NetworkResponses::NoResponse, true) + })), + ); + + let connectors_ = connectors.write().unwrap(); + for i in 0..4 { + let expected = (1000 + i * 100) as u128; + + let successful_queries2 = successful_queries.clone(); + actix::spawn( + connectors_[i] + .1 + .send(Query { path: "account/".to_owned() + validators[i], data: vec![] }) + .then(move |res| { + let query_responce = res.unwrap().unwrap(); + if let ViewAccount(view_account_result) = query_responce { + assert_eq!(view_account_result.amount, expected); + successful_queries2.fetch_add(1, Ordering::Relaxed); + if successful_queries2.load(Ordering::Relaxed) >= 4 { + System::current().stop(); + } + } + future::result(Ok(())) + }), + ); + } + + near_network::test_utils::wait_or_panic(5000); + }) + .unwrap(); +} + +#[cfg(test)] +#[cfg(feature = "expensive_tests")] +mod tests { + use actix::{Addr, MailboxError, System}; + use futures::future; + use futures::future::Future; + use near_chain::test_utils::account_id_to_shard_id; + use near_client::test_utils::setup_mock_all_validators; + use near_client::{ClientActor, Query, ViewClientActor}; + use near_network::{NetworkClientMessages, NetworkRequests, NetworkResponses, PeerInfo}; + use near_primitives::rpc::QueryResponse; + use near_primitives::rpc::QueryResponse::ViewAccount; + use near_primitives::test_utils::init_test_logger; + use near_primitives::transaction::SignedTransaction; + use std::sync::atomic::{AtomicUsize, Ordering}; + use std::sync::{Arc, RwLock}; + + fn test_cross_shard_tx_callback( + res: Result, MailboxError>, + connectors: Arc, Addr)>>>, + iteration: Arc, + validators: Vec<&'static str>, + successful_queries: Arc, + unsuccessful_queries: Arc, + balances: Arc>>, + observed_balances: Arc>>, + num_iters: usize, + ) { + let query_responce = res.unwrap().unwrap(); + if let ViewAccount(view_account_result) = query_responce { + let mut expected = 0; + let mut account_id = "".to_owned(); + for i in 0..8 { + if validators[i] == view_account_result.account_id { + expected = balances.read().unwrap()[i]; + account_id = view_account_result.account_id.clone(); + observed_balances.write().unwrap()[i] = view_account_result.amount; + } + } + if view_account_result.amount == expected { + successful_queries.fetch_add(1, Ordering::Relaxed); + if successful_queries.load(Ordering::Relaxed) >= 8 { + println!("Finished iteration {}", iteration.load(Ordering::Relaxed)); + + iteration.fetch_add(1, Ordering::Relaxed); + let iteration_local = iteration.load(Ordering::Relaxed); + if iteration_local > num_iters { + System::current().stop(); + } + + let connectors_ = connectors.write().unwrap(); + + let from = iteration_local % 8; + let to = (iteration_local / 8) % 8; + let amount = (5 + iteration_local) as u128; + connectors_[account_id_to_shard_id(&validators[from].to_string(), 8) as usize] + .0 + .do_send(NetworkClientMessages::Transaction( + SignedTransaction::create_payment_tx( + validators[from].to_string(), + validators[to].to_string(), + amount, + ), + )); + let mut balances_local = balances.write().unwrap(); + balances_local[from] -= amount; + balances_local[to] += amount; + + successful_queries.store(0, Ordering::Relaxed); + unsuccessful_queries.store(0, Ordering::Relaxed); + + // Send the initial balance queries for the iteration + for i in 0..8 { + let connectors1 = connectors.clone(); + let iteration1 = iteration.clone(); + let validators1 = validators.clone(); + let successful_queries1 = successful_queries.clone(); + let unsuccessful_queries1 = unsuccessful_queries.clone(); + let balances1 = balances.clone(); + let observed_balances1 = observed_balances.clone(); + actix::spawn( + connectors_ + [account_id_to_shard_id(&validators[i].to_string(), 8) as usize] + .1 + .send(Query { + path: "account/".to_owned() + validators[i].clone(), + data: vec![], + }) + .then(move |x| { + test_cross_shard_tx_callback( + x, + connectors1, + iteration1, + validators1, + successful_queries1, + unsuccessful_queries1, + balances1, + observed_balances1, + num_iters, + ); + future::result(Ok(())) + }), + ); + } + } + } else { + // The balance is not correct, optionally trace, and resend the query + unsuccessful_queries.fetch_add(1, Ordering::Relaxed); + if unsuccessful_queries.load(Ordering::Relaxed) % 100 == 0 { + println!("Waiting for balances"); + print!("Expected: "); + for i in 0..8 { + print!("{} ", balances.read().unwrap()[i]); + } + println!(); + print!("Received: "); + for i in 0..8 { + print!("{} ", observed_balances.read().unwrap()[i]); + } + println!(); + } + + let connectors_ = connectors.write().unwrap(); + let connectors1 = connectors.clone(); + actix::spawn( + connectors_[account_id_to_shard_id(&account_id, 8) as usize] + .1 + .send(Query { path: "account/".to_owned() + &account_id, data: vec![] }) + .then(move |x| { + test_cross_shard_tx_callback( + x, + connectors1, + iteration, + validators, + successful_queries, + unsuccessful_queries, + balances, + observed_balances, + num_iters, + ); + future::result(Ok(())) + }), + ); + } + } + } + + #[test] + fn test_cross_shard_tx() { + let validators_per_shard = 2; + let num_iters = 64; + init_test_logger(); + System::run(move || { + let connectors: Arc, Addr)>>> = + Arc::new(RwLock::new(vec![])); + + let validators = + vec!["test1", "test2", "test3", "test4", "test5", "test6", "test7", "test8"]; + let key_pairs = (0..8).map(|_| PeerInfo::random()).collect::>(); + let balances = Arc::new(RwLock::new(vec![])); + let observed_balances = Arc::new(RwLock::new(vec![])); + + let mut balances_local = balances.write().unwrap(); + let mut observed_balances_local = observed_balances.write().unwrap(); + for i in 0..8 { + balances_local.push(1000 + 100 * i); + observed_balances_local.push(0); + } + + *connectors.write().unwrap() = setup_mock_all_validators( + validators.clone(), + key_pairs.clone(), + validators_per_shard, + true, + Arc::new(RwLock::new(move |_account_id: String, _msg: &NetworkRequests| { + (NetworkResponses::NoResponse, true) + })), + ); + + let connectors_ = connectors.write().unwrap(); + let iteration = Arc::new(AtomicUsize::new(0)); + let successful_queries = Arc::new(AtomicUsize::new(0)); + let unsuccessful_queries = Arc::new(AtomicUsize::new(0)); + + for i in 0..8 { + let connectors1 = connectors.clone(); + let iteration1 = iteration.clone(); + let validators1 = validators.clone(); + let successful_queries1 = successful_queries.clone(); + let unsuccessful_queries1 = unsuccessful_queries.clone(); + let balances1 = balances.clone(); + let observed_balances1 = observed_balances.clone(); + actix::spawn( + connectors_[i] + .1 + .send(Query { + path: "account/".to_owned() + validators[i].clone(), + data: vec![], + }) + .then(move |x| { + test_cross_shard_tx_callback( + x, + connectors1, + iteration1, + validators1, + successful_queries1, + unsuccessful_queries1, + balances1, + observed_balances1, + num_iters, + ); + future::result(Ok(())) + }), + ); + } + + // On X1 it takes ~1m 15s + near_network::test_utils::wait_or_panic(240000); + }) + .unwrap(); + } +} diff --git a/core/primitives/src/transaction.rs b/core/primitives/src/transaction.rs index e2820e18e76..58020b39b04 100644 --- a/core/primitives/src/transaction.rs +++ b/core/primitives/src/transaction.rs @@ -477,13 +477,17 @@ impl SignedTransaction { self.hash } - // this is for tests + // The following functions are for tests pub fn empty() -> SignedTransaction { + Self::create_payment_tx(AccountId::default(), AccountId::default(), 0) + } + + pub fn create_payment_tx(from: AccountId, to: AccountId, amount: u128) -> SignedTransaction { let body = TransactionBody::SendMoney(SendMoneyTransaction { nonce: 0, - originator: AccountId::default(), - receiver: AccountId::default(), - amount: 0, + originator: from, + receiver: to, + amount, }); SignedTransaction { signature: DEFAULT_SIGNATURE,