diff --git a/ethcore/sync/src/block_sync.rs b/ethcore/sync/src/block_sync.rs index 388a62666a8..bf3ffd85820 100644 --- a/ethcore/sync/src/block_sync.rs +++ b/ethcore/sync/src/block_sync.rs @@ -28,6 +28,7 @@ use ethcore::client::{BlockStatus, BlockId, BlockImportError, BlockImportErrorKi use ethcore::error::{ImportErrorKind, QueueErrorKind, BlockError}; use sync_io::SyncIo; use blocks::{BlockCollection, SyncBody, SyncHeader}; +use chain::BlockSet; const MAX_HEADERS_TO_REQUEST: usize = 128; const MAX_BODIES_TO_REQUEST: usize = 32; @@ -35,6 +36,26 @@ const MAX_RECEPITS_TO_REQUEST: usize = 128; const SUBCHAIN_SIZE: u64 = 256; const MAX_ROUND_PARENTS: usize = 16; const MAX_PARALLEL_SUBCHAIN_DOWNLOAD: usize = 5; +const MAX_USELESS_HEADERS_PER_ROUND: usize = 3; + +// logging macros prepend BlockSet context for log filtering +macro_rules! trace_sync { + ($self:ident, $fmt:expr, $($arg:tt)+) => { + trace!(target: "sync", concat!("{:?}: ", $fmt), $self.block_set, $($arg)+); + }; + ($self:ident, $fmt:expr) => { + trace!(target: "sync", concat!("{:?}: ", $fmt), $self.block_set); + }; +} + +macro_rules! debug_sync { + ($self:ident, $fmt:expr, $($arg:tt)+) => { + debug!(target: "sync", concat!("{:?}: ", $fmt), $self.block_set, $($arg)+); + }; + ($self:ident, $fmt:expr) => { + debug!(target: "sync", concat!("{:?}: ", $fmt), $self.block_set); + }; +} #[derive(Copy, Clone, Eq, PartialEq, Debug)] /// Downloader state @@ -65,6 +86,7 @@ pub enum BlockRequest { } /// Indicates sync action +#[derive(Eq, PartialEq, Debug)] pub enum DownloadAction { /// Do nothing None, @@ -89,15 +111,17 @@ impl From for BlockDownloaderImportError { /// Block downloader strategy. /// Manages state and block data for a block download process. pub struct BlockDownloader { + /// Which set of blocks to download + block_set: BlockSet, /// Downloader state state: State, /// Highest block number seen highest_block: Option, /// Downloaded blocks, holds `H`, `B` and `S` blocks: BlockCollection, - /// Last impoted block number + /// Last imported block number last_imported_block: BlockNumber, - /// Last impoted block hash + /// Last imported block hash last_imported_hash: H256, /// Number of blocks imported this round imported_this_round: Option, @@ -114,32 +138,20 @@ pub struct BlockDownloader { retract_step: u64, /// Whether reorg should be limited. limit_reorg: bool, + /// consecutive useless headers this round + useless_headers_count: usize, } impl BlockDownloader { - /// Create a new instance of syncing strategy. This won't reorganize to before the - /// last kept state. - pub fn new(sync_receipts: bool, start_hash: &H256, start_number: BlockNumber) -> Self { - BlockDownloader { - state: State::Idle, - highest_block: None, - last_imported_block: start_number, - last_imported_hash: start_hash.clone(), - last_round_start: start_number, - last_round_start_hash: start_hash.clone(), - blocks: BlockCollection::new(sync_receipts), - imported_this_round: None, - round_parents: VecDeque::new(), - download_receipts: sync_receipts, - target_hash: None, - retract_step: 1, - limit_reorg: true, - } - } - - /// Create a new instance of sync with unlimited reorg allowed. - pub fn with_unlimited_reorg(sync_receipts: bool, start_hash: &H256, start_number: BlockNumber) -> Self { + /// Create a new instance of syncing strategy. + /// For BlockSet::NewBlocks this won't reorganize to before the last kept state. + pub fn new(block_set: BlockSet, start_hash: &H256, start_number: BlockNumber) -> Self { + let (limit_reorg, sync_receipts) = match block_set { + BlockSet::NewBlocks => (true, false), + BlockSet::OldBlocks => (false, true) + }; BlockDownloader { + block_set: block_set, state: State::Idle, highest_block: None, last_imported_block: start_number, @@ -152,13 +164,15 @@ impl BlockDownloader { download_receipts: sync_receipts, target_hash: None, retract_step: 1, - limit_reorg: false, + limit_reorg: limit_reorg, + useless_headers_count: 0, } } /// Reset sync. Clear all local downloaded data. pub fn reset(&mut self) { self.blocks.clear(); + self.useless_headers_count = 0; self.state = State::Idle; } @@ -223,7 +237,7 @@ impl BlockDownloader { pub fn import_headers(&mut self, io: &mut SyncIo, r: &Rlp, expected_hash: H256) -> Result { let item_count = r.item_count().unwrap_or(0); if self.state == State::Idle { - trace!(target: "sync", "Ignored unexpected block headers"); + trace_sync!(self, "Ignored unexpected block headers"); return Ok(DownloadAction::None) } if item_count == 0 && (self.state == State::Blocks) { @@ -270,15 +284,15 @@ impl BlockDownloader { last_header = Some((number, hash)); if self.blocks.contains(&hash) { - trace!(target: "sync", "Skipping existing block header {} ({:?})", number, hash); + trace_sync!(self, "Skipping existing block header {} ({:?})", number, hash); continue; } match io.chain().block_status(BlockId::Hash(hash.clone())) { BlockStatus::InChain | BlockStatus::Queued => { match self.state { - State::Blocks => trace!(target: "sync", "Header already in chain {} ({})", number, hash), - _ => trace!(target: "sync", "Header already in chain {} ({}), state = {:?}", number, hash, self.state), + State::Blocks => trace_sync!(self, "Header already in chain {} ({})", number, hash), + _ => trace_sync!(self, "Header already in chain {} ({}), state = {:?}", number, hash, self.state), } headers.push(info); hashes.push(hash); @@ -302,7 +316,7 @@ impl BlockDownloader { match self.state { State::ChainHead => { if !headers.is_empty() { - trace!(target: "sync", "Received {} subchain heads, proceeding to download", headers.len()); + trace_sync!(self, "Received {} subchain heads, proceeding to download", headers.len()); self.blocks.reset_to(hashes); self.state = State::Blocks; return Ok(DownloadAction::Reset); @@ -311,21 +325,29 @@ impl BlockDownloader { let oldest_reorg = io.chain().pruning_info().earliest_state; let last = self.last_imported_block; if self.limit_reorg && best > last && (last == 0 || last < oldest_reorg) { - trace!(target: "sync", "No common block, disabling peer"); + trace_sync!(self, "No common block, disabling peer"); return Err(BlockDownloaderImportError::Invalid); } } }, State::Blocks => { let count = headers.len(); + // At least one of the headers must advance the subchain. Otherwise they are all useless. if count == 0 { - trace!(target: "sync", "No useful headers"); + self.useless_headers_count += 1; + trace_sync!(self, "No useful headers ({:?} this round), expected hash {:?}", self.useless_headers_count, expected_hash); + // only reset download if we have multiple subchain heads, to avoid unnecessary resets + // when we are at the head of the chain when we may legitimately receive no useful headers + if self.blocks.heads_len() > 1 && self.useless_headers_count >= MAX_USELESS_HEADERS_PER_ROUND { + trace_sync!(self, "Received {:?} useless responses this round. Resetting sync", MAX_USELESS_HEADERS_PER_ROUND); + self.reset(); + } return Err(BlockDownloaderImportError::Useless); } self.blocks.insert_headers(headers); - trace!(target: "sync", "Inserted {} headers", count); + trace_sync!(self, "Inserted {} headers", count); }, - _ => trace!(target: "sync", "Unexpected headers({})", headers.len()), + _ => trace_sync!(self, "Unexpected headers({})", headers.len()), } Ok(DownloadAction::None) @@ -337,7 +359,7 @@ impl BlockDownloader { if item_count == 0 { return Err(BlockDownloaderImportError::Useless); } else if self.state != State::Blocks { - trace!(target: "sync", "Ignored unexpected block bodies"); + trace_sync!(self, "Ignored unexpected block bodies"); } else { let mut bodies = Vec::with_capacity(item_count); for i in 0..item_count { @@ -347,11 +369,11 @@ impl BlockDownloader { let hashes = self.blocks.insert_bodies(bodies); if hashes.len() != item_count { - trace!(target: "sync", "Deactivating peer for giving invalid block bodies"); + trace_sync!(self, "Deactivating peer for giving invalid block bodies"); return Err(BlockDownloaderImportError::Invalid); } if !all_expected(hashes.as_slice(), expected_hashes, |&a, &b| a == b) { - trace!(target: "sync", "Deactivating peer for giving unexpected block bodies"); + trace_sync!(self, "Deactivating peer for giving unexpected block bodies"); return Err(BlockDownloaderImportError::Invalid); } } @@ -365,24 +387,24 @@ impl BlockDownloader { return Err(BlockDownloaderImportError::Useless); } else if self.state != State::Blocks { - trace!(target: "sync", "Ignored unexpected block receipts"); + trace_sync!(self, "Ignored unexpected block receipts"); } else { let mut receipts = Vec::with_capacity(item_count); for i in 0..item_count { let receipt = r.at(i).map_err(|e| { - trace!(target: "sync", "Error decoding block receipts RLP: {:?}", e); + trace_sync!(self, "Error decoding block receipts RLP: {:?}", e); BlockDownloaderImportError::Invalid })?; receipts.push(receipt.as_raw().to_vec()); } let hashes = self.blocks.insert_receipts(receipts); if hashes.len() != item_count { - trace!(target: "sync", "Deactivating peer for giving invalid block receipts"); + trace_sync!(self, "Deactivating peer for giving invalid block receipts"); return Err(BlockDownloaderImportError::Invalid); } if !all_expected(hashes.as_slice(), expected_hashes, |a, b| a.contains(b)) { - trace!(target: "sync", "Deactivating peer for giving unexpected block receipts"); + trace_sync!(self, "Deactivating peer for giving unexpected block receipts"); return Err(BlockDownloaderImportError::Invalid); } } @@ -391,7 +413,7 @@ impl BlockDownloader { fn start_sync_round(&mut self, io: &mut SyncIo) { self.state = State::ChainHead; - trace!(target: "sync", "Starting round (last imported count = {:?}, last started = {}, block = {:?}", self.imported_this_round, self.last_round_start, self.last_imported_block); + trace_sync!(self, "Starting round (last imported count = {:?}, last started = {}, block = {:?}", self.imported_this_round, self.last_round_start, self.last_imported_block); // Check if need to retract to find the common block. The problem is that the peers still return headers by hash even // from the non-canonical part of the tree. So we also retract if nothing has been imported last round. let start = self.last_round_start; @@ -403,12 +425,12 @@ impl BlockDownloader { if let Some(&(_, p)) = self.round_parents.iter().find(|&&(h, _)| h == start_hash) { self.last_imported_block = start - 1; self.last_imported_hash = p.clone(); - trace!(target: "sync", "Searching common header from the last round {} ({})", self.last_imported_block, self.last_imported_hash); + trace_sync!(self, "Searching common header from the last round {} ({})", self.last_imported_block, self.last_imported_hash); } else { let best = io.chain().chain_info().best_block_number; let oldest_reorg = io.chain().pruning_info().earliest_state; if self.limit_reorg && best > start && start < oldest_reorg { - debug!(target: "sync", "Could not revert to previous ancient block, last: {} ({})", start, start_hash); + debug_sync!(self, "Could not revert to previous ancient block, last: {} ({})", start, start_hash); self.reset(); } else { let n = start - cmp::min(self.retract_step, start); @@ -417,10 +439,10 @@ impl BlockDownloader { Some(h) => { self.last_imported_block = n; self.last_imported_hash = h; - trace!(target: "sync", "Searching common header in the blockchain {} ({})", start, self.last_imported_hash); + trace_sync!(self, "Searching common header in the blockchain {} ({})", start, self.last_imported_hash); } None => { - debug!(target: "sync", "Could not revert to previous block, last: {} ({})", start, self.last_imported_hash); + debug_sync!(self, "Could not revert to previous block, last: {} ({})", start, self.last_imported_hash); self.reset(); } } @@ -448,7 +470,7 @@ impl BlockDownloader { State::ChainHead => { if num_active_peers < MAX_PARALLEL_SUBCHAIN_DOWNLOAD { // Request subchain headers - trace!(target: "sync", "Starting sync with better chain"); + trace_sync!(self, "Starting sync with better chain"); // Request MAX_HEADERS_TO_REQUEST - 2 headers apart so that // MAX_HEADERS_TO_REQUEST would include headers for neighbouring subchains return Some(BlockRequest::Headers { @@ -491,8 +513,9 @@ impl BlockDownloader { } /// Checks if there are blocks fully downloaded that can be imported into the blockchain and does the import. - pub fn collect_blocks(&mut self, io: &mut SyncIo, allow_out_of_order: bool) -> Result<(), BlockDownloaderImportError> { - let mut bad = false; + /// Returns DownloadAction::Reset if it is imported all the the blocks it can and all downloading peers should be reset + pub fn collect_blocks(&mut self, io: &mut SyncIo, allow_out_of_order: bool) -> DownloadAction { + let mut download_action = DownloadAction::None; let mut imported = HashSet::new(); let blocks = self.blocks.drain(); let count = blocks.len(); @@ -506,8 +529,8 @@ impl BlockDownloader { if self.target_hash.as_ref().map_or(false, |t| t == &h) { self.state = State::Complete; - trace!(target: "sync", "Sync target reached"); - return Ok(()); + trace_sync!(self, "Sync target reached"); + return download_action; } let result = if let Some(receipts) = receipts { @@ -518,15 +541,15 @@ impl BlockDownloader { match result { Err(BlockImportError(BlockImportErrorKind::Import(ImportErrorKind::AlreadyInChain), _)) => { - trace!(target: "sync", "Block already in chain {:?}", h); + trace_sync!(self, "Block already in chain {:?}", h); self.block_imported(&h, number, &parent); }, Err(BlockImportError(BlockImportErrorKind::Import(ImportErrorKind::AlreadyQueued), _)) => { - trace!(target: "sync", "Block already queued {:?}", h); + trace_sync!(self, "Block already queued {:?}", h); self.block_imported(&h, number, &parent); }, Ok(_) => { - trace!(target: "sync", "Block queued {:?}", h); + trace_sync!(self, "Block queued {:?}", h); imported.insert(h.clone()); self.block_imported(&h, number, &parent); }, @@ -534,37 +557,34 @@ impl BlockDownloader { break; }, Err(BlockImportError(BlockImportErrorKind::Block(BlockError::UnknownParent(_)), _)) => { - trace!(target: "sync", "Unknown new block parent, restarting sync"); + trace_sync!(self, "Unknown new block parent, restarting sync"); break; }, Err(BlockImportError(BlockImportErrorKind::Block(BlockError::TemporarilyInvalid(_)), _)) => { - debug!(target: "sync", "Block temporarily invalid, restarting sync"); + debug_sync!(self, "Block temporarily invalid: {:?}, restarting sync", h); break; }, Err(BlockImportError(BlockImportErrorKind::Queue(QueueErrorKind::Full(limit)), _)) => { - debug!(target: "sync", "Block import queue full ({}), restarting sync", limit); + debug_sync!(self, "Block import queue full ({}), restarting sync", limit); + download_action = DownloadAction::Reset; break; }, Err(e) => { - debug!(target: "sync", "Bad block {:?} : {:?}", h, e); - bad = true; + debug_sync!(self, "Bad block {:?} : {:?}", h, e); + download_action = DownloadAction::Reset; break; } } } - trace!(target: "sync", "Imported {} of {}", imported.len(), count); + trace_sync!(self, "Imported {} of {}", imported.len(), count); self.imported_this_round = Some(self.imported_this_round.unwrap_or(0) + imported.len()); - if bad { - return Err(BlockDownloaderImportError::Invalid); - } - if self.blocks.is_empty() { // complete sync round - trace!(target: "sync", "Sync round complete"); - self.reset(); + trace_sync!(self, "Sync round complete"); + download_action = DownloadAction::Reset; } - Ok(()) + download_action } fn block_imported(&mut self, hash: &H256, number: BlockNumber, parent: &H256) { @@ -623,6 +643,20 @@ mod tests { Transaction::default().sign(keypair.secret(), None) } + fn import_headers(headers: &[BlockHeader], downloader: &mut BlockDownloader, io: &mut SyncIo) -> Result { + let mut stream = RlpStream::new(); + stream.append_list(headers); + let bytes = stream.out(); + let rlp = Rlp::new(&bytes); + let expected_hash = headers.first().unwrap().hash(); + downloader.import_headers(io, &rlp, expected_hash) + } + + fn import_headers_ok(headers: &[BlockHeader], downloader: &mut BlockDownloader, io: &mut SyncIo) { + let res = import_headers(headers, downloader, io); + assert!(res.is_ok()); + } + #[test] fn import_headers_in_chain_head_state() { ::env_logger::try_init().ok(); @@ -630,7 +664,7 @@ mod tests { let spec = Spec::new_test(); let genesis_hash = spec.genesis_header().hash(); - let mut downloader = BlockDownloader::new(false, &genesis_hash, 0); + let mut downloader = BlockDownloader::new(BlockSet::NewBlocks, &genesis_hash, 0); downloader.state = State::ChainHead; let mut chain = TestBlockChainClient::new(); @@ -712,7 +746,7 @@ mod tests { let parent_hash = headers[1].hash(); headers.push(dummy_header(129, parent_hash)); - let mut downloader = BlockDownloader::new(false, &H256::random(), 0); + let mut downloader = BlockDownloader::new(BlockSet::NewBlocks, &H256::random(), 0); downloader.state = State::Blocks; downloader.blocks.reset_to(vec![headers[0].hash()]); @@ -782,7 +816,7 @@ mod tests { headers.push(header); } - let mut downloader = BlockDownloader::new(false, &headers[0].hash(), 0); + let mut downloader = BlockDownloader::new(BlockSet::NewBlocks, &headers[0].hash(), 0); downloader.state = State::Blocks; downloader.blocks.reset_to(vec![headers[0].hash()]); @@ -846,7 +880,7 @@ mod tests { headers.push(header); } - let mut downloader = BlockDownloader::new(true, &headers[0].hash(), 0); + let mut downloader = BlockDownloader::new(BlockSet::OldBlocks, &headers[0].hash(), 0); downloader.state = State::Blocks; downloader.blocks.reset_to(vec![headers[0].hash()]); @@ -871,4 +905,84 @@ mod tests { _ => panic!("expected BlockDownloaderImportError"), }; } + + #[test] + fn reset_after_multiple_sets_of_useless_headers() { + ::env_logger::try_init().ok(); + + let spec = Spec::new_test(); + let genesis_hash = spec.genesis_header().hash(); + + let mut downloader = BlockDownloader::new(BlockSet::NewBlocks, &genesis_hash, 0); + downloader.state = State::ChainHead; + + let mut chain = TestBlockChainClient::new(); + let snapshot_service = TestSnapshotService::new(); + let queue = RwLock::new(VecDeque::new()); + let mut io = TestIo::new(&mut chain, &snapshot_service, &queue, None); + + let heads = [ + spec.genesis_header(), + dummy_header(127, H256::random()), + dummy_header(254, H256::random()), + ]; + + let short_subchain = [dummy_header(1, genesis_hash)]; + + import_headers_ok(&heads, &mut downloader, &mut io); + import_headers_ok(&short_subchain, &mut downloader, &mut io); + + assert_eq!(downloader.state, State::Blocks); + assert!(!downloader.blocks.is_empty()); + + // simulate receiving useless headers + let head = vec![short_subchain.last().unwrap().clone()]; + for _ in 0..MAX_USELESS_HEADERS_PER_ROUND { + let res = import_headers(&head, &mut downloader, &mut io); + assert!(res.is_err()); + } + + assert_eq!(downloader.state, State::Idle); + assert!(downloader.blocks.is_empty()); + } + + #[test] + fn dont_reset_after_multiple_sets_of_useless_headers_for_chain_head() { + ::env_logger::try_init().ok(); + + let spec = Spec::new_test(); + let genesis_hash = spec.genesis_header().hash(); + + let mut downloader = BlockDownloader::new(BlockSet::NewBlocks, &genesis_hash, 0); + downloader.state = State::ChainHead; + + let mut chain = TestBlockChainClient::new(); + let snapshot_service = TestSnapshotService::new(); + let queue = RwLock::new(VecDeque::new()); + let mut io = TestIo::new(&mut chain, &snapshot_service, &queue, None); + + let heads = [ + spec.genesis_header() + ]; + + let short_subchain = [dummy_header(1, genesis_hash)]; + + import_headers_ok(&heads, &mut downloader, &mut io); + import_headers_ok(&short_subchain, &mut downloader, &mut io); + + assert_eq!(downloader.state, State::Blocks); + assert!(!downloader.blocks.is_empty()); + + // simulate receiving useless headers + let head = vec![short_subchain.last().unwrap().clone()]; + for _ in 0..MAX_USELESS_HEADERS_PER_ROUND { + let res = import_headers(&head, &mut downloader, &mut io); + assert!(res.is_err()); + } + + // download shouldn't be reset since this is the chain head for a single subchain. + // this state usually occurs for NewBlocks when it has reached the chain head. + assert_eq!(downloader.state, State::Blocks); + assert!(!downloader.blocks.is_empty()); + } } diff --git a/ethcore/sync/src/blocks.rs b/ethcore/sync/src/blocks.rs index 35190d1b3c9..c64843e3b2a 100644 --- a/ethcore/sync/src/blocks.rs +++ b/ethcore/sync/src/blocks.rs @@ -397,6 +397,11 @@ impl BlockCollection { self.blocks.contains_key(hash) } + /// Check the number of heads + pub fn heads_len(&self) -> usize { + self.heads.len() + } + /// Return used heap size. pub fn heap_size(&self) -> usize { self.heads.heap_size_of_children() diff --git a/ethcore/sync/src/chain/handler.rs b/ethcore/sync/src/chain/handler.rs index 870598615a7..bbb3db3288e 100644 --- a/ethcore/sync/src/chain/handler.rs +++ b/ethcore/sync/src/chain/handler.rs @@ -293,7 +293,9 @@ impl SyncHandler { let block_set = sync.peers.get(&peer_id) .and_then(|p| p.block_set) .unwrap_or(BlockSet::NewBlocks); - if !sync.reset_peer_asking(peer_id, PeerAsking::BlockBodies) { + let allowed = sync.peers.get(&peer_id).map(|p| p.is_allowed()).unwrap_or(false); + + if !sync.reset_peer_asking(peer_id, PeerAsking::BlockBodies) || !allowed { trace!(target: "sync", "{}: Ignored unexpected bodies", peer_id); return Ok(()); } @@ -420,12 +422,8 @@ impl SyncHandler { downloader.import_headers(io, r, expected_hash)? }; - if let DownloadAction::Reset = result { - // mark all outstanding requests as expired - trace!("Resetting downloads for {:?}", block_set); - for (_, ref mut p) in sync.peers.iter_mut().filter(|&(_, ref p)| p.block_set == Some(block_set)) { - p.reset_asking(); - } + if result == DownloadAction::Reset { + sync.reset_downloads(block_set); } sync.collect_blocks(io, block_set); @@ -436,7 +434,8 @@ impl SyncHandler { fn on_peer_block_receipts(sync: &mut ChainSync, io: &mut SyncIo, peer_id: PeerId, r: &Rlp) -> Result<(), DownloaderImportError> { sync.clear_peer_download(peer_id); let block_set = sync.peers.get(&peer_id).and_then(|p| p.block_set).unwrap_or(BlockSet::NewBlocks); - if !sync.reset_peer_asking(peer_id, PeerAsking::BlockReceipts) { + let allowed = sync.peers.get(&peer_id).map(|p| p.is_allowed()).unwrap_or(false); + if !sync.reset_peer_asking(peer_id, PeerAsking::BlockReceipts) || !allowed { trace!(target: "sync", "{}: Ignored unexpected receipts", peer_id); return Ok(()); } diff --git a/ethcore/sync/src/chain/mod.rs b/ethcore/sync/src/chain/mod.rs index 8941437b705..f5a0e30b8da 100644 --- a/ethcore/sync/src/chain/mod.rs +++ b/ethcore/sync/src/chain/mod.rs @@ -109,7 +109,7 @@ use ethcore::client::{BlockChainClient, BlockStatus, BlockId, BlockChainInfo, Bl use ethcore::snapshot::{RestorationStatus}; use sync_io::SyncIo; use super::{WarpSync, SyncConfig}; -use block_sync::{BlockDownloader, BlockDownloaderImportError as DownloaderImportError}; +use block_sync::{BlockDownloader, DownloadAction}; use rand::Rng; use snapshot::{Snapshot}; use api::{EthProtocolInfo as PeerInfoDigest, WARP_SYNC_PROTOCOL_ID}; @@ -429,7 +429,7 @@ impl ChainSync { peers: HashMap::new(), handshaking_peers: HashMap::new(), active_peers: HashSet::new(), - new_blocks: BlockDownloader::new(false, &chain_info.best_block_hash, chain_info.best_block_number), + new_blocks: BlockDownloader::new(BlockSet::NewBlocks, &chain_info.best_block_hash, chain_info.best_block_number), old_blocks: None, last_sent_block_number: 0, network_id: config.network_id, @@ -638,13 +638,13 @@ impl ChainSync { pub fn update_targets(&mut self, chain: &BlockChainClient) { // Do not assume that the block queue/chain still has our last_imported_block let chain = chain.chain_info(); - self.new_blocks = BlockDownloader::new(false, &chain.best_block_hash, chain.best_block_number); + self.new_blocks = BlockDownloader::new(BlockSet::NewBlocks, &chain.best_block_hash, chain.best_block_number); self.old_blocks = None; if self.download_old_blocks { if let (Some(ancient_block_hash), Some(ancient_block_number)) = (chain.ancient_block_hash, chain.ancient_block_number) { trace!(target: "sync", "Downloading old blocks from {:?} (#{}) till {:?} (#{:?})", ancient_block_hash, ancient_block_number, chain.first_block_hash, chain.first_block_number); - let mut downloader = BlockDownloader::with_unlimited_reorg(true, &ancient_block_hash, ancient_block_number); + let mut downloader = BlockDownloader::new(BlockSet::OldBlocks, &ancient_block_hash, ancient_block_number); if let Some(hash) = chain.first_block_hash { trace!(target: "sync", "Downloader target set to {:?}", hash); downloader.set_target(&hash); @@ -763,12 +763,10 @@ impl ChainSync { } } - // Only ask for old blocks if the peer has a higher difficulty than the last imported old block - let last_imported_old_block_difficulty = self.old_blocks.as_mut().and_then(|d| { - io.chain().block_total_difficulty(BlockId::Number(d.last_imported_block_number())) - }); + // Only ask for old blocks if the peer has an equal or higher difficulty + let equal_or_higher_difficulty = peer_difficulty.map_or(false, |pd| pd >= syncing_difficulty); - if force || last_imported_old_block_difficulty.map_or(true, |ld| peer_difficulty.map_or(true, |pd| pd > ld)) { + if force || equal_or_higher_difficulty { if let Some(request) = self.old_blocks.as_mut().and_then(|d| d.request_blocks(io, num_active_peers)) { SyncRequester::request_blocks(self, io, peer_id, request, BlockSet::OldBlocks); return; @@ -776,9 +774,9 @@ impl ChainSync { } else { trace!( target: "sync", - "peer {:?} is not suitable for requesting old blocks, last_imported_old_block_difficulty={:?}, peer_difficulty={:?}", + "peer {:?} is not suitable for requesting old blocks, syncing_difficulty={:?}, peer_difficulty={:?}", peer_id, - last_imported_old_block_difficulty, + syncing_difficulty, peer_difficulty ); self.deactivate_peer(io, peer_id); @@ -856,18 +854,39 @@ impl ChainSync { fn collect_blocks(&mut self, io: &mut SyncIo, block_set: BlockSet) { match block_set { BlockSet::NewBlocks => { - if self.new_blocks.collect_blocks(io, self.state == SyncState::NewBlocks) == Err(DownloaderImportError::Invalid) { - self.restart(io); + if self.new_blocks.collect_blocks(io, self.state == SyncState::NewBlocks) == DownloadAction::Reset { + self.reset_downloads(block_set); + self.new_blocks.reset(); } }, BlockSet::OldBlocks => { - if self.old_blocks.as_mut().map_or(false, |downloader| { downloader.collect_blocks(io, false) == Err(DownloaderImportError::Invalid) }) { - self.restart(io); - } else if self.old_blocks.as_ref().map_or(false, |downloader| { downloader.is_complete() }) { + let mut is_complete = false; + let mut download_action = DownloadAction::None; + if let Some(downloader) = self.old_blocks.as_mut() { + download_action = downloader.collect_blocks(io, false); + is_complete = downloader.is_complete(); + } + + if download_action == DownloadAction::Reset { + self.reset_downloads(block_set); + if let Some(downloader) = self.old_blocks.as_mut() { + downloader.reset(); + } + } + + if is_complete { trace!(target: "sync", "Background block download is complete"); self.old_blocks = None; } } + }; + } + + /// Mark all outstanding requests as expired + fn reset_downloads(&mut self, block_set: BlockSet) { + trace!(target: "sync", "Resetting downloads for {:?}", block_set); + for (_, ref mut p) in self.peers.iter_mut().filter(|&(_, ref p)| p.block_set == Some(block_set)) { + p.reset_asking(); } }