Skip to content
This repository has been archived by the owner on Nov 6, 2020. It is now read-only.

Multithreaded snapshot creation #9239

Merged
merged 27 commits into from
Sep 13, 2018
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
f432170
Add Progress to Snapshot Secondary chunks creation
ngotchac Jul 24, 2018
4418342
Use half of CPUs to multithread snapshot creation
ngotchac Jul 24, 2018
422d214
Use env var to define number of threads
ngotchac Jul 24, 2018
5242cd5
info to debug logs
ngotchac Jul 24, 2018
9d44c47
Add Snapshot threads as CLI option
ngotchac Jul 25, 2018
8f3ece7
Randomize chunks per thread
ngotchac Jul 25, 2018
d278cb9
Remove randomness, add debugging
ngotchac Jul 25, 2018
4ee58d2
Add warning
ngotchac Jul 25, 2018
db4f5d5
Add tracing
ngotchac Jul 26, 2018
603d265
Use parity-common fix seek branch
ngotchac Jul 27, 2018
73b8858
Fix log
ngotchac Jul 27, 2018
4c68b1c
Fix tests
ngotchac Jul 27, 2018
680cef5
Fix tests
ngotchac Jul 27, 2018
17f49d9
Merge branch 'master' into ng-multithread-snapshot-creation
ngotchac Jul 27, 2018
14eda40
Merge branch 'master' into ng-multithread-snapshot-creation
ngotchac Aug 1, 2018
cd5e93d
Merge branch 'master' into ng-multithread-snapshot-creation
ngotchac Aug 22, 2018
09a9076
PR Grumbles
ngotchac Aug 22, 2018
4304c23
Merge branch 'master' into ng-multithread-snapshot-creation
ngotchac Aug 22, 2018
4e265cb
PR Grumble II
ngotchac Aug 22, 2018
7670562
Merge branch 'master' into ng-multithread-snapshot-creation
ngotchac Sep 7, 2018
4c8e900
Merge branch 'master' into ng-multithread-snapshot-creation
ngotchac Sep 7, 2018
8dff8f1
Update Cargo.lock
ngotchac Sep 7, 2018
3763dd0
PR Grumbles
ngotchac Sep 7, 2018
087468a
Default snapshot threads to half number of CPUs
ngotchac Sep 7, 2018
ceabd5c
Fix default snapshot threads // min 1
ngotchac Sep 7, 2018
1610154
Merge branch 'master' into ng-multithread-snapshot-creation
ngotchac Sep 13, 2018
d8c10b7
Merge branch 'master' into ng-multithread-snapshot-creation
ngotchac Sep 13, 2018
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,420 changes: 765 additions & 655 deletions Cargo.lock

Large diffs are not rendered by default.

3 changes: 2 additions & 1 deletion ethcore/src/client/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1138,7 +1138,8 @@ impl Client {
},
};

snapshot::take_snapshot(&*self.engine, &self.chain.read(), start_hash, db.as_hashdb(), writer, p)?;
let processing_threads = self.config.snapshot.processing_threads;
snapshot::take_snapshot(&*self.engine, &self.chain.read(), start_hash, db.as_hashdb(), writer, p, processing_threads)?;

Ok(())
}
Expand Down
4 changes: 4 additions & 0 deletions ethcore/src/client/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use std::fmt::{Display, Formatter, Error as FmtError};

use verification::{VerifierType, QueueConfig};
use journaldb;
use snapshot::SnapshotConfiguration;

pub use std::time::Duration;
pub use blockchain::Config as BlockChainConfig;
Expand Down Expand Up @@ -120,6 +121,8 @@ pub struct ClientConfig {
pub check_seal: bool,
/// Maximal number of transactions queued for verification in a separate thread.
pub transaction_verification_queue_size: usize,
/// Snapshot configuration
pub snapshot: SnapshotConfiguration,
}

impl Default for ClientConfig {
Expand All @@ -144,6 +147,7 @@ impl Default for ClientConfig {
history_mem: 32 * mb,
check_seal: true,
transaction_verification_queue_size: 8192,
snapshot: Default::default(),
}
}
}
Expand Down
3 changes: 2 additions & 1 deletion ethcore/src/snapshot/consensus/authority.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use machine::EthereumMachine;
use ids::BlockId;
use header::Header;
use receipt::Receipt;
use snapshot::{Error, ManifestData};
use snapshot::{Error, ManifestData, Progress};

