Skip to content
This repository has been archived by the owner on Nov 6, 2020. It is now read-only.

Commit

Permalink
Keep existing blocks when restoring a Snapshot (#8643)
Browse files Browse the repository at this point in the history
* Rename db_restore => client

* First step: make it compile!

* Second step: working implementation!

* Refactoring

* Fix tests

* PR Grumbles

* PR Grumbles WIP

* Migrate ancient blocks interating backward

* Early return in block migration if snapshot is aborted

* Remove RwLock getter (PR Grumble I)

* Remove dependency on `Client`: only used Traits

* Add test for recovering aborted snapshot recovery

* Add test for migrating old blocks

* Fix build

* PR Grumble I

* PR Grumble II

* PR Grumble III

* PR Grumble IV

* PR Grumble V

* PR Grumble VI

* Fix one test

* Fix test

* PR Grumble

* PR Grumbles

* PR Grumbles II

* Fix tests

* Release RwLock earlier

* Revert Cargo.lock

* Update _update ancient block_ logic: set local in `commit`

* Update typo in ethcore/src/snapshot/service.rs

Co-Authored-By: ngotchac <ngotchac@gmail.com>
  • Loading branch information
ngotchac authored and 5chdn committed Nov 27, 2018
1 parent 6a40a46 commit 3a3e321
Show file tree
Hide file tree
Showing 17 changed files with 518 additions and 91 deletions.
11 changes: 11 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions ethcore/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ hardware-wallet = { path = "../hw" }
fake-hardware-wallet = { path = "../util/fake-hardware-wallet" }

[dev-dependencies]
env_logger = "0.4"
tempdir = "0.3"
trie-standardmap = "0.1"

Expand Down
4 changes: 2 additions & 2 deletions ethcore/light/src/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,8 +176,8 @@ impl<T: ProvingBlockChainClient + ?Sized> Provider for T {
}

fn block_receipts(&self, req: request::CompleteReceiptsRequest) -> Option<request::ReceiptsResponse> {
BlockChainClient::encoded_block_receipts(self, &req.hash)
.map(|x| ::request::ReceiptsResponse { receipts: ::rlp::decode_list(&x) })
BlockChainClient::block_receipts(self, &req.hash)
.map(|x| ::request::ReceiptsResponse { receipts: x.receipts })
}

fn account_proof(&self, req: request::CompleteAccountRequest) -> Option<request::AccountResponse> {
Expand Down
2 changes: 1 addition & 1 deletion ethcore/service/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ impl ClientService {
pruning: pruning,
channel: io_service.channel(),
snapshot_root: snapshot_path.into(),
db_restore: client.clone(),
client: client.clone(),
};
let snapshot = Arc::new(SnapshotService::new(snapshot_params)?);

Expand Down
99 changes: 87 additions & 12 deletions ethcore/src/blockchain/blockchain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,7 @@ pub struct BlockChain {

cache_man: Mutex<CacheManager<CacheId>>,

pending_best_ancient_block: RwLock<Option<Option<BestAncientBlock>>>,
pending_best_block: RwLock<Option<BestBlock>>,
pending_block_hashes: RwLock<HashMap<BlockNumber, H256>>,
pending_block_details: RwLock<HashMap<H256, BlockDetails>>,
Expand Down Expand Up @@ -538,6 +539,7 @@ impl BlockChain {
block_receipts: RwLock::new(HashMap::new()),
db: db.clone(),
cache_man: Mutex::new(cache_man),
pending_best_ancient_block: RwLock::new(None),
pending_best_block: RwLock::new(None),
pending_block_hashes: RwLock::new(HashMap::new()),
pending_block_details: RwLock::new(HashMap::new()),
Expand Down Expand Up @@ -808,18 +810,7 @@ impl BlockChain {
}, is_best);

if is_ancient {
let mut best_ancient_block = self.best_ancient_block.write();
let ancient_number = best_ancient_block.as_ref().map_or(0, |b| b.number);
if self.block_hash(block_number + 1).is_some() {
batch.delete(db::COL_EXTRA, b"ancient");
*best_ancient_block = None;
} else if block_number > ancient_number {
batch.put(db::COL_EXTRA, b"ancient", &hash);
*best_ancient_block = Some(BestAncientBlock {
hash: hash,
number: block_number,
});
}
self.set_best_ancient_block(block_number, &hash, batch);
}

false
Expand Down Expand Up @@ -860,6 +851,84 @@ impl BlockChain {
}
}

/// Update the best ancient block to the given hash, after checking that
/// it's directly linked to the currently known best ancient block
pub fn update_best_ancient_block(&self, hash: &H256) {
// Get the block view of the next ancient block (it must
// be in DB at this point)
let block_view = match self.block(hash) {
Some(v) => v,
None => return,
};

// So that `best_ancient_block` gets unlocked before calling
// `set_best_ancient_block`
{
// Get the target hash ; if there are no ancient block,
// it means that the chain is already fully linked
// Release the `best_ancient_block` RwLock
let target_hash = {
let best_ancient_block = self.best_ancient_block.read();
let cur_ancient_block = match *best_ancient_block {
Some(ref b) => b,
None => return,
};

// Ensure that the new best ancient block is after the current one
if block_view.number() <= cur_ancient_block.number {
return;
}

cur_ancient_block.hash.clone()
};

let mut block_hash = *hash;
let mut is_linked = false;

loop {
if block_hash == target_hash {
is_linked = true;
break;
}

match self.block_details(&block_hash) {
Some(block_details) => {
block_hash = block_details.parent;
},
None => break,
}
}

if !is_linked {
trace!(target: "blockchain", "The given block {:x} is not linked to the known ancient block {:x}", hash, target_hash);
return;
}
}

let mut batch = self.db.key_value().transaction();
self.set_best_ancient_block(block_view.number(), hash, &mut batch);
self.db.key_value().write(batch).expect("Low level database error.");
}

/// Set the best ancient block with the given value: private method
/// `best_ancient_block` must not be locked, otherwise a DeadLock would occur
fn set_best_ancient_block(&self, block_number: BlockNumber, block_hash: &H256, batch: &mut DBTransaction) {
let mut pending_best_ancient_block = self.pending_best_ancient_block.write();
let ancient_number = self.best_ancient_block.read().as_ref().map_or(0, |b| b.number);
if self.block_hash(block_number + 1).is_some() {
trace!(target: "blockchain", "The two ends of the chain have met.");
batch.delete(db::COL_EXTRA, b"ancient");
*pending_best_ancient_block = Some(None);
} else if block_number > ancient_number {
trace!(target: "blockchain", "Updating the best ancient block to {}.", block_number);
batch.put(db::COL_EXTRA, b"ancient", &block_hash);
*pending_best_ancient_block = Some(Some(BestAncientBlock {
hash: *block_hash,
number: block_number,
}));
}
}

/// Insert an epoch transition. Provide an epoch number being transitioned to
/// and epoch transition object.
///
Expand Down Expand Up @@ -1112,15 +1181,21 @@ impl BlockChain {

/// Apply pending insertion updates
pub fn commit(&self) {
let mut pending_best_ancient_block = self.pending_best_ancient_block.write();
let mut pending_best_block = self.pending_best_block.write();
let mut pending_write_hashes = self.pending_block_hashes.write();
let mut pending_block_details = self.pending_block_details.write();
let mut pending_write_txs = self.pending_transaction_addresses.write();

let mut best_ancient_block = self.best_ancient_block.write();
let mut best_block = self.best_block.write();
let mut write_block_details = self.block_details.write();
let mut write_hashes = self.block_hashes.write();
let mut write_txs = self.transaction_addresses.write();
// update best ancient block
if let Some(block_option) = pending_best_ancient_block.take() {
*best_ancient_block = block_option;
}
// update best block
if let Some(block) = pending_best_block.take() {
*best_block = block;
Expand Down
24 changes: 18 additions & 6 deletions ethcore/src/client/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use kvdb::{DBValue, KeyValueDB, DBTransaction};
// other
use ethereum_types::{H256, Address, U256};
use block::{IsBlock, LockedBlock, Drain, ClosedBlock, OpenBlock, enact_verified, SealedBlock};
use blockchain::{BlockChain, BlockChainDB, BlockProvider, TreeRoute, ImportRoute, TransactionAddress, ExtrasInsert};
use blockchain::{BlockReceipts, BlockChain, BlockChainDB, BlockProvider, TreeRoute, ImportRoute, TransactionAddress, ExtrasInsert};
use client::ancient_import::AncientVerifier;
use client::{
Nonce, Balance, ChainInfo, BlockInfo, CallContract, TransactionInfo,
Expand Down Expand Up @@ -66,7 +66,7 @@ use ethcore_miner::pool::VerifiedTransaction;
use parking_lot::{Mutex, RwLock};
use rand::OsRng;
use receipt::{Receipt, LocalizedReceipt};
use snapshot::{self, io as snapshot_io};
use snapshot::{self, io as snapshot_io, SnapshotClient};
use spec::Spec;
use state_db::StateDB;
use state::{self, State};
Expand Down Expand Up @@ -1005,6 +1005,16 @@ impl Client {
self.importer.miner.clone()
}

#[cfg(test)]
pub fn state_db(&self) -> ::parking_lot::RwLockReadGuard<StateDB> {
self.state_db.read()
}

#[cfg(test)]
pub fn chain(&self) -> Arc<BlockChain> {
self.chain.read().clone()
}

/// Replace io channel. Useful for testing.
pub fn set_io_channel(&self, io_channel: IoChannel<ClientIoMessage>) {
*self.io_channel.write() = io_channel;
Expand Down Expand Up @@ -1817,7 +1827,7 @@ impl BlockChainClient for Client {
Some(receipt)
}

fn block_receipts(&self, id: BlockId) -> Option<Vec<LocalizedReceipt>> {
fn localized_block_receipts(&self, id: BlockId) -> Option<Vec<LocalizedReceipt>> {
let hash = self.block_hash(id)?;

let chain = self.chain.read();
Expand Down Expand Up @@ -1860,8 +1870,8 @@ impl BlockChainClient for Client {
self.state_db.read().journal_db().state(hash)
}

fn encoded_block_receipts(&self, hash: &H256) -> Option<Bytes> {
self.chain.read().block_receipts(hash).map(|receipts| ::rlp::encode(&receipts))
fn block_receipts(&self, hash: &H256) -> Option<BlockReceipts> {
self.chain.read().block_receipts(hash)
}

fn queue_info(&self) -> BlockQueueInfo {
Expand Down Expand Up @@ -2406,6 +2416,8 @@ impl ProvingBlockChainClient for Client {
}
}

impl SnapshotClient for Client {}

impl Drop for Client {
fn drop(&mut self) {
self.engine.stop();
Expand Down Expand Up @@ -2504,7 +2516,7 @@ mod tests {
use test_helpers::{generate_dummy_client_with_data};

let client = generate_dummy_client_with_data(2, 2, &[1.into(), 1.into()]);
let receipts = client.block_receipts(BlockId::Latest).unwrap();
let receipts = client.localized_block_receipts(BlockId::Latest).unwrap();

assert_eq!(receipts.len(), 2);
assert_eq!(receipts[0].transaction_index, 0);
Expand Down
8 changes: 3 additions & 5 deletions ethcore/src/client/test_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -686,7 +686,7 @@ impl BlockChainClient for TestBlockChainClient {
self.receipts.read().get(&id).cloned()
}

fn block_receipts(&self, _id: BlockId) -> Option<Vec<LocalizedReceipt>> {
fn localized_block_receipts(&self, _id: BlockId) -> Option<Vec<LocalizedReceipt>> {
Some(self.receipts.read().values().cloned().collect())
}

Expand Down Expand Up @@ -789,16 +789,14 @@ impl BlockChainClient for TestBlockChainClient {
None
}

fn encoded_block_receipts(&self, hash: &H256) -> Option<Bytes> {
fn block_receipts(&self, hash: &H256) -> Option<BlockReceipts> {
// starts with 'f' ?
if *hash > H256::from("f000000000000000000000000000000000000000000000000000000000000000") {
let receipt = BlockReceipts::new(vec![Receipt::new(
TransactionOutcome::StateRoot(H256::zero()),
U256::zero(),
vec![])]);
let mut rlp = RlpStream::new();
rlp.append(&receipt);
return Some(rlp.out());
return Some(receipt);
}
None
}
Expand Down
8 changes: 4 additions & 4 deletions ethcore/src/client/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use std::sync::Arc;
use itertools::Itertools;

use block::{OpenBlock, SealedBlock, ClosedBlock};
use blockchain::TreeRoute;
use blockchain::{BlockReceipts, TreeRoute};
use client::Mode;
use encoded;
use vm::LastHashes;
Expand Down Expand Up @@ -282,7 +282,7 @@ pub trait BlockChainClient : Sync + Send + AccountData + BlockChain + CallContra
fn transaction_receipt(&self, id: TransactionId) -> Option<LocalizedReceipt>;

/// Get localized receipts for all transaction in given block.
fn block_receipts(&self, id: BlockId) -> Option<Vec<LocalizedReceipt>>;
fn localized_block_receipts(&self, id: BlockId) -> Option<Vec<LocalizedReceipt>>;

/// Get a tree route between `from` and `to`.
/// See `BlockChain::tree_route`.
Expand All @@ -294,8 +294,8 @@ pub trait BlockChainClient : Sync + Send + AccountData + BlockChain + CallContra
/// Get latest state node
fn state_data(&self, hash: &H256) -> Option<Bytes>;

/// Get raw block receipts data by block header hash.
fn encoded_block_receipts(&self, hash: &H256) -> Option<Bytes>;
/// Get block receipts data by block header hash.
fn block_receipts(&self, hash: &H256) -> Option<BlockReceipts>;

/// Get block queue information.
fn queue_info(&self) -> BlockQueueInfo;
Expand Down
3 changes: 3 additions & 0 deletions ethcore/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,9 @@ extern crate trace_time;
#[cfg_attr(test, macro_use)]
extern crate evm;

#[cfg(test)]
extern crate env_logger;

pub extern crate ethstore;

#[macro_use]
Expand Down
3 changes: 3 additions & 0 deletions ethcore/src/snapshot/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ pub enum Error {
BadEpochProof(u64),
/// Wrong chunk format.
WrongChunkFormat(String),
/// Unlinked ancient block chain
UnlinkedAncientBlockChain,
}

impl fmt::Display for Error {
Expand All @@ -91,6 +93,7 @@ impl fmt::Display for Error {
Error::SnapshotsUnsupported => write!(f, "Snapshots unsupported by consensus engine."),
Error::BadEpochProof(i) => write!(f, "Bad epoch proof for transition to epoch {}", i),
Error::WrongChunkFormat(ref msg) => write!(f, "Wrong chunk format: {}", msg),
Error::UnlinkedAncientBlockChain => write!(f, "Unlinked ancient blocks chain"),
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion ethcore/src/snapshot/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ use rand::{Rng, OsRng};
pub use self::error::Error;

pub use self::consensus::*;
pub use self::service::{Service, DatabaseRestore};
pub use self::service::{SnapshotClient, Service, DatabaseRestore};
pub use self::traits::SnapshotService;
pub use self::watcher::Watcher;
pub use types::snapshot_manifest::ManifestData;
Expand Down
Loading

0 comments on commit 3a3e321

Please sign in to comment.