-
Notifications
You must be signed in to change notification settings - Fork 1.7k
Resumable warp-sync / Seed downloaded snapshots #8544
Changes from 22 commits
1f735cf
d5bcfa5
8217b82
da0eb24
017fd34
96b9e0d
5a74ba9
ef779c7
e240635
f37c0ad
487e51d
157cf32
68aeaee
290adf7
5bc5983
2c46379
87efaf5
0d28cdd
2ea7e6f
18af9cb
d81b784
60950b6
fa2b5a9
4be5dd4
f977338
8ad988d
0f1e5b4
20145c8
14979ff
a6cdd99
91f91a3
39f67cc
ce2bce4
79a46f9
329a8f4
6f171b9
72302ae
9735382
a83c49c
50eadfb
f9500bc
67980cc
ddf2f3d
75fd4b5
4fc215c
21d1cee
dc5a035
abf1647
572fde0
b59e743
6cef217
9757a5c
6b0d11f
721ae85
e3947c9
3afd81f
ae3b919
259432e
3475321
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -17,8 +17,8 @@ | |
//! Snapshot network service implementation. | ||
|
||
use std::collections::HashSet; | ||
use std::io::ErrorKind; | ||
use std::fs; | ||
use std::io::{self, Read, ErrorKind}; | ||
use std::fs::{self, File}; | ||
use std::path::PathBuf; | ||
use std::sync::Arc; | ||
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; | ||
|
@@ -30,6 +30,7 @@ use blockchain::BlockChain; | |
use client::{Client, ChainInfo, ClientIoMessage}; | ||
use engines::EthEngine; | ||
use error::Error; | ||
use hash::keccak; | ||
use ids::BlockId; | ||
|
||
use io::IoChannel; | ||
|
@@ -240,6 +241,7 @@ pub struct Service { | |
progress: super::Progress, | ||
taking_snapshot: AtomicBool, | ||
restoring_snapshot: AtomicBool, | ||
ready: AtomicBool, | ||
} | ||
|
||
impl Service { | ||
|
@@ -261,6 +263,7 @@ impl Service { | |
progress: Default::default(), | ||
taking_snapshot: AtomicBool::new(false), | ||
restoring_snapshot: AtomicBool::new(false), | ||
ready: AtomicBool::new(false), | ||
}; | ||
|
||
// create the root snapshot dir if it doesn't exist. | ||
|
@@ -270,8 +273,8 @@ impl Service { | |
} | ||
} | ||
|
||
// delete the temporary restoration dir if it does exist. | ||
if let Err(e) = fs::remove_dir_all(service.restoration_dir()) { | ||
// delete the temporary restoration DB dir if it does exist. | ||
if let Err(e) = fs::remove_dir_all(service.restoration_db()) { | ||
if e.kind() != ErrorKind::NotFound { | ||
return Err(e.into()) | ||
} | ||
|
@@ -325,6 +328,13 @@ impl Service { | |
dir | ||
} | ||
|
||
// previous snapshot chunks path. | ||
fn prev_chunks_dir(&self) -> PathBuf { | ||
let mut dir = self.snapshot_root.clone(); | ||
dir.push("prev_chunks"); | ||
dir | ||
} | ||
|
||
// replace one the client's database with our own. | ||
fn replace_client_db(&self) -> Result<(), Error> { | ||
let our_db = self.restoration_db(); | ||
|
@@ -406,55 +416,133 @@ impl Service { | |
/// Initialize the restoration synchronously. | ||
/// The recover flag indicates whether to recover the restored snapshot. | ||
pub fn init_restore(&self, manifest: ManifestData, recover: bool) -> Result<(), Error> { | ||
let rest_dir = self.restoration_dir(); | ||
|
||
let mut res = self.restoration.lock(); | ||
self.ready.store(false, Ordering::SeqCst); | ||
|
||
self.state_chunks.store(0, Ordering::SeqCst); | ||
self.block_chunks.store(0, Ordering::SeqCst); | ||
|
||
// tear down existing restoration. | ||
*res = None; | ||
let rest_dir = self.restoration_dir(); | ||
let rest_db = self.restoration_db(); | ||
let recovery_temp = self.temp_recovery_dir(); | ||
let prev_chunks = self.prev_chunks_dir(); | ||
|
||
// delete and restore the restoration dir. | ||
if let Err(e) = fs::remove_dir_all(&rest_dir) { | ||
if let Err(e) = fs::remove_dir_all(&prev_chunks) { | ||
match e.kind() { | ||
ErrorKind::NotFound => {}, | ||
_ => return Err(e.into()), | ||
} | ||
} | ||
|
||
fs::create_dir_all(&rest_dir)?; | ||
// Move the previous recovery temp directory | ||
// to `prev_chunks` to be able to restart restoring | ||
// with previously downloaded blocks | ||
// This step is optional, so don't fail on error | ||
fs::rename(&recovery_temp, &prev_chunks).ok(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. IMHO this should be moved under There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. (this is done now) |
||
|
||
// make new restoration. | ||
let writer = match recover { | ||
true => Some(LooseWriter::new(self.temp_recovery_dir())?), | ||
false => None | ||
}; | ||
{ | ||
let mut res = self.restoration.lock(); | ||
|
||
let params = RestorationParams { | ||
manifest: manifest, | ||
pruning: self.pruning, | ||
db: self.restoration_db_handler.open(&self.restoration_db())?, | ||
writer: writer, | ||
genesis: &self.genesis_block, | ||
guard: Guard::new(rest_dir), | ||
engine: &*self.engine, | ||
}; | ||
self.state_chunks.store(0, Ordering::SeqCst); | ||
self.block_chunks.store(0, Ordering::SeqCst); | ||
|
||
let state_chunks = params.manifest.state_hashes.len(); | ||
let block_chunks = params.manifest.block_hashes.len(); | ||
// tear down existing restoration. | ||
*res = None; | ||
|
||
*res = Some(Restoration::new(params)?); | ||
// delete and restore the restoration dir. | ||
if let Err(e) = fs::remove_dir_all(&rest_dir) { | ||
match e.kind() { | ||
ErrorKind::NotFound => {}, | ||
_ => return Err(e.into()), | ||
} | ||
} | ||
|
||
*self.status.lock() = RestorationStatus::Ongoing { | ||
state_chunks: state_chunks as u32, | ||
block_chunks: block_chunks as u32, | ||
state_chunks_done: self.state_chunks.load(Ordering::SeqCst) as u32, | ||
block_chunks_done: self.block_chunks.load(Ordering::SeqCst) as u32, | ||
}; | ||
fs::create_dir_all(&rest_dir)?; | ||
|
||
// make new restoration. | ||
let writer = match recover { | ||
true => Some(LooseWriter::new(self.temp_recovery_dir())?), | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: |
||
false => None | ||
}; | ||
|
||
let params = RestorationParams { | ||
manifest: manifest.clone(), | ||
pruning: self.pruning, | ||
db: self.restoration_db_handler.open(&self.restoration_db())?, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: |
||
writer: writer, | ||
genesis: &self.genesis_block, | ||
guard: Guard::new(rest_db), | ||
engine: &*self.engine, | ||
}; | ||
|
||
let state_chunks = manifest.state_hashes.len(); | ||
let block_chunks = manifest.block_hashes.len(); | ||
|
||
*res = Some(Restoration::new(params)?); | ||
|
||
*self.status.lock() = RestorationStatus::Ongoing { | ||
state_chunks: state_chunks as u32, | ||
block_chunks: block_chunks as u32, | ||
state_chunks_done: self.state_chunks.load(Ordering::SeqCst) as u32, | ||
block_chunks_done: self.block_chunks.load(Ordering::SeqCst) as u32, | ||
}; | ||
} | ||
|
||
self.restoring_snapshot.store(true, Ordering::SeqCst); | ||
|
||
// Import previous chunks, continue if it fails | ||
self.import_prev_chunks(manifest).ok(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What will happen if we start to sync a different manifest? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It just won't import any of the available chunks and the directory will be deleted anyway. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. My question is more like: "Isn't that an issues?" How do we know if it's the right manifest to sync with? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Mmmh, the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I was thinking about a case where, you alternate between two manifests, say Manifest at
I guess that's an edge case, but just want to make sure that it will indeed work anyway in such case (and we won't stall the sync or something). There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. So I think that currently, when you got a snapshot manifest and started downloaded chunks, you won't go back to trying to find a manifest before finishing the restoration. |
||
self.ready.store(true, Ordering::SeqCst); | ||
|
||
Ok(()) | ||
} | ||
|
||
/// Import the previous chunks into the current restoration | ||
fn import_prev_chunks(&self, manifest: ManifestData) -> Result<(), Error> { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Isn't there a better way to figure out if the chunks in directory are really mean for this manifest? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We could check for the name first, but hashing the content of the chunks is really fast so I don't think it's a bottleneck here (and it shouldn't happen that much that you stop your warp sync to restart syncing another snapshot) |
||
let prev_chunks = self.prev_chunks_dir(); | ||
|
||
// Restore previous snapshot chunks | ||
let files = fs::read_dir(prev_chunks.as_path())?; | ||
let mut num_temp_chunks = 0; | ||
|
||
for prev_chunk_file in files { | ||
// Import the chunk, don't fail and continue if one fails | ||
self.import_prev_chunk(&manifest, prev_chunk_file).ok(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe just log a warning here and not increment |
||
num_temp_chunks += 1; | ||
} | ||
|
||
trace!(target:"snapshot", "Imported {} previous chunks", num_temp_chunks); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Judging by the way it is computed, I think, |
||
|
||
// Remove the prev temp directory | ||
fs::remove_dir_all(&prev_chunks)?; | ||
|
||
Ok(()) | ||
} | ||
|
||
/// Import a previous chunk at the given path | ||
fn import_prev_chunk(&self, manifest: &ManifestData, file: io::Result<fs::DirEntry>) -> Result<(), Error> { | ||
let file = file?; | ||
let path = file.path(); | ||
|
||
let mut file = File::open(path.clone())?; | ||
let mut buffer = Vec::new(); | ||
file.read_to_end(&mut buffer)?; | ||
|
||
let hash = keccak(&buffer); | ||
|
||
let (is_valid, is_state) = if manifest.block_hashes.contains(&hash) { | ||
(true, false) | ||
} else if manifest.state_hashes.contains(&hash) { | ||
(true, true) | ||
} else { | ||
(false, false) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Perhaps early exit here instead of having this |
||
}; | ||
|
||
if !is_valid { | ||
return Ok(()); | ||
} | ||
|
||
self.feed_chunk(hash, &buffer, is_state)?; | ||
|
||
trace!(target: "snapshot", "Fed chunk {:?}", hash); | ||
|
||
Ok(()) | ||
} | ||
|
||
|
@@ -492,6 +580,7 @@ impl Service { | |
|
||
let _ = fs::remove_dir_all(self.restoration_dir()); | ||
*self.status.lock() = RestorationStatus::Inactive; | ||
self.ready.store(false, Ordering::SeqCst); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is that fine? Why we are becoming not ready after the snapshot is restored? Maybe the name is not good enough and should encapsulate more regarding "readiness" There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah so There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I see. Is it possible to encapsulate that flow in the names somehow? Currently it's a little bit misleading, maybe call it There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done ! |
||
|
||
Ok(()) | ||
} | ||
|
@@ -503,7 +592,10 @@ impl Service { | |
let mut restoration = self.restoration.lock(); | ||
|
||
match self.status() { | ||
RestorationStatus::Inactive | RestorationStatus::Failed => return Ok(()), | ||
RestorationStatus::Inactive | RestorationStatus::Failed => { | ||
trace!(target: "snapshot", "Tried to restore chunk {:x} while inactive or failed", hash); | ||
return Ok(()); | ||
}, | ||
RestorationStatus::Ongoing { .. } => { | ||
let (res, db) = { | ||
let rest = match *restoration { | ||
|
@@ -583,6 +675,32 @@ impl SnapshotService for Service { | |
self.reader.read().as_ref().and_then(|r| r.chunk(hash).ok()) | ||
} | ||
|
||
fn ready(&self) -> bool { | ||
self.ready.load(Ordering::SeqCst) | ||
} | ||
|
||
fn completed_chunks(&self) -> Option<Vec<H256>> { | ||
let restoration = self.restoration.lock(); | ||
|
||
match *restoration { | ||
Some(ref restoration) => { | ||
let completed_chunks = restoration.manifest.block_hashes | ||
.iter() | ||
.filter(|h| !restoration.block_chunks_left.contains(h)) | ||
.chain( | ||
restoration.manifest.state_hashes | ||
.iter() | ||
.filter(|h| !restoration.state_chunks_left.contains(h)) | ||
) | ||
.map(|h| *h) | ||
.collect(); | ||
|
||
Some(completed_chunks) | ||
}, | ||
None => None, | ||
} | ||
} | ||
|
||
fn status(&self) -> RestorationStatus { | ||
let mut cur_status = self.status.lock(); | ||
if let RestorationStatus::Ongoing { ref mut state_chunks_done, ref mut block_chunks_done, .. } = *cur_status { | ||
|
@@ -600,7 +718,9 @@ impl SnapshotService for Service { | |
} | ||
|
||
fn abort_restore(&self) { | ||
trace!(target: "snapshot", "Aborting restore"); | ||
self.restoring_snapshot.store(false, Ordering::SeqCst); | ||
self.ready.store(false, Ordering::SeqCst); | ||
*self.restoration.lock() = None; | ||
*self.status.lock() = RestorationStatus::Inactive; | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't that be moved under
restoration.lock()
?