use itertools::{Position, Itertools};
use rlp::{RlpStream, Rlp};
Expand Down Expand Up @@ -59,6 +59,7 @@ impl SnapshotComponents for PoaSnapshot {
chain: &BlockChain,
block_at: H256,
sink: &mut ChunkSink,
_progress: &Progress,
preferred_size: usize,
) -> Result<(), Error> {
let number = chain.block_number(&block_at)
Expand Down
3 changes: 2 additions & 1 deletion ethcore/src/snapshot/consensus/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use std::sync::Arc;

use blockchain::{BlockChain, BlockChainDB};
use engines::EthEngine;
use snapshot::{Error, ManifestData};
use snapshot::{Error, ManifestData, Progress};

use ethereum_types::H256;

Expand All @@ -49,6 +49,7 @@ pub trait SnapshotComponents: Send {
chain: &BlockChain,
block_at: H256,
chunk_sink: &mut ChunkSink,
progress: &Progress,
preferred_size: usize,
) -> Result<(), Error>;

Expand Down
6 changes: 5 additions & 1 deletion ethcore/src/snapshot/consensus/work.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use std::sync::Arc;

use blockchain::{BlockChain, BlockChainDB, BlockProvider};
use engines::EthEngine;
use snapshot::{Error, ManifestData};
use snapshot::{Error, ManifestData, Progress};
use snapshot::block::AbridgedBlock;
use ethereum_types::H256;
use kvdb::KeyValueDB;
Expand Down Expand Up @@ -65,13 +65,15 @@ impl SnapshotComponents for PowSnapshot {
chain: &BlockChain,
block_at: H256,
chunk_sink: &mut ChunkSink,
progress: &Progress,
preferred_size: usize,
) -> Result<(), Error> {
PowWorker {
chain: chain,
rlps: VecDeque::new(),
current_hash: block_at,
writer: chunk_sink,
progress: progress,
preferred_size: preferred_size,
}.chunk_all(self.blocks)
}
Expand All @@ -96,6 +98,7 @@ struct PowWorker<'a> {
rlps: VecDeque<Bytes>,
current_hash: H256,
writer: &'a mut ChunkSink<'a>,
progress: &'a Progress,
preferred_size: usize,
}

Expand Down Expand Up @@ -138,6 +141,7 @@ impl<'a> PowWorker<'a> {

last = self.current_hash;
self.current_hash = block.header_view().parent_hash();
self.progress.blocks.fetch_add(1, Ordering::SeqCst);
}

