Skip to content
This repository has been archived by the owner on Nov 6, 2020. It is now read-only.

Resumable warp-sync / Seed downloaded snapshots #8544

Merged
merged 59 commits into from
May 16, 2018
Merged
Show file tree
Hide file tree
Changes from 25 commits
Commits
Show all changes
59 commits
Select commit Hold shift + click to select a range
1f735cf
Start dividing sync chain : first supplier method
ngotchac Apr 23, 2018
d5bcfa5
WIP - updated chain sync supplier
ngotchac Apr 23, 2018
8217b82
Finish refactoring the Chain Sync Supplier
ngotchac Apr 23, 2018
da0eb24
Create Chain Sync Requester
ngotchac Apr 23, 2018
017fd34
Add Propagator for Chain Sync
ngotchac Apr 23, 2018
96b9e0d
Add the Chain Sync Handler
ngotchac Apr 23, 2018
5a74ba9
Move tests from mod -> handler
ngotchac Apr 23, 2018
ef779c7
Move tests to propagator
ngotchac Apr 23, 2018
e240635
Refactor SyncRequester arguments
ngotchac Apr 23, 2018
f37c0ad
Refactoring peer fork header handler
ngotchac Apr 23, 2018
487e51d
Fix wrong highest block number in snapshot sync
ngotchac Apr 23, 2018
157cf32
Small refactor...
ngotchac Apr 23, 2018
68aeaee
Merge branch 'master' into ng-warp-update
ngotchac May 2, 2018
290adf7
Resume warp-sync downloaded chunks
ngotchac May 3, 2018
5bc5983
Add comments
ngotchac May 4, 2018
2c46379
Refactoring the previous chunks import
ngotchac May 4, 2018
87efaf5
Fix tests
ngotchac May 4, 2018
0d28cdd
Merge branch 'master' into ng-warp-update
ngotchac May 4, 2018
2ea7e6f
Merge branch 'ng-warp-update' into ng-warp-resume
ngotchac May 4, 2018
18af9cb
Address PR grumbles
ngotchac May 7, 2018
d81b784
Merge branch 'master' into ng-warp-update
ngotchac May 7, 2018
60950b6
Merge branch 'ng-warp-update' into ng-warp-resume
ngotchac May 7, 2018
fa2b5a9
Fix not seeding current snapshot
ngotchac May 4, 2018
4be5dd4
Address PR Grumbles
ngotchac May 7, 2018
f977338
Address PR grumble
ngotchac May 7, 2018
8ad988d
Retry failed CI job
ngotchac May 7, 2018
0f1e5b4
Update SnapshotService readiness check
ngotchac May 7, 2018
20145c8
Merge branch 'ng-warp-update' into ng-warp-resume
ngotchac May 7, 2018
14979ff
Fix tests
ngotchac May 7, 2018
a6cdd99
Merge branch 'ng-warp-update' into ng-warp-resume
ngotchac May 7, 2018
91f91a3
Fix tests
ngotchac May 7, 2018
39f67cc
Fix test
ngotchac May 7, 2018
ce2bce4
Early abort importing previous chunks
ngotchac May 7, 2018
79a46f9
PR Grumbles
ngotchac May 8, 2018
329a8f4
Merge branch 'master' into ng-warp-update
ngotchac May 9, 2018
6f171b9
Merge branch 'ng-warp-update' into ng-warp-resume
ngotchac May 9, 2018
72302ae
Update Gitlab CI config
ngotchac May 9, 2018
9735382
SyncState back to Waiting when Manifest peers disconnect
ngotchac May 9, 2018
a83c49c
Move fix
ngotchac May 9, 2018
50eadfb
Better fix
ngotchac May 9, 2018
f9500bc
Revert GitLab CI changes
ngotchac May 9, 2018
67980cc
Merge branch 'master' into ng-warp-resume
ngotchac May 9, 2018
ddf2f3d
Fix Warning
ngotchac May 9, 2018
75fd4b5
Refactor resuming snapshots
ngotchac May 9, 2018
4fc215c
Fix string construction
ngotchac May 9, 2018
21d1cee
Revert "Refactor resuming snapshots"
ngotchac May 9, 2018
dc5a035
Update informant log
ngotchac May 11, 2018
abf1647
Merge branch 'ng-warp-resume-refacto' into ng-warp-resume
ngotchac May 11, 2018
572fde0
Fix string construction
ngotchac May 9, 2018
b59e743
Refactor resuming snapshots
ngotchac May 9, 2018
6cef217
Fix informant
ngotchac May 11, 2018
9757a5c
PR Grumbles
ngotchac May 11, 2018
6b0d11f
Merge branch 'master' into ng-warp-resume
ngotchac May 11, 2018
721ae85
Update informant message : show chunks done
ngotchac May 11, 2018
e3947c9
PR Grumbles
ngotchac May 11, 2018
3afd81f
Fix
ngotchac May 11, 2018
ae3b919
Fix Warning
ngotchac May 11, 2018
259432e
Merge branch 'master' into ng-warp-resume
ngotchac May 16, 2018
3475321
PR Grumbles
ngotchac May 16, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
193 changes: 155 additions & 38 deletions ethcore/src/snapshot/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@
//! Snapshot network service implementation.

