Skip to content

Commit

Permalink
Don't block sync when importing old blocks (openethereum#8530)
Browse files Browse the repository at this point in the history
* Alter IO queueing.

* Don't require IoMessages to be Clone

* Ancient blocks imported via IoChannel.

* Get rid of private transactions io message.

* Get rid of deadlock and fix disconnected handler.

* Revert to old disconnect condition.

* Fix tests.

* Fix deadlock.
  • Loading branch information
tomusdrw authored and VladLupashevskyi committed May 23, 2018
1 parent a22ff24 commit eadd68b
Show file tree
Hide file tree
Showing 23 changed files with 452 additions and 329 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

250 changes: 131 additions & 119 deletions ethcore/private-tx/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,8 +149,8 @@ impl Provider where {
encryptor: Box<Encryptor>,
config: ProviderConfig,
channel: IoChannel<ClientIoMessage>,
) -> Result<Self, Error> {
Ok(Provider {
) -> Self {
Provider {
encryptor,
validator_accounts: config.validator_accounts.into_iter().collect(),
signer_account: config.signer_account,
Expand All @@ -162,7 +162,7 @@ impl Provider where {
miner,
accounts,
channel,
})
}
}

// TODO [ToDr] Don't use `ChainNotify` here!
Expand Down Expand Up @@ -243,50 +243,6 @@ impl Provider where {
Ok(original_transaction)
}

/// Process received private transaction
pub fn import_private_transaction(&self, rlp: &[u8]) -> Result<(), Error> {
trace!("Private transaction received");
let private_tx: PrivateTransaction = Rlp::new(rlp).as_val()?;
let contract = private_tx.contract;
let contract_validators = self.get_validators(BlockId::Latest, &contract)?;

let validation_account = contract_validators
.iter()
.find(|address| self.validator_accounts.contains(address));

match validation_account {
None => {
// TODO [ToDr] This still seems a bit invalid, imho we should still import the transaction to the pool.
// Importing to pool verifies correctness and nonce; here we are just blindly forwarding.
//
// Not for verification, broadcast further to peers
self.broadcast_private_transaction(rlp.into());
return Ok(());
},
Some(&validation_account) => {
let hash = private_tx.hash();
trace!("Private transaction taken for verification");
let original_tx = self.extract_original_transaction(private_tx, &contract)?;
trace!("Validating transaction: {:?}", original_tx);
// Verify with the first account available
trace!("The following account will be used for verification: {:?}", validation_account);
let nonce_cache = Default::default();
self.transactions_for_verification.lock().add_transaction(
original_tx,
contract,
validation_account,
hash,
self.pool_client(&nonce_cache),
)?;
// NOTE This will just fire `on_private_transaction_queued` but from a client thread.
// It seems that a lot of heavy work (verification) is done in this thread anyway
// it might actually make sense to decouple it from clientService and just use dedicated thread
// for both verification and execution.
self.channel.send(ClientIoMessage::NewPrivateTransaction).map_err(|_| ErrorKind::ClientIsMalformed.into())
}
}
}

fn pool_client<'a>(&'a self, nonce_cache: &'a RwLock<HashMap<Address, U256>>) -> miner::pool_client::PoolClient<'a, Client> {
let engine = self.client.engine();
let refuse_service_transactions = true;
Expand All @@ -299,11 +255,6 @@ impl Provider where {
)
}

/// Private transaction for validation added into queue
pub fn on_private_transaction_queued(&self) -> Result<(), Error> {
self.process_queue()
}

/// Retrieve and verify the first available private transaction for every sender
///
/// TODO [ToDr] It seems that:
Expand Down Expand Up @@ -347,73 +298,6 @@ impl Provider where {
Ok(())
}

/// Add signed private transaction into the store
/// Creates corresponding public transaction if last required singature collected and sends it to the chain
pub fn import_signed_private_transaction(&self, rlp: &[u8]) -> Result<(), Error> {
let tx: SignedPrivateTransaction = Rlp::new(rlp).as_val()?;
trace!("Signature for private transaction received: {:?}", tx);
let private_hash = tx.private_transaction_hash();
let desc = match self.transactions_for_signing.lock().get(&private_hash) {
None => {
// TODO [ToDr] Verification (we can't just blindly forward every transaction)

// Not our transaction, broadcast further to peers
self.broadcast_signed_private_transaction(rlp.into());
return Ok(());
},
Some(desc) => desc,
};

let last = self.last_required_signature(&desc, tx.signature())?;

if last {
let mut signatures = desc.received_signatures.clone();
signatures.push(tx.signature());
let rsv: Vec<Signature> = signatures.into_iter().map(|sign| sign.into_electrum().into()).collect();
//Create public transaction
let public_tx = self.public_transaction(
desc.state.clone(),
&desc.original_transaction,
&rsv,
desc.original_transaction.nonce,
desc.original_transaction.gas_price
)?;
trace!("Last required signature received, public transaction created: {:?}", public_tx);
//Sign and add it to the queue
let chain_id = desc.original_transaction.chain_id();
let hash = public_tx.hash(chain_id);
let signer_account = self.signer_account.ok_or_else(|| ErrorKind::SignerAccountNotSet)?;
let password = find_account_password(&self.passwords, &*self.accounts, &signer_account);
let signature = self.accounts.sign(signer_account, password, hash)?;
let signed = SignedTransaction::new(public_tx.with_signature(signature, chain_id))?;
match self.miner.import_own_transaction(&*self.client, signed.into()) {
Ok(_) => trace!("Public transaction added to queue"),
Err(err) => {
trace!("Failed to add transaction to queue, error: {:?}", err);
bail!(err);
}
}
//Remove from store for signing
match self.transactions_for_signing.lock().remove(&private_hash) {
Ok(_) => {}
Err(err) => {
trace!("Failed to remove transaction from signing store, error: {:?}", err);
bail!(err);
}
}
} else {
//Add signature to the store
match self.transactions_for_signing.lock().add_signature(&private_hash, tx.signature()) {
Ok(_) => trace!("Signature stored for private transaction"),
Err(err) => {
trace!("Failed to add signature to signing store, error: {:?}", err);
bail!(err);
}
}
}
Ok(())
}

fn last_required_signature(&self, desc: &PrivateTransactionSigningDesc, sign: Signature) -> Result<bool, Error> {
if desc.received_signatures.contains(&sign) {
return Ok(false);
Expand Down Expand Up @@ -657,6 +541,134 @@ impl Provider where {
}
}

pub trait Importer {
/// Process received private transaction
fn import_private_transaction(&self, _rlp: &[u8]) -> Result<(), Error>;

/// Add signed private transaction into the store
///
/// Creates corresponding public transaction if last required signature collected and sends it to the chain
fn import_signed_private_transaction(&self, _rlp: &[u8]) -> Result<(), Error>;
}

// TODO [ToDr] Offload more heavy stuff to the IoService thread.
// It seems that a lot of heavy work (verification) is done in this thread anyway
// it might actually make sense to decouple it from clientService and just use dedicated thread
// for both verification and execution.

impl Importer for Arc<Provider> {
fn import_private_transaction(&self, rlp: &[u8]) -> Result<(), Error> {
trace!("Private transaction received");
let private_tx: PrivateTransaction = Rlp::new(rlp).as_val()?;
let contract = private_tx.contract;
let contract_validators = self.get_validators(BlockId::Latest, &contract)?;

let validation_account = contract_validators
.iter()
.find(|address| self.validator_accounts.contains(address));

match validation_account {
None => {
// TODO [ToDr] This still seems a bit invalid, imho we should still import the transaction to the pool.
// Importing to pool verifies correctness and nonce; here we are just blindly forwarding.
//
// Not for verification, broadcast further to peers
self.broadcast_private_transaction(rlp.into());
return Ok(());
},
Some(&validation_account) => {
let hash = private_tx.hash();
trace!("Private transaction taken for verification");
let original_tx = self.extract_original_transaction(private_tx, &contract)?;
trace!("Validating transaction: {:?}", original_tx);
// Verify with the first account available
trace!("The following account will be used for verification: {:?}", validation_account);
let nonce_cache = Default::default();
self.transactions_for_verification.lock().add_transaction(
original_tx,
contract,
validation_account,
hash,
self.pool_client(&nonce_cache),
)?;
let provider = Arc::downgrade(self);
self.channel.send(ClientIoMessage::execute(move |_| {
if let Some(provider) = provider.upgrade() {
if let Err(e) = provider.process_queue() {
debug!("Unable to process the queue: {}", e);
}
}
})).map_err(|_| ErrorKind::ClientIsMalformed.into())
}
}
}

fn import_signed_private_transaction(&self, rlp: &[u8]) -> Result<(), Error> {
let tx: SignedPrivateTransaction = Rlp::new(rlp).as_val()?;
trace!("Signature for private transaction received: {:?}", tx);
let private_hash = tx.private_transaction_hash();
let desc = match self.transactions_for_signing.lock().get(&private_hash) {
None => {
// TODO [ToDr] Verification (we can't just blindly forward every transaction)

// Not our transaction, broadcast further to peers
self.broadcast_signed_private_transaction(rlp.into());
return Ok(());
},
Some(desc) => desc,
};

let last = self.last_required_signature(&desc, tx.signature())?;

if last {
let mut signatures = desc.received_signatures.clone();
signatures.push(tx.signature());
let rsv: Vec<Signature> = signatures.into_iter().map(|sign| sign.into_electrum().into()).collect();
//Create public transaction
let public_tx = self.public_transaction(
desc.state.clone(),
&desc.original_transaction,
&rsv,
desc.original_transaction.nonce,
desc.original_transaction.gas_price
)?;
trace!("Last required signature received, public transaction created: {:?}", public_tx);
//Sign and add it to the queue
let chain_id = desc.original_transaction.chain_id();
let hash = public_tx.hash(chain_id);
let signer_account = self.signer_account.ok_or_else(|| ErrorKind::SignerAccountNotSet)?;
let password = find_account_password(&self.passwords, &*self.accounts, &signer_account);
let signature = self.accounts.sign(signer_account, password, hash)?;
let signed = SignedTransaction::new(public_tx.with_signature(signature, chain_id))?;
match self.miner.import_own_transaction(&*self.client, signed.into()) {
Ok(_) => trace!("Public transaction added to queue"),
Err(err) => {
trace!("Failed to add transaction to queue, error: {:?}", err);
bail!(err);
}
}
//Remove from store for signing
match self.transactions_for_signing.lock().remove(&private_hash) {
Ok(_) => {}
Err(err) => {
trace!("Failed to remove transaction from signing store, error: {:?}", err);
bail!(err);
}
}
} else {
//Add signature to the store
match self.transactions_for_signing.lock().add_signature(&private_hash, tx.signature()) {
Ok(_) => trace!("Signature stored for private transaction"),
Err(err) => {
trace!("Failed to add signature to signing store, error: {:?}", err);
bail!(err);
}
}
}
Ok(())
}
}

/// Try to unlock account using stored password, return found password if any
fn find_account_password(passwords: &Vec<String>, account_provider: &AccountProvider, account: &Address) -> Option<String> {
for password in passwords {
Expand Down
2 changes: 1 addition & 1 deletion ethcore/private-tx/tests/private_contract.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ fn private_contract() {
Box::new(NoopEncryptor::default()),
config,
io,
).unwrap());
));

let (address, _) = contract_address(CreateContractAddress::FromSenderAndNonce, &key1.address(), &0.into(), &[]);

Expand Down
1 change: 1 addition & 0 deletions ethcore/service/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ ethcore-sync = { path = "../sync" }
kvdb = { path = "../../util/kvdb" }
log = "0.3"
stop-guard = { path = "../../util/stop-guard" }
trace-time = { path = "../../util/trace-time" }

[dev-dependencies]
tempdir = "0.3"
Expand Down
3 changes: 3 additions & 0 deletions ethcore/service/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ extern crate error_chain;
#[macro_use]
extern crate log;

#[macro_use]
extern crate trace_time;

#[cfg(test)]
extern crate tempdir;

Expand Down
Loading

0 comments on commit eadd68b

Please sign in to comment.