if loaded_size != 0 {
Expand Down
97 changes: 88 additions & 9 deletions ethcore/src/snapshot/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
//! https://wiki.parity.io/Warp-Sync-Snapshot-Format

use std::collections::{HashMap, HashSet};
use std::cmp;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use hash::{keccak, KECCAK_NULL_RLP, KECCAK_EMPTY};
Expand Down Expand Up @@ -88,6 +89,26 @@ const MAX_CHUNK_SIZE: usize = PREFERRED_CHUNK_SIZE / 4 * 5;
const MIN_SUPPORTED_STATE_CHUNK_VERSION: u64 = 1;
// current state chunk version.
const STATE_CHUNK_VERSION: u64 = 2;
/// number of snapshot subparts, must be a power of 2 in [1; 256]
pub const SNAPSHOT_SUBPARTS: usize = 16;

/// Configuration for the Snapshot service
#[derive(Debug, Clone, PartialEq)]
pub struct SnapshotConfiguration {
/// If `true`, no periodic snapshots will be created
pub no_periodic: bool,
/// Number of threads for creating snapshots
pub processing_threads: usize,
}

impl Default for SnapshotConfiguration {
fn default() -> Self {
SnapshotConfiguration {
no_periodic: false,
processing_threads: 1,
}
}
}

/// A progress indicator for snapshots.
#[derive(Debug, Default)]
Expand Down Expand Up @@ -130,7 +151,8 @@ pub fn take_snapshot<W: SnapshotWriter + Send>(
block_at: H256,
state_db: &HashDB<KeccakHasher>,
writer: W,
p: &Progress
p: &Progress,
processing_threads: usize,
) -> Result<(), Error> {
let start_header = chain.block_header_data(&block_at)
.ok_or(Error::InvalidStartingBlock(BlockId::Hash(block_at)))?;
Expand All @@ -142,14 +164,46 @@ pub fn take_snapshot<W: SnapshotWriter + Send>(
let writer = Mutex::new(writer);
let chunker = engine.snapshot_components().ok_or(Error::SnapshotsUnsupported)?;
let snapshot_version = chunker.current_version();
let (state_hashes, block_hashes) = scope(|scope| {
let (state_hashes, block_hashes) = scope(|scope| -> Result<(Vec<H256>, Vec<H256>), Error> {
let writer = &writer;
let block_guard = scope.spawn(move || chunk_secondary(chunker, chain, block_at, writer, p));
let state_res = chunk_state(state_db, &state_root, writer, p);

state_res.and_then(|state_hashes| {
block_guard.join().map(|block_hashes| (state_hashes, block_hashes))
})
// The number of threads must be between 1 and SNAPSHOT_SUBPARTS
let num_threads: usize = cmp::max(1, cmp::min(processing_threads, SNAPSHOT_SUBPARTS));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe use assert!(processing_threads >= 1, "...") instead of this cmp::max, since it would be a logical mistake to pass 0 as processing_threads?

info!(target: "snapshot", "Using {} threads for Snapshot creation.", num_threads);

let mut state_guards = Vec::with_capacity(num_threads as usize);
let subparts: Vec<usize> = (0..SNAPSHOT_SUBPARTS).collect();

for thread_idx in 0..num_threads {
let subparts_c = subparts.clone();
let state_guard = scope.spawn(move || -> Result<Vec<H256>, Error> {
let mut chunk_hashes = Vec::new();

for subpart_chunk in subparts_c.chunks(num_threads) {
if subpart_chunk.len() > thread_idx {
let part = subpart_chunk[thread_idx];
Copy link
Collaborator

@ordian ordian Aug 3, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe something like this would be easier to read (and we don't need to allocate subparts):

let state_guard = scope.spawn(move || -> Result<Vec<H256>, Error> {
  let mut chunk_hashes = Vec::new();
  for part in (thread_idx..SNAPSHOT_SUBPARTS).step_by(num_threads) {
    ...
  }
}

Note, that step_by was only stabilized in rust 1.28.

debug!(target: "snapshot", "Chunking part {} in thread {}", part, thread_idx);
let mut hashes = chunk_state(state_db, &state_root, writer, p, Some(part))?;
chunk_hashes.append(&mut hashes);
}
}

Ok(chunk_hashes)
});
state_guards.push(state_guard);
}

let block_hashes = block_guard.join()?;
let mut state_hashes = Vec::new();

for guard in state_guards {
let mut part_state_hashes = guard.join()?.clone();
state_hashes.append(&mut part_state_hashes);
}
Copy link
Collaborator

@ordian ordian Aug 3, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can avoid cloning:

		let mut state_hashes = Vec::new();

		for guard in state_guards {
			let part_state_hashes = guard.join()?;
			state_hashes.extend(part_state_hashes);
		}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

extend expects a mutable reference :/

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oups, you're right, sorry.


debug!(target: "snapshot", "Took a snapshot of {} accounts", p.accounts.load(Ordering::SeqCst));
Ok((state_hashes, block_hashes))
})?;

info!("produced {} state chunks and {} block chunks.", state_hashes.len(), block_hashes.len());
Expand Down Expand Up @@ -200,6 +254,7 @@ pub fn chunk_secondary<'a>(mut chunker: Box<SnapshotComponents>, chain: &'a Bloc
chain,
start_hash,
&mut chunk_sink,
progress,
PREFERRED_CHUNK_SIZE,
)?;
}
Expand Down Expand Up @@ -263,10 +318,12 @@ impl<'a> StateChunker<'a> {

/// Walk the given state database starting from the given root,
/// creating chunks and writing them out.
/// `part` is a number between 0 and 15, which describe which part of

This comment was marked as resolved.

/// the tree should be chunked.
///
/// Returns a list of hashes of chunks created, or any error it may
/// have encountered.
pub fn chunk_state<'a>(db: &HashDB<KeccakHasher>, root: &H256, writer: &Mutex<SnapshotWriter + 'a>, progress: &'a Progress) -> Result<Vec<H256>, Error> {
pub fn chunk_state<'a>(db: &HashDB<KeccakHasher>, root: &H256, writer: &Mutex<SnapshotWriter + 'a>, progress: &'a Progress, part: Option<usize>) -> Result<Vec<H256>, Error> {
ordian marked this conversation as resolved.
Show resolved Hide resolved
let account_trie = TrieDB::new(db, &root)?;

let mut chunker = StateChunker {
Expand All @@ -281,11 +338,33 @@ pub fn chunk_state<'a>(db: &HashDB<KeccakHasher>, root: &H256, writer: &Mutex<Sn
let mut used_code = HashSet::new();

// account_key here is the address' hash.
for item in account_trie.iter()? {
let mut account_iter = account_trie.iter()?;

let mut seek_to = None;

if let Some(part) = part {
let part_offset = 256 / SNAPSHOT_SUBPARTS;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe name 256? I don't like magic constants.

let mut seek_from = vec![0; 32];
seek_from[0] = (part * part_offset) as u8;
account_iter.seek(&seek_from)?;

// Set the upper-bond, except for the last part
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo: upper bound

if part < SNAPSHOT_SUBPARTS - 1 {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we could get rid of this if, if we make seek_to inclusive upper bound, e.g. the check would be

			if account_key[0] > seek_to {
				break;
			}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But seek_to wouldn't make sense if part == None, so it might as well be an Option

seek_to = Some(((part + 1) * part_offset) as u8)
}
}

This comment was marked as resolved.


for item in account_iter {
let (account_key, account_data) = item?;
let account = ::rlp::decode(&*account_data)?;
let account_key_hash = H256::from_slice(&account_key);

if let Some(seek_to) = seek_to {
if account_key[0] >= seek_to {
break;
}
}

let account = ::rlp::decode(&*account_data)?;
let account_db = AccountDB::from_hash(db, account_key_hash);

let fat_rlps = account::to_fat_rlps(&account_key_hash, &account, &account_db, &mut used_code, PREFERRED_CHUNK_SIZE - chunker.chunk_size(), PREFERRED_CHUNK_SIZE)?;
Expand Down
11 changes: 8 additions & 3 deletions ethcore/src/snapshot/tests/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use hash::{KECCAK_NULL_RLP, keccak};

use basic_account::BasicAccount;
use snapshot::account;
use snapshot::{chunk_state, Error as SnapshotError, Progress, StateRebuilder};
use snapshot::{chunk_state, Error as SnapshotError, Progress, StateRebuilder, SNAPSHOT_SUBPARTS};
use snapshot::io::{PackedReader, PackedWriter, SnapshotReader, SnapshotWriter};
use super::helpers::{compare_dbs, StateProducer};

Expand Down Expand Up @@ -53,7 +53,12 @@ fn snap_and_restore() {
let state_root = producer.state_root();
let writer = Mutex::new(PackedWriter::new(&snap_file).unwrap());

let state_hashes = chunk_state(&old_db, &state_root, &writer, &Progress::default()).unwrap();
let mut state_hashes = Vec::new();
for part in 0..SNAPSHOT_SUBPARTS {
let mut hashes = chunk_state(&old_db, &state_root, &writer, &Progress::default(), Some(part)).unwrap();
state_hashes.append(&mut hashes);

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: extra newline

}

writer.into_inner().finish(::snapshot::ManifestData {
version: 2,
Expand Down Expand Up @@ -164,7 +169,7 @@ fn checks_flag() {
let state_root = producer.state_root();
let writer = Mutex::new(PackedWriter::new(&snap_file).unwrap());

let state_hashes = chunk_state(&old_db, &state_root, &writer, &Progress::default()).unwrap();
let state_hashes = chunk_state(&old_db, &state_root, &writer, &Progress::default(), None).unwrap();

writer.into_inner().finish(::snapshot::ManifestData {
version: 2,
Expand Down
7 changes: 7 additions & 0 deletions parity/cli/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -865,6 +865,10 @@ usage! {
"--no-periodic-snapshot",
"Disable automated snapshots which usually occur once every 10000 blocks.",

ARG arg_snapshot_threads: (Option<usize>) = None, or |c: &Config| c.snapshots.as_ref()?.processing_threads,
"--snapshot-threads=[NUM]",
"Enables multiple threads for snapshots creation.",

["Whisper Options"]
FLAG flag_whisper: (bool) = false, or |c: &Config| c.whisper.as_ref()?.enabled,
"--whisper",
Expand Down Expand Up @@ -1345,6 +1349,7 @@ struct Footprint {
#[serde(deny_unknown_fields)]
struct Snapshots {
disable_periodic: Option<bool>,
processing_threads: Option<usize>,
}

#[derive(Default, Debug, PartialEq, Deserialize)]
Expand Down Expand Up @@ -1771,6 +1776,7 @@ mod tests {
arg_export_state_at: "latest".into(),
arg_snapshot_at: "latest".into(),
flag_no_periodic_snapshot: false,
arg_snapshot_threads: None,

// -- Whisper options.
flag_whisper: false,
Expand Down Expand Up @@ -2021,6 +2027,7 @@ mod tests {
}),
snapshots: Some(Snapshots {
disable_periodic: Some(true),
processing_threads: None,
}),
misc: Some(Misc {
logging: Some("own_tx=trace".into()),
Expand Down
Loading