use std::collections::HashSet;
use std::io::ErrorKind;
use std::fs;
use std::io::{self, Read, ErrorKind};
use std::fs::{self, File};
use std::path::PathBuf;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
Expand All @@ -30,6 +30,7 @@ use blockchain::BlockChain;
use client::{Client, ChainInfo, ClientIoMessage};
use engines::EthEngine;
use error::Error;
use hash::keccak;
use ids::BlockId;

use io::IoChannel;
Expand Down Expand Up @@ -240,6 +241,7 @@ pub struct Service {
progress: super::Progress,
taking_snapshot: AtomicBool,
restoring_snapshot: AtomicBool,
restoration_ready: AtomicBool,
}

impl Service {
Expand All @@ -261,6 +263,7 @@ impl Service {
progress: Default::default(),
taking_snapshot: AtomicBool::new(false),
restoring_snapshot: AtomicBool::new(false),
restoration_ready: AtomicBool::new(false),
};

// create the root snapshot dir if it doesn't exist.
Expand All @@ -270,8 +273,8 @@ impl Service {
}
}

// delete the temporary restoration dir if it does exist.
if let Err(e) = fs::remove_dir_all(service.restoration_dir()) {
// delete the temporary restoration DB dir if it does exist.
if let Err(e) = fs::remove_dir_all(service.restoration_db()) {
if e.kind() != ErrorKind::NotFound {
return Err(e.into())
}
Expand Down Expand Up @@ -325,6 +328,13 @@ impl Service {
dir
}

// previous snapshot chunks path.
fn prev_chunks_dir(&self) -> PathBuf {
let mut dir = self.snapshot_root.clone();
dir.push("prev_chunks");
dir
}

// replace one the client's database with our own.
fn replace_client_db(&self) -> Result<(), Error> {
let our_db = self.restoration_db();
Expand Down Expand Up @@ -407,54 +417,129 @@ impl Service {
/// The recover flag indicates whether to recover the restored snapshot.
pub fn init_restore(&self, manifest: ManifestData, recover: bool) -> Result<(), Error> {
let rest_dir = self.restoration_dir();

let mut res = self.restoration.lock();

self.state_chunks.store(0, Ordering::SeqCst);
self.block_chunks.store(0, Ordering::SeqCst);

// tear down existing restoration.
*res = None;
let rest_db = self.restoration_db();
let recovery_temp = self.temp_recovery_dir();
let prev_chunks = self.prev_chunks_dir();

// delete and restore the restoration dir.
if let Err(e) = fs::remove_dir_all(&rest_dir) {
if let Err(e) = fs::remove_dir_all(&prev_chunks) {
match e.kind() {
ErrorKind::NotFound => {},
_ => return Err(e.into()),
}
}

fs::create_dir_all(&rest_dir)?;
{
self.restoration_ready.store(false, Ordering::SeqCst);
let mut res = self.restoration.lock();

// make new restoration.
let writer = match recover {
true => Some(LooseWriter::new(self.temp_recovery_dir())?),
false => None
};
// Move the previous recovery temp directory
// to `prev_chunks` to be able to restart restoring
// with previously downloaded blocks
// This step is optional, so don't fail on error
fs::rename(&recovery_temp, &prev_chunks).ok();

let params = RestorationParams {
manifest: manifest,
pruning: self.pruning,
db: self.restoration_db_handler.open(&self.restoration_db())?,
writer: writer,
genesis: &self.genesis_block,
guard: Guard::new(rest_dir),
engine: &*self.engine,
};
self.state_chunks.store(0, Ordering::SeqCst);
self.block_chunks.store(0, Ordering::SeqCst);

let state_chunks = params.manifest.state_hashes.len();
let block_chunks = params.manifest.block_hashes.len();
// tear down existing restoration.
*res = None;

*res = Some(Restoration::new(params)?);
// delete and restore the restoration dir.
if let Err(e) = fs::remove_dir_all(&rest_dir) {
match e.kind() {
ErrorKind::NotFound => {},
_ => return Err(e.into()),
}
}

*self.status.lock() = RestorationStatus::Ongoing {
state_chunks: state_chunks as u32,
block_chunks: block_chunks as u32,
state_chunks_done: self.state_chunks.load(Ordering::SeqCst) as u32,
block_chunks_done: self.block_chunks.load(Ordering::SeqCst) as u32,
};
fs::create_dir_all(&rest_dir)?;

// make new restoration.
let writer = match recover {
true => Some(LooseWriter::new(self.temp_recovery_dir())?),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: recovery_temp?

false => None
};

let params = RestorationParams {
manifest: manifest.clone(),
pruning: self.pruning,
db: self.restoration_db_handler.open(&self.restoration_db())?,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: &rest_db?

writer: writer,
genesis: &self.genesis_block,
guard: Guard::new(rest_db),
engine: &*self.engine,
};

let state_chunks = manifest.state_hashes.len();
let block_chunks = manifest.block_hashes.len();

*res = Some(Restoration::new(params)?);

*self.status.lock() = RestorationStatus::Ongoing {
state_chunks: state_chunks as u32,
block_chunks: block_chunks as u32,
state_chunks_done: self.state_chunks.load(Ordering::SeqCst) as u32,
block_chunks_done: self.block_chunks.load(Ordering::SeqCst) as u32,
};
}

self.restoring_snapshot.store(true, Ordering::SeqCst);

// Import previous chunks, continue if it fails
self.import_prev_chunks(manifest).ok();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What will happen if we start to sync a different manifest?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It just won't import any of the available chunks and the directory will be deleted anyway.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My question is more like: "Isn't that an issues?" How do we know if it's the right manifest to sync with?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mmmh, the manifest is the one that just got downloaded, so it must be the right one. I'm not sure I get your question.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking about a case where, you alternate between two manifests, say Manifest at X and X+10k (with warp-barrier set to X-5k)

  1. You start fetching X + 10k
  2. In the middle the peer get's disconnected, but we save already downloaded chunks
  3. You find a peer with X, starts to sync and discard all chunks
  4. The peer with X + 10k get's back online and you need to sync from scratch.

I guess that's an edge case, but just want to make sure that it will indeed work anyway in such case (and we won't stall the sync or something).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So I think that currently, when you got a snapshot manifest and started downloaded chunks, you won't go back to trying to find a manifest before finishing the restoration.

self.restoration_ready.store(true, Ordering::SeqCst);

Ok(())
}

/// Import the previous chunks into the current restoration
fn import_prev_chunks(&self, manifest: ManifestData) -> Result<(), Error> {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't there a better way to figure out if the chunks in directory are really mean for this manifest?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could check for the name first, but hashing the content of the chunks is really fast so I don't think it's a bottleneck here (and it shouldn't happen that much that you stop your warp sync to restart syncing another snapshot)

let prev_chunks = self.prev_chunks_dir();

// Restore previous snapshot chunks
let files = fs::read_dir(prev_chunks.as_path())?;
let mut num_temp_chunks = 0;

for prev_chunk_file in files {
// Import the chunk, don't fail and continue if one fails
match self.import_prev_chunk(&manifest, prev_chunk_file) {
Ok(_) => num_temp_chunks += 1,
Err(e) => trace!(target: "snapshot", "Error importing chunk: {:?}", e),
}
}

trace!(target:"snapshot", "Imported {} previous chunks", num_temp_chunks);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Judging by the way it is computed, I think, num_temp_chunks is a number of all chunks in temp dir. But since it's used only for debugging, it's not that important.


// Remove the prev temp directory
fs::remove_dir_all(&prev_chunks)?;

Ok(())
}

/// Import a previous chunk at the given path
fn import_prev_chunk(&self, manifest: &ManifestData, file: io::Result<fs::DirEntry>) -> Result<(), Error> {
let file = file?;
let path = file.path();

let mut file = File::open(path.clone())?;
let mut buffer = Vec::new();
file.read_to_end(&mut buffer)?;

let hash = keccak(&buffer);

let is_state = if manifest.block_hashes.contains(&hash) {
false
} else if manifest.state_hashes.contains(&hash) {
true
} else {
return Ok(());
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I believe something like Result<bool,_> here is needed to distinguish imported blocks from others.

};

self.feed_chunk(hash, &buffer, is_state)?;

trace!(target: "snapshot", "Fed chunk {:?}", hash);

Ok(())
}

Expand Down Expand Up @@ -492,6 +577,7 @@ impl Service {

let _ = fs::remove_dir_all(self.restoration_dir());
*self.status.lock() = RestorationStatus::Inactive;
self.restoration_ready.store(false, Ordering::SeqCst);

Ok(())
}
Expand All @@ -503,7 +589,10 @@ impl Service {
let mut restoration = self.restoration.lock();

match self.status() {
RestorationStatus::Inactive | RestorationStatus::Failed => return Ok(()),
RestorationStatus::Inactive | RestorationStatus::Failed => {
trace!(target: "snapshot", "Tried to restore chunk {:x} while inactive or failed", hash);
return Ok(());
},
RestorationStatus::Ongoing { .. } => {
let (res, db) = {
let rest = match *restoration {
Expand Down Expand Up @@ -583,6 +672,32 @@ impl SnapshotService for Service {
self.reader.read().as_ref().and_then(|r| r.chunk(hash).ok())
}

fn restoration_ready(&self) -> bool {
self.restoration_ready.load(Ordering::SeqCst)
}

fn completed_chunks(&self) -> Option<Vec<H256>> {
let restoration = self.restoration.lock();

match *restoration {
Some(ref restoration) => {
let completed_chunks = restoration.manifest.block_hashes
.iter()
.filter(|h| !restoration.block_chunks_left.contains(h))
.chain(
restoration.manifest.state_hashes
.iter()
.filter(|h| !restoration.state_chunks_left.contains(h))
)
.map(|h| *h)
.collect();

Some(completed_chunks)
},
None => None,
}
}

fn status(&self) -> RestorationStatus {
let mut cur_status = self.status.lock();
if let RestorationStatus::Ongoing { ref mut state_chunks_done, ref mut block_chunks_done, .. } = *cur_status {
Expand All @@ -600,7 +715,9 @@ impl SnapshotService for Service {
}

fn abort_restore(&self) {
trace!(target: "snapshot", "Aborting restore");
self.restoring_snapshot.store(false, Ordering::SeqCst);
self.restoration_ready.store(false, Ordering::SeqCst);
*self.restoration.lock() = None;
*self.status.lock() = RestorationStatus::Inactive;
}
Expand Down
8 changes: 6 additions & 2 deletions ethcore/src/snapshot/tests/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,12 +130,16 @@ fn guards_delete_folders() {
service.init_restore(manifest.clone(), true).unwrap();
assert!(path.exists());

// The `db` folder should have been deleted,
// while the `temp` one kept
service.abort_restore();
assert!(!path.exists());
assert!(!path.join("db").exists());
assert!(path.join("temp").exists());

service.init_restore(manifest.clone(), true).unwrap();
assert!(path.exists());

drop(service);
assert!(!path.exists());
assert!(!path.join("db").exists());
assert!(path.join("temp").exists());
}
6 changes: 6 additions & 0 deletions ethcore/src/snapshot/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,12 @@ pub trait SnapshotService : Sync + Send {
/// `None` indicates warp sync isn't supported by the consensus engine.
fn supported_versions(&self) -> Option<(u64, u64)>;

/// Returns whether the Snapshot Service restoration is ready
fn restoration_ready(&self) -> bool;

/// Returns a list of the completed chunks
fn completed_chunks(&self) -> Option<Vec<H256>>;

/// Get raw chunk for a given hash.
fn chunk(&self, hash: H256) -> Option<Bytes>;

Expand Down
Loading