Skip to content
This repository has been archived by the owner on Nov 6, 2020. It is now read-only.

Prevent old blocks sync restart if import queue full #9381

Merged
merged 1 commit into from
Aug 24, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 11 additions & 25 deletions ethcore/src/client/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

use std::collections::{HashSet, BTreeMap, VecDeque};
use std::cmp;
use std::fmt;
use std::str::FromStr;
use std::sync::atomic::{AtomicUsize, AtomicBool, Ordering as AtomicOrdering};
use std::sync::{Arc, Weak};
Expand Down Expand Up @@ -50,13 +49,16 @@ use client::{
};
use encoded;
use engines::{EthEngine, EpochTransition, ForkChoice};
use error::{ImportErrorKind, BlockImportErrorKind, ExecutionError, CallError, BlockError, ImportResult, Error as EthcoreError};
use error::{
ImportErrorKind, BlockImportErrorKind, ExecutionError, CallError, BlockError, ImportResult,
QueueError, QueueErrorKind, Error as EthcoreError
};
use vm::{EnvInfo, LastHashes};
use evm::Schedule;
use executive::{Executive, Executed, TransactOptions, contract_address};
use factory::{Factories, VmFactory};
use header::{BlockNumber, Header, ExtendedHeader};
use io::{IoChannel, IoError};
use io::IoChannel;
use log_entry::LocalizedLogEntry;
use miner::{Miner, MinerService};
use ethcore_miner::pool::VerifiedTransaction;
Expand Down Expand Up @@ -2095,7 +2097,7 @@ impl IoClient for Client {

let queued = self.queued_ancient_blocks.clone();
let lock = self.ancient_blocks_import_lock.clone();
match self.queue_ancient_blocks.queue(&self.io_channel.read(), 1, move |client| {
self.queue_ancient_blocks.queue(&self.io_channel.read(), 1, move |client| {
trace_time!("import_ancient_block");
// Make sure to hold the lock here to prevent importing out of order.
// We use separate lock, cause we don't want to block queueing.
Expand All @@ -2119,10 +2121,9 @@ impl IoClient for Client {
break;
}
}
}) {
Ok(_) => Ok(hash),
Err(e) => bail!(BlockImportErrorKind::Other(format!("{}", e))),
}
})?;

Ok(hash)
}

fn queue_consensus_message(&self, message: Bytes) {
Expand Down Expand Up @@ -2538,21 +2539,6 @@ mod tests {
}
}

#[derive(Debug)]
enum QueueError {
Channel(IoError),
Full(usize),
}

impl fmt::Display for QueueError {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
match *self {
QueueError::Channel(ref c) => fmt::Display::fmt(c, fmt),
QueueError::Full(limit) => write!(fmt, "The queue is full ({})", limit),
}
}
}

/// Queue some items to be processed by IO client.
struct IoChannelQueue {
currently_queued: Arc<AtomicUsize>,
Expand All @@ -2571,7 +2557,7 @@ impl IoChannelQueue {
F: Fn(&Client) + Send + Sync + 'static,
{
let queue_size = self.currently_queued.load(AtomicOrdering::Relaxed);
ensure!(queue_size < self.limit, QueueError::Full(self.limit));
ensure!(queue_size < self.limit, QueueErrorKind::Full(self.limit));

let currently_queued = self.currently_queued.clone();
let result = channel.send(ClientIoMessage::execute(move |client| {
Expand All @@ -2584,7 +2570,7 @@ impl IoChannelQueue {
self.currently_queued.fetch_add(count, AtomicOrdering::SeqCst);
Ok(())
},
Err(e) => Err(QueueError::Channel(e)),
Err(e) => bail!(QueueErrorKind::Channel(e)),
}
}
}
19 changes: 19 additions & 0 deletions ethcore/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,24 @@ impl error::Error for BlockError {
}
}

error_chain! {
types {
QueueError, QueueErrorKind, QueueErrorResultExt, QueueErrorResult;
}

errors {
#[doc = "Queue is full"]
Full(limit: usize) {
description("Queue is full")
display("The queue is full ({})", limit)
}
}

foreign_links {
Channel(IoError) #[doc = "Io channel error"];
}
}

error_chain! {
types {
ImportError, ImportErrorKind, ImportErrorResultExt, ImportErrorResult;
Expand Down Expand Up @@ -183,6 +201,7 @@ error_chain! {

links {
Import(ImportError, ImportErrorKind) #[doc = "Import error"];
Queue(QueueError, QueueErrorKind) #[doc = "Io channel queue error"];
}

foreign_links {
Expand Down
6 changes: 5 additions & 1 deletion ethcore/sync/src/block_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use ethereum_types::H256;
use rlp::{self, Rlp};
use ethcore::header::BlockNumber;
use ethcore::client::{BlockStatus, BlockId, BlockImportError, BlockImportErrorKind};
use ethcore::error::{ImportErrorKind, BlockError};
use ethcore::error::{ImportErrorKind, QueueErrorKind, BlockError};
use sync_io::SyncIo;
use blocks::{BlockCollection, SyncBody, SyncHeader};

Expand Down Expand Up @@ -513,6 +513,10 @@ impl BlockDownloader {
debug!(target: "sync", "Block temporarily invalid, restarting sync");
break;
},
Err(BlockImportError(BlockImportErrorKind::Queue(QueueErrorKind::Full(limit)), _)) => {
debug!(target: "sync", "Block import queue full ({}), restarting sync", limit);
break;
},
Err(e) => {
debug!(target: "sync", "Bad block {:?} : {:?}", h, e);
bad = true;
Expand Down