diff --git a/Cargo.lock b/Cargo.lock index e5e11a352ad..137d2de20e0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -691,7 +691,6 @@ dependencies = [ "parity-reactor 0.1.0", "parking_lot 0.5.4 (registry+https://github.com/rust-lang/crates.io-index)", "price-info 1.12.0", - "rayon 1.0.1 (registry+https://github.com/rust-lang/crates.io-index)", "rlp 0.2.1", "rustc-hex 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)", "trace-time 0.1.0", diff --git a/Cargo.toml b/Cargo.toml index 7795fb3b3bd..8df37d5942b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -98,6 +98,13 @@ slow-blocks = ["ethcore/slow-blocks"] secretstore = ["ethcore-secretstore"] final = ["parity-version/final"] deadlock_detection = ["parking_lot/deadlock_detection"] +# to create a memory profile (requires nightly rust), use e.g. +# `heaptrack /path/to/parity `, +# to visualize a memory profile, use `heaptrack_gui` +# or +# `valgrind --tool=massif /path/to/parity ` +# and `massif-visualizer` for visualization +memory_profiling = [] [lib] path = "parity/lib.rs" diff --git a/ethcore/light/Cargo.toml b/ethcore/light/Cargo.toml index 3853ad910ff..ab347c62e83 100644 --- a/ethcore/light/Cargo.toml +++ b/ethcore/light/Cargo.toml @@ -38,6 +38,7 @@ memory-cache = { path = "../../util/memory_cache" } error-chain = { version = "0.11", default-features = false } [dev-dependencies] +ethcore = { path = "..", features = ["test-helpers"] } kvdb-memorydb = { path = "../../util/kvdb-memorydb" } tempdir = "0.3" diff --git a/ethcore/node_filter/Cargo.toml b/ethcore/node_filter/Cargo.toml index 0d204e29ff8..11be807652b 100644 --- a/ethcore/node_filter/Cargo.toml +++ b/ethcore/node_filter/Cargo.toml @@ -19,6 +19,7 @@ ethabi-contract = "5.0" lru-cache = "0.1" [dev-dependencies] +ethcore = { path = "..", features = ["test-helpers"] } kvdb-memorydb = { path = "../../util/kvdb-memorydb" } ethcore-io = { path = "../../util/io" } tempdir = "0.3" diff --git a/ethcore/service/Cargo.toml b/ethcore/service/Cargo.toml index 3a10849b61f..634ee55dba4 100644 --- a/ethcore/service/Cargo.toml +++ b/ethcore/service/Cargo.toml @@ -16,5 +16,6 @@ stop-guard = { path = "../../util/stop-guard" } trace-time = { path = "../../util/trace-time" } [dev-dependencies] +ethcore = { path = "..", features = ["test-helpers"] } tempdir = "0.3" kvdb-rocksdb = { path = "../../util/kvdb-rocksdb" } diff --git a/ethcore/src/client/client.rs b/ethcore/src/client/client.rs index c270eed035f..d1f23911781 100644 --- a/ethcore/src/client/client.rs +++ b/ethcore/src/client/client.rs @@ -2339,6 +2339,11 @@ fn transaction_receipt(machine: &::machine::EthereumMachine, mut tx: LocalizedTr let transaction_index = tx.transaction_index; LocalizedReceipt { + from: sender, + to: match tx.action { + Action::Create => None, + Action::Call(ref address) => Some(address.clone().into()) + }, transaction_hash: transaction_hash, transaction_index: transaction_index, block_hash: block_hash, @@ -2464,6 +2469,11 @@ mod tests { // then assert_eq!(receipt, LocalizedReceipt { + from: tx1.sender().into(), + to: match tx1.action { + Action::Create => None, + Action::Call(ref address) => Some(address.clone().into()) + }, transaction_hash: tx1.hash(), transaction_index: 1, block_hash: block_hash, diff --git a/ethcore/src/engines/authority_round/mod.rs b/ethcore/src/engines/authority_round/mod.rs index 067c754c7f5..e2e88c8178d 100644 --- a/ethcore/src/engines/authority_round/mod.rs +++ b/ethcore/src/engines/authority_round/mod.rs @@ -382,12 +382,16 @@ impl Decodable for SealedEmptyStep { } } +struct PermissionedStep { + inner: Step, + can_propose: AtomicBool, +} + /// Engine using `AuthorityRound` proof-of-authority BFT consensus. pub struct AuthorityRound { transition_service: IoService<()>, - step: Arc, - can_propose: AtomicBool, - client: RwLock>>, + step: Arc, + client: Arc>>>, signer: RwLock, validators: Box, validate_score_transition: u64, @@ -407,7 +411,7 @@ pub struct AuthorityRound { // header-chain validator. struct EpochVerifier { - step: Arc, + step: Arc, subchain_validators: SimpleList, empty_steps_transition: u64, } @@ -415,7 +419,7 @@ struct EpochVerifier { impl super::EpochVerifier for EpochVerifier { fn verify_light(&self, header: &Header) -> Result<(), Error> { // Validate the timestamp - verify_timestamp(&*self.step, header_step(header, self.empty_steps_transition)?)?; + verify_timestamp(&self.step.inner, header_step(header, self.empty_steps_transition)?)?; // always check the seal since it's fast. // nothing heavier to do. verify_external(header, &self.subchain_validators, self.empty_steps_transition) @@ -615,13 +619,15 @@ impl AuthorityRound { let engine = Arc::new( AuthorityRound { transition_service: IoService::<()>::start()?, - step: Arc::new(Step { - inner: AtomicUsize::new(initial_step), - calibrate: our_params.start_step.is_none(), - duration: our_params.step_duration, + step: Arc::new(PermissionedStep { + inner: Step { + inner: AtomicUsize::new(initial_step), + calibrate: our_params.start_step.is_none(), + duration: our_params.step_duration, + }, + can_propose: AtomicBool::new(true), }), - can_propose: AtomicBool::new(true), - client: RwLock::new(None), + client: Arc::new(RwLock::new(None)), signer: Default::default(), validators: our_params.validators, validate_score_transition: our_params.validate_score_transition, @@ -641,7 +647,10 @@ impl AuthorityRound { // Do not initialize timeouts for tests. if should_timeout { - let handler = TransitionHandler { engine: Arc::downgrade(&engine) }; + let handler = TransitionHandler { + step: engine.step.clone(), + client: engine.client.clone(), + }; engine.transition_service.register_handler(Arc::new(handler))?; } Ok(engine) @@ -666,7 +675,7 @@ impl AuthorityRound { } fn generate_empty_step(&self, parent_hash: &H256) { - let step = self.step.load(); + let step = self.step.inner.load(); let empty_step_rlp = empty_step_rlp(step, parent_hash); if let Ok(signature) = self.sign(keccak(&empty_step_rlp)).map(Into::into) { @@ -698,34 +707,37 @@ fn unix_now() -> Duration { } struct TransitionHandler { - engine: Weak, + step: Arc, + client: Arc>>>, } const ENGINE_TIMEOUT_TOKEN: TimerToken = 23; impl IoHandler<()> for TransitionHandler { fn initialize(&self, io: &IoContext<()>) { - if let Some(engine) = self.engine.upgrade() { - let remaining = engine.step.duration_remaining().as_millis(); - io.register_timer_once(ENGINE_TIMEOUT_TOKEN, Duration::from_millis(remaining)) - .unwrap_or_else(|e| warn!(target: "engine", "Failed to start consensus step timer: {}.", e)) - } + let remaining = self.step.inner.duration_remaining().as_millis(); + io.register_timer_once(ENGINE_TIMEOUT_TOKEN, Duration::from_millis(remaining)) + .unwrap_or_else(|e| warn!(target: "engine", "Failed to start consensus step timer: {}.", e)) } fn timeout(&self, io: &IoContext<()>, timer: TimerToken) { if timer == ENGINE_TIMEOUT_TOKEN { - if let Some(engine) = self.engine.upgrade() { - // NOTE we might be lagging by couple of steps in case the timeout - // has not been called fast enough. - // Make sure to advance up to the actual step. - while engine.step.duration_remaining().as_millis() == 0 { - engine.step(); + // NOTE we might be lagging by couple of steps in case the timeout + // has not been called fast enough. + // Make sure to advance up to the actual step. + while self.step.inner.duration_remaining().as_millis() == 0 { + self.step.inner.increment(); + self.step.can_propose.store(true, AtomicOrdering::SeqCst); + if let Some(ref weak) = *self.client.read() { + if let Some(c) = weak.upgrade() { + c.update_sealing(); + } } - - let next_run_at = engine.step.duration_remaining().as_millis() >> 2; - io.register_timer_once(ENGINE_TIMEOUT_TOKEN, Duration::from_millis(next_run_at)) - .unwrap_or_else(|e| warn!(target: "engine", "Failed to restart consensus step timer: {}.", e)) } + + let next_run_at = self.step.inner.duration_remaining().as_millis() >> 2; + io.register_timer_once(ENGINE_TIMEOUT_TOKEN, Duration::from_millis(next_run_at)) + .unwrap_or_else(|e| warn!(target: "engine", "Failed to restart consensus step timer: {}.", e)) } } } @@ -742,8 +754,8 @@ impl Engine for AuthorityRound { } fn step(&self) { - self.step.increment(); - self.can_propose.store(true, AtomicOrdering::SeqCst); + self.step.inner.increment(); + self.step.can_propose.store(true, AtomicOrdering::SeqCst); if let Some(ref weak) = *self.client.read() { if let Some(c) = weak.upgrade() { c.update_sealing(); @@ -790,7 +802,7 @@ impl Engine for AuthorityRound { fn populate_from_parent(&self, header: &mut Header, parent: &Header) { let parent_step = header_step(parent, self.empty_steps_transition).expect("Header has been verified; qed"); - let current_step = self.step.load(); + let current_step = self.step.inner.load(); let current_empty_steps_len = if header.number() >= self.empty_steps_transition { self.empty_steps(parent_step.into(), current_step.into(), parent.hash()).len() @@ -816,7 +828,7 @@ impl Engine for AuthorityRound { let empty_step: EmptyStep = rlp.as_val().map_err(fmt_err)?;; if empty_step.verify(&*self.validators).unwrap_or(false) { - if self.step.check_future(empty_step.step).is_ok() { + if self.step.inner.check_future(empty_step.step).is_ok() { trace!(target: "engine", "handle_message: received empty step message {:?}", empty_step); self.handle_empty_step_message(empty_step); } else { @@ -836,7 +848,7 @@ impl Engine for AuthorityRound { fn generate_seal(&self, block: &ExecutedBlock, parent: &Header) -> Seal { // first check to avoid generating signature most of the time // (but there's still a race to the `compare_and_swap`) - if !self.can_propose.load(AtomicOrdering::SeqCst) { + if !self.step.can_propose.load(AtomicOrdering::SeqCst) { trace!(target: "engine", "Aborting seal generation. Can't propose."); return Seal::None; } @@ -845,7 +857,7 @@ impl Engine for AuthorityRound { let parent_step: U256 = header_step(parent, self.empty_steps_transition) .expect("Header has been verified; qed").into(); - let step = self.step.load(); + let step = self.step.inner.load(); // filter messages from old and future steps and different parents let empty_steps = if header.number() >= self.empty_steps_transition { @@ -922,7 +934,7 @@ impl Engine for AuthorityRound { trace!(target: "engine", "generate_seal: Issuing a block for step {}.", step); // only issue the seal if we were the first to reach the compare_and_swap. - if self.can_propose.compare_and_swap(true, false, AtomicOrdering::SeqCst) { + if self.step.can_propose.compare_and_swap(true, false, AtomicOrdering::SeqCst) { self.clear_empty_steps(parent_step); @@ -999,7 +1011,7 @@ impl Engine for AuthorityRound { .decode()?; let parent_step = header_step(&parent, self.empty_steps_transition)?; - let current_step = self.step.load(); + let current_step = self.step.inner.load(); self.empty_steps(parent_step.into(), current_step.into(), parent.hash()) } else { // we're verifying a block, extract empty steps from the seal @@ -1052,7 +1064,7 @@ impl Engine for AuthorityRound { // If yes then probably benign reporting needs to be moved further in the verification. let set_number = header.number(); - match verify_timestamp(&*self.step, header_step(header, self.empty_steps_transition)?) { + match verify_timestamp(&self.step.inner, header_step(header, self.empty_steps_transition)?) { Err(BlockError::InvalidSeal) => { self.validators.report_benign(header.author(), set_number, header.number()); Err(BlockError::InvalidSeal.into()) @@ -1294,7 +1306,7 @@ impl Engine for AuthorityRound { // This way, upon encountering an epoch change, the proposer from the // new set will be forced to wait until the next step to avoid sealing a // block that breaks the invariant that the parent's step < the block's step. - self.can_propose.store(false, AtomicOrdering::SeqCst); + self.step.can_propose.store(false, AtomicOrdering::SeqCst); return Some(combine_proofs(signal_number, &pending.proof, &*finality_proof)); } } diff --git a/ethcore/types/src/receipt.rs b/ethcore/types/src/receipt.rs index b4f105afab2..81233d212a2 100644 --- a/ethcore/types/src/receipt.rs +++ b/ethcore/types/src/receipt.rs @@ -16,7 +16,7 @@ //! Receipt -use ethereum_types::{H256, U256, Address, Bloom}; +use ethereum_types::{H160, H256, U256, Address, Bloom}; use heapsize::HeapSizeOf; use rlp::{Rlp, RlpStream, Encodable, Decodable, DecoderError}; @@ -157,6 +157,10 @@ pub struct LocalizedReceipt { pub log_bloom: Bloom, /// Transaction outcome. pub outcome: TransactionOutcome, + /// Receiver address + pub to: Option, + /// Sender + pub from: H160 } #[cfg(test)] diff --git a/ipfs/Cargo.toml b/ipfs/Cargo.toml index 9c7b5f3b003..5a72048134c 100644 --- a/ipfs/Cargo.toml +++ b/ipfs/Cargo.toml @@ -15,3 +15,6 @@ rlp = { path = "../util/rlp" } cid = "0.2" multihash = "0.7" unicase = "2.0" + +[dev-dependencies] +ethcore = { path = "../ethcore", features = ["test-helpers"] } diff --git a/local-store/Cargo.toml b/local-store/Cargo.toml index 6d09eb76f69..d2c3469ca40 100644 --- a/local-store/Cargo.toml +++ b/local-store/Cargo.toml @@ -16,5 +16,6 @@ serde_derive = "1.0" serde_json = "1.0" [dev-dependencies] +ethcore = { path = "../ethcore", features = ["test-helpers"] } ethkey = { path = "../ethkey" } kvdb-memorydb = { path = "../util/kvdb-memorydb" } diff --git a/miner/Cargo.toml b/miner/Cargo.toml index 707352484e2..e692d2f7083 100644 --- a/miner/Cargo.toml +++ b/miner/Cargo.toml @@ -27,7 +27,6 @@ linked-hash-map = "0.5" log = "0.3" parking_lot = "0.5" price-info = { path = "../price-info" } -rayon = "1.0" rlp = { path = "../util/rlp" } trace-time = { path = "../util/trace-time" } transaction-pool = { path = "../transaction-pool" } diff --git a/miner/src/lib.rs b/miner/src/lib.rs index 107b9b22b54..9dc180fefe4 100644 --- a/miner/src/lib.rs +++ b/miner/src/lib.rs @@ -29,7 +29,6 @@ extern crate keccak_hash as hash; extern crate linked_hash_map; extern crate parking_lot; extern crate price_info; -extern crate rayon; extern crate rlp; extern crate trace_time; extern crate transaction_pool as txpool; diff --git a/miner/src/pool/queue.rs b/miner/src/pool/queue.rs index 4ebdf9e3f1f..cf4a956f426 100644 --- a/miner/src/pool/queue.rs +++ b/miner/src/pool/queue.rs @@ -23,7 +23,6 @@ use std::collections::BTreeMap; use ethereum_types::{H256, U256, Address}; use parking_lot::RwLock; -use rayon::prelude::*; use transaction; use txpool::{self, Verifier}; @@ -179,8 +178,14 @@ impl TransactionQueue { let verifier = verifier::Verifier::new(client, options, self.insertion_id.clone()); let results = transactions - .into_par_iter() - .map(|transaction| verifier.verify_transaction(transaction)) + .into_iter() + .map(|transaction| { + if self.pool.read().find(&transaction.hash()).is_some() { + bail!(transaction::Error::AlreadyImported) + } + + verifier.verify_transaction(transaction) + }) .map(|result| result.and_then(|verified| { self.pool.write().import(verified) .map(|_imported| ()) diff --git a/miner/src/pool/tests/client.rs b/miner/src/pool/tests/client.rs index 101b6cdc21a..08b43f12adf 100644 --- a/miner/src/pool/tests/client.rs +++ b/miner/src/pool/tests/client.rs @@ -14,6 +14,8 @@ // You should have received a copy of the GNU General Public License // along with Parity. If not, see . +use std::sync::{atomic, Arc}; + use ethereum_types::{U256, H256, Address}; use rlp::Rlp; use transaction::{self, Transaction, SignedTransaction, UnverifiedTransaction}; @@ -25,6 +27,7 @@ const MAX_TRANSACTION_SIZE: usize = 15 * 1024; #[derive(Debug, Clone)] pub struct TestClient { + verification_invoked: Arc, account_details: AccountDetails, gas_required: U256, is_service_transaction: bool, @@ -35,6 +38,7 @@ pub struct TestClient { impl Default for TestClient { fn default() -> Self { TestClient { + verification_invoked: Default::default(), account_details: AccountDetails { nonce: 123.into(), balance: 63_100.into(), @@ -88,6 +92,10 @@ impl TestClient { insertion_id: 1, } } + + pub fn was_verification_triggered(&self) -> bool { + self.verification_invoked.load(atomic::Ordering::SeqCst) + } } impl pool::client::Client for TestClient { @@ -98,6 +106,7 @@ impl pool::client::Client for TestClient { fn verify_transaction(&self, tx: UnverifiedTransaction) -> Result { + self.verification_invoked.store(true, atomic::Ordering::SeqCst); Ok(SignedTransaction::new(tx)?) } diff --git a/miner/src/pool/tests/mod.rs b/miner/src/pool/tests/mod.rs index 552903a4bb2..ac2e6b008e9 100644 --- a/miner/src/pool/tests/mod.rs +++ b/miner/src/pool/tests/mod.rs @@ -796,3 +796,37 @@ fn should_include_local_transaction_to_a_full_pool() { // then assert_eq!(txq.status().status.transaction_count, 1); } + +#[test] +fn should_avoid_verifying_transaction_already_in_pool() { + // given + let txq = TransactionQueue::new( + txpool::Options { + max_count: 1, + max_per_sender: 2, + max_mem_usage: 50 + }, + verifier::Options { + minimal_gas_price: 1.into(), + block_gas_limit: 1_000_000.into(), + tx_gas_limit: 1_000_000.into(), + }, + PrioritizationStrategy::GasPriceOnly, + ); + let client = TestClient::new(); + let tx1 = Tx::default().signed().unverified(); + + let res = txq.import(client.clone(), vec![tx1.clone()]); + assert_eq!(res, vec![Ok(())]); + assert_eq!(txq.status().status.transaction_count, 1); + assert!(client.was_verification_triggered()); + + // when + let client = TestClient::new(); + let res = txq.import(client.clone(), vec![tx1]); + assert_eq!(res, vec![Err(transaction::Error::AlreadyImported)]); + assert!(!client.was_verification_triggered()); + + // then + assert_eq!(txq.status().status.transaction_count, 1); +} diff --git a/miner/src/pool/verifier.rs b/miner/src/pool/verifier.rs index 4675303928d..5e6e22077a3 100644 --- a/miner/src/pool/verifier.rs +++ b/miner/src/pool/verifier.rs @@ -57,6 +57,7 @@ impl Default for Options { } /// Transaction to verify. +#[cfg_attr(test, derive(Clone))] pub enum Transaction { /// Fresh, never verified transaction. /// @@ -75,7 +76,8 @@ pub enum Transaction { } impl Transaction { - fn hash(&self) -> H256 { + /// Return transaction hash + pub fn hash(&self) -> H256 { match *self { Transaction::Unverified(ref tx) => tx.hash(), Transaction::Retracted(ref tx) => tx.hash(), diff --git a/parity/lib.rs b/parity/lib.rs index 8c3242afb49..a4f7d7963ff 100644 --- a/parity/lib.rs +++ b/parity/lib.rs @@ -17,6 +17,7 @@ //! Ethcore client application. #![warn(missing_docs)] +#![cfg_attr(feature = "memory_profiling", feature(alloc_system, global_allocator, allocator_api))] extern crate ansi_term; extern crate docopt; @@ -91,6 +92,9 @@ extern crate pretty_assertions; #[cfg(test)] extern crate tempdir; +#[cfg(feature = "memory_profiling")] +extern crate alloc_system; + mod account; mod blockchain; mod cache; @@ -125,10 +129,16 @@ use cli::Args; use configuration::{Cmd, Execute}; use deprecated::find_deprecated; use ethcore_logger::setup_log; +#[cfg(feature = "memory_profiling")] +use alloc_system::System; pub use self::configuration::Configuration; pub use self::run::RunningClient; +#[cfg(feature = "memory_profiling")] +#[global_allocator] +static A: System = System; + fn print_hash_of(maybe_file: Option) -> Result { if let Some(file) = maybe_file { let mut f = BufReader::new(File::open(&file).map_err(|_| "Unable to open file".to_owned())?); @@ -144,6 +154,7 @@ fn run_deadlock_detection_thread() { use std::thread; use std::time::Duration; use parking_lot::deadlock; + use ansi_term::Style; info!("Starting deadlock detection thread."); // Create a background thread which checks for deadlocks every 10s diff --git a/rpc/Cargo.toml b/rpc/Cargo.toml index 8a0b689c655..338a8926829 100644 --- a/rpc/Cargo.toml +++ b/rpc/Cargo.toml @@ -65,6 +65,7 @@ stats = { path = "../util/stats" } vm = { path = "../ethcore/vm" } [dev-dependencies] +ethcore = { path = "../ethcore", features = ["test-helpers"] } ethcore-network = { path = "../util/network" } fake-fetch = { path = "../util/fake-fetch" } kvdb-memorydb = { path = "../util/kvdb-memorydb" } diff --git a/rpc/src/v1/helpers/dispatch.rs b/rpc/src/v1/helpers/dispatch.rs index 9bec8e1d31a..dbfdd51bc5f 100644 --- a/rpc/src/v1/helpers/dispatch.rs +++ b/rpc/src/v1/helpers/dispatch.rs @@ -237,7 +237,7 @@ pub fn fetch_gas_price_corpus( /// Returns a eth_sign-compatible hash of data to sign. /// The data is prepended with special message to prevent -/// chosen-plaintext attacks. +/// malicious DApps from using the function to sign forged transactions. pub fn eth_data_hash(mut data: Bytes) -> H256 { let mut message_data = format!("\x19Ethereum Signed Message:\n{}", data.len()) diff --git a/rpc/src/v1/tests/mocked/eth.rs b/rpc/src/v1/tests/mocked/eth.rs index a8875a33541..c0ed3306fee 100644 --- a/rpc/src/v1/tests/mocked/eth.rs +++ b/rpc/src/v1/tests/mocked/eth.rs @@ -19,7 +19,7 @@ use std::collections::HashMap; use std::sync::Arc; use std::time::{Instant, Duration, SystemTime, UNIX_EPOCH}; -use ethereum_types::{H256, U256, Address}; +use ethereum_types::{H160, H256, U256, Address}; use parking_lot::Mutex; use ethcore::account_provider::AccountProvider; use ethcore::client::{BlockChainClient, BlockId, EachBlockWith, Executed, TestBlockChainClient, TransactionId}; @@ -1004,6 +1004,8 @@ fn rpc_eth_send_raw_transaction() { #[test] fn rpc_eth_transaction_receipt() { let receipt = LocalizedReceipt { + from: H160::from_str("b60e8dd61c5d32be8058bb8eb970870f07233155").unwrap(), + to: Some(H160::from_str("d46e8dd67c5d32be8058bb8eb970870f07244567").unwrap()), transaction_hash: H256::zero(), transaction_index: 0, block_hash: H256::from_str("ed76641c68a1c641aee09a94b3b471f4dc0316efe5ac19cf488e2674cf8d05b5").unwrap(), @@ -1041,7 +1043,7 @@ fn rpc_eth_transaction_receipt() { "params": ["0xb903239f8543d04b5dc1ba6579132b143087c68db1b2168786408fcbce568238"], "id": 1 }"#; - let response = r#"{"jsonrpc":"2.0","result":{"blockHash":"0xed76641c68a1c641aee09a94b3b471f4dc0316efe5ac19cf488e2674cf8d05b5","blockNumber":"0x4510c","contractAddress":null,"cumulativeGasUsed":"0x20","gasUsed":"0x10","logs":[{"address":"0x33990122638b9132ca29c723bdf037f1a891a70c","blockHash":"0xed76641c68a1c641aee09a94b3b471f4dc0316efe5ac19cf488e2674cf8d05b5","blockNumber":"0x4510c","data":"0x","logIndex":"0x1","topics":["0xa6697e974e6a320f454390be03f74955e8978f1a6971ea6730542e37b66179bc","0x4861736852656700000000000000000000000000000000000000000000000000"],"transactionHash":"0x0000000000000000000000000000000000000000000000000000000000000000","transactionIndex":"0x0","transactionLogIndex":"0x0","type":"mined"}],"logsBloom":"0x00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000","root":"0x0000000000000000000000000000000000000000000000000000000000000000","status":null,"transactionHash":"0x0000000000000000000000000000000000000000000000000000000000000000","transactionIndex":"0x0"},"id":1}"#; + let response = r#"{"jsonrpc":"2.0","result":{"blockHash":"0xed76641c68a1c641aee09a94b3b471f4dc0316efe5ac19cf488e2674cf8d05b5","blockNumber":"0x4510c","contractAddress":null,"cumulativeGasUsed":"0x20","from":"0xb60e8dd61c5d32be8058bb8eb970870f07233155","gasUsed":"0x10","logs":[{"address":"0x33990122638b9132ca29c723bdf037f1a891a70c","blockHash":"0xed76641c68a1c641aee09a94b3b471f4dc0316efe5ac19cf488e2674cf8d05b5","blockNumber":"0x4510c","data":"0x","logIndex":"0x1","topics":["0xa6697e974e6a320f454390be03f74955e8978f1a6971ea6730542e37b66179bc","0x4861736852656700000000000000000000000000000000000000000000000000"],"transactionHash":"0x0000000000000000000000000000000000000000000000000000000000000000","transactionIndex":"0x0","transactionLogIndex":"0x0","type":"mined"}],"logsBloom":"0x00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000","root":"0x0000000000000000000000000000000000000000000000000000000000000000","status":null,"to":"0xd46e8dd67c5d32be8058bb8eb970870f07244567","transactionHash":"0x0000000000000000000000000000000000000000000000000000000000000000","transactionIndex":"0x0"},"id":1}"#; assert_eq!(tester.io.handle_request_sync(request), Some(response.to_owned())); } diff --git a/rpc/src/v1/types/receipt.rs b/rpc/src/v1/types/receipt.rs index f8d111887a7..e46b640f148 100644 --- a/rpc/src/v1/types/receipt.rs +++ b/rpc/src/v1/types/receipt.rs @@ -29,6 +29,10 @@ pub struct Receipt { /// Block hash #[serde(rename="blockHash")] pub block_hash: Option, + /// Sender + pub from: Option, + /// Recipient + pub to: Option, /// Block number #[serde(rename="blockNumber")] pub block_number: Option, @@ -73,6 +77,8 @@ impl Receipt { impl From for Receipt { fn from(r: LocalizedReceipt) -> Self { Receipt { + to: r.to.map(Into::into), + from: Some(r.from.into()), transaction_hash: Some(r.transaction_hash.into()), transaction_index: Some(r.transaction_index.into()), block_hash: Some(r.block_hash.into()), @@ -91,6 +97,8 @@ impl From for Receipt { impl From for Receipt { fn from(r: RichReceipt) -> Self { Receipt { + from: None, + to: None, transaction_hash: Some(r.transaction_hash.into()), transaction_index: Some(r.transaction_index.into()), block_hash: None, @@ -109,6 +117,8 @@ impl From for Receipt { impl From for Receipt { fn from(r: EthReceipt) -> Self { Receipt { + from: None, + to: None, transaction_hash: None, transaction_index: None, block_hash: None, @@ -131,9 +141,11 @@ mod tests { #[test] fn receipt_serialization() { - let s = r#"{"transactionHash":"0x0000000000000000000000000000000000000000000000000000000000000000","transactionIndex":"0x0","blockHash":"0xed76641c68a1c641aee09a94b3b471f4dc0316efe5ac19cf488e2674cf8d05b5","blockNumber":"0x4510c","cumulativeGasUsed":"0x20","gasUsed":"0x10","contractAddress":null,"logs":[{"address":"0x33990122638b9132ca29c723bdf037f1a891a70c","topics":["0xa6697e974e6a320f454390be03f74955e8978f1a6971ea6730542e37b66179bc","0x4861736852656700000000000000000000000000000000000000000000000000"],"data":"0x","blockHash":"0xed76641c68a1c641aee09a94b3b471f4dc0316efe5ac19cf488e2674cf8d05b5","blockNumber":"0x4510c","transactionHash":"0x0000000000000000000000000000000000000000000000000000000000000000","transactionIndex":"0x0","logIndex":"0x1","transactionLogIndex":null,"type":"mined"}],"root":"0x000000000000000000000000000000000000000000000000000000000000000a","logsBloom":"0x0000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000f","status":"0x1"}"#; + let s = r#"{"transactionHash":"0x0000000000000000000000000000000000000000000000000000000000000000","transactionIndex":"0x0","blockHash":"0xed76641c68a1c641aee09a94b3b471f4dc0316efe5ac19cf488e2674cf8d05b5","from":null,"to":null,"blockNumber":"0x4510c","cumulativeGasUsed":"0x20","gasUsed":"0x10","contractAddress":null,"logs":[{"address":"0x33990122638b9132ca29c723bdf037f1a891a70c","topics":["0xa6697e974e6a320f454390be03f74955e8978f1a6971ea6730542e37b66179bc","0x4861736852656700000000000000000000000000000000000000000000000000"],"data":"0x","blockHash":"0xed76641c68a1c641aee09a94b3b471f4dc0316efe5ac19cf488e2674cf8d05b5","blockNumber":"0x4510c","transactionHash":"0x0000000000000000000000000000000000000000000000000000000000000000","transactionIndex":"0x0","logIndex":"0x1","transactionLogIndex":null,"type":"mined"}],"root":"0x000000000000000000000000000000000000000000000000000000000000000a","logsBloom":"0x0000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000f","status":"0x1"}"#; let receipt = Receipt { + from: None, + to: None, transaction_hash: Some(0.into()), transaction_index: Some(0.into()), block_hash: Some("ed76641c68a1c641aee09a94b3b471f4dc0316efe5ac19cf488e2674cf8d05b5".parse().unwrap()), diff --git a/secret_store/Cargo.toml b/secret_store/Cargo.toml index 261658903cc..a8aed831273 100644 --- a/secret_store/Cargo.toml +++ b/secret_store/Cargo.toml @@ -39,5 +39,6 @@ ethabi-derive = "5.0" ethabi-contract = "5.0" [dev-dependencies] +ethcore = { path = "../ethcore", features = ["test-helpers"] } tempdir = "0.3" kvdb-rocksdb = { path = "../util/kvdb-rocksdb" } diff --git a/updater/Cargo.toml b/updater/Cargo.toml index d76db6ec0ea..8d94680335c 100644 --- a/updater/Cargo.toml +++ b/updater/Cargo.toml @@ -25,5 +25,6 @@ path = { path = "../util/path" } rand = "0.4" [dev-dependencies] +ethcore = { path = "../ethcore", features = ["test-helpers"] } tempdir = "0.3" matches = "0.1" diff --git a/util/network-devp2p/src/discovery.rs b/util/network-devp2p/src/discovery.rs index 8e8a3d6cc69..5f8f0cdbc2a 100644 --- a/util/network-devp2p/src/discovery.rs +++ b/util/network-devp2p/src/discovery.rs @@ -17,18 +17,13 @@ use ethcore_bytes::Bytes; use std::net::SocketAddr; use std::collections::{HashSet, HashMap, VecDeque}; -use std::mem; use std::default::Default; use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH}; -use mio::*; -use mio::deprecated::{Handler, EventLoop}; -use mio::udp::*; use hash::keccak; use ethereum_types::{H256, H520}; use rlp::{Rlp, RlpStream, encode_list}; use node_table::*; use network::{Error, ErrorKind}; -use io::{StreamToken, IoContext}; use ethkey::{Secret, KeyPair, sign, recover}; use network::IpFilter; @@ -39,7 +34,7 @@ const ADDRESS_BITS: usize = 8 * ADDRESS_BYTES_SIZE; // Denoted by n in [Kademl const DISCOVERY_MAX_STEPS: u16 = 8; // Max iterations of discovery. (discover) const BUCKET_SIZE: usize = 16; // Denoted by k in [Kademlia]. Number of nodes stored in each bucket. const ALPHA: usize = 3; // Denoted by \alpha in [Kademlia]. Number of concurrent FindNode requests. -const MAX_DATAGRAM_SIZE: usize = 1280; +pub const MAX_DATAGRAM_SIZE: usize = 1280; const PACKET_PING: u8 = 1; const PACKET_PONG: u8 = 2; @@ -79,9 +74,9 @@ impl NodeBucket { } } -struct Datagramm { - payload: Bytes, - address: SocketAddr, +pub struct Datagram { + pub payload: Bytes, + pub address: SocketAddr, } pub struct Discovery { @@ -89,13 +84,11 @@ pub struct Discovery { id_hash: H256, secret: Secret, public_endpoint: NodeEndpoint, - udp_socket: UdpSocket, - token: StreamToken, discovery_round: u16, discovery_id: NodeId, discovery_nodes: HashSet, node_buckets: Vec, - send_queue: VecDeque, + send_queue: VecDeque, check_timestamps: bool, adding_nodes: Vec, ip_filter: IpFilter, @@ -107,19 +100,16 @@ pub struct TableUpdates { } impl Discovery { - pub fn new(key: &KeyPair, listen: SocketAddr, public: NodeEndpoint, token: StreamToken, ip_filter: IpFilter) -> Discovery { - let socket = UdpSocket::bind(&listen).expect("Error binding UDP socket"); + pub fn new(key: &KeyPair, public: NodeEndpoint, ip_filter: IpFilter) -> Discovery { Discovery { id: key.public().clone(), id_hash: keccak(key.public()), secret: key.secret().clone(), public_endpoint: public, - token: token, discovery_round: 0, discovery_id: NodeId::new(), discovery_nodes: HashSet::new(), node_buckets: (0..ADDRESS_BITS).map(|_| NodeBucket::new()).collect(), - udp_socket: socket, send_queue: VecDeque::new(), check_timestamps: true, adding_nodes: Vec::new(), @@ -157,7 +147,7 @@ impl Discovery { let dist = match Discovery::distance(&self.id_hash, &id_hash) { Some(dist) => dist, None => { - warn!(target: "discovery", "Attempted to update own entry: {:?}", e); + debug!(target: "discovery", "Attempted to update own entry: {:?}", e); return; } }; @@ -191,7 +181,7 @@ impl Discovery { let dist = match Discovery::distance(&self.id_hash, &keccak(id)) { Some(dist) => dist, None => { - warn!(target: "discovery", "Received ping from self"); + debug!(target: "discovery", "Received ping from self"); return } }; @@ -352,53 +342,12 @@ impl Discovery { ret } - pub fn writable(&mut self, io: &IoContext) where Message: Send + Sync + Clone { - while let Some(data) = self.send_queue.pop_front() { - match self.udp_socket.send_to(&data.payload, &data.address) { - Ok(Some(size)) if size == data.payload.len() => { - }, - Ok(Some(_)) => { - warn!("UDP sent incomplete datagramm"); - }, - Ok(None) => { - self.send_queue.push_front(data); - return; - } - Err(e) => { - debug!("UDP send error: {:?}, address: {:?}", e, &data.address); - return; - } - } - } - io.update_registration(self.token).unwrap_or_else(|e| debug!("Error updating discovery registration: {:?}", e)); - } - fn send_to(&mut self, payload: Bytes, address: SocketAddr) { - self.send_queue.push_back(Datagramm { payload: payload, address: address }); - } - - pub fn readable(&mut self, io: &IoContext) -> Option where Message: Send + Sync + Clone { - let mut buf: [u8; MAX_DATAGRAM_SIZE] = unsafe { mem::uninitialized() }; - let writable = !self.send_queue.is_empty(); - let res = match self.udp_socket.recv_from(&mut buf) { - Ok(Some((len, address))) => self.on_packet(&buf[0..len], address).unwrap_or_else(|e| { - debug!("Error processing UDP packet: {:?}", e); - None - }), - Ok(_) => None, - Err(e) => { - debug!("Error reading UPD socket: {:?}", e); - None - } - }; - let new_writable = !self.send_queue.is_empty(); - if writable != new_writable { - io.update_registration(self.token).unwrap_or_else(|e| debug!("Error updating discovery registration: {:?}", e)); - } - res + self.send_queue.push_back(Datagram { payload: payload, address: address }); } - fn on_packet(&mut self, packet: &[u8], from: SocketAddr) -> Result, Error> { + + pub fn on_packet(&mut self, packet: &[u8], from: SocketAddr) -> Result, Error> { // validate packet if packet.len() < 32 + 65 + 4 + 1 { return Err(ErrorKind::BadProtocol.into()); @@ -421,7 +370,7 @@ impl Discovery { PACKET_FIND_NODE => self.on_find_node(&rlp, &node_id, &from), PACKET_NEIGHBOURS => self.on_neighbours(&rlp, &node_id, &from), _ => { - debug!("Unknown UDP packet: {}", packet_id); + debug!(target: "discovery", "Unknown UDP packet: {}", packet_id); Ok(None) } } @@ -571,19 +520,16 @@ impl Discovery { self.start(); } - pub fn register_socket(&self, event_loop: &mut EventLoop) -> Result<(), Error> { - event_loop.register(&self.udp_socket, Token(self.token), Ready::all(), PollOpt::edge()).expect("Error registering UDP socket"); - Ok(()) + pub fn any_sends_queued(&self) -> bool { + !self.send_queue.is_empty() } - pub fn update_registration(&self, event_loop: &mut EventLoop) -> Result<(), Error> { - let registration = if !self.send_queue.is_empty() { - Ready::readable() | Ready::writable() - } else { - Ready::readable() - }; - event_loop.reregister(&self.udp_socket, Token(self.token), registration, PollOpt::edge()).expect("Error reregistering UDP socket"); - Ok(()) + pub fn dequeue_send(&mut self) -> Option { + self.send_queue.pop_front() + } + + pub fn requeue_send(&mut self, datagram: Datagram) { + self.send_queue.push_front(datagram) } } @@ -620,8 +566,8 @@ mod tests { let key2 = Random.generate().unwrap(); let ep1 = NodeEndpoint { address: SocketAddr::from_str("127.0.0.1:40444").unwrap(), udp_port: 40444 }; let ep2 = NodeEndpoint { address: SocketAddr::from_str("127.0.0.1:40445").unwrap(), udp_port: 40445 }; - let mut discovery1 = Discovery::new(&key1, ep1.address.clone(), ep1.clone(), 0, IpFilter::default()); - let mut discovery2 = Discovery::new(&key2, ep2.address.clone(), ep2.clone(), 0, IpFilter::default()); + let mut discovery1 = Discovery::new(&key1, ep1.clone(), IpFilter::default()); + let mut discovery2 = Discovery::new(&key2, ep2.clone(), IpFilter::default()); let node1 = Node::from_str("enode://a979fb575495b8d6db44f750317d0f4622bf4c2aa3365d6af7c284339968eef29b69ad0dce72a4d8db5ebb4968de0e3bec910127f134779fbcb0cb6d3331163c@127.0.0.1:7770").unwrap(); let node2 = Node::from_str("enode://b979fb575495b8d6db44f750317d0f4622bf4c2aa3365d6af7c284339968eef29b69ad0dce72a4d8db5ebb4968de0e3bec910127f134779fbcb0cb6d3331163c@127.0.0.1:7771").unwrap(); @@ -632,16 +578,14 @@ mod tests { discovery2.refresh(); for _ in 0 .. 10 { - while !discovery1.send_queue.is_empty() { - let datagramm = discovery1.send_queue.pop_front().unwrap(); - if datagramm.address == ep2.address { - discovery2.on_packet(&datagramm.payload, ep1.address.clone()).ok(); + while let Some(datagram) = discovery1.dequeue_send() { + if datagram.address == ep2.address { + discovery2.on_packet(&datagram.payload, ep1.address.clone()).ok(); } } - while !discovery2.send_queue.is_empty() { - let datagramm = discovery2.send_queue.pop_front().unwrap(); - if datagramm.address == ep1.address { - discovery1.on_packet(&datagramm.payload, ep2.address.clone()).ok(); + while let Some(datagram) = discovery2.dequeue_send() { + if datagram.address == ep1.address { + discovery1.on_packet(&datagram.payload, ep2.address.clone()).ok(); } } discovery2.round(); @@ -653,7 +597,7 @@ mod tests { fn removes_expired() { let key = Random.generate().unwrap(); let ep = NodeEndpoint { address: SocketAddr::from_str("127.0.0.1:40446").unwrap(), udp_port: 40447 }; - let mut discovery = Discovery::new(&key, ep.address.clone(), ep.clone(), 0, IpFilter::default()); + let mut discovery = Discovery::new(&key, ep.clone(), IpFilter::default()); for _ in 0..1200 { discovery.add_node(NodeEntry { id: NodeId::random(), endpoint: ep.clone() }); } @@ -668,7 +612,7 @@ mod tests { let key = Random.generate().unwrap(); let ep = NodeEndpoint { address: SocketAddr::from_str("127.0.0.1:40447").unwrap(), udp_port: 40447 }; - let mut discovery = Discovery::new(&key, ep.address.clone(), ep.clone(), 0, IpFilter::default()); + let mut discovery = Discovery::new(&key, ep.clone(), IpFilter::default()); for _ in 0..(16 + 10) { discovery.node_buckets[0].nodes.push_back(BucketEntry { @@ -728,7 +672,7 @@ mod tests { let key = Secret::from_str(secret_hex) .and_then(|secret| KeyPair::from_secret(secret)) .unwrap(); - let mut discovery = Discovery::new(&key, ep.address.clone(), ep.clone(), 0, IpFilter::default()); + let mut discovery = Discovery::new(&key, ep.clone(), IpFilter::default()); node_entries.iter().for_each(|entry| discovery.update_node(entry.clone())); @@ -773,7 +717,7 @@ mod tests { fn packets() { let key = Random.generate().unwrap(); let ep = NodeEndpoint { address: SocketAddr::from_str("127.0.0.1:40449").unwrap(), udp_port: 40449 }; - let mut discovery = Discovery::new(&key, ep.address.clone(), ep.clone(), 0, IpFilter::default()); + let mut discovery = Discovery::new(&key, ep.clone(), IpFilter::default()); discovery.check_timestamps = false; let from = SocketAddr::from_str("99.99.99.99:40445").unwrap(); @@ -840,13 +784,13 @@ mod tests { let key2 = Random.generate().unwrap(); let ep1 = NodeEndpoint { address: SocketAddr::from_str("127.0.0.1:40344").unwrap(), udp_port: 40344 }; let ep2 = NodeEndpoint { address: SocketAddr::from_str("127.0.0.1:40345").unwrap(), udp_port: 40345 }; - let mut discovery1 = Discovery::new(&key1, ep1.address.clone(), ep1.clone(), 0, IpFilter::default()); - let mut discovery2 = Discovery::new(&key2, ep2.address.clone(), ep2.clone(), 0, IpFilter::default()); + let mut discovery1 = Discovery::new(&key1, ep1.clone(), IpFilter::default()); + let mut discovery2 = Discovery::new(&key2, ep2.clone(), IpFilter::default()); discovery1.ping(&ep2); - let ping_data = discovery1.send_queue.pop_front().unwrap(); + let ping_data = discovery1.dequeue_send().unwrap(); discovery2.on_packet(&ping_data.payload, ep1.address.clone()).ok(); - let pong_data = discovery2.send_queue.pop_front().unwrap(); + let pong_data = discovery2.dequeue_send().unwrap(); let data = &pong_data.payload[(32 + 65)..]; let rlp = Rlp::new(&data[1..]); assert_eq!(ping_data.payload[0..32], rlp.val_at::>(1).unwrap()[..]) diff --git a/util/network-devp2p/src/host.rs b/util/network-devp2p/src/host.rs index 6d28a838c27..0fbd64b4209 100644 --- a/util/network-devp2p/src/host.rs +++ b/util/network-devp2p/src/host.rs @@ -30,6 +30,7 @@ use hash::keccak; use mio::*; use mio::deprecated::{EventLoop}; use mio::tcp::*; +use mio::udp::*; use ethereum_types::H256; use rlp::{RlpStream, Encodable}; @@ -40,7 +41,7 @@ use node_table::*; use network::{NetworkConfiguration, NetworkIoMessage, ProtocolId, PeerId, PacketId}; use network::{NonReservedPeerMode, NetworkContext as NetworkContextTrait}; use network::{SessionInfo, Error, ErrorKind, DisconnectReason, NetworkProtocolHandler}; -use discovery::{Discovery, TableUpdates, NodeEntry}; +use discovery::{Discovery, TableUpdates, NodeEntry, MAX_DATAGRAM_SIZE}; use ip_utils::{map_external_address, select_public_address}; use path::restrict_permissions_owner; use parking_lot::{Mutex, RwLock}; @@ -239,6 +240,7 @@ struct ProtocolTimer { /// Root IO handler. Manages protocol handlers, IO timers and network connections. pub struct Host { pub info: RwLock, + udp_socket: Mutex>, tcp_listener: Mutex, sessions: Arc>>, discovery: Mutex>, @@ -295,6 +297,7 @@ impl Host { local_endpoint: local_endpoint, }), discovery: Mutex::new(None), + udp_socket: Mutex::new(None), tcp_listener: Mutex::new(tcp_listener), sessions: Arc::new(RwLock::new(Slab::new_starting_at(FIRST_SESSION, MAX_SESSIONS))), nodes: RwLock::new(NodeTable::new(path)), @@ -458,13 +461,16 @@ impl Host { let discovery = { let info = self.info.read(); if info.config.discovery_enabled && info.config.non_reserved_mode == NonReservedPeerMode::Accept { - let mut udp_addr = local_endpoint.address.clone(); - udp_addr.set_port(local_endpoint.udp_port); - Some(Discovery::new(&info.keys, udp_addr, public_endpoint, DISCOVERY, allow_ips)) + Some(Discovery::new(&info.keys, public_endpoint, allow_ips)) } else { None } }; if let Some(mut discovery) = discovery { + let mut udp_addr = local_endpoint.address; + udp_addr.set_port(local_endpoint.udp_port); + let socket = UdpSocket::bind(&udp_addr).expect("Error binding UDP socket"); + *self.udp_socket.lock() = Some(socket); + discovery.init_node_list(self.nodes.read().entries()); discovery.add_node_list(self.nodes.read().entries()); *self.discovery.lock() = Some(discovery); @@ -819,6 +825,67 @@ impl Host { } } + fn discovery_readable(&self, io: &IoContext) { + let node_changes = match (self.udp_socket.lock().as_ref(), self.discovery.lock().as_mut()) { + (Some(udp_socket), Some(discovery)) => { + let mut buf = [0u8; MAX_DATAGRAM_SIZE]; + let writable = discovery.any_sends_queued(); + let res = match udp_socket.recv_from(&mut buf) { + Ok(Some((len, address))) => discovery.on_packet(&buf[0..len], address).unwrap_or_else(|e| { + debug!(target: "network", "Error processing UDP packet: {:?}", e); + None + }), + Ok(_) => None, + Err(e) => { + debug!(target: "network", "Error reading UPD socket: {:?}", e); + None + } + }; + let new_writable = discovery.any_sends_queued(); + if writable != new_writable { + io.update_registration(DISCOVERY) + .unwrap_or_else(|e| { + debug!(target: "network" ,"Error updating discovery registration: {:?}", e) + }); + } + res + }, + _ => None, + }; + if let Some(node_changes) = node_changes { + self.update_nodes(io, node_changes); + } + } + + fn discovery_writable(&self, io: &IoContext) { + match (self.udp_socket.lock().as_ref(), self.discovery.lock().as_mut()) { + (Some(udp_socket), Some(discovery)) => { + while let Some(data) = discovery.dequeue_send() { + match udp_socket.send_to(&data.payload, &data.address) { + Ok(Some(size)) if size == data.payload.len() => { + }, + Ok(Some(_)) => { + warn!(target: "network", "UDP sent incomplete datagram"); + }, + Ok(None) => { + discovery.requeue_send(data); + return; + } + Err(e) => { + debug!(target: "network", "UDP send error: {:?}, address: {:?}", e, &data.address); + return; + } + } + } + io.update_registration(DISCOVERY) + .unwrap_or_else(|e| { + debug!(target: "network", "Error updating discovery registration: {:?}", e) + }); + }, + _ => (), + } + } + fn connection_timeout(&self, token: StreamToken, io: &IoContext) { trace!(target: "network", "Connection timeout: {}", token); self.kill_connection(token, io, true) @@ -920,12 +987,7 @@ impl IoHandler for Host { } match stream { FIRST_SESSION ... LAST_SESSION => self.session_readable(stream, io), - DISCOVERY => { - let node_changes = { self.discovery.lock().as_mut().map_or(None, |d| d.readable(io)) }; - if let Some(node_changes) = node_changes { - self.update_nodes(io, node_changes); - } - }, + DISCOVERY => self.discovery_readable(io), TCP_ACCEPT => self.accept(io), _ => panic!("Received unknown readable token"), } @@ -937,9 +999,7 @@ impl IoHandler for Host { } match stream { FIRST_SESSION ... LAST_SESSION => self.session_writable(stream, io), - DISCOVERY => { - self.discovery.lock().as_mut().map(|d| d.writable(io)); - } + DISCOVERY => self.discovery_writable(io), _ => panic!("Received unknown writable token"), } } @@ -1055,7 +1115,13 @@ impl IoHandler for Host { session.lock().register_socket(reg, event_loop).expect("Error registering socket"); } } - DISCOVERY => self.discovery.lock().as_ref().and_then(|d| d.register_socket(event_loop).ok()).expect("Error registering discovery socket"), + DISCOVERY => match self.udp_socket.lock().as_ref() { + Some(udp_socket) => { + event_loop.register(udp_socket, reg, Ready::all(), PollOpt::edge()) + .expect("Error registering UDP socket"); + }, + _ => panic!("Error registering discovery socket"), + } TCP_ACCEPT => event_loop.register(&*self.tcp_listener.lock(), Token(TCP_ACCEPT), Ready::all(), PollOpt::edge()).expect("Error registering stream"), _ => warn!("Unexpected stream registration") } @@ -1086,7 +1152,18 @@ impl IoHandler for Host { connection.lock().update_socket(reg, event_loop).expect("Error updating socket"); } } - DISCOVERY => self.discovery.lock().as_ref().and_then(|d| d.update_registration(event_loop).ok()).expect("Error reregistering discovery socket"), + DISCOVERY => match (self.udp_socket.lock().as_ref(), self.discovery.lock().as_ref()) { + (Some(udp_socket), Some(discovery)) => { + let registration = if discovery.any_sends_queued() { + Ready::readable() | Ready::writable() + } else { + Ready::readable() + }; + event_loop.reregister(udp_socket, reg, registration, PollOpt::edge()) + .expect("Error reregistering UDP socket"); + }, + _ => panic!("Error reregistering discovery socket"), + } TCP_ACCEPT => event_loop.reregister(&*self.tcp_listener.lock(), Token(TCP_ACCEPT), Ready::all(), PollOpt::edge()).expect("Error reregistering stream"), _ => warn!("Unexpected stream update") } diff --git a/util/version/Cargo.toml b/util/version/Cargo.toml index 297211b2bd0..c7bd4f581e3 100644 --- a/util/version/Cargo.toml +++ b/util/version/Cargo.toml @@ -12,14 +12,13 @@ build = "build.rs" # Used by auto-updater and for Parity version string. track = "nightly" -# Indicates a critical release in this track (i.e. consensus issue) -critical = false - -# Latest supported fork blocks for various networks. Used ONLY by auto-updater. -[package.metadata.forks] -foundation = 4370000 -ropsten = 10 -kovan = 6600000 +# Network specific settings, used ONLY by auto-updater. +# Latest supported fork blocks. +# Indicates a critical release in this track (i.e. consensus issue). +[package.metadata.networks] +foundation = { forkBlock = 4370000, critical = false } +ropsten = { forkBlock = 10, critical = false } +kovan = { forkBlock = 6600000, critical = false } [dependencies] ethcore-bytes = { path = "../bytes" }