Skip to content

Commit

Permalink
Rewrite the warp syncing code to use call proofs (#2578)
Browse files Browse the repository at this point in the history
* VirtualMachineParamsGet -> ChainInfoQuery

* Add call proofs requests to AllSync

* Start adding a requests system to warp_sync.rs

* warp_sync_source no longer public

* Start adding call proof requests

* Provide block_hash in warp_sync::RequestDetail

* Fix implementation in sync_service

* Send the call proofs

* Store the call proof responses

* Use the call proofs

* Remove StorageGet and NextKey variants from warp_sync

* Add RequestDetail::WarpSyncRequest

* Inline PostVerificationState

* Add a Phase enum to ChainInfoQuery

* Do the warp sync as part of ChainInfoQuery

* Remove GrandpaWarpSync and Verifier variants

* Remove ChainInfoQuery::current_source

* Remove source_id field, instead get source on the fly

* Remove WaitingForSources struct

* Merge InProgressWarpSync and ChainInfoQuery into one

* Remove warp_sync_header()

* Have a run() function that drives the CPU process

* Properly transition requests

* Update inject_error function

* as_chain_information is in fact correct

* Add warp_sync::Config::requests_capacity

* Update remove_source

* Store the raw downloaded runtime code and compile later

* Store the downloaded fragments and verify them in run()

* Small tweak to run()

* handle_response_ok no longer takes ownership

* Tweaks to downloaded_proof

* Tweak desired_requests so that rustfmt is happy

* Small tweaks to desired_requests

* Add a TRq user data for requests

* Remove functions a bit

* Split RequestDetail from DesiredRequest, to remove state_trie_root field

* Use Cow for the function name and parameters

* Split PreVerification phase in two

* Immediately create a Verifier after download

* Extract body of run() to different sub-structs

* run() -> process_one() and tweaks

* BuildChainInformation::verify() -> build()

* Fix one todo in VerifyWarpSyncFragment::verify

* Restore verifying fragments one by one

* Only verify one fragment at a time in warp_sync

* CHANGELOG

* Remove all remaining todo!()s from warp_sync.rs

* Fix most warnings

* Properly check block hash on warp sync request success

* Try again sources if all sources banned

* Tweak usage

* Properly remove obsolete requests when removing source

* Improve docs somewhat

* Move process_one to the bottom

* PR number

* Rustfmt

* Fix doclink

* Fix spellcheck

* Properly change phase if multiple sets of fragments to download

* No longer ignore responses from banned sources, to prevent infinite loop

Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com>
  • Loading branch information
tomaka and mergify[bot] authored Aug 5, 2022
1 parent 0c76256 commit 6b26ed6
Show file tree
Hide file tree
Showing 5 changed files with 1,600 additions and 1,198 deletions.
27 changes: 12 additions & 15 deletions bin/full-node/src/run/consensus_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -533,10 +533,6 @@ impl SyncBackground {
| all::ResponseOutcome::NotFinalizedChain { .. }
| all::ResponseOutcome::AllAlreadyInChain { .. } => {
}
all::ResponseOutcome::WarpSyncError { .. } |
all::ResponseOutcome::WarpSyncFinished { .. } => {
unreachable!()
}
}
}
},
Expand Down Expand Up @@ -802,15 +798,15 @@ impl SyncBackground {
// Locally-authored blocks source.
match (request_details, &self.authored_block) {
(
all::RequestDetail::BlocksRequest {
all::DesiredRequest::BlocksRequest {
first_block_hash: None,
first_block_height,
..
},
Some((authored_height, _, _, _)),
) if first_block_height == authored_height => true,
(
all::RequestDetail::BlocksRequest {
all::DesiredRequest::BlocksRequest {
first_block_hash: Some(first_block_hash),
first_block_height,
..
Expand All @@ -834,7 +830,7 @@ impl SyncBackground {
request_info.num_blocks_clamp(NonZeroU64::new(64).unwrap());

match request_info {
all::RequestDetail::BlocksRequest { .. }
all::DesiredRequest::BlocksRequest { .. }
if source_id == self.block_author_sync_source =>
{
tracing::debug!("queue-locally-authored-block-for-import");
Expand All @@ -847,7 +843,7 @@ impl SyncBackground {
// Create a request that is immediately answered right below.
let request_id = self.sync.add_request(
source_id,
request_info,
request_info.into(),
future::AbortHandle::new_pair().0, // Temporary dummy.
);

Expand All @@ -863,7 +859,7 @@ impl SyncBackground {
);
}

all::RequestDetail::BlocksRequest {
all::DesiredRequest::BlocksRequest {
first_block_hash,
first_block_height,
ascending,
Expand Down Expand Up @@ -905,15 +901,14 @@ impl SyncBackground {
);

let (request, abort) = future::abortable(request);
let request_id = self
.sync
.add_request(source_id, request_info.clone(), abort);
let request_id = self.sync.add_request(source_id, request_info.into(), abort);

self.block_requests_finished
.push(request.map(move |r| (request_id, r)).boxed());
}
all::RequestDetail::GrandpaWarpSync { .. }
| all::RequestDetail::StorageGet { .. } => {
all::DesiredRequest::GrandpaWarpSync { .. }
| all::DesiredRequest::StorageGet { .. }
| all::DesiredRequest::RuntimeCallMerkleProof { .. } => {
// Not used in "full" mode.
unreachable!()
}
Expand All @@ -937,7 +932,9 @@ impl SyncBackground {
self.sync = idle;
break;
}
all::ProcessOne::VerifyWarpSyncFragment(_) => unreachable!(),
all::ProcessOne::VerifyWarpSyncFragment(_)
| all::ProcessOne::WarpSyncError { .. }
| all::ProcessOne::WarpSyncFinished { .. } => unreachable!(),
all::ProcessOne::VerifyBodyHeader(verify) => {
let hash_to_verify = verify.hash();
let height_to_verify = verify.height();
Expand Down
174 changes: 130 additions & 44 deletions bin/light-base/src/sync_service/standalone.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use smoldot::{
};
use std::{
collections::{HashMap, HashSet},
iter,
marker::PhantomData,
num::{NonZeroU32, NonZeroU64},
sync::Arc,
Expand Down Expand Up @@ -81,6 +82,7 @@ pub(super) async fn start_standalone_chain<TPlat: Platform>(
pending_block_requests: stream::FuturesUnordered::new(),
pending_grandpa_requests: stream::FuturesUnordered::new(),
pending_storage_requests: stream::FuturesUnordered::new(),
pending_call_proof_requests: stream::FuturesUnordered::new(),
warp_sync_taking_long_time_warning: future::Either::Left(TPlat::sleep(
Duration::from_secs(15),
))
Expand Down Expand Up @@ -277,8 +279,7 @@ pub(super) async fn start_standalone_chain<TPlat: Platform>(

(request_id, result) = task.pending_storage_requests.select_next_some() => {
// A storage request has been finished.
// `result` is an error if the block request got cancelled by the sync state
// machine.
// `result` is an error if the request got cancelled by the sync state machine.
if let Ok(result) = result {
// Inject the result of the request into the sync state machine.
task.sync.storage_get_response(
Expand All @@ -293,6 +294,26 @@ pub(super) async fn start_standalone_chain<TPlat: Platform>(
}
},

(request_id, result) = task.pending_call_proof_requests.select_next_some() => {
// A call proof request has been finished.
// `result` is an error if the request got cancelled by the sync state machine.
if let Ok(result) = result {
// Inject the result of the request into the sync state machine.
task.sync.call_proof_response(
request_id,
match result {
Ok(ref r) => Ok(r.decode().into_iter()),
Err(err) => Err(err),
}
).1

} else {
// The sync state machine has emitted a `Action::Cancel` earlier, and is
// thus no longer interested in the response.
continue;
}
},

() = &mut task.warp_sync_taking_long_time_warning => {
log::warn!(
target: &task.log_target,
Expand Down Expand Up @@ -325,42 +346,6 @@ pub(super) async fn start_standalone_chain<TPlat: Platform>(
| all::ResponseOutcome::Queued
| all::ResponseOutcome::NotFinalizedChain { .. }
| all::ResponseOutcome::AllAlreadyInChain { .. } => {}
all::ResponseOutcome::WarpSyncError { error } => {
log::warn!(
target: &task.log_target,
"Error during GrandPa warp syncing: {}",
error
);
}
all::ResponseOutcome::WarpSyncFinished {
finalized_block_runtime,
finalized_storage_code,
finalized_storage_heap_pages,
} => {
let finalized_header = task.sync.finalized_block_header();
log::info!(
target: &task.log_target,
"GrandPa warp sync finished to #{} ({})",
finalized_header.number,
HashDisplay(&finalized_header.hash(task.sync.block_number_bytes()))
);

task.warp_sync_taking_long_time_warning =
future::Either::Right(future::pending()).fuse();

debug_assert!(task.known_finalized_runtime.is_none());
task.known_finalized_runtime = Some(FinalizedBlockRuntime {
virtual_machine: finalized_block_runtime,
storage_code: finalized_storage_code,
storage_heap_pages: finalized_storage_heap_pages,
});

task.network_up_to_date_finalized = false;
task.network_up_to_date_best = false;
// Since there is a gap in the blocks, all active notifications to all blocks
// must be cleared.
task.all_notifications.clear();
}
}
}
}
Expand Down Expand Up @@ -446,6 +431,17 @@ struct Task<TPlat: Platform> {
>,
>,

/// List of call proof requests currently in progress.
pending_call_proof_requests: stream::FuturesUnordered<
future::BoxFuture<
'static,
(
all::RequestId,
Result<Result<network::service::EncodedMerkleProof, ()>, future::Aborted>,
),
>,
>,

platform: PhantomData<fn() -> TPlat>,
}

Expand Down Expand Up @@ -475,7 +471,7 @@ impl<TPlat: Platform> Task<TPlat> {
request_detail.num_blocks_clamp(NonZeroU64::new(64).unwrap());

match request_detail {
all::RequestDetail::BlocksRequest {
all::DesiredRequest::BlocksRequest {
first_block_hash,
first_block_height,
ascending,
Expand Down Expand Up @@ -514,13 +510,15 @@ impl<TPlat: Platform> Task<TPlat> {
);

let (block_request, abort) = future::abortable(block_request);
let request_id = self.sync.add_request(source_id, request_detail, abort);
let request_id = self
.sync
.add_request(source_id, request_detail.into(), abort);

self.pending_block_requests
.push(async move { (request_id, block_request.await) }.boxed());
}

all::RequestDetail::GrandpaWarpSync {
all::DesiredRequest::GrandpaWarpSync {
sync_start_block_hash,
} => {
let peer_id = self.sync[source_id].0.clone(); // TODO: why does this require cloning? weird borrow chk issue
Expand All @@ -537,13 +535,15 @@ impl<TPlat: Platform> Task<TPlat> {
);

let (grandpa_request, abort) = future::abortable(grandpa_request);
let request_id = self.sync.add_request(source_id, request_detail, abort);
let request_id = self
.sync
.add_request(source_id, request_detail.into(), abort);

self.pending_grandpa_requests
.push(async move { (request_id, grandpa_request.await) }.boxed());
}

all::RequestDetail::StorageGet {
all::DesiredRequest::StorageGet {
block_hash,
state_trie_root,
ref keys,
Expand Down Expand Up @@ -583,11 +583,52 @@ impl<TPlat: Platform> Task<TPlat> {
};

let (storage_request, abort) = future::abortable(storage_request);
let request_id = self.sync.add_request(source_id, request_detail, abort);
let request_id = self
.sync
.add_request(source_id, request_detail.into(), abort);

self.pending_storage_requests
.push(async move { (request_id, storage_request.await) }.boxed());
}

all::DesiredRequest::RuntimeCallMerkleProof {
block_hash,
ref function_name,
ref parameter_vectored,
} => {
let peer_id = self.sync[source_id].0.clone(); // TODO: why does this require cloning? weird borrow chk issue
let network_service = self.network_service.clone();
let network_chain_index = self.network_chain_index;
// TODO: all this copying is done because of lifetime requirements in NetworkService::call_proof_request; maybe check if it can be avoided
let parameter_vectored = parameter_vectored.clone();
let function_name = function_name.clone();

let call_proof_request = async move {
let rq = network_service.call_proof_request(
network_chain_index,
peer_id,
network::protocol::CallProofRequestConfig {
block_hash,
method: &function_name,
parameter_vectored: iter::once(parameter_vectored),
},
Duration::from_secs(16),
);

match rq.await {
Ok(p) => Ok(p),
Err(_) => Err(()),
}
};

let (call_proof_request, abort) = future::abortable(call_proof_request);
let request_id = self
.sync
.add_request(source_id, request_detail.into(), abort);

self.pending_call_proof_requests
.push(async move { (request_id, call_proof_request.await) }.boxed());
}
}

true
Expand All @@ -607,6 +648,51 @@ impl<TPlat: Platform> Task<TPlat> {
return (self, false);
}

all::ProcessOne::WarpSyncError { sync, error } => {
self.sync = sync;
log::warn!(
target: &self.log_target,
"Error during GrandPa warp syncing: {}",
error
);
return (self, true);
}

all::ProcessOne::WarpSyncFinished {
sync,
finalized_block_runtime,
finalized_storage_code,
finalized_storage_heap_pages,
} => {
self.sync = sync;

let finalized_header = self.sync.finalized_block_header();
log::info!(
target: &self.log_target,
"GrandPa warp sync finished to #{} ({})",
finalized_header.number,
HashDisplay(&finalized_header.hash(self.sync.block_number_bytes()))
);

self.warp_sync_taking_long_time_warning =
future::Either::Right(future::pending()).fuse();

debug_assert!(self.known_finalized_runtime.is_none());
self.known_finalized_runtime = Some(FinalizedBlockRuntime {
virtual_machine: finalized_block_runtime,
storage_code: finalized_storage_code,
storage_heap_pages: finalized_storage_heap_pages,
});

self.network_up_to_date_finalized = false;
self.network_up_to_date_best = false;
// Since there is a gap in the blocks, all active notifications to all blocks
// must be cleared.
self.all_notifications.clear();

return (self, true);
}

all::ProcessOne::VerifyWarpSyncFragment(verify) => {
// Grandpa warp sync fragment to verify.
let sender_peer_id = verify.proof_sender().1 .0.clone(); // TODO: unnecessary cloning most of the time
Expand Down
5 changes: 5 additions & 0 deletions bin/wasm-node/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,11 @@

## Unreleased

### Changed

- The GRANDPA warp sync algorithm now downloads Merkle proofs of all the necessary storage items at once, rather than one by one sequentially. This removes approximately 11 networking round-trips and thus significantly reduces the time the warp syncing takes. ([#2578](https://github.com/paritytech/smoldot/pull/2578))
- The GRANDPA warp sync implementation has been considerably refactored. It is possible that unintended changes in behaviour have accidentally been introduced. ([#2578](https://github.com/paritytech/smoldot/pull/2578))

## 0.6.27 - 2022-07-29

### Changed
Expand Down
Loading

0 comments on commit 6b26ed6

Please sign in to comment.