Skip to content

Commit

Permalink
Batched transactions (implements NEP#0008) (#1140)
Browse files Browse the repository at this point in the history
Implements NEP#0008
  • Loading branch information
Evgeny Kuzyakov authored Aug 12, 2019
1 parent d54d962 commit d579d64
Show file tree
Hide file tree
Showing 62 changed files with 2,984 additions and 3,841 deletions.
21 changes: 7 additions & 14 deletions chain/chain/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ use chrono::Duration;
use log::{debug, info};

use near_primitives::hash::CryptoHash;
use near_primitives::transaction::{ReceiptTransaction, TransactionResult};
use near_primitives::receipt::Receipt;
use near_primitives::transaction::TransactionResult;
use near_primitives::types::{BlockIndex, MerkleHash, ShardId, ValidatorStake};
use near_store::Store;

Expand Down Expand Up @@ -427,7 +428,7 @@ impl Chain {
shard_id: ShardId,
hash: CryptoHash,
payload: Vec<u8>,
receipts: Vec<ReceiptTransaction>,
receipts: Vec<Receipt>,
) -> Result<(), Error> {
// TODO(1046): update this with any required changes for chunks support.
let header = self.get_block_header(&hash)?;
Expand Down Expand Up @@ -520,7 +521,7 @@ impl Chain {

/// Get receipts stored for the given hash.
#[inline]
pub fn get_receipts(&mut self, hash: &CryptoHash) -> Result<&Vec<ReceiptTransaction>, Error> {
pub fn get_receipts(&mut self, hash: &CryptoHash) -> Result<&Vec<Receipt>, Error> {
self.store.get_receipts(hash)
}

Expand Down Expand Up @@ -664,10 +665,9 @@ impl<'a> ChainUpdate<'a> {

// Retrieve receipts from the previous block.
let receipts = self.chain_store_update.get_receipts(&prev_hash)?;
let receipt_hashes = receipts.iter().map(|r| r.get_hash()).collect::<Vec<_>>();

// Apply block to runtime.
let (trie_changes, state_root, mut tx_results, new_receipts, validator_proposals) = self
let (trie_changes, state_root, tx_results, new_receipts, validator_proposals) = self
.runtime_adapter
.apply_transactions(
0,
Expand Down Expand Up @@ -704,15 +704,8 @@ impl<'a> ChainUpdate<'a> {
self.chain_store_update
.save_receipt(&block.hash(), new_receipts.get(&0).unwrap_or(&vec![]).to_vec());
// Save receipt and transaction results.
for (i, tx_result) in tx_results.drain(..).enumerate() {
if i < receipt_hashes.len() {
self.chain_store_update.save_transaction_result(&receipt_hashes[i], tx_result);
} else {
self.chain_store_update.save_transaction_result(
&block.transactions[i - receipt_hashes.len()].get_hash(),
tx_result,
);
}
for tx_result in tx_results.into_iter() {
self.chain_store_update.save_transaction_result(&tx_result.hash, tx_result.result);
}

// Add validated block to the db, even if it's not the selected fork.
Expand Down
15 changes: 8 additions & 7 deletions chain/chain/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ use cached::SizedCache;
use log::debug;

use near_primitives::hash::CryptoHash;
use near_primitives::transaction::{ReceiptTransaction, TransactionResult};
use near_primitives::receipt::Receipt;
use near_primitives::transaction::TransactionResult;
use near_primitives::types::{BlockIndex, MerkleHash, ValidatorStake};
use near_primitives::utils::index_to_bytes;
use near_store::{
Expand Down Expand Up @@ -58,7 +59,7 @@ pub trait ChainStoreAccess {
/// Returns hash of the block on the main chain for given height.
fn get_block_hash_by_height(&mut self, height: BlockIndex) -> Result<CryptoHash, Error>;
/// Returns resulting receipt for given block.
fn get_receipts(&mut self, hash: &CryptoHash) -> Result<&Vec<ReceiptTransaction>, Error>;
fn get_receipts(&mut self, hash: &CryptoHash) -> Result<&Vec<Receipt>, Error>;
/// Returns transaction result for given tx hash.
fn get_transaction_result(&mut self, hash: &CryptoHash) -> Result<&TransactionResult, Error>;
}
Expand All @@ -77,7 +78,7 @@ pub struct ChainStore {
// Cache with index to hash on the main chain.
// block_index: SizedCache<Vec<u8>, CryptoHash>,
/// Cache with receipts.
receipts: SizedCache<Vec<u8>, Vec<ReceiptTransaction>>,
receipts: SizedCache<Vec<u8>, Vec<Receipt>>,
/// Cache transaction statuses.
transaction_results: SizedCache<Vec<u8>, TransactionResult>,
}
Expand Down Expand Up @@ -205,7 +206,7 @@ impl ChainStoreAccess for ChainStore {
// )
}

fn get_receipts(&mut self, hash: &CryptoHash) -> Result<&Vec<ReceiptTransaction>, Error> {
fn get_receipts(&mut self, hash: &CryptoHash) -> Result<&Vec<Receipt>, Error> {
option_to_not_found(
read_with_cache(&*self.store, COL_RECEIPTS, &mut self.receipts, hash.as_ref()),
&format!("RECEIPT: {}", hash),
Expand Down Expand Up @@ -237,7 +238,7 @@ pub struct ChainStoreUpdate<'a, T> {
post_state_roots: HashMap<CryptoHash, MerkleHash>,
post_validator_proposals: HashMap<CryptoHash, Vec<ValidatorStake>>,
block_index: HashMap<BlockIndex, Option<CryptoHash>>,
receipts: HashMap<CryptoHash, Vec<ReceiptTransaction>>,
receipts: HashMap<CryptoHash, Vec<Receipt>>,
transaction_results: HashMap<CryptoHash, TransactionResult>,
head: Option<Tip>,
tail: Option<Tip>,
Expand Down Expand Up @@ -367,7 +368,7 @@ impl<'a, T: ChainStoreAccess> ChainStoreAccess for ChainStoreUpdate<'a, T> {
}

/// Get receipts produced for block with givien hash.
fn get_receipts(&mut self, hash: &CryptoHash) -> Result<&Vec<ReceiptTransaction>, Error> {
fn get_receipts(&mut self, hash: &CryptoHash) -> Result<&Vec<Receipt>, Error> {
if let Some(receipts) = self.receipts.get(hash) {
Ok(receipts)
} else {
Expand Down Expand Up @@ -463,7 +464,7 @@ impl<'a, T: ChainStoreAccess> ChainStoreUpdate<'a, T> {
self.headers.insert(header.hash(), header);
}

pub fn save_receipt(&mut self, hash: &CryptoHash, receipt: Vec<ReceiptTransaction>) {
pub fn save_receipt(&mut self, hash: &CryptoHash, receipt: Vec<Receipt>) {
self.receipts.insert(*hash, receipt);
}

Expand Down
28 changes: 13 additions & 15 deletions chain/chain/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,11 @@ use chrono::Utc;
use near_primitives::crypto::signature::{verify, Signature};
use near_primitives::crypto::signer::InMemorySigner;
use near_primitives::hash::CryptoHash;
use near_primitives::receipt::Receipt;
use near_primitives::rpc::{AccountViewCallResult, QueryResponse};
use near_primitives::test_utils::get_public_key_from_seed;
use near_primitives::transaction::{
ReceiptTransaction, SignedTransaction, TransactionResult, TransactionStatus,
SignedTransaction, TransactionLog, TransactionResult, TransactionStatus,
};
use near_primitives::types::{AccountId, BlockIndex, MerkleHash, ShardId, ValidatorStake};
use near_store::test_utils::create_test_store;
Expand Down Expand Up @@ -158,25 +159,22 @@ impl RuntimeAdapter for KeyValueRuntime {
_block_index: BlockIndex,
_prev_block_hash: &CryptoHash,
_block_hash: &CryptoHash,
_receipts: &Vec<Vec<ReceiptTransaction>>,
_receipts: &Vec<Vec<Receipt>>,
transactions: &Vec<SignedTransaction>,
) -> Result<
(
WrappedTrieChanges,
MerkleHash,
Vec<TransactionResult>,
ReceiptResult,
Vec<ValidatorStake>,
),
(WrappedTrieChanges, MerkleHash, Vec<TransactionLog>, ReceiptResult, Vec<ValidatorStake>),
Box<dyn std::error::Error>,
> {
let mut tx_results = vec![];
for _ in transactions {
tx_results.push(TransactionResult {
status: TransactionStatus::Completed,
logs: vec![],
receipts: vec![],
result: None,
for tx in transactions {
tx_results.push(TransactionLog {
hash: tx.get_hash(),
result: TransactionResult {
status: TransactionStatus::Completed,
logs: vec![],
receipts: vec![],
result: None,
},
});
}
Ok((
Expand Down
15 changes: 5 additions & 10 deletions chain/chain/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,9 @@ pub use near_primitives::block::{Block, BlockHeader, Weight};
use near_primitives::crypto::signature::Signature;
use near_primitives::crypto::signer::EDSigner;
use near_primitives::hash::CryptoHash;
use near_primitives::receipt::Receipt;
use near_primitives::rpc::QueryResponse;
use near_primitives::transaction::{ReceiptTransaction, SignedTransaction, TransactionResult};
use near_primitives::transaction::{SignedTransaction, TransactionLog};
use near_primitives::types::{AccountId, BlockIndex, MerkleHash, ShardId, ValidatorStake};
use near_store::{StoreUpdate, WrappedTrieChanges};

Expand Down Expand Up @@ -39,7 +40,7 @@ pub struct ValidTransaction {
}

/// Map of shard to list of receipts to send to it.
pub type ReceiptResult = HashMap<ShardId, Vec<ReceiptTransaction>>;
pub type ReceiptResult = HashMap<ShardId, Vec<Receipt>>;

/// Bridge between the chain and the runtime.
/// Main function is to update state given transactions.
Expand Down Expand Up @@ -129,16 +130,10 @@ pub trait RuntimeAdapter: Send + Sync {
block_index: BlockIndex,
prev_block_hash: &CryptoHash,
block_hash: &CryptoHash,
receipts: &Vec<Vec<ReceiptTransaction>>,
receipts: &Vec<Vec<Receipt>>,
transactions: &Vec<SignedTransaction>,
) -> Result<
(
WrappedTrieChanges,
MerkleHash,
Vec<TransactionResult>,
ReceiptResult,
Vec<ValidatorStake>,
),
(WrappedTrieChanges, MerkleHash, Vec<TransactionLog>, ReceiptResult, Vec<ValidatorStake>),
Box<dyn std::error::Error>,
>;

Expand Down
5 changes: 3 additions & 2 deletions chain/client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,9 @@ use near_network::{
use near_pool::TransactionPool;
use near_primitives::crypto::signature::{verify, Signature};
use near_primitives::hash::{hash, CryptoHash};
use near_primitives::receipt::Receipt;
use near_primitives::rpc::ValidatorInfo;
use near_primitives::transaction::{ReceiptTransaction, SignedTransaction};
use near_primitives::transaction::SignedTransaction;
use near_primitives::types::{AccountId, BlockIndex, ShardId};
use near_primitives::unwrap_or_return;
use near_store::Store;
Expand Down Expand Up @@ -1103,7 +1104,7 @@ impl ClientActor {
&mut self,
shard_id: ShardId,
hash: CryptoHash,
) -> Result<(Vec<u8>, Vec<ReceiptTransaction>), near_chain::Error> {
) -> Result<(Vec<u8>, Vec<Receipt>), near_chain::Error> {
let header = self.chain.get_block_header(&hash)?;
let prev_hash = header.prev_hash;
let payload = self
Expand Down
2 changes: 1 addition & 1 deletion chain/client/src/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -340,7 +340,7 @@ impl BlockSync {
if self
.network_recipient
.do_send(NetworkRequests::BlockRequest {
hash: hash.clone(),
hash: *hash,
peer_id: peer.peer_info.id.clone(),
})
.is_ok()
Expand Down
79 changes: 38 additions & 41 deletions chain/client/src/view_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use near_chain::{Block, Chain, ErrorKind, RuntimeAdapter};
use near_primitives::hash::CryptoHash;
use near_primitives::rpc::QueryResponse;
use near_primitives::transaction::{
FinalTransactionResult, FinalTransactionStatus, TransactionLogs, TransactionResult,
FinalTransactionResult, FinalTransactionStatus, TransactionLog, TransactionResult,
TransactionStatus,
};
use near_store::Store;
Expand Down Expand Up @@ -42,40 +42,49 @@ impl ViewClientActor {
match self.chain.get_transaction_result(hash) {
Ok(result) => Ok(result.clone()),
Err(err) => match err.kind() {
ErrorKind::DBNotFoundErr(_) => Ok(TransactionResult::default()),
ErrorKind::DBNotFoundErr(_) => Ok(TransactionResult {
status: TransactionStatus::Unknown,
..Default::default()
}),
_ => Err(err.to_string()),
},
}
}

fn collect_transaction_final_result(
fn get_recursive_transaction_results(
&mut self,
transaction_result: &TransactionResult,
logs: &mut Vec<TransactionLogs>,
) -> Result<FinalTransactionStatus, String> {
match transaction_result.status {
TransactionStatus::Unknown => Ok(FinalTransactionStatus::Unknown),
TransactionStatus::Failed => Ok(FinalTransactionStatus::Failed),
TransactionStatus::Completed => {
for r in transaction_result.receipts.iter() {
let receipt_result = self.get_transaction_result(&r)?;
logs.push(TransactionLogs {
hash: *r,
lines: receipt_result.logs.clone(),
receipts: receipt_result.receipts.clone(),
result: receipt_result.result.clone(),
});
match self.collect_transaction_final_result(&receipt_result, logs)? {
FinalTransactionStatus::Failed => {
return Ok(FinalTransactionStatus::Failed)
}
FinalTransactionStatus::Completed => {}
_ => return Ok(FinalTransactionStatus::Started),
};
}
Ok(FinalTransactionStatus::Completed)
}
hash: &CryptoHash,
) -> Result<Vec<TransactionLog>, String> {
let result = self.get_transaction_result(hash)?;
let receipt_ids = result.receipts.clone();
let mut transactions = vec![TransactionLog { hash: *hash, result }];
for hash in &receipt_ids {
transactions.extend(self.get_recursive_transaction_results(hash)?.into_iter());
}
Ok(transactions)
}

fn get_final_transaction_result(
&mut self,
hash: &CryptoHash,
) -> Result<FinalTransactionResult, String> {
let transactions = self.get_recursive_transaction_results(hash)?;
let status = if transactions
.iter()
.find(|t| &t.result.status == &TransactionStatus::Failed)
.is_some()
{
FinalTransactionStatus::Failed
} else if transactions
.iter()
.find(|t| &t.result.status == &TransactionStatus::Unknown)
.is_some()
{
FinalTransactionStatus::Started
} else {
FinalTransactionStatus::Completed
};
Ok(FinalTransactionResult { status, transactions })
}
}

Expand Down Expand Up @@ -118,19 +127,7 @@ impl Handler<TxStatus> for ViewClientActor {
type Result = Result<FinalTransactionResult, String>;

fn handle(&mut self, msg: TxStatus, _: &mut Context<Self>) -> Self::Result {
let transaction_result = self.get_transaction_result(&msg.tx_hash)?;
let mut result = FinalTransactionResult {
status: FinalTransactionStatus::Unknown,
logs: vec![TransactionLogs {
hash: msg.tx_hash,
lines: transaction_result.logs.clone(),
receipts: transaction_result.receipts.clone(),
result: transaction_result.result.clone(),
}],
};
result.status =
self.collect_transaction_final_result(&transaction_result, &mut result.logs)?;
Ok(result)
self.get_final_transaction_result(&msg.tx_hash)
}
}

Expand Down
16 changes: 13 additions & 3 deletions chain/client/tests/process_blocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,11 @@ use near_client::GetBlock;
use near_network::test_utils::wait_or_panic;
use near_network::types::{FullPeerInfo, NetworkInfo, PeerChainInfo};
use near_network::{NetworkClientMessages, NetworkRequests, NetworkResponses, PeerInfo};
use near_primitives::crypto::signature::{PublicKey, DEFAULT_SIGNATURE};
use near_primitives::crypto::signer::InMemorySigner;
use near_primitives::hash::{hash, CryptoHash};
use near_primitives::test_utils::init_test_logger;
use near_primitives::transaction::SignedTransaction;
use near_primitives::transaction::{SignedTransaction, Transaction};
use near_primitives::types::MerkleHash;

/// Runs block producing client and stops after network mock received two blocks.
Expand Down Expand Up @@ -61,7 +62,16 @@ fn produce_blocks_with_tx() {
NetworkResponses::NoResponse
}),
);
client.do_send(NetworkClientMessages::Transaction(SignedTransaction::empty()));
client.do_send(NetworkClientMessages::Transaction(SignedTransaction::new(
DEFAULT_SIGNATURE,
Transaction {
signer_id: "".to_string(),
public_key: PublicKey::empty(),
nonce: 0,
receiver_id: "".to_string(),
actions: vec![],
},
)));
})
.unwrap();
}
Expand Down Expand Up @@ -118,7 +128,7 @@ fn receive_network_block_header() {
Box::new(move |msg, _ctx, client_addr| match msg {
NetworkRequests::BlockRequest { hash, peer_id } => {
let block = block_holder1.read().unwrap().clone().unwrap();
assert_eq!(hash.clone(), block.hash());
assert_eq!(hash, &block.hash());
actix::spawn(
client_addr
.send(NetworkClientMessages::Block(block, peer_id.clone(), false))
Expand Down
Loading

0 comments on commit d579d64

Please sign in to comment.