From 6f507d1845635a67a07c6118899703426f32bb37 Mon Sep 17 00:00:00 2001 From: Andrew Jones Date: Mon, 20 Aug 2018 12:21:21 +0100 Subject: [PATCH] Prevent sync restart if import queue full --- ethcore/src/client/client.rs | 36 +++++++++++----------------------- ethcore/src/error.rs | 19 ++++++++++++++++++ ethcore/sync/src/block_sync.rs | 6 +++++- 3 files changed, 35 insertions(+), 26 deletions(-) diff --git a/ethcore/src/client/client.rs b/ethcore/src/client/client.rs index 84bcddfd6db..e8707d6eb7c 100644 --- a/ethcore/src/client/client.rs +++ b/ethcore/src/client/client.rs @@ -16,7 +16,6 @@ use std::collections::{HashSet, BTreeMap, VecDeque}; use std::cmp; -use std::fmt; use std::str::FromStr; use std::sync::atomic::{AtomicUsize, AtomicBool, Ordering as AtomicOrdering}; use std::sync::{Arc, Weak}; @@ -50,13 +49,16 @@ use client::{ }; use encoded; use engines::{EthEngine, EpochTransition, ForkChoice}; -use error::{ImportErrorKind, BlockImportErrorKind, ExecutionError, CallError, BlockError, ImportResult, Error as EthcoreError}; +use error::{ + ImportErrorKind, BlockImportErrorKind, ExecutionError, CallError, BlockError, ImportResult, + QueueError, QueueErrorKind, Error as EthcoreError +}; use vm::{EnvInfo, LastHashes}; use evm::Schedule; use executive::{Executive, Executed, TransactOptions, contract_address}; use factory::{Factories, VmFactory}; use header::{BlockNumber, Header, ExtendedHeader}; -use io::{IoChannel, IoError}; +use io::IoChannel; use log_entry::LocalizedLogEntry; use miner::{Miner, MinerService}; use ethcore_miner::pool::VerifiedTransaction; @@ -2095,7 +2097,7 @@ impl IoClient for Client { let queued = self.queued_ancient_blocks.clone(); let lock = self.ancient_blocks_import_lock.clone(); - match self.queue_ancient_blocks.queue(&self.io_channel.read(), 1, move |client| { + self.queue_ancient_blocks.queue(&self.io_channel.read(), 1, move |client| { trace_time!("import_ancient_block"); // Make sure to hold the lock here to prevent importing out of order. // We use separate lock, cause we don't want to block queueing. @@ -2119,10 +2121,9 @@ impl IoClient for Client { break; } } - }) { - Ok(_) => Ok(hash), - Err(e) => bail!(BlockImportErrorKind::Other(format!("{}", e))), - } + })?; + + Ok(hash) } fn queue_consensus_message(&self, message: Bytes) { @@ -2538,21 +2539,6 @@ mod tests { } } -#[derive(Debug)] -enum QueueError { - Channel(IoError), - Full(usize), -} - -impl fmt::Display for QueueError { - fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { - match *self { - QueueError::Channel(ref c) => fmt::Display::fmt(c, fmt), - QueueError::Full(limit) => write!(fmt, "The queue is full ({})", limit), - } - } -} - /// Queue some items to be processed by IO client. struct IoChannelQueue { currently_queued: Arc, @@ -2571,7 +2557,7 @@ impl IoChannelQueue { F: Fn(&Client) + Send + Sync + 'static, { let queue_size = self.currently_queued.load(AtomicOrdering::Relaxed); - ensure!(queue_size < self.limit, QueueError::Full(self.limit)); + ensure!(queue_size < self.limit, QueueErrorKind::Full(self.limit)); let currently_queued = self.currently_queued.clone(); let result = channel.send(ClientIoMessage::execute(move |client| { @@ -2584,7 +2570,7 @@ impl IoChannelQueue { self.currently_queued.fetch_add(count, AtomicOrdering::SeqCst); Ok(()) }, - Err(e) => Err(QueueError::Channel(e)), + Err(e) => bail!(QueueErrorKind::Channel(e)), } } } diff --git a/ethcore/src/error.rs b/ethcore/src/error.rs index 75b2b517503..29ef8961296 100644 --- a/ethcore/src/error.rs +++ b/ethcore/src/error.rs @@ -150,6 +150,24 @@ impl error::Error for BlockError { } } +error_chain! { + types { + QueueError, QueueErrorKind, QueueErrorResultExt, QueueErrorResult; + } + + errors { + #[doc = "Queue is full"] + Full(limit: usize) { + description("Queue is full") + display("The queue is full ({})", limit) + } + } + + foreign_links { + Channel(IoError) #[doc = "Io channel error"]; + } +} + error_chain! { types { ImportError, ImportErrorKind, ImportErrorResultExt, ImportErrorResult; @@ -183,6 +201,7 @@ error_chain! { links { Import(ImportError, ImportErrorKind) #[doc = "Import error"]; + Queue(QueueError, QueueErrorKind) #[doc = "Io channel queue error"]; } foreign_links { diff --git a/ethcore/sync/src/block_sync.rs b/ethcore/sync/src/block_sync.rs index 4c229cd87ae..5dd1bdac244 100644 --- a/ethcore/sync/src/block_sync.rs +++ b/ethcore/sync/src/block_sync.rs @@ -25,7 +25,7 @@ use ethereum_types::H256; use rlp::{self, Rlp}; use ethcore::header::BlockNumber; use ethcore::client::{BlockStatus, BlockId, BlockImportError, BlockImportErrorKind}; -use ethcore::error::{ImportErrorKind, BlockError}; +use ethcore::error::{ImportErrorKind, QueueErrorKind, BlockError}; use sync_io::SyncIo; use blocks::{BlockCollection, SyncBody, SyncHeader}; @@ -513,6 +513,10 @@ impl BlockDownloader { debug!(target: "sync", "Block temporarily invalid, restarting sync"); break; }, + Err(BlockImportError(BlockImportErrorKind::Queue(QueueErrorKind::Full(limit)), _)) => { + debug!(target: "sync", "Block import queue full ({}), restarting sync", limit); + break; + }, Err(e) => { debug!(target: "sync", "Bad block {:?} : {:?}", h, e); bad = true;