Skip to content
This repository has been archived by the owner on Nov 6, 2020. It is now read-only.

Commit

Permalink
Process signed private transaction packets via io queue
Browse files Browse the repository at this point in the history
  • Loading branch information
grbIzl committed Jun 19, 2018
1 parent fc113ba commit e5feccd
Show file tree
Hide file tree
Showing 2 changed files with 83 additions and 57 deletions.
135 changes: 78 additions & 57 deletions ethcore/private-tx/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ pub struct Provider {
miner: Arc<Miner>,
accounts: Arc<AccountProvider>,
channel: IoChannel<ClientIoMessage>,
imported_signed_transactions: Mutex<Vec<SignedPrivateTransaction>>,
}

#[derive(Debug)]
Expand Down Expand Up @@ -159,6 +160,7 @@ impl Provider where {
notify: RwLock::default(),
transactions_for_signing: Mutex::default(),
transactions_for_verification: VerificationStore::default(),
imported_signed_transactions: Mutex::default(),
client,
miner,
accounts,
Expand Down Expand Up @@ -257,7 +259,7 @@ impl Provider where {
}

/// Retrieve and verify the first available private transaction for every sender
fn process_queue(&self) -> Result<(), Error> {
fn process_verification_queue(&self) -> Result<(), Error> {
let nonce_cache = Default::default();
let ready_transactions = self.transactions_for_verification.drain(self.pool_client(&nonce_cache));
for transaction in ready_transactions {
Expand Down Expand Up @@ -295,6 +297,71 @@ impl Provider where {
Ok(())
}

/// Add signed private transaction into the store /// Creates corresponding public transaction if last required signature collected and sends it to the chain
pub fn process_signatures(&self) -> Result<(), Error> {
trace!("Processing signed private transactions");
let mut signed_transactions = self.imported_signed_transactions.lock();
for tx in signed_transactions.drain(..) {
let private_hash = tx.private_transaction_hash();
let desc = match self.transactions_for_signing.lock().get(&private_hash) {
None => {
// Not our transaction, broadcast further to peers
self.broadcast_signed_private_transaction(tx.hash(), tx.rlp_bytes().into_vec());
return Ok(());
},
Some(desc) => desc,
};
let last = self.last_required_signature(&desc, tx.signature())?;

if last {
let mut signatures = desc.received_signatures.clone();
signatures.push(tx.signature());
let rsv: Vec<Signature> = signatures.into_iter().map(|sign| sign.into_electrum().into()).collect();
//Create public transaction
let public_tx = self.public_transaction(
desc.state.clone(),
&desc.original_transaction,
&rsv,
desc.original_transaction.nonce,
desc.original_transaction.gas_price
)?;
trace!("Last required signature received, public transaction created: {:?}", public_tx);
//Sign and add it to the queue
let chain_id = desc.original_transaction.chain_id();
let hash = public_tx.hash(chain_id);
let signer_account = self.signer_account.ok_or_else(|| ErrorKind::SignerAccountNotSet)?;
let password = find_account_password(&self.passwords, &*self.accounts, &signer_account);
let signature = self.accounts.sign(signer_account, password, hash)?;
let signed = SignedTransaction::new(public_tx.with_signature(signature, chain_id))?;
match self.miner.import_own_transaction(&*self.client, signed.into()) {
Ok(_) => trace!("Public transaction added to queue"),
Err(err) => {
trace!("Failed to add transaction to queue, error: {:?}", err);
bail!(err);
}
}
//Remove from store for signing
match self.transactions_for_signing.lock().remove(&private_hash) {
Ok(_) => {}
Err(err) => {
trace!("Failed to remove transaction from signing store, error: {:?}", err);
bail!(err);
}
}
} else {
//Add signature to the store
match self.transactions_for_signing.lock().add_signature(&private_hash, tx.signature()) {
Ok(_) => trace!("Signature stored for private transaction"),
Err(err) => {
trace!("Failed to add signature to signing store, error: {:?}", err);
bail!(err);
}
}
}
}
Ok(())
}

fn last_required_signature(&self, desc: &PrivateTransactionSigningDesc, sign: Signature) -> Result<bool, Error> {
if desc.received_signatures.contains(&sign) {
return Ok(false);
Expand Down Expand Up @@ -575,7 +642,7 @@ impl Importer for Arc<Provider> {
)?;
if let Err(e) = self.channel.send(ClientIoMessage::execute(move |_| {
if let Some(provider) = provider.upgrade() {
if let Err(e) = provider.process_queue() {
if let Err(e) = provider.process_verification_queue() {
debug!("Unable to process the queue: {}", e);
}
}
Expand All @@ -589,61 +656,15 @@ impl Importer for Arc<Provider> {
let tx: SignedPrivateTransaction = Rlp::new(rlp).as_val()?;
trace!("Signature for private transaction received: {:?}", tx);
let private_hash = tx.private_transaction_hash();
let desc = match self.transactions_for_signing.lock().get(&private_hash) {
None => {
// Not our transaction, broadcast further to peers
self.broadcast_signed_private_transaction(tx.hash(), rlp.into());
return Ok(private_hash);
},
Some(desc) => desc,
};

let last = self.last_required_signature(&desc, tx.signature())?;

if last {
let mut signatures = desc.received_signatures.clone();
signatures.push(tx.signature());
let rsv: Vec<Signature> = signatures.into_iter().map(|sign| sign.into_electrum().into()).collect();
//Create public transaction
let public_tx = self.public_transaction(
desc.state.clone(),
&desc.original_transaction,
&rsv,
desc.original_transaction.nonce,
desc.original_transaction.gas_price
)?;
trace!("Last required signature received, public transaction created: {:?}", public_tx);
//Sign and add it to the queue
let chain_id = desc.original_transaction.chain_id();
let hash = public_tx.hash(chain_id);
let signer_account = self.signer_account.ok_or_else(|| ErrorKind::SignerAccountNotSet)?;
let password = find_account_password(&self.passwords, &*self.accounts, &signer_account);
let signature = self.accounts.sign(signer_account, password, hash)?;
let signed = SignedTransaction::new(public_tx.with_signature(signature, chain_id))?;
match self.miner.import_own_transaction(&*self.client, signed.into()) {
Ok(_) => trace!("Public transaction added to queue"),
Err(err) => {
trace!("Failed to add transaction to queue, error: {:?}", err);
bail!(err);
}
}
//Remove from store for signing
match self.transactions_for_signing.lock().remove(&private_hash) {
Ok(_) => {}
Err(err) => {
trace!("Failed to remove transaction from signing store, error: {:?}", err);
bail!(err);
}
}
} else {
//Add signature to the store
match self.transactions_for_signing.lock().add_signature(&private_hash, tx.signature()) {
Ok(_) => trace!("Signature stored for private transaction"),
Err(err) => {
trace!("Failed to add signature to signing store, error: {:?}", err);
bail!(err);
}
}
self.imported_signed_transactions.lock().push(tx);
if let Err(e) = self.channel.send(ClientIoMessage::execute(move |_| {
if let Some(provider) = provider.upgrade() {
if let Err(e) = provider.process_signatures() {
debug!("Unable to process the queue: {}", e);
}
}
})) {
trace!("Error sending NewSignedPrivateTransaction message: {:?}", e);
}
Ok(private_hash)
}
Expand Down
5 changes: 5 additions & 0 deletions ethcore/sync/src/tests/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -508,13 +508,15 @@ impl<C: FlushingBlockChainClient> TestNet<EthPeer<C>> {
pub struct TestIoHandler {
pub client: Arc<EthcoreClient>,
pub private_tx_queued: Mutex<usize>,
pub private_signed_tx_queued: Mutex<usize>,
}

impl TestIoHandler {
pub fn new(client: Arc<EthcoreClient>) -> Self {
TestIoHandler {
client,
private_tx_queued: Mutex::default(),
private_signed_tx_queued: Mutex::default(),
}
}
}
Expand All @@ -526,6 +528,9 @@ impl IoHandler<ClientIoMessage> for TestIoHandler {
*self.private_tx_queued.lock() += 1;
(*exec.0)(&self.client);
},
ClientIoMessage::NewSignedPrivateTransaction => {
*self.private_signed_tx_queued.lock() += 1;
},
_ => {} // ignore other messages
}
}
Expand Down

0 comments on commit e5feccd

Please sign in to comment.