diff --git a/bin/full-node/src/run/consensus_service.rs b/bin/full-node/src/run/consensus_service.rs index 7485db2245..b1307c1d3a 100644 --- a/bin/full-node/src/run/consensus_service.rs +++ b/bin/full-node/src/run/consensus_service.rs @@ -533,10 +533,6 @@ impl SyncBackground { | all::ResponseOutcome::NotFinalizedChain { .. } | all::ResponseOutcome::AllAlreadyInChain { .. } => { } - all::ResponseOutcome::WarpSyncError { .. } | - all::ResponseOutcome::WarpSyncFinished { .. } => { - unreachable!() - } } } }, @@ -802,7 +798,7 @@ impl SyncBackground { // Locally-authored blocks source. match (request_details, &self.authored_block) { ( - all::RequestDetail::BlocksRequest { + all::DesiredRequest::BlocksRequest { first_block_hash: None, first_block_height, .. @@ -810,7 +806,7 @@ impl SyncBackground { Some((authored_height, _, _, _)), ) if first_block_height == authored_height => true, ( - all::RequestDetail::BlocksRequest { + all::DesiredRequest::BlocksRequest { first_block_hash: Some(first_block_hash), first_block_height, .. @@ -834,7 +830,7 @@ impl SyncBackground { request_info.num_blocks_clamp(NonZeroU64::new(64).unwrap()); match request_info { - all::RequestDetail::BlocksRequest { .. } + all::DesiredRequest::BlocksRequest { .. } if source_id == self.block_author_sync_source => { tracing::debug!("queue-locally-authored-block-for-import"); @@ -847,7 +843,7 @@ impl SyncBackground { // Create a request that is immediately answered right below. let request_id = self.sync.add_request( source_id, - request_info, + request_info.into(), future::AbortHandle::new_pair().0, // Temporary dummy. ); @@ -863,7 +859,7 @@ impl SyncBackground { ); } - all::RequestDetail::BlocksRequest { + all::DesiredRequest::BlocksRequest { first_block_hash, first_block_height, ascending, @@ -905,15 +901,14 @@ impl SyncBackground { ); let (request, abort) = future::abortable(request); - let request_id = self - .sync - .add_request(source_id, request_info.clone(), abort); + let request_id = self.sync.add_request(source_id, request_info.into(), abort); self.block_requests_finished .push(request.map(move |r| (request_id, r)).boxed()); } - all::RequestDetail::GrandpaWarpSync { .. } - | all::RequestDetail::StorageGet { .. } => { + all::DesiredRequest::GrandpaWarpSync { .. } + | all::DesiredRequest::StorageGet { .. } + | all::DesiredRequest::RuntimeCallMerkleProof { .. } => { // Not used in "full" mode. unreachable!() } @@ -937,7 +932,9 @@ impl SyncBackground { self.sync = idle; break; } - all::ProcessOne::VerifyWarpSyncFragment(_) => unreachable!(), + all::ProcessOne::VerifyWarpSyncFragment(_) + | all::ProcessOne::WarpSyncError { .. } + | all::ProcessOne::WarpSyncFinished { .. } => unreachable!(), all::ProcessOne::VerifyBodyHeader(verify) => { let hash_to_verify = verify.hash(); let height_to_verify = verify.height(); diff --git a/bin/light-base/src/sync_service/standalone.rs b/bin/light-base/src/sync_service/standalone.rs index b29b476d13..38ef5c11ea 100644 --- a/bin/light-base/src/sync_service/standalone.rs +++ b/bin/light-base/src/sync_service/standalone.rs @@ -29,6 +29,7 @@ use smoldot::{ }; use std::{ collections::{HashMap, HashSet}, + iter, marker::PhantomData, num::{NonZeroU32, NonZeroU64}, sync::Arc, @@ -81,6 +82,7 @@ pub(super) async fn start_standalone_chain( pending_block_requests: stream::FuturesUnordered::new(), pending_grandpa_requests: stream::FuturesUnordered::new(), pending_storage_requests: stream::FuturesUnordered::new(), + pending_call_proof_requests: stream::FuturesUnordered::new(), warp_sync_taking_long_time_warning: future::Either::Left(TPlat::sleep( Duration::from_secs(15), )) @@ -277,8 +279,7 @@ pub(super) async fn start_standalone_chain( (request_id, result) = task.pending_storage_requests.select_next_some() => { // A storage request has been finished. - // `result` is an error if the block request got cancelled by the sync state - // machine. + // `result` is an error if the request got cancelled by the sync state machine. if let Ok(result) = result { // Inject the result of the request into the sync state machine. task.sync.storage_get_response( @@ -293,6 +294,26 @@ pub(super) async fn start_standalone_chain( } }, + (request_id, result) = task.pending_call_proof_requests.select_next_some() => { + // A call proof request has been finished. + // `result` is an error if the request got cancelled by the sync state machine. + if let Ok(result) = result { + // Inject the result of the request into the sync state machine. + task.sync.call_proof_response( + request_id, + match result { + Ok(ref r) => Ok(r.decode().into_iter()), + Err(err) => Err(err), + } + ).1 + + } else { + // The sync state machine has emitted a `Action::Cancel` earlier, and is + // thus no longer interested in the response. + continue; + } + }, + () = &mut task.warp_sync_taking_long_time_warning => { log::warn!( target: &task.log_target, @@ -325,42 +346,6 @@ pub(super) async fn start_standalone_chain( | all::ResponseOutcome::Queued | all::ResponseOutcome::NotFinalizedChain { .. } | all::ResponseOutcome::AllAlreadyInChain { .. } => {} - all::ResponseOutcome::WarpSyncError { error } => { - log::warn!( - target: &task.log_target, - "Error during GrandPa warp syncing: {}", - error - ); - } - all::ResponseOutcome::WarpSyncFinished { - finalized_block_runtime, - finalized_storage_code, - finalized_storage_heap_pages, - } => { - let finalized_header = task.sync.finalized_block_header(); - log::info!( - target: &task.log_target, - "GrandPa warp sync finished to #{} ({})", - finalized_header.number, - HashDisplay(&finalized_header.hash(task.sync.block_number_bytes())) - ); - - task.warp_sync_taking_long_time_warning = - future::Either::Right(future::pending()).fuse(); - - debug_assert!(task.known_finalized_runtime.is_none()); - task.known_finalized_runtime = Some(FinalizedBlockRuntime { - virtual_machine: finalized_block_runtime, - storage_code: finalized_storage_code, - storage_heap_pages: finalized_storage_heap_pages, - }); - - task.network_up_to_date_finalized = false; - task.network_up_to_date_best = false; - // Since there is a gap in the blocks, all active notifications to all blocks - // must be cleared. - task.all_notifications.clear(); - } } } } @@ -446,6 +431,17 @@ struct Task { >, >, + /// List of call proof requests currently in progress. + pending_call_proof_requests: stream::FuturesUnordered< + future::BoxFuture< + 'static, + ( + all::RequestId, + Result, future::Aborted>, + ), + >, + >, + platform: PhantomData TPlat>, } @@ -475,7 +471,7 @@ impl Task { request_detail.num_blocks_clamp(NonZeroU64::new(64).unwrap()); match request_detail { - all::RequestDetail::BlocksRequest { + all::DesiredRequest::BlocksRequest { first_block_hash, first_block_height, ascending, @@ -514,13 +510,15 @@ impl Task { ); let (block_request, abort) = future::abortable(block_request); - let request_id = self.sync.add_request(source_id, request_detail, abort); + let request_id = self + .sync + .add_request(source_id, request_detail.into(), abort); self.pending_block_requests .push(async move { (request_id, block_request.await) }.boxed()); } - all::RequestDetail::GrandpaWarpSync { + all::DesiredRequest::GrandpaWarpSync { sync_start_block_hash, } => { let peer_id = self.sync[source_id].0.clone(); // TODO: why does this require cloning? weird borrow chk issue @@ -537,13 +535,15 @@ impl Task { ); let (grandpa_request, abort) = future::abortable(grandpa_request); - let request_id = self.sync.add_request(source_id, request_detail, abort); + let request_id = self + .sync + .add_request(source_id, request_detail.into(), abort); self.pending_grandpa_requests .push(async move { (request_id, grandpa_request.await) }.boxed()); } - all::RequestDetail::StorageGet { + all::DesiredRequest::StorageGet { block_hash, state_trie_root, ref keys, @@ -583,11 +583,52 @@ impl Task { }; let (storage_request, abort) = future::abortable(storage_request); - let request_id = self.sync.add_request(source_id, request_detail, abort); + let request_id = self + .sync + .add_request(source_id, request_detail.into(), abort); self.pending_storage_requests .push(async move { (request_id, storage_request.await) }.boxed()); } + + all::DesiredRequest::RuntimeCallMerkleProof { + block_hash, + ref function_name, + ref parameter_vectored, + } => { + let peer_id = self.sync[source_id].0.clone(); // TODO: why does this require cloning? weird borrow chk issue + let network_service = self.network_service.clone(); + let network_chain_index = self.network_chain_index; + // TODO: all this copying is done because of lifetime requirements in NetworkService::call_proof_request; maybe check if it can be avoided + let parameter_vectored = parameter_vectored.clone(); + let function_name = function_name.clone(); + + let call_proof_request = async move { + let rq = network_service.call_proof_request( + network_chain_index, + peer_id, + network::protocol::CallProofRequestConfig { + block_hash, + method: &function_name, + parameter_vectored: iter::once(parameter_vectored), + }, + Duration::from_secs(16), + ); + + match rq.await { + Ok(p) => Ok(p), + Err(_) => Err(()), + } + }; + + let (call_proof_request, abort) = future::abortable(call_proof_request); + let request_id = self + .sync + .add_request(source_id, request_detail.into(), abort); + + self.pending_call_proof_requests + .push(async move { (request_id, call_proof_request.await) }.boxed()); + } } true @@ -607,6 +648,51 @@ impl Task { return (self, false); } + all::ProcessOne::WarpSyncError { sync, error } => { + self.sync = sync; + log::warn!( + target: &self.log_target, + "Error during GrandPa warp syncing: {}", + error + ); + return (self, true); + } + + all::ProcessOne::WarpSyncFinished { + sync, + finalized_block_runtime, + finalized_storage_code, + finalized_storage_heap_pages, + } => { + self.sync = sync; + + let finalized_header = self.sync.finalized_block_header(); + log::info!( + target: &self.log_target, + "GrandPa warp sync finished to #{} ({})", + finalized_header.number, + HashDisplay(&finalized_header.hash(self.sync.block_number_bytes())) + ); + + self.warp_sync_taking_long_time_warning = + future::Either::Right(future::pending()).fuse(); + + debug_assert!(self.known_finalized_runtime.is_none()); + self.known_finalized_runtime = Some(FinalizedBlockRuntime { + virtual_machine: finalized_block_runtime, + storage_code: finalized_storage_code, + storage_heap_pages: finalized_storage_heap_pages, + }); + + self.network_up_to_date_finalized = false; + self.network_up_to_date_best = false; + // Since there is a gap in the blocks, all active notifications to all blocks + // must be cleared. + self.all_notifications.clear(); + + return (self, true); + } + all::ProcessOne::VerifyWarpSyncFragment(verify) => { // Grandpa warp sync fragment to verify. let sender_peer_id = verify.proof_sender().1 .0.clone(); // TODO: unnecessary cloning most of the time diff --git a/bin/wasm-node/CHANGELOG.md b/bin/wasm-node/CHANGELOG.md index 30a0387ae6..49165bc87f 100644 --- a/bin/wasm-node/CHANGELOG.md +++ b/bin/wasm-node/CHANGELOG.md @@ -2,6 +2,11 @@ ## Unreleased +### Changed + +- The GRANDPA warp sync algorithm now downloads Merkle proofs of all the necessary storage items at once, rather than one by one sequentially. This removes approximately 11 networking round-trips and thus significantly reduces the time the warp syncing takes. ([#2578](https://github.com/paritytech/smoldot/pull/2578)) +- The GRANDPA warp sync implementation has been considerably refactored. It is possible that unintended changes in behaviour have accidentally been introduced. ([#2578](https://github.com/paritytech/smoldot/pull/2578)) + ## 0.6.27 - 2022-07-29 ### Changed diff --git a/src/sync/all.rs b/src/sync/all.rs index 714371807d..ceded148b8 100644 --- a/src/sync/all.rs +++ b/src/sync/all.rs @@ -32,15 +32,15 @@ use crate::{ chain::{blocks_tree, chain_information}, - executor::{host, storage_diff, vm::ExecHint}, + executor::{host, storage_diff}, header, sync::{all_forks, optimistic, warp_sync}, verify, }; -use alloc::{vec, vec::Vec}; +use alloc::{borrow::Cow, vec, vec::Vec}; use core::{ - cmp, iter, mem, + cmp, iter, marker, mem, num::{NonZeroU32, NonZeroU64}, ops, time::Duration, @@ -151,6 +151,7 @@ impl AllSync { start_chain_information: config.chain_information, block_number_bytes: config.block_number_bytes, sources_capacity: config.sources_capacity, + requests_capacity: config.sources_capacity, // TODO: ?! add as config? }) { Ok(inner) => AllSyncInner::GrandpaWarpSync { inner }, Err((chain_information, warp_sync::WarpSyncInitError::NotGrandpa)) => { @@ -345,29 +346,7 @@ impl AllSync { // `inner` is temporarily replaced with `Poisoned`. A new value must be put back before // returning. match mem::replace(&mut self.inner, AllSyncInner::Poisoned) { - AllSyncInner::GrandpaWarpSync { - inner: warp_sync::InProgressWarpSync::WaitingForSources(waiting), - } => { - let outer_source_id_entry = self.shared.sources.vacant_entry(); - let outer_source_id = SourceId(outer_source_id_entry.key()); - - let warp_sync_request = waiting.add_source(GrandpaWarpSyncSourceExtra { - outer_source_id, - user_data, - best_block_number, - best_block_hash, - }); - - let inner_source_id = warp_sync_request.current_source().0; - outer_source_id_entry.insert(SourceMapping::GrandpaWarpSync(inner_source_id)); - - self.inner = AllSyncInner::GrandpaWarpSync { - inner: warp_sync_request.into(), - }; - - outer_source_id - } - AllSyncInner::GrandpaWarpSync { inner: mut grandpa } => { + AllSyncInner::GrandpaWarpSync { inner: mut sync } => { let outer_source_id_entry = self.shared.sources.vacant_entry(); let outer_source_id = SourceId(outer_source_id_entry.key()); @@ -378,26 +357,11 @@ impl AllSync { best_block_hash, }; - let inner_source_id = match &mut grandpa { - warp_sync::InProgressWarpSync::WaitingForSources(_) => { - unreachable!() - } - warp_sync::InProgressWarpSync::Verifier(sync) => sync.add_source(source_extra), - warp_sync::InProgressWarpSync::WarpSyncRequest(sync) => { - sync.add_source(source_extra) - } - warp_sync::InProgressWarpSync::VirtualMachineParamsGet(sync) => { - sync.add_source(source_extra) - } - warp_sync::InProgressWarpSync::StorageGet(sync) => { - sync.add_source(source_extra) - } - warp_sync::InProgressWarpSync::NextKey(sync) => sync.add_source(source_extra), - }; + let inner_source_id = sync.add_source(source_extra); outer_source_id_entry.insert(SourceMapping::GrandpaWarpSync(inner_source_id)); - self.inner = AllSyncInner::GrandpaWarpSync { inner: grandpa }; + self.inner = AllSyncInner::GrandpaWarpSync { inner: sync }; outer_source_id } AllSyncInner::AllForks(mut all_forks) => { @@ -518,18 +482,37 @@ impl AllSync { (user_data.user_data, requests) } - (AllSyncInner::GrandpaWarpSync { .. }, SourceMapping::GrandpaWarpSync(source_id)) => { - let sync = match mem::replace(&mut self.inner, AllSyncInner::Poisoned) { - AllSyncInner::GrandpaWarpSync { inner: sync } => sync, - _ => unreachable!(), - }; + ( + AllSyncInner::GrandpaWarpSync { inner }, + SourceMapping::GrandpaWarpSync(source_id), + ) => { + let (user_data, requests) = inner.remove_source(source_id); + let requests = requests + .map(|(_inner_request_id, request_inner_user_data)| { + debug_assert!(self + .shared + .requests + .contains(request_inner_user_data.outer_request_id.0)); + let _removed = self + .shared + .requests + .remove(request_inner_user_data.outer_request_id.0); + debug_assert!(matches!( + _removed, + RequestMapping::WarpSync(_inner_request_id) + )); - let (user_data, grandpa_warp_sync) = sync.remove_source(source_id); - self.inner = AllSyncInner::GrandpaWarpSync { - inner: grandpa_warp_sync, - }; + ( + request_inner_user_data.outer_request_id, + request_inner_user_data.user_data, + ) + }) + .collect::>() + .into_iter(); - (user_data.user_data, Vec::new().into_iter()) // TODO: properly return requests + // TODO: also handle the "inline" requests + + (user_data.user_data, requests) } (AllSyncInner::Poisoned, _) => unreachable!(), @@ -796,7 +779,7 @@ impl AllSync { /// called in order for the request to actually be marked as started. pub fn desired_requests( &'_ self, - ) -> impl Iterator + '_ { + ) -> impl Iterator + '_ { match &self.inner { AllSyncInner::AllForks(sync) => { let iter = sync.desired_requests().map( @@ -809,7 +792,7 @@ impl AllSync { }, ); - either::Left(iter) + either::Left(either::Right(iter)) } AllSyncInner::Optimistic { inner } => { let iter = inner.desired_requests().map(move |rq_detail| { @@ -820,56 +803,45 @@ impl AllSync { ) }); - either::Right(either::Left(iter)) + either::Right(iter) } AllSyncInner::GrandpaWarpSync { inner } => { - // Grandpa warp sync only ever requires one request at a time. Determine which - // one it is, if any. - let desired_request = match inner { - warp_sync::InProgressWarpSync::WarpSyncRequest(rq) => Some(( - rq.current_source().1.outer_source_id, - &rq.current_source().1.user_data, - RequestDetail::GrandpaWarpSync { - sync_start_block_hash: rq.start_block_hash(), - }, - )), - warp_sync::InProgressWarpSync::StorageGet(get) => Some(( - get.warp_sync_source().1.outer_source_id, - &get.warp_sync_source().1.user_data, - RequestDetail::StorageGet { - block_hash: get.warp_sync_header().hash(inner.block_number_bytes()), - state_trie_root: *get.warp_sync_header().state_root, - keys: vec![get.key_as_vec()], - }, - )), - warp_sync::InProgressWarpSync::VirtualMachineParamsGet(rq) => Some(( - rq.warp_sync_source().1.outer_source_id, - &rq.warp_sync_source().1.user_data, - RequestDetail::StorageGet { - block_hash: rq.warp_sync_header().hash(inner.block_number_bytes()), - state_trie_root: *rq.warp_sync_header().state_root, - keys: vec![b":code".to_vec(), b":heappages".to_vec()], - }, - )), - _ => None, - }; + let iter = inner + .desired_requests() + .map(move |(_, src_user_data, rq_detail)| { + let detail = match rq_detail { + warp_sync::DesiredRequest::WarpSyncRequest { block_hash } => { + DesiredRequest::GrandpaWarpSync { + sync_start_block_hash: block_hash, + } + } + warp_sync::DesiredRequest::RuntimeParametersGet { + block_hash, + state_trie_root, + } => DesiredRequest::StorageGet { + block_hash, + state_trie_root, + keys: vec![b":code".to_vec(), b":heappages".to_vec()], + }, + warp_sync::DesiredRequest::RuntimeCallMerkleProof { + block_hash, + function_name, + parameter_vectored, + } => DesiredRequest::RuntimeCallMerkleProof { + block_hash, + function_name, + parameter_vectored, + }, + }; - let iter = if let Some(desired_request) = desired_request { - if self.shared.requests.iter().any(|(_, rq)| match rq { - RequestMapping::Inline(src_id, ud, _) => { - (src_id, ud) == (&desired_request.0, &desired_request.2) - } - _ => false, - }) { - either::Left(iter::empty()) - } else { - either::Right(iter::once(desired_request)) - } - } else { - either::Left(iter::empty()) - }; + ( + src_user_data.outer_source_id, + &src_user_data.user_data, + detail, + ) + }); - either::Right(either::Right(iter)) + either::Left(either::Left(iter)) } AllSyncInner::Poisoned => unreachable!(), } @@ -958,6 +930,92 @@ impl AllSync { request_mapping_entry.insert(RequestMapping::Optimistic(inner_request_id)); return outer_request_id; } + ( + AllSyncInner::GrandpaWarpSync { inner }, + RequestDetail::GrandpaWarpSync { + sync_start_block_hash, + }, + ) => { + let inner_source_id = match self.shared.sources.get(source_id.0).unwrap() { + SourceMapping::GrandpaWarpSync(inner_source_id) => *inner_source_id, + _ => unreachable!(), + }; + + let request_mapping_entry = self.shared.requests.vacant_entry(); + let outer_request_id = RequestId(request_mapping_entry.key()); + + let inner_request_id = inner.add_request( + inner_source_id, + GrandpaWarpSyncRequestExtra { + outer_request_id, + user_data, + }, + warp_sync::RequestDetail::WarpSyncRequest { + block_hash: *sync_start_block_hash, + }, + ); + + request_mapping_entry.insert(RequestMapping::WarpSync(inner_request_id)); + return outer_request_id; + } + ( + AllSyncInner::GrandpaWarpSync { inner }, + RequestDetail::StorageGet { block_hash, keys }, + ) if keys == &[&b":code"[..], &b":heappages"[..]] => { + let inner_source_id = match self.shared.sources.get(source_id.0).unwrap() { + SourceMapping::GrandpaWarpSync(inner_source_id) => *inner_source_id, + _ => unreachable!(), + }; + + let request_mapping_entry = self.shared.requests.vacant_entry(); + let outer_request_id = RequestId(request_mapping_entry.key()); + + let inner_request_id = inner.add_request( + inner_source_id, + GrandpaWarpSyncRequestExtra { + outer_request_id, + user_data, + }, + warp_sync::RequestDetail::RuntimeParametersGet { + block_hash: *block_hash, + }, + ); + + request_mapping_entry.insert(RequestMapping::WarpSync(inner_request_id)); + return outer_request_id; + } + ( + AllSyncInner::GrandpaWarpSync { inner }, + RequestDetail::RuntimeCallMerkleProof { + block_hash, + function_name, + parameter_vectored, + }, + ) => { + let inner_source_id = match self.shared.sources.get(source_id.0).unwrap() { + SourceMapping::GrandpaWarpSync(inner_source_id) => *inner_source_id, + _ => unreachable!(), + }; + + let request_mapping_entry = self.shared.requests.vacant_entry(); + let outer_request_id = RequestId(request_mapping_entry.key()); + + let inner_request_id = inner.add_request( + inner_source_id, + GrandpaWarpSyncRequestExtra { + outer_request_id, + user_data, + }, + warp_sync::RequestDetail::RuntimeCallMerkleProof { + block_hash: *block_hash, + function_name: function_name.clone(), // TODO: don't clone + parameter_vectored: parameter_vectored.clone(), // TODO: don't clone + }, + ); + + request_mapping_entry.insert(RequestMapping::WarpSync(inner_request_id)); + return outer_request_id; + } (AllSyncInner::AllForks { .. }, _) => {} (AllSyncInner::Optimistic { .. }, _) => {} (AllSyncInner::GrandpaWarpSync { .. }, _) => {} @@ -1018,10 +1076,43 @@ impl AllSync { /// [`AllSync`] is yielded back at the end of this process. pub fn process_one(mut self) -> ProcessOne { match self.inner { - AllSyncInner::GrandpaWarpSync { - inner: warp_sync::InProgressWarpSync::Verifier(_), - } => ProcessOne::VerifyWarpSyncFragment(WarpSyncFragmentVerify { inner: self }), - AllSyncInner::GrandpaWarpSync { .. } => ProcessOne::AllSync(self), + AllSyncInner::GrandpaWarpSync { inner } => { + match inner.process_one() { + warp_sync::ProcessOne::Idle(inner) => { + self.inner = AllSyncInner::GrandpaWarpSync { inner }; + ProcessOne::AllSync(self) + } + warp_sync::ProcessOne::VerifyWarpSyncFragment(inner) => { + ProcessOne::VerifyWarpSyncFragment(WarpSyncFragmentVerify { + inner, + shared: self.shared, + marker: marker::PhantomData, + }) + } + warp_sync::ProcessOne::BuildChainInformation(inner) => match inner.build().0 { + // TODO: errors not reported to upper layer + warp_sync::WarpSync::InProgress(inner) => { + self.inner = AllSyncInner::GrandpaWarpSync { inner }; + ProcessOne::AllSync(self) + } + warp_sync::WarpSync::Finished(success) => { + let ( + new_inner, + finalized_block_runtime, + finalized_storage_code, + finalized_storage_heap_pages, + ) = self.shared.transition_grandpa_warp_sync_all_forks(success); + self.inner = AllSyncInner::AllForks(new_inner); + ProcessOne::WarpSyncFinished { + sync: self, + finalized_block_runtime, + finalized_storage_code, + finalized_storage_heap_pages, + } + } + }, + } + } AllSyncInner::AllForks(sync) => match sync.process_one() { all_forks::ProcessOne::AllSync { sync } => { self.inner = AllSyncInner::AllForks(sync); @@ -1341,34 +1432,28 @@ impl AllSync { ) -> (TRq, ResponseOutcome) { debug_assert!(self.shared.requests.contains(request_id.0)); let request = self.shared.requests.remove(request_id.0); - let user_data = match request { - RequestMapping::Inline(_, _, user_data) => user_data, - _ => panic!(), - }; - let outcome = match mem::replace(&mut self.inner, AllSyncInner::Poisoned) { - AllSyncInner::GrandpaWarpSync { - inner: warp_sync::InProgressWarpSync::WarpSyncRequest(grandpa), - } => { - let updated_grandpa = if let Some((fragments, is_finished)) = response { - grandpa.handle_response_ok(fragments, is_finished) + match (&mut self.inner, request) { + ( + AllSyncInner::GrandpaWarpSync { inner: grandpa }, + RequestMapping::WarpSync(request_id), + ) => { + let user_data = if let Some((fragments, is_finished)) = response { + grandpa.warp_sync_request_success(request_id, fragments, is_finished) } else { - grandpa.handle_response_err() - }; - self.inner = AllSyncInner::GrandpaWarpSync { - inner: updated_grandpa, + grandpa.fail_request(request_id) }; - ResponseOutcome::Queued + + (user_data.user_data, ResponseOutcome::Queued) } // Only the GrandPa warp syncing ever starts GrandPa warp sync requests. - other => { - self.inner = other; - ResponseOutcome::Queued // TODO: no + (_, RequestMapping::Inline(_, _, user_data)) => { + (user_data, ResponseOutcome::Queued) // TODO: no, not queued } - }; - (user_data, outcome) + _ => todo!(), // TODO: handle other variants + } } /// Inject a response to a previously-emitted storage proof request. @@ -1388,20 +1473,16 @@ impl AllSync { ) -> (TRq, ResponseOutcome) { debug_assert!(self.shared.requests.contains(request_id.0)); let request = self.shared.requests.remove(request_id.0); - let user_data = match request { - RequestMapping::Inline(_, _, user_data) => user_data, - _ => panic!(), - }; - let outcome = match ( + match ( mem::replace(&mut self.inner, AllSyncInner::Poisoned), response, + request, ) { ( - AllSyncInner::GrandpaWarpSync { - inner: warp_sync::InProgressWarpSync::VirtualMachineParamsGet(sync), - }, + AllSyncInner::GrandpaWarpSync { inner: mut sync }, Ok(mut response), + RequestMapping::WarpSync(request_id), ) => { // In this state, we expect the response to be one value for `:code` and one for // `:heappages`. As documented, we panic if the number of items isn't 2. @@ -1409,113 +1490,86 @@ impl AllSync { let heap_pages = response.next().unwrap(); assert!(response.next().is_none()); - // We use an `ExecHint` that assumes that the runtime will continue being used - // after the end of the warp syncing. This might be the case, since we provide - // the runtime to the API user. The API user might then immediately throw away - // this runtime, but we don't care enough about this possibility to optimize - // this. - // TODO: make `allow_unresolved_imports` configurable - let outcome = sync.set_virtual_machine_params( - code, - heap_pages, - ExecHint::CompileAheadOfTime, - false, - ); + let user_data = sync.runtime_parameters_get_success(request_id, code, heap_pages); - match outcome { - (warp_sync::WarpSync::InProgress(inner), None) => { - self.inner = AllSyncInner::GrandpaWarpSync { inner }; - ResponseOutcome::Queued - } - (warp_sync::WarpSync::InProgress(inner), Some(error)) => { - self.inner = AllSyncInner::GrandpaWarpSync { inner }; - ResponseOutcome::WarpSyncError { error } - } - (warp_sync::WarpSync::Finished(success), None) => { - let ( - all_forks, - finalized_block_runtime, - finalized_storage_code, - finalized_storage_heap_pages, - ) = self.shared.transition_grandpa_warp_sync_all_forks(success); - self.inner = AllSyncInner::AllForks(all_forks); - ResponseOutcome::WarpSyncFinished { - finalized_block_runtime, - finalized_storage_code, - finalized_storage_heap_pages, - } - } - (warp_sync::WarpSync::Finished(_), Some(_)) => unreachable!(), - } + self.inner = AllSyncInner::GrandpaWarpSync { inner: sync }; + (user_data.user_data, ResponseOutcome::Queued) } ( - AllSyncInner::GrandpaWarpSync { - inner: warp_sync::InProgressWarpSync::StorageGet(sync), - }, - Ok(mut response), + AllSyncInner::GrandpaWarpSync { inner: mut sync }, + Err(_), + RequestMapping::WarpSync(request_id), ) => { - // In this state, we expect the response to be one value. As documented, we panic - // if the number of items isn't 1. - let value = response.next().unwrap(); - assert!(response.next().is_none()); - - let outcome = sync.inject_value(value.map(iter::once)); - match outcome { - (warp_sync::WarpSync::InProgress(inner), None) => { - self.inner = AllSyncInner::GrandpaWarpSync { inner }; - ResponseOutcome::Queued - } - (warp_sync::WarpSync::InProgress(inner), Some(error)) => { - self.inner = AllSyncInner::GrandpaWarpSync { inner }; - ResponseOutcome::WarpSyncError { error } - } - (warp_sync::WarpSync::Finished(success), None) => { - let ( - all_forks, - finalized_block_runtime, - finalized_storage_code, - finalized_storage_heap_pages, - ) = self.shared.transition_grandpa_warp_sync_all_forks(success); - self.inner = AllSyncInner::AllForks(all_forks); - ResponseOutcome::WarpSyncFinished { - finalized_block_runtime, - finalized_storage_code, - finalized_storage_heap_pages, - } - } - (warp_sync::WarpSync::Finished(_), Some(_)) => unreachable!(), - } + let user_data = sync.fail_request(request_id).user_data; + // TODO: notify user of the problem + self.inner = AllSyncInner::GrandpaWarpSync { inner: sync }; + (user_data, ResponseOutcome::Queued) } + // Only the GrandPa warp syncing ever starts GrandPa warp sync requests. + (other, _, RequestMapping::Inline(_, _, user_data)) => { + self.inner = other; + (user_data, ResponseOutcome::Queued) // TODO: no + } + (_, _, _) => { + // Type of request doesn't correspond to a storage get. + panic!() + } + } + } + + /// Inject a response to a previously-emitted call proof request. + /// + /// On success, must contain the encoded Merkle proof. See the + /// [`trie`](crate::trie::proof_verify) module for a description of the format of Merkle + /// proofs. + /// + /// # Panic + /// + /// Panics if the [`RequestId`] doesn't correspond to any request, or corresponds to a request + /// of a different type. + /// + pub fn call_proof_response( + &mut self, + request_id: RequestId, + response: Result>, ()>, + ) -> (TRq, ResponseOutcome) { + debug_assert!(self.shared.requests.contains(request_id.0)); + let request = self.shared.requests.remove(request_id.0); + + match ( + mem::replace(&mut self.inner, AllSyncInner::Poisoned), + response, + request, + ) { ( - AllSyncInner::GrandpaWarpSync { - inner: warp_sync::InProgressWarpSync::VirtualMachineParamsGet(sync), - }, - Err(_), + AllSyncInner::GrandpaWarpSync { inner: mut sync }, + Ok(response), + RequestMapping::WarpSync(request_id), ) => { - let inner = sync.inject_error(); - // TODO: notify user of the problem - self.inner = AllSyncInner::GrandpaWarpSync { inner }; - ResponseOutcome::Queued + let user_data = sync.runtime_call_merkle_proof_success(request_id, response); + self.inner = AllSyncInner::GrandpaWarpSync { inner: sync }; + (user_data.user_data, ResponseOutcome::Queued) } ( - AllSyncInner::GrandpaWarpSync { - inner: warp_sync::InProgressWarpSync::StorageGet(sync), - }, + AllSyncInner::GrandpaWarpSync { inner: mut sync }, Err(_), + RequestMapping::WarpSync(request_id), ) => { - let inner = sync.inject_error(); + let user_data = sync.fail_request(request_id); // TODO: notify user of the problem - self.inner = AllSyncInner::GrandpaWarpSync { inner }; - ResponseOutcome::Queued + self.inner = AllSyncInner::GrandpaWarpSync { inner: sync }; + (user_data.user_data, ResponseOutcome::Queued) } - // Only the GrandPa warp syncing ever starts GrandPa warp sync requests. - (other, _) => { + // Only the GrandPa warp syncing ever starts call proof requests. + (other, _, RequestMapping::Inline(_, _, user_data)) => { self.inner = other; - ResponseOutcome::Queued // TODO: no + (user_data, ResponseOutcome::Queued) // TODO: no } - }; - - (user_data, outcome) + (_, _, _) => { + // Type of request doesn't correspond to a call proof request. + panic!() + } + } } } @@ -1585,7 +1639,7 @@ impl ops::IndexMut for AllSync { /// See [`AllSync::desired_requests`]. #[derive(Debug, Clone, PartialEq, Eq)] #[must_use] -pub enum RequestDetail { +pub enum DesiredRequest { /// Requesting blocks from the source is requested. BlocksRequest { /// Height of the first block to request. @@ -1628,6 +1682,87 @@ pub enum RequestDetail { /// Keys whose values is requested. keys: Vec>, }, + + /// Sending a call proof query is requested. + RuntimeCallMerkleProof { + /// Hash of the block whose call is made against. + block_hash: [u8; 32], + /// Name of the function to be called. + function_name: Cow<'static, str>, + /// Concatenated SCALE-encoded parameters to provide to the call. + parameter_vectored: Cow<'static, [u8]>, + }, +} + +impl DesiredRequest { + /// Caps the number of blocks to request to `max`. + pub fn num_blocks_clamp(&mut self, max: NonZeroU64) { + if let DesiredRequest::BlocksRequest { num_blocks, .. } = self { + *num_blocks = NonZeroU64::new(cmp::min(num_blocks.get(), max.get())).unwrap(); + } + } + + /// Caps the number of blocks to request to `max`. + pub fn with_num_blocks_clamp(mut self, max: NonZeroU64) -> Self { + self.num_blocks_clamp(max); + self + } +} + +/// See [`AllSync::desired_requests`]. +#[derive(Debug, Clone, PartialEq, Eq)] +#[must_use] +pub enum RequestDetail { + /// Requesting blocks from the source is requested. + BlocksRequest { + /// Height of the first block to request. + first_block_height: u64, + /// Hash of the first block to request. `None` if not known. + first_block_hash: Option<[u8; 32]>, + /// `True` if the `first_block_hash` is the response should contain blocks in an + /// increasing number, starting from `first_block_hash` with the lowest number. If `false`, + /// the blocks should be in decreasing number, with `first_block_hash` as the highest + /// number. + ascending: bool, + /// Number of blocks the request should return. + /// + /// Note that this is only an indication, and the source is free to give fewer blocks + /// than requested. + /// + /// This might be equal to `u64::max_value()` in case no upper bound is required. The API + /// user is responsible for clamping this value to a reasonable limit. + num_blocks: NonZeroU64, + /// `True` if headers should be included in the response. + request_headers: bool, + /// `True` if bodies should be included in the response. + request_bodies: bool, + /// `True` if the justification should be included in the response, if any. + request_justification: bool, + }, + + /// Sending a Grandpa warp sync request is requested. + GrandpaWarpSync { + /// Hash of the known finalized block. Starting point of the request. + sync_start_block_hash: [u8; 32], + }, + + /// Sending a storage query is requested. + StorageGet { + /// Hash of the block whose storage is requested. + block_hash: [u8; 32], + /// Keys whose values is requested. + keys: Vec>, + }, + + /// Sending a call proof query is requested. + RuntimeCallMerkleProof { + /// Hash of the block whose call is made against. + block_hash: [u8; 32], + /// Name of the function to be called. + function_name: Cow<'static, str>, + /// Concatenated SCALE-encoded parameters to provide to the call. + parameter_vectored: Cow<'static, [u8]>, + }, } impl RequestDetail { @@ -1645,6 +1780,47 @@ impl RequestDetail { } } +impl From for RequestDetail { + fn from(rq: DesiredRequest) -> RequestDetail { + match rq { + DesiredRequest::BlocksRequest { + first_block_height, + first_block_hash, + ascending, + num_blocks, + request_headers, + request_bodies, + request_justification, + } => RequestDetail::BlocksRequest { + first_block_height, + first_block_hash, + ascending, + num_blocks, + request_headers, + request_bodies, + request_justification, + }, + DesiredRequest::GrandpaWarpSync { + sync_start_block_hash, + } => RequestDetail::GrandpaWarpSync { + sync_start_block_hash, + }, + DesiredRequest::StorageGet { + block_hash, keys, .. + } => RequestDetail::StorageGet { block_hash, keys }, + DesiredRequest::RuntimeCallMerkleProof { + block_hash, + function_name, + parameter_vectored, + } => RequestDetail::RuntimeCallMerkleProof { + block_hash, + function_name, + parameter_vectored, + }, + } + } +} + pub struct BlockRequestSuccessBlock { pub scale_encoded_header: Vec, pub scale_encoded_justifications: Vec<([u8; 4], Vec)>, @@ -1743,35 +1919,18 @@ pub enum ProcessOne { /// No block ready to be processed. AllSync(AllSync), - /// Ready to start verifying a header. - VerifyHeader(HeaderVerify), - - /// Ready to start verifying a proof of finality. - VerifyFinalityProof(FinalityProofVerify), - - /// Ready to start verifying a header and a body. - VerifyBodyHeader(HeaderBodyVerify), - - /// Ready to start verifying a warp sync fragment. - VerifyWarpSyncFragment(WarpSyncFragmentVerify), -} - -/// Outcome of injecting a response in the [`AllSync`]. -pub enum ResponseOutcome { - /// Request was no longer interesting for the state machine. - Outdated, - - /// Content of the response has been queued and will be processed later. - Queued, - /// Content of the response is erroneous in the context of warp syncing. WarpSyncError { + sync: AllSync, + /// Error that happened. error: warp_sync::Error, }, /// Response has made it possible to finish warp syncing. WarpSyncFinished { + sync: AllSync, + /// Runtime of the newly finalized block. /// /// > **Note**: Use methods such as [`AllSync::finalized_block_header`] to know which @@ -1785,6 +1944,27 @@ pub enum ResponseOutcome { finalized_storage_heap_pages: Option>, }, + /// Ready to start verifying a header. + VerifyHeader(HeaderVerify), + + /// Ready to start verifying a proof of finality. + VerifyFinalityProof(FinalityProofVerify), + + /// Ready to start verifying a header and a body. + VerifyBodyHeader(HeaderBodyVerify), + + /// Ready to start verifying a warp sync fragment. + VerifyWarpSyncFragment(WarpSyncFragmentVerify), +} + +/// Outcome of injecting a response in the [`AllSync`]. +pub enum ResponseOutcome { + /// Request was no longer interesting for the state machine. + Outdated, + + /// Content of the response has been queued and will be processed later. + Queued, + /// Source has given blocks that aren't part of the finalized chain. /// /// This doesn't necessarily mean that the source is malicious or uses a different chain. It @@ -2076,43 +2256,40 @@ pub enum FinalityProofVerifyOutcome { } pub struct WarpSyncFragmentVerify { - inner: AllSync, + inner: warp_sync::VerifyWarpSyncFragment< + GrandpaWarpSyncSourceExtra, + GrandpaWarpSyncRequestExtra, + >, + shared: Shared, + marker: marker::PhantomData>, } impl WarpSyncFragmentVerify { /// Returns the identifier and user data of the source that has sent the fragment to be /// verified. pub fn proof_sender(&self) -> (SourceId, &TSrc) { - let sender = match &self.inner.inner { - AllSyncInner::GrandpaWarpSync { - inner: warp_sync::InProgressWarpSync::Verifier(verifier), - } => verifier.proof_sender(), - _ => unreachable!(), - }; - - (sender.1.outer_source_id, &sender.1.user_data) + let (_, ud) = self.inner.proof_sender(); + (ud.outer_source_id, &ud.user_data) } /// Perform the verification. pub fn perform( - mut self, + self, ) -> ( AllSync, Result<(), warp_sync::FragmentError>, ) { - let (next_grandpa_warp_sync, error) = - match mem::replace(&mut self.inner.inner, AllSyncInner::Poisoned) { - AllSyncInner::GrandpaWarpSync { - inner: warp_sync::InProgressWarpSync::Verifier(verifier), - } => verifier.next(), - _ => unreachable!(), - }; - - self.inner.inner = AllSyncInner::GrandpaWarpSync { - inner: next_grandpa_warp_sync, - }; + let (next_grandpa_warp_sync, error) = self.inner.verify(); - (self.inner, error) + ( + AllSync { + inner: AllSyncInner::GrandpaWarpSync { + inner: next_grandpa_warp_sync, + }, + shared: self.shared, + }, + error.map_or(Ok(()), Result::Err), + ) } } @@ -2365,7 +2542,10 @@ impl StorageNextKey { enum AllSyncInner { GrandpaWarpSync { - inner: warp_sync::InProgressWarpSync>, + inner: warp_sync::InProgressWarpSync< + GrandpaWarpSyncSourceExtra, + GrandpaWarpSyncRequestExtra, + >, }, Optimistic { inner: optimistic::OptimisticSync< @@ -2409,6 +2589,11 @@ struct GrandpaWarpSyncSourceExtra { best_block_hash: [u8; 32], } +struct GrandpaWarpSyncRequestExtra { + outer_request_id: RequestId, + user_data: TRq, +} + struct Shared { sources: slab::Slab, requests: slab::Slab>, @@ -2435,7 +2620,10 @@ impl Shared { /// strategy. fn transition_grandpa_warp_sync_all_forks( &mut self, - grandpa: warp_sync::Success>, + grandpa: warp_sync::Success< + GrandpaWarpSyncSourceExtra, + GrandpaWarpSyncRequestExtra, + >, ) -> ( all_forks::AllForksSync, AllForksRequestExtra, AllForksSourceExtra>, host::HostVmPrototype, @@ -2459,6 +2647,54 @@ impl Shared { .iter() .all(|(_, s)| matches!(s, SourceMapping::GrandpaWarpSync(_)))); + for ( + source_id, + _, + GrandpaWarpSyncRequestExtra { + outer_request_id, + user_data, + }, + detail, + ) in grandpa.in_progress_requests + { + // TODO: DRY + let detail = match detail { + warp_sync::RequestDetail::WarpSyncRequest { block_hash } => { + RequestDetail::GrandpaWarpSync { + sync_start_block_hash: block_hash, + } + } + warp_sync::RequestDetail::RuntimeParametersGet { block_hash } => { + RequestDetail::StorageGet { + block_hash, + keys: vec![b":code".to_vec(), b":heappages".to_vec()], + } + } + warp_sync::RequestDetail::RuntimeCallMerkleProof { + block_hash, + function_name, + parameter_vectored, + } => RequestDetail::RuntimeCallMerkleProof { + block_hash, + function_name, + parameter_vectored, + }, + }; + + // TODO: O(n2) + let (source_id, _) = self + .sources + .iter() + .find(|(_, s)| match s { + SourceMapping::GrandpaWarpSync(s) if *s == source_id => true, + _ => false, + }) + .unwrap(); + + self.requests[outer_request_id.0] = + RequestMapping::Inline(SourceId(source_id), detail, user_data); + } + for source in grandpa.sources { let source_user_data = AllForksSourceExtra { user_data: source.user_data, @@ -2485,6 +2721,10 @@ impl Shared { .sources .iter() .all(|(_, s)| matches!(s, SourceMapping::AllForks(_)))); + debug_assert!(self + .requests + .iter() + .all(|(_, s)| matches!(s, RequestMapping::AllForks(..) | RequestMapping::Inline(..)))); ( all_forks, @@ -2500,6 +2740,7 @@ enum RequestMapping { Inline(SourceId, RequestDetail, TRq), AllForks(all_forks::RequestId), Optimistic(optimistic::RequestId), + WarpSync(warp_sync::RequestId), } #[derive(Debug, Clone, PartialEq, Eq)] @@ -2512,8 +2753,8 @@ enum SourceMapping { fn all_forks_request_convert( rq_params: all_forks::RequestParams, full_node: bool, -) -> RequestDetail { - RequestDetail::BlocksRequest { +) -> DesiredRequest { + DesiredRequest::BlocksRequest { ascending: false, // Hardcoded based on the logic of the all-forks syncing. first_block_hash: Some(rq_params.first_block_hash), first_block_height: rq_params.first_block_height, @@ -2527,8 +2768,8 @@ fn all_forks_request_convert( fn optimistic_request_convert( rq_params: optimistic::RequestDetail, full_node: bool, -) -> RequestDetail { - RequestDetail::BlocksRequest { +) -> DesiredRequest { + DesiredRequest::BlocksRequest { ascending: true, // Hardcoded based on the logic of the optimistic syncing. first_block_hash: None, first_block_height: rq_params.block_height.get(), diff --git a/src/sync/warp_sync.rs b/src/sync/warp_sync.rs index 21eead0c58..2e56e3eb53 100644 --- a/src/sync/warp_sync.rs +++ b/src/sync/warp_sync.rs @@ -48,15 +48,17 @@ //! of type `TSrc` associated to it. The content of this "user data" is at the discretion of the //! API user. //! -//! The [`InProgressWarpSync`] enum must be examined in order to determine how to make the warp -//! syncing process. +//! Similarly, at any given moment, this state machine holds a list of requests that concern these +//! sources. Use [`InProgressWarpSync::desired_requests`] to determine which requests will be +//! useful to the progress of the warp syncing. +//! +//! Use [`InProgressWarpSync::process_one`] in order to run verifications of the payloads that have +//! previously been downloaded. //! -//! At the end of the process, a [`Success`] is returned and can be used to kick-off another -//! syncing phase. use crate::{ chain::chain_information::{ - self, babe_fetch_epoch, BabeEpochInformation, ChainInformation, ChainInformationConsensus, + self, babe_fetch_epoch, ChainInformation, ChainInformationConsensus, ChainInformationConsensusRef, ChainInformationFinality, ChainInformationFinalityRef, ValidChainInformation, ValidChainInformationRef, }, @@ -66,11 +68,12 @@ use crate::{ vm::ExecHint, }, finality::grandpa::warp_sync, - header::{self, Header, HeaderRef}, + header::{self, Header}, + trie::proof_verify, }; -use alloc::vec::Vec; -use core::ops; +use alloc::{borrow::Cow, vec::Vec}; +use core::{iter, mem, ops}; pub use warp_sync::{Error as FragmentError, WarpSyncFragment}; @@ -90,6 +93,11 @@ pub enum Error { InvalidChain(chain_information::ValidityError), /// Chain uses an unrecognized consensus mechanism. UnknownConsensus, + /// Failed to verify call proof. + InvalidCallProof(proof_verify::Error), + /// Warp sync requires fetching the key that follows another one. This isn't implemented in + /// smoldot. + NextKeyUnimplemented, } /// The configuration for [`warp_sync()`]. @@ -103,14 +111,17 @@ pub struct Config { /// The initial capacity of the list of sources. pub sources_capacity: usize, + + /// The initial capacity of the list of requests. + pub requests_capacity: usize, } /// Initializes the warp sync state machine. /// /// On error, returns the [`ValidChainInformation`] that was provided in the configuration. -pub fn warp_sync( +pub fn warp_sync( config: Config, -) -> Result, (ValidChainInformation, WarpSyncInitError)> { +) -> Result, (ValidChainInformation, WarpSyncInitError)> { match config.start_chain_information.as_ref().finality { ChainInformationFinalityRef::Grandpa { .. } => {} _ => { @@ -121,14 +132,15 @@ pub fn warp_sync( } } - Ok(InProgressWarpSync::WaitingForSources(WaitingForSources { - state: PreVerificationState { - start_chain_information: config.start_chain_information, - block_number_bytes: config.block_number_bytes, - }, + Ok(InProgressWarpSync { + start_chain_information: config.start_chain_information, + block_number_bytes: config.block_number_bytes, sources: slab::Slab::with_capacity(config.sources_capacity), - previous_verifier_values: None, - })) + in_progress_requests: slab::Slab::with_capacity(config.requests_capacity), + phase: Phase::DownloadFragments { + previous_verifier_values: None, + }, + }) } /// Error potentially returned by [`warp_sync()`]. @@ -145,7 +157,7 @@ pub enum WarpSyncInitError { pub struct SourceId(usize); /// The result of a successful warp sync. -pub struct Success { +pub struct Success { /// The synced chain information. pub chain_information: ValidChainInformation, @@ -161,968 +173,1029 @@ pub struct Success { /// The list of sources that were added to the state machine. pub sources: Vec, + + /// The list of requests that were added to the state machine. + pub in_progress_requests: Vec<(SourceId, RequestId, TRq, RequestDetail)>, } /// The warp sync state machine. #[derive(derive_more::From)] -pub enum WarpSync { +pub enum WarpSync { /// Warp syncing is over. - Finished(Success), + Finished(Success), /// Warp syncing is in progress, - InProgress(InProgressWarpSync), + InProgress(InProgressWarpSync), } -#[derive(derive_more::From)] -pub enum InProgressWarpSync { - /// Loading a storage value is required in order to continue. - #[from] - StorageGet(StorageGet), - /// Fetching the key that follows a given one is required in order to continue. - #[from] - NextKey(NextKey), - /// Verifying the warp sync response is required to continue. - #[from] - Verifier(Verifier), - /// Requesting GrandPa warp sync data from a source is required to continue. - #[from] - WarpSyncRequest(GrandpaWarpSyncRequest), - /// Fetching the parameters for the virtual machine is required to continue. - #[from] - VirtualMachineParamsGet(VirtualMachineParamsGet), - /// Adding more sources of warp sync data to is required to continue. - #[from] - WaitingForSources(WaitingForSources), +impl ops::Index for InProgressWarpSync { + type Output = TSrc; + + #[track_caller] + fn index(&self, source_id: SourceId) -> &TSrc { + debug_assert!(self.sources.contains(source_id.0)); + &self.sources[source_id.0].user_data + } } -impl WarpSync { - fn from_babe_fetch_epoch_query( - mut query: babe_fetch_epoch::Query, - mut fetched_current_epoch: Option, - mut state: PostVerificationState, - post_download: PostRuntimeDownloadState, - ) -> (Self, Option) { - loop { - match (query, fetched_current_epoch) { - ( - babe_fetch_epoch::Query::Finished { - result: Ok(next_epoch), - virtual_machine, - }, - Some(current_epoch), - ) => { - // The number of slots per epoch is never modified once the chain is running, - // and as such is copied from the original chain information. - let slots_per_epoch = match state.start_chain_information.as_ref().consensus { - ChainInformationConsensusRef::Babe { - slots_per_epoch, .. - } => slots_per_epoch, - _ => unreachable!(), - }; +impl ops::IndexMut for InProgressWarpSync { + #[track_caller] + fn index_mut(&mut self, source_id: SourceId) -> &mut TSrc { + debug_assert!(self.sources.contains(source_id.0)); + &mut self.sources[source_id.0].user_data + } +} - // Build a `ChainInformation` using the parameters found in the runtime. - // It is possible, however, that the runtime produces parameters that aren't - // coherent. For example the runtime could give "current" and "next" Babe - // epochs that don't follow each other. - let chain_information = - match ValidChainInformation::try_from(ChainInformation { - finalized_block_header: state.header, - finality: state.chain_information_finality, - consensus: ChainInformationConsensus::Babe { - finalized_block_epoch_information: Some(current_epoch), - finalized_next_epoch_transition: next_epoch, - slots_per_epoch, - }, - }) { - Ok(ci) => ci, - Err(err) => { - return ( - Self::InProgress( - InProgressWarpSync::warp_sync_request_from_next_source( - state.sources, - PreVerificationState { - start_chain_information: state - .start_chain_information, - block_number_bytes: state.block_number_bytes, - }, - None, - ), - ), - Some(Error::InvalidChain(err)), - ) - } - }; +/// Warp syncing process now obtaining the chain information. +pub struct InProgressWarpSync { + phase: Phase, + start_chain_information: ValidChainInformation, + block_number_bytes: usize, + sources: slab::Slab>, + in_progress_requests: slab::Slab<(SourceId, TRq, RequestDetail)>, +} - return ( - Self::Finished(Success { - chain_information, - finalized_runtime: virtual_machine, - finalized_storage_code: post_download.finalized_storage_code, - finalized_storage_heap_pages: post_download - .finalized_storage_heap_pages, - sources: state - .sources - .drain() - .map(|source| source.user_data) - .collect(), - }), - None, - ); - } - ( - babe_fetch_epoch::Query::Finished { - result: Ok(current_epoch), - virtual_machine, - }, - None, - ) => { - fetched_current_epoch = Some(current_epoch); - query = babe_fetch_epoch::babe_fetch_epoch(babe_fetch_epoch::Config { - runtime: virtual_machine, - epoch_to_fetch: babe_fetch_epoch::BabeEpochToFetch::NextEpoch, - }); - } - ( - babe_fetch_epoch::Query::Finished { - result: Err(error), - virtual_machine: _, - }, - _, - ) => { - return ( - Self::InProgress(InProgressWarpSync::warp_sync_request_from_next_source( - state.sources, - PreVerificationState { - start_chain_information: state.start_chain_information, - block_number_bytes: state.block_number_bytes, - }, - None, - )), - Some(Error::BabeFetchEpoch(error)), - ) - } - (babe_fetch_epoch::Query::StorageGet(storage_get), fetched_current_epoch) => { - return ( - Self::InProgress(InProgressWarpSync::StorageGet(StorageGet { - inner: storage_get, - fetched_current_epoch, - state, - post_download, - })), - None, - ) - } - (babe_fetch_epoch::Query::StorageRoot(storage_root), e) => { - fetched_current_epoch = e; - query = storage_root.resume(&state.header.state_root); - } - (babe_fetch_epoch::Query::NextKey(next_key), fetched_current_epoch) => { - return ( - Self::InProgress(InProgressWarpSync::NextKey(NextKey { - inner: next_key, - fetched_current_epoch, - state, - post_download, - })), - None, - ) - } - } - } - } +enum Phase { + DownloadFragments { + previous_verifier_values: Option<(Header, ChainInformationFinality)>, + }, + PendingVerify { + previous_verifier_values: Option<(Header, ChainInformationFinality)>, + downloaded_source: SourceId, + final_set_of_fragments: bool, + /// Always `Some`. + verifier: Option, + }, + PostVerification { + header: Header, + chain_information_finality: ChainInformationFinality, + warp_sync_source_id: SourceId, + // TODO: use struct instead? + runtime: Option<(Option>, Option>)>, + babeapi_current_epoch_response: Option>>, + babeapi_next_epoch_response: Option>>, + }, } -impl InProgressWarpSync { +impl InProgressWarpSync { /// Returns the value that was initially passed in [`Config::block_number_bytes`]. pub fn block_number_bytes(&self) -> usize { - match self { - Self::StorageGet(storage_get) => storage_get.state.block_number_bytes, - Self::NextKey(next_key) => next_key.state.block_number_bytes, - Self::Verifier(verifier) => verifier.state.block_number_bytes, - Self::WarpSyncRequest(warp_sync_request) => warp_sync_request.state.block_number_bytes, - Self::VirtualMachineParamsGet(virtual_machine_params_get) => { - virtual_machine_params_get.state.block_number_bytes - } - Self::WaitingForSources(waiting_for_sources) => { - waiting_for_sources.state.block_number_bytes - } - } + self.block_number_bytes } /// Returns the chain information that is considered verified. pub fn as_chain_information(&self) -> ValidChainInformationRef { - match self { - Self::StorageGet(storage_get) => &storage_get.state.start_chain_information, - Self::NextKey(next_key) => &next_key.state.start_chain_information, - Self::Verifier(verifier) => &verifier.state.start_chain_information, - Self::WarpSyncRequest(warp_sync_request) => { - &warp_sync_request.state.start_chain_information - } - Self::VirtualMachineParamsGet(virtual_machine_params_get) => { - &virtual_machine_params_get.state.start_chain_information - } - Self::WaitingForSources(waiting_for_sources) => { - &waiting_for_sources.state.start_chain_information - } - } - .into() + // Note: after verifying a warp sync fragment, we are certain that the header targeted by + // this fragment is indeed part of the chain. However, this is not enough in order to + // produce a full chain information struct. Such struct can only be produced after the + // entire warp syncing has succeeded. If if it still in progress, all we can return is + // the starting point. + (&self.start_chain_information).into() } /// Returns a list of all known sources stored in the state machine. pub fn sources(&'_ self) -> impl Iterator + '_ { - let sources = match self { - Self::StorageGet(storage_get) => &storage_get.state.sources, - Self::NextKey(next_key) => &next_key.state.sources, - Self::Verifier(verifier) => &verifier.sources, - Self::WarpSyncRequest(warp_sync_request) => &warp_sync_request.sources, - Self::VirtualMachineParamsGet(virtual_machine_params_get) => { - &virtual_machine_params_get.state.sources - } - Self::WaitingForSources(waiting_for_sources) => &waiting_for_sources.sources, - }; - - sources.iter().map(|(id, _)| SourceId(id)) + self.sources.iter().map(|(id, _)| SourceId(id)) } - fn warp_sync_request_from_next_source( - mut sources: slab::Slab>, - state: PreVerificationState, - previous_verifier_values: Option<(Header, ChainInformationFinality)>, - ) -> Self { - // It is possible for a source to be banned because of, say, networking errors. - // If all sources are "banned", unban them in order to try make progress again with the - // hopes that this time it will work. - if sources.iter().all(|(_, s)| s.already_tried) { - for (_, s) in &mut sources { - s.already_tried = false; - } - } - - let next_id = sources - .iter() - .find(|(_, s)| !s.already_tried) - .map(|(id, _)| SourceId(id)); - - if let Some(next_id) = next_id { - Self::WarpSyncRequest(GrandpaWarpSyncRequest { - source_id: next_id, - sources, - state, - previous_verifier_values, - }) - } else { - debug_assert!(sources.is_empty()); - Self::WaitingForSources(WaitingForSources { - sources, - state, - previous_verifier_values, - }) - } + /// Add a source to the list of sources. + pub fn add_source(&mut self, user_data: TSrc) -> SourceId { + SourceId(self.sources.insert(Source { + user_data, + already_tried: false, + })) } - /// Remove a source from the list of sources. + /// Removes a source from the list of sources. In addition to the user data associated to this + /// source, also returns a list of requests that were in progress concerning this source. These + /// requests are now considered obsolete. /// /// # Panic /// - /// Panics if the source wasn't added to the list earlier. + /// Panics if the [`SourceId`] is invalid. /// - pub fn remove_source(self, to_remove: SourceId) -> (TSrc, InProgressWarpSync) { - match self { - Self::WaitingForSources(waiting_for_sources) => { - waiting_for_sources.remove_source(to_remove) - } - Self::WarpSyncRequest(warp_sync_request) => warp_sync_request.remove_source(to_remove), - Self::Verifier(verifier) => verifier.remove_source(to_remove), - Self::VirtualMachineParamsGet(mut virtual_machine_params_get) => { - let (removed, result) = virtual_machine_params_get.state.remove_source(to_remove); - match result { - StateRemoveSourceResult::RemovedOther(state) => { - virtual_machine_params_get.state = state; - ( - removed, - Self::VirtualMachineParamsGet(virtual_machine_params_get), - ) - } - StateRemoveSourceResult::RemovedCurrent(warp_sync) => (removed, warp_sync), - } - } - Self::StorageGet(mut storage_get) => { - let (removed, result) = storage_get.state.remove_source(to_remove); - match result { - StateRemoveSourceResult::RemovedOther(state) => { - storage_get.state = state; - (removed, Self::StorageGet(storage_get)) - } - StateRemoveSourceResult::RemovedCurrent(warp_sync) => (removed, warp_sync), + pub fn remove_source( + &'_ mut self, + to_remove: SourceId, + ) -> (TSrc, impl Iterator + '_) { + debug_assert!(self.sources.contains(to_remove.0)); + let removed = self.sources.remove(to_remove.0).user_data; + + if let Phase::PostVerification { + warp_sync_source_id, + header, + chain_information_finality, + .. + } = &self.phase + { + if to_remove == *warp_sync_source_id { + self.phase = Phase::DownloadFragments { + previous_verifier_values: Some(( + header.clone(), + chain_information_finality.clone(), + )), } } - Self::NextKey(mut next_key) => { - let (removed, result) = next_key.state.remove_source(to_remove); - match result { - StateRemoveSourceResult::RemovedOther(state) => { - next_key.state = state; - (removed, Self::NextKey(next_key)) - } - StateRemoveSourceResult::RemovedCurrent(warp_sync) => (removed, warp_sync), + } else if let Phase::PendingVerify { + previous_verifier_values, + downloaded_source, + .. + } = &mut self.phase + { + // We make sure to not leave invalid source IDs in the state of `self`. + // While it is a waste of bandwidth to completely remove a proof that has already + // been downloaded if the source disconnects, it is in practice not something that is + // supposed to happen. + if *downloaded_source == to_remove { + self.phase = Phase::DownloadFragments { + previous_verifier_values: previous_verifier_values.take(), } } } + + let obsolete_requests_indices = self + .in_progress_requests + .iter() + .filter_map(|(id, (src, _, _))| if *src == to_remove { Some(id) } else { None }) + .collect::>(); + let mut obsolete_requests = Vec::with_capacity(obsolete_requests_indices.len()); + for index in obsolete_requests_indices { + let (_, user_data, _) = self.in_progress_requests.remove(index); + obsolete_requests.push((RequestId(index), user_data)); + } + + (removed, obsolete_requests.into_iter()) } -} -impl ops::Index for InProgressWarpSync { - type Output = TSrc; + /// Returns a list of requests that should be started in order to drive the warp syncing + /// process to completion. + /// + /// Once a request that matches a desired request is added through + /// [`InProgressWarpSync::add_request`], it is no longer returned by this function. + pub fn desired_requests( + &'_ self, + ) -> impl Iterator + '_ { + let warp_sync_request = if let Phase::DownloadFragments { + previous_verifier_values, + } = &self.phase + { + // TODO: it feels like a hack to try again sources that have failed in the past + let all_sources_already_tried = self.sources.iter().all(|(_, s)| s.already_tried); + + let start_block_hash = match previous_verifier_values.as_ref() { + Some((header, _)) => header.hash(self.block_number_bytes), + None => self + .start_chain_information + .as_ref() + .finalized_block_header + .hash(self.block_number_bytes), + }; - #[track_caller] - fn index(&self, source_id: SourceId) -> &TSrc { - let sources = match self { - Self::StorageGet(storage_get) => &storage_get.state.sources, - Self::NextKey(next_key) => &next_key.state.sources, - Self::Verifier(verifier) => &verifier.sources, - Self::WarpSyncRequest(warp_sync_request) => &warp_sync_request.sources, - Self::VirtualMachineParamsGet(virtual_machine_params_get) => { - &virtual_machine_params_get.state.sources + if !self + .in_progress_requests + .iter() + .any(|(_, (_, _, rq))| match rq { + RequestDetail::WarpSyncRequest { block_hash } + if *block_hash == start_block_hash => + { + true + } + _ => false, + }) + { + either::Left(self.sources.iter().filter_map(move |(src_id, src)| { + // TODO: also filter by source finalized block? so that we don't request from sources below us + if all_sources_already_tried || !src.already_tried { + Some(( + SourceId(src_id), + &src.user_data, + DesiredRequest::WarpSyncRequest { + block_hash: start_block_hash, + }, + )) + } else { + None + } + })) + } else { + either::Right(iter::empty()) } - Self::WaitingForSources(waiting_for_sources) => &waiting_for_sources.sources, + } else { + either::Right(iter::empty()) }; - debug_assert!(sources.contains(source_id.0)); - &sources[source_id.0].user_data - } -} - -impl ops::IndexMut for InProgressWarpSync { - #[track_caller] - fn index_mut(&mut self, source_id: SourceId) -> &mut TSrc { - let sources = match self { - Self::StorageGet(storage_get) => &mut storage_get.state.sources, - Self::NextKey(next_key) => &mut next_key.state.sources, - Self::Verifier(verifier) => &mut verifier.sources, - Self::WarpSyncRequest(warp_sync_request) => &mut warp_sync_request.sources, - Self::VirtualMachineParamsGet(virtual_machine_params_get) => { - &mut virtual_machine_params_get.state.sources + let runtime_parameters_get = if let Phase::PostVerification { + header, + warp_sync_source_id, + .. + } = &self.phase + { + if !self.in_progress_requests.iter().any(|(_, rq)| { + rq.0 == *warp_sync_source_id + && match rq.2 { + RequestDetail::RuntimeParametersGet { block_hash: b } + if b == header.hash(self.block_number_bytes) => + { + true + } + _ => false, + } + }) { + Some(( + *warp_sync_source_id, + &self.sources[warp_sync_source_id.0].user_data, + DesiredRequest::RuntimeParametersGet { + block_hash: header.hash(self.block_number_bytes), + state_trie_root: header.state_root, + }, + )) + } else { + None } - Self::WaitingForSources(waiting_for_sources) => &mut waiting_for_sources.sources, + } else { + None }; - debug_assert!(sources.contains(source_id.0)); - &mut sources[source_id.0].user_data - } -} - -/// Loading a storage value is required in order to continue. -#[must_use] -pub struct StorageGet { - inner: babe_fetch_epoch::StorageGet, - fetched_current_epoch: Option, - state: PostVerificationState, - post_download: PostRuntimeDownloadState, -} - -impl StorageGet { - /// Returns the key whose value must be passed to [`StorageGet::inject_value`]. - pub fn key(&'_ self) -> impl Iterator + '_> + '_ { - self.inner.key() - } - - /// Returns the source that we received the warp sync data from. - pub fn warp_sync_source(&self) -> (SourceId, &TSrc) { - debug_assert!(self - .state - .sources - .contains(self.state.warp_sync_source_id.0)); - - ( - self.state.warp_sync_source_id, - &self.state.sources[self.state.warp_sync_source_id.0].user_data, - ) - } + let babe_current_epoch = if let Phase::PostVerification { + header, + warp_sync_source_id, + babeapi_current_epoch_response: None, + .. + } = &self.phase + { + if !self.in_progress_requests.iter().any(|(_, rq)| { + rq.0 == *warp_sync_source_id + && match rq.2 { + RequestDetail::RuntimeCallMerkleProof { + block_hash: b, + function_name: ref f, + parameter_vectored: ref p, + } if b == header.hash(self.block_number_bytes) + && f == "BabeApi_current_epoch" + && p.is_empty() => + { + true + } + _ => false, + } + }) { + Some(( + *warp_sync_source_id, + &self.sources[warp_sync_source_id.0].user_data, + DesiredRequest::RuntimeCallMerkleProof { + block_hash: header.hash(self.block_number_bytes), + function_name: "BabeApi_current_epoch".into(), + parameter_vectored: Cow::Borrowed(&[]), + }, + )) + } else { + None + } + } else { + None + }; - /// Returns the header that we're warp syncing up to. - pub fn warp_sync_header(&self) -> HeaderRef { - (&self.state.header).into() - } + let babe_next_epoch = if let Phase::PostVerification { + header, + warp_sync_source_id, + babeapi_next_epoch_response: None, + .. + } = &self.phase + { + if !self.in_progress_requests.iter().any(|(_, rq)| { + rq.0 == *warp_sync_source_id + && match rq.2 { + RequestDetail::RuntimeCallMerkleProof { + block_hash: b, + function_name: ref f, + parameter_vectored: ref p, + } if b == header.hash(self.block_number_bytes) + && f == "BabeApi_next_epoch" + && p.is_empty() => + { + true + } + _ => false, + } + }) { + Some(( + *warp_sync_source_id, + &self.sources[warp_sync_source_id.0].user_data, + DesiredRequest::RuntimeCallMerkleProof { + block_hash: header.hash(self.block_number_bytes), + function_name: "BabeApi_next_epoch".into(), + parameter_vectored: Cow::Borrowed(&[]), + }, + )) + } else { + None + } + } else { + None + }; - /// Add a source to the list of sources. - pub fn add_source(&mut self, user_data: TSrc) -> SourceId { - SourceId(self.state.sources.insert(Source { - user_data, - already_tried: false, - })) + warp_sync_request + .chain(runtime_parameters_get.into_iter()) + .chain(babe_current_epoch.into_iter()) + .chain(babe_next_epoch.into_iter()) } - /// Returns the key whose value must be passed to [`StorageGet::inject_value`]. + /// Inserts a new request in the data structure. /// - /// This method is a shortcut for calling `key` and concatenating the returned slices. - pub fn key_as_vec(&self) -> Vec { - self.inner.key_as_vec() - } - - /// Injects the corresponding storage value. - pub fn inject_value( - self, - value: Option>>, - ) -> (WarpSync, Option) { - WarpSync::from_babe_fetch_epoch_query( - self.inner.inject_value(value), - self.fetched_current_epoch, - self.state, - self.post_download, - ) - } - - /// Injects a failure to retrieve the storage value. - pub fn inject_error(self) -> InProgressWarpSync { - InProgressWarpSync::warp_sync_request_from_next_source( - self.state.sources, - PreVerificationState { - start_chain_information: self.state.start_chain_information, - block_number_bytes: self.state.block_number_bytes, - }, - None, - ) - } -} - -/// Fetching the key that follows a given one is required in order to continue. -#[must_use] -pub struct NextKey { - inner: babe_fetch_epoch::NextKey, - fetched_current_epoch: Option, - state: PostVerificationState, - post_download: PostRuntimeDownloadState, -} - -impl NextKey { - /// Returns the key whose next key must be passed back. - pub fn key(&'_ self) -> impl AsRef<[u8]> + '_ { - self.inner.key() - } - - /// Returns the source that we received the warp sync data from. - pub fn warp_sync_source(&self) -> (SourceId, &TSrc) { - debug_assert!(self - .state - .sources - .contains(self.state.warp_sync_source_id.0)); - ( - self.state.warp_sync_source_id, - &self.state.sources[self.state.warp_sync_source_id.0].user_data, - ) - } - - /// Returns the header that we're warp syncing up to. - pub fn warp_sync_header(&self) -> HeaderRef { - (&self.state.header).into() - } - - /// Add a source to the list of sources. - pub fn add_source(&mut self, user_data: TSrc) -> SourceId { - SourceId(self.state.sources.insert(Source { - user_data, - already_tried: false, - })) - } - - /// Injects the key. + /// > **Note**: The request doesn't necessarily have to match a request returned by + /// > [`InProgressWarpSync::desired_requests`]. /// /// # Panic /// - /// Panics if the key passed as parameter isn't strictly superior to the requested key. + /// Panics if the [`SourceId`] is out of range. /// - pub fn inject_key(self, key: Option>) -> (WarpSync, Option) { - WarpSync::from_babe_fetch_epoch_query( - self.inner.inject_key(key), - self.fetched_current_epoch, - self.state, - self.post_download, + pub fn add_request( + &mut self, + source_id: SourceId, + user_data: TRq, + detail: RequestDetail, + ) -> RequestId { + assert!(self.sources.contains(source_id.0)); + RequestId( + self.in_progress_requests + .insert((source_id, user_data, detail)), ) } -} - -/// Verifying the warp sync response is required to continue. -pub struct Verifier { - verifier: warp_sync::Verifier, - state: PreVerificationState, - warp_sync_source_id: SourceId, - sources: slab::Slab>, - final_set_of_fragments: bool, - previous_verifier_values: Option<(Header, ChainInformationFinality)>, -} -impl Verifier { - /// Add a source to the list of sources. - pub fn add_source(&mut self, user_data: TSrc) -> SourceId { - SourceId(self.sources.insert(Source { - user_data, - already_tried: false, - })) + /// Removes the given request from the state machine. Returns the user data that was associated + /// to it. + /// + /// # Panic + /// + /// Panics if the [`RequestId`] is invalid. + /// + pub fn fail_request(&mut self, id: RequestId) -> TRq { + match (self.in_progress_requests.remove(id.0), &mut self.phase) { + ((source_id, user_data, RequestDetail::WarpSyncRequest { .. }), _) => { + // TODO: check that block hash matches starting point? ^ + self.sources[source_id.0].already_tried = true; + user_data + } + ( + ( + source_id, + user_data, + RequestDetail::RuntimeCallMerkleProof { .. } + | RequestDetail::RuntimeParametersGet { .. }, + ), + Phase::PostVerification { + header, + chain_information_finality, + warp_sync_source_id, + .. + }, + ) if source_id == *warp_sync_source_id => { + self.phase = Phase::DownloadFragments { + previous_verifier_values: Some(( + header.clone(), + chain_information_finality.clone(), + )), + }; + user_data + } + ( + ( + _, + user_data, + RequestDetail::RuntimeCallMerkleProof { .. } + | RequestDetail::RuntimeParametersGet { .. }, + ), + _, + ) => user_data, + } } - /// Remove a source from the list of sources. + /// Injects a successful response and removes the given request from the state machine. Returns + /// the user data that was associated to it. /// /// # Panic /// - /// Panics if the source wasn't added to the list earlier. + /// Panics if the [`RequestId`] is invalid. + /// Panics if the [`RequestId`] doesn't correspond to a runtime parameters get request. /// - pub fn remove_source(mut self, to_remove: SourceId) -> (TSrc, InProgressWarpSync) { - debug_assert!(self.sources.contains(to_remove.0)); - let removed = self.sources.remove(to_remove.0).user_data; - - if to_remove == self.warp_sync_source_id { - let next_state = InProgressWarpSync::warp_sync_request_from_next_source( - self.sources, - self.state, - self.previous_verifier_values, - ); + pub fn runtime_parameters_get_success( + &mut self, + id: RequestId, + code: Option>, + heap_pages: Option>, + ) -> TRq { + let user_data = match (self.in_progress_requests.remove(id.0), &self.phase) { + ( + (_, user_data, RequestDetail::RuntimeParametersGet { block_hash }), + Phase::PostVerification { header, .. }, + ) if block_hash == header.hash(self.block_number_bytes) => user_data, + ((_, user_data, RequestDetail::RuntimeParametersGet { .. }), _) => return user_data, + ( + ( + _, + _, + RequestDetail::RuntimeCallMerkleProof { .. } + | RequestDetail::WarpSyncRequest { .. }, + ), + _, + ) => panic!(), + }; - (removed, next_state) + if let Phase::PostVerification { + runtime: ref mut runtime_store, + .. + } = self.phase + { + *runtime_store = Some(( + code.map(|c| c.as_ref().to_vec()), + heap_pages.map(|hp| hp.as_ref().to_vec()), + )); } else { - (removed, InProgressWarpSync::Verifier(self)) + // This is checked at the beginning of this function. + unreachable!() } - } - /// Returns the identifier and user data of the source that has sent the fragment that is to - /// be verified. - pub fn proof_sender(&self) -> (SourceId, &TSrc) { - ( - self.warp_sync_source_id, - &self.sources[self.warp_sync_source_id.0].user_data, - ) + user_data } - /// Verifies the next warp sync fragment in queue. - pub fn next(self) -> (InProgressWarpSync, Result<(), FragmentError>) { - match self.verifier.next() { - Ok(warp_sync::Next::NotFinished(next_verifier)) => ( - InProgressWarpSync::Verifier(Self { - verifier: next_verifier, - state: self.state, - sources: self.sources, - warp_sync_source_id: self.warp_sync_source_id, - final_set_of_fragments: self.final_set_of_fragments, - previous_verifier_values: self.previous_verifier_values, - }), - Ok(()), - ), - Ok(warp_sync::Next::EmptyProof) => ( - // TODO: should return success immediately; unfortunately the AllSync is quite complicated to update if we do this - InProgressWarpSync::VirtualMachineParamsGet(VirtualMachineParamsGet { - state: PostVerificationState { - header: self - .state - .start_chain_information - .as_ref() - .finalized_block_header - .into(), - chain_information_finality: self - .state - .start_chain_information - .as_ref() - .finality - .into(), - block_number_bytes: self.state.block_number_bytes, - start_chain_information: self.state.start_chain_information, - sources: self.sources, - warp_sync_source_id: self.warp_sync_source_id, + /// Injects a successful response and removes the given request from the state machine. Returns + /// the user data that was associated to it. + /// + /// # Panic + /// + /// Panics if the [`RequestId`] is invalid. + /// Panics if the [`RequestId`] doesn't correspond to a runtime Merkle call proof request. + /// + pub fn runtime_call_merkle_proof_success( + &mut self, + request_id: RequestId, + response: impl Iterator>, + ) -> TRq { + match ( + self.in_progress_requests.remove(request_id.0), + &mut self.phase, + ) { + ( + ( + _, + user_data, + RequestDetail::RuntimeCallMerkleProof { + block_hash, + function_name, + parameter_vectored, }, - }), - Ok(()), - ), - Ok(warp_sync::Next::Success { - scale_encoded_header, - chain_information_finality, - }) => { - // As the verification of the fragment has succeeded, we are sure that the header - // is valid and can decode it. - let header: Header = - header::decode(&scale_encoded_header, self.state.block_number_bytes) - .unwrap() - .into(); - - if self.final_set_of_fragments { - ( - InProgressWarpSync::VirtualMachineParamsGet(VirtualMachineParamsGet { - state: PostVerificationState { - header, - chain_information_finality, - start_chain_information: self.state.start_chain_information, - block_number_bytes: self.state.block_number_bytes, - sources: self.sources, - warp_sync_source_id: self.warp_sync_source_id, - }, - }), - Ok(()), - ) - } else { - ( - InProgressWarpSync::WarpSyncRequest(GrandpaWarpSyncRequest { - source_id: self.warp_sync_source_id, - sources: self.sources, - state: self.state, - previous_verifier_values: Some((header, chain_information_finality)), - }), - Ok(()), - ) - } + ), + Phase::PostVerification { + ref header, + ref mut babeapi_current_epoch_response, + .. + }, + ) if block_hash == header.hash(self.block_number_bytes) + && function_name == "BabeApi_current_epoch" + && parameter_vectored.is_empty() => + { + *babeapi_current_epoch_response = + Some(response.map(|e| e.as_ref().to_vec()).collect()); + user_data } - Err(error) => ( - InProgressWarpSync::warp_sync_request_from_next_source( - self.sources, - self.state, - self.previous_verifier_values, + ( + ( + _, + user_data, + RequestDetail::RuntimeCallMerkleProof { + block_hash, + function_name, + parameter_vectored, + }, ), - Err(error), - ), + Phase::PostVerification { + ref header, + ref mut babeapi_next_epoch_response, + .. + }, + ) if block_hash == header.hash(self.block_number_bytes) + && function_name == "BabeApi_next_epoch" + && parameter_vectored.is_empty() => + { + *babeapi_next_epoch_response = + Some(response.map(|e| e.as_ref().to_vec()).collect()); + user_data + } + ((_, user_data, RequestDetail::RuntimeCallMerkleProof { .. }), _) => return user_data, + ( + (_, _, RequestDetail::RuntimeParametersGet { .. }) + | (_, _, RequestDetail::WarpSyncRequest { .. }), + _, + ) => panic!(), } } -} -struct PreVerificationState { - start_chain_information: ValidChainInformation, - block_number_bytes: usize, -} + /// Injects a successful response and removes the given request from the state machine. Returns + /// the user data that was associated to it. + /// + /// # Panic + /// + /// Panics if the [`RequestId`] is invalid. + /// Panics if the [`RequestId`] doesn't correspond to a warp sync request. + /// + pub fn warp_sync_request_success( + &mut self, + request_id: RequestId, + fragments: Vec, + final_set_of_fragments: bool, + ) -> TRq { + match ( + self.in_progress_requests.remove(request_id.0), + &mut self.phase, + ) { + ( + (rq_source_id, user_data, RequestDetail::WarpSyncRequest { block_hash }), + Phase::DownloadFragments { + previous_verifier_values, + }, + ) => { + let desired_block_hash = match previous_verifier_values.as_ref() { + Some((header, _)) => header.hash(self.block_number_bytes), + None => self + .start_chain_information + .as_ref() + .finalized_block_header + .hash(self.block_number_bytes), + }; + if desired_block_hash != block_hash { + return user_data; + } -struct PostVerificationState { - header: Header, - chain_information_finality: ChainInformationFinality, - start_chain_information: ValidChainInformation, - block_number_bytes: usize, - sources: slab::Slab>, - warp_sync_source_id: SourceId, -} + self.sources[rq_source_id.0].already_tried = true; -impl PostVerificationState { - fn remove_source(mut self, to_remove: SourceId) -> (TSrc, StateRemoveSourceResult) { - debug_assert!(self.sources.contains(to_remove.0)); - let removed = self.sources.remove(to_remove.0).user_data; - - if to_remove == self.warp_sync_source_id { - ( - removed, - StateRemoveSourceResult::RemovedCurrent( - InProgressWarpSync::warp_sync_request_from_next_source( - self.sources, - PreVerificationState { - start_chain_information: self.start_chain_information, - block_number_bytes: self.block_number_bytes, - }, - None, + let verifier = match &previous_verifier_values { + Some((_, chain_information_finality)) => warp_sync::Verifier::new( + chain_information_finality.into(), + self.block_number_bytes, + fragments, + final_set_of_fragments, ), - ), - ) - } else { - (removed, StateRemoveSourceResult::RemovedOther(self)) + None => warp_sync::Verifier::new( + self.start_chain_information.as_ref().finality, + self.block_number_bytes, + fragments, + final_set_of_fragments, + ), + }; + + self.phase = Phase::PendingVerify { + previous_verifier_values: previous_verifier_values.take(), + final_set_of_fragments, + downloaded_source: rq_source_id, + verifier: Some(verifier), + }; + + user_data + } + ((_, user_data, RequestDetail::WarpSyncRequest { .. }), _) => { + // Uninteresting download. We simply ignore the response. + user_data + } + ((_, _, _), _) => panic!(), } } -} -enum StateRemoveSourceResult { - RemovedCurrent(InProgressWarpSync), - RemovedOther(PostVerificationState), -} + /// Start processing one CPU operation. + /// + /// This function takes ownership of `self` and yields it back after the operation is finished. + pub fn process_one(self) -> ProcessOne { + if let Phase::PostVerification { + runtime: Some(_), + babeapi_current_epoch_response: Some(_), + babeapi_next_epoch_response: Some(_), + .. + } = &self.phase + { + return ProcessOne::BuildChainInformation(BuildChainInformation { inner: self }); + } + + if let Phase::PendingVerify { .. } = &self.phase { + return ProcessOne::VerifyWarpSyncFragment(VerifyWarpSyncFragment { inner: self }); + } -struct PostRuntimeDownloadState { - finalized_storage_code: Option>, - finalized_storage_heap_pages: Option>, + ProcessOne::Idle(self) + } } -/// Requesting GrandPa warp sync data from a source is required to continue. -pub struct GrandpaWarpSyncRequest { - source_id: SourceId, - sources: slab::Slab>, - state: PreVerificationState, - previous_verifier_values: Option<(Header, ChainInformationFinality)>, +#[derive(Debug, Copy, Clone)] +struct Source { + user_data: TSrc, + /// `true` if this source has been in a past `WarpSyncRequest`. `false` if the source is + /// currently in a `WarpSyncRequest`. + already_tried: bool, } -impl GrandpaWarpSyncRequest { - /// The source to make a GrandPa warp sync request to. - pub fn current_source(&self) -> (SourceId, &TSrc) { - debug_assert!(self.sources.contains(self.source_id.0)); - (self.source_id, &self.sources[self.source_id.0].user_data) - } +#[derive(Debug, Clone)] +pub enum DesiredRequest { + WarpSyncRequest { + block_hash: [u8; 32], + }, + RuntimeParametersGet { + block_hash: [u8; 32], + state_trie_root: [u8; 32], + }, + RuntimeCallMerkleProof { + block_hash: [u8; 32], + function_name: Cow<'static, str>, + parameter_vectored: Cow<'static, [u8]>, + }, +} - /// The hash of the header to warp sync from. - pub fn start_block_hash(&self) -> [u8; 32] { - match self.previous_verifier_values.as_ref() { - Some((header, _)) => header.hash(self.state.block_number_bytes), - None => self - .state - .start_chain_information - .as_ref() - .finalized_block_header - .hash(self.state.block_number_bytes), - } - } +#[derive(Debug, Clone)] +pub enum RequestDetail { + WarpSyncRequest { + block_hash: [u8; 32], + }, + RuntimeParametersGet { + block_hash: [u8; 32], + }, + RuntimeCallMerkleProof { + block_hash: [u8; 32], + function_name: Cow<'static, str>, + parameter_vectored: Cow<'static, [u8]>, + }, +} - /// Add a source to the list of sources. - pub fn add_source(&mut self, user_data: TSrc) -> SourceId { - SourceId(self.sources.insert(Source { - user_data, - already_tried: false, - })) - } +/// Identifier for a request in the warp sync state machine. +#[derive(Debug, Copy, Clone, Ord, PartialOrd, Eq, PartialEq, Hash)] +pub struct RequestId(usize); - /// Remove a source from the list of sources. - /// - /// # Panic - /// - /// Panics if the source wasn't added to the list earlier. - /// - pub fn remove_source(mut self, to_remove: SourceId) -> (TSrc, InProgressWarpSync) { - debug_assert!(self.sources.contains(to_remove.0)); - let removed = self.sources.remove(to_remove.0).user_data; +pub enum ProcessOne { + Idle(InProgressWarpSync), + VerifyWarpSyncFragment(VerifyWarpSyncFragment), + BuildChainInformation(BuildChainInformation), +} - if to_remove == self.source_id { - let next_state = InProgressWarpSync::warp_sync_request_from_next_source( - self.sources, - self.state, - self.previous_verifier_values, - ); +pub struct VerifyWarpSyncFragment { + inner: InProgressWarpSync, +} - (removed, next_state) +impl VerifyWarpSyncFragment { + pub fn proof_sender(&self) -> (SourceId, &TSrc) { + if let Phase::PendingVerify { + downloaded_source, .. + } = &self.inner.phase + { + ( + *downloaded_source, + &self.inner.sources[downloaded_source.0].user_data, + ) } else { - (removed, InProgressWarpSync::WarpSyncRequest(self)) + unreachable!() } } - /// Submit a GrandPa warp sync successful response. - pub fn handle_response_ok( - mut self, - fragments: Vec, - final_set_of_fragments: bool, - ) -> InProgressWarpSync { - debug_assert!(self.sources.contains(self.source_id.0)); - self.sources[self.source_id.0].already_tried = true; - - let verifier = match &self.previous_verifier_values { - Some((_, chain_information_finality)) => warp_sync::Verifier::new( - chain_information_finality.into(), - self.state.block_number_bytes, - fragments, - final_set_of_fragments, - ), - None => warp_sync::Verifier::new( - self.state.start_chain_information.as_ref().finality, - self.state.block_number_bytes, - fragments, - final_set_of_fragments, - ), - }; - - InProgressWarpSync::Verifier(Verifier { - final_set_of_fragments, + // TODO: does this API make sense? + pub fn verify(mut self) -> (InProgressWarpSync, Option) { + if let Phase::PendingVerify { + previous_verifier_values, verifier, - state: self.state, - sources: self.sources, - warp_sync_source_id: self.source_id, - previous_verifier_values: self.previous_verifier_values, - }) - } - - /// Submit a GrandPa warp sync request failure. - pub fn handle_response_err(mut self) -> InProgressWarpSync { - debug_assert!(self.sources.contains(self.source_id.0)); - self.sources[self.source_id.0].already_tried = true; + final_set_of_fragments, + downloaded_source, + } = &mut self.inner.phase + { + match verifier.take().unwrap().next() { + Ok(warp_sync::Next::NotFinished(next_verifier)) => { + *verifier = Some(next_verifier); + } + Ok(warp_sync::Next::EmptyProof) => { + self.inner.phase = Phase::PostVerification { + babeapi_current_epoch_response: None, + babeapi_next_epoch_response: None, + runtime: None, + header: self + .inner + .start_chain_information + .as_ref() + .finalized_block_header + .into(), + chain_information_finality: self + .inner + .start_chain_information + .as_ref() + .finality + .into(), + warp_sync_source_id: *downloaded_source, + }; + } + Ok(warp_sync::Next::Success { + scale_encoded_header, + chain_information_finality, + }) => { + // As the verification of the fragment has succeeded, we are sure that the header + // is valid and can decode it. + let header: Header = + header::decode(&scale_encoded_header, self.inner.block_number_bytes) + .unwrap() + .into(); + + if *final_set_of_fragments { + self.inner.phase = Phase::PostVerification { + babeapi_current_epoch_response: None, + babeapi_next_epoch_response: None, + runtime: None, + header, + chain_information_finality, + warp_sync_source_id: *downloaded_source, + }; + } else { + self.inner.phase = Phase::DownloadFragments { + previous_verifier_values: Some((header, chain_information_finality)), + }; + } + } + Err(error) => { + self.inner.phase = Phase::DownloadFragments { + previous_verifier_values: previous_verifier_values.take(), + }; + return (self.inner, Some(error)); + } + } - InProgressWarpSync::warp_sync_request_from_next_source( - self.sources, - self.state, - self.previous_verifier_values, - ) + (self.inner, None) + } else { + unreachable!() + } } } -/// Fetching the parameters for the virtual machine is required to continue. -pub struct VirtualMachineParamsGet { - state: PostVerificationState, +pub struct BuildChainInformation { + inner: InProgressWarpSync, } -impl VirtualMachineParamsGet { - /// Returns the source that we received the warp sync data from. - pub fn warp_sync_source(&self) -> (SourceId, &TSrc) { - debug_assert!(self - .state - .sources - .contains(self.state.warp_sync_source_id.0)); - - ( - self.state.warp_sync_source_id, - &self.state.sources[self.state.warp_sync_source_id.0].user_data, - ) - } - - /// Returns the header that we're warp syncing up to. - pub fn warp_sync_header(&self) -> HeaderRef { - (&self.state.header).into() - } - - /// Add a source to the list of sources. - pub fn add_source(&mut self, user_data: TSrc) -> SourceId { - SourceId(self.state.sources.insert(Source { - user_data, - already_tried: false, - })) - } - - /// Injects a failure to retrieve the parameters. - pub fn inject_error(self) -> InProgressWarpSync { - InProgressWarpSync::warp_sync_request_from_next_source( - self.state.sources, - PreVerificationState { - start_chain_information: self.state.start_chain_information, - block_number_bytes: self.state.block_number_bytes, - }, - None, - ) - } - - /// Set the code and heap pages from storage using the keys `:code` and `:heappages` - /// respectively. Also allows setting an execution hint for the virtual machine. - pub fn set_virtual_machine_params( - self, - code: Option>, - heap_pages: Option>, - exec_hint: ExecHint, - allow_unresolved_imports: bool, - ) -> (WarpSync, Option) { - let code = match code { - Some(code) => code.as_ref().to_vec(), - None => { - return ( - WarpSync::InProgress(InProgressWarpSync::warp_sync_request_from_next_source( - self.state.sources, - PreVerificationState { - start_chain_information: self.state.start_chain_information, - block_number_bytes: self.state.block_number_bytes, - }, - None, - )), - Some(Error::MissingCode), - ) - } - }; +impl BuildChainInformation { + pub fn build(mut self) -> (WarpSync, Option) { + if let Phase::PostVerification { + header, + chain_information_finality, + runtime: runtime @ Some(_), + babeapi_current_epoch_response: babeapi_current_epoch_response @ Some(_), + babeapi_next_epoch_response: babeapi_next_epoch_response @ Some(_), + .. + } = &mut self.inner.phase + { + let (finalized_storage_code, finalized_storage_heap_pages) = runtime.take().unwrap(); + let babeapi_current_epoch_response = babeapi_current_epoch_response.take().unwrap(); + let babeapi_next_epoch_response = babeapi_next_epoch_response.take().unwrap(); + + let finalized_storage_code = match finalized_storage_code { + Some(code) => code, + None => { + self.inner.phase = Phase::DownloadFragments { + previous_verifier_values: Some(( + header.clone(), + chain_information_finality.clone(), + )), + }; + return (WarpSync::InProgress(self.inner), Some(Error::MissingCode)); + } + }; - let decoded_heap_pages = - match executor::storage_heap_pages_to_value(heap_pages.as_ref().map(|p| p.as_ref())) { + let decoded_heap_pages = match executor::storage_heap_pages_to_value( + finalized_storage_heap_pages.as_ref().map(|p| p.as_ref()), + ) { Ok(hp) => hp, Err(err) => { + self.inner.phase = Phase::DownloadFragments { + previous_verifier_values: Some(( + header.clone(), + chain_information_finality.clone(), + )), + }; return ( - WarpSync::InProgress( - InProgressWarpSync::warp_sync_request_from_next_source( - self.state.sources, - PreVerificationState { - start_chain_information: self.state.start_chain_information, - block_number_bytes: self.state.block_number_bytes, - }, - None, - ), - ), + WarpSync::InProgress(self.inner), Some(Error::InvalidHeapPages(err)), - ) + ); } }; - let runtime = match HostVmPrototype::new(host::Config { - module: &code, - heap_pages: decoded_heap_pages, - exec_hint, - allow_unresolved_imports, - }) { - Ok(runtime) => runtime, - Err(error) => { - return ( - WarpSync::InProgress(InProgressWarpSync::warp_sync_request_from_next_source( - self.state.sources, - PreVerificationState { - start_chain_information: self.state.start_chain_information, - block_number_bytes: self.state.block_number_bytes, - }, - None, - )), - Some(Error::NewRuntime(error)), - ) - } - }; + let runtime = match HostVmPrototype::new(host::Config { + module: &finalized_storage_code, + heap_pages: decoded_heap_pages, + exec_hint: ExecHint::CompileAheadOfTime, // TODO: make configurable + allow_unresolved_imports: false, // TODO: make configurable + }) { + Ok(runtime) => runtime, + Err(error) => { + self.inner.phase = Phase::DownloadFragments { + previous_verifier_values: Some(( + header.clone(), + chain_information_finality.clone(), + )), + }; + return ( + WarpSync::InProgress(self.inner), + Some(Error::NewRuntime(error)), + ); + } + }; - match self.state.start_chain_information.as_ref().consensus { - ChainInformationConsensusRef::Babe { .. } => { - let babe_current_epoch_query = - babe_fetch_epoch::babe_fetch_epoch(babe_fetch_epoch::Config { - runtime, - epoch_to_fetch: babe_fetch_epoch::BabeEpochToFetch::CurrentEpoch, - }); - - let (warp_sync, error) = WarpSync::from_babe_fetch_epoch_query( - babe_current_epoch_query, - None, - self.state, - PostRuntimeDownloadState { - finalized_storage_code: Some(code), - finalized_storage_heap_pages: heap_pages.map(|hp| hp.as_ref().to_vec()), - }, - ); + match self.inner.start_chain_information.as_ref().consensus { + ChainInformationConsensusRef::Babe { .. } => { + let mut babe_current_epoch_query = + babe_fetch_epoch::babe_fetch_epoch(babe_fetch_epoch::Config { + runtime, + epoch_to_fetch: babe_fetch_epoch::BabeEpochToFetch::CurrentEpoch, + }); + + let (current_epoch, runtime) = loop { + match babe_current_epoch_query { + babe_fetch_epoch::Query::StorageGet(get) => { + let value = match proof_verify::verify_proof(proof_verify::VerifyProofConfig { + requested_key: &get.key_as_vec(), // TODO: allocating vec + trie_root_hash: &header.state_root, + proof: babeapi_current_epoch_response.iter().map(|v| &v[..]), + }) { + Ok(v) => v, + Err(err) => { + self.inner.phase = Phase::DownloadFragments { + previous_verifier_values: Some(( + header.clone(), + chain_information_finality.clone(), + )), + }; + return ( + WarpSync::InProgress(self.inner), + Some(Error::InvalidCallProof(err)), + ); + } + }; + + babe_current_epoch_query = get.inject_value(value.map(iter::once)); + }, + babe_fetch_epoch::Query::NextKey(_) => { + // TODO: implement + self.inner.phase = Phase::DownloadFragments { + previous_verifier_values: Some(( + header.clone(), + chain_information_finality.clone(), + )), + }; + return ( + WarpSync::InProgress(self.inner), + Some(Error::NextKeyUnimplemented), + );} + babe_fetch_epoch::Query::StorageRoot(root) => { + babe_current_epoch_query = root.resume(&header.state_root); + }, + babe_fetch_epoch::Query::Finished { result: Ok(result), virtual_machine } => break (result, virtual_machine), + babe_fetch_epoch::Query::Finished { result: Err(err), .. } => { + self.inner.phase = Phase::DownloadFragments { + previous_verifier_values: Some(( + header.clone(), + chain_information_finality.clone(), + )), + }; + return ( + WarpSync::InProgress(self.inner), + Some(Error::BabeFetchEpoch(err)), + ); + } + } + }; - (warp_sync, error) - } - ChainInformationConsensusRef::Aura { .. } | // TODO: https://github.com/paritytech/smoldot/issues/933 - ChainInformationConsensusRef::Unknown => { - ( - WarpSync::InProgress(InProgressWarpSync::warp_sync_request_from_next_source( - self.state.sources, - PreVerificationState { - start_chain_information: self.state.start_chain_information, - block_number_bytes: self.state.block_number_bytes, - }, - None, - )), - Some(Error::UnknownConsensus), - ) - } - } - } -} + let mut babe_next_epoch_query = + babe_fetch_epoch::babe_fetch_epoch(babe_fetch_epoch::Config { + runtime, + epoch_to_fetch: babe_fetch_epoch::BabeEpochToFetch::NextEpoch, + }); + + let (next_epoch, runtime) = loop { + match babe_next_epoch_query { + babe_fetch_epoch::Query::StorageGet(get) => { + let value = match proof_verify::verify_proof(proof_verify::VerifyProofConfig { + requested_key: &get.key_as_vec(), // TODO: allocating vec + trie_root_hash: &header.state_root, + proof: babeapi_next_epoch_response.iter().map(|v| &v[..]), + }) { + Ok(v) => v, + Err(err) => { + self.inner.phase = Phase::DownloadFragments { + previous_verifier_values: Some(( + header.clone(), + chain_information_finality.clone(), + )), + }; + return ( + WarpSync::InProgress(self.inner), + Some(Error::InvalidCallProof(err)), + ); + } + }; + + babe_next_epoch_query = get.inject_value(value.map(iter::once)); + }, + babe_fetch_epoch::Query::NextKey(_) => { + // TODO: implement + self.inner.phase = Phase::DownloadFragments { + previous_verifier_values: Some(( + header.clone(), + chain_information_finality.clone(), + )), + }; + return ( + WarpSync::InProgress(self.inner), + Some(Error::NextKeyUnimplemented), + ); + } + babe_fetch_epoch::Query::StorageRoot(root) => { + babe_next_epoch_query = root.resume(&header.state_root); + }, + babe_fetch_epoch::Query::Finished { result: Ok(result), virtual_machine } => break (result, virtual_machine), + babe_fetch_epoch::Query::Finished { result: Err(err), .. } => { + self.inner.phase = Phase::DownloadFragments { + previous_verifier_values: Some(( + header.clone(), + chain_information_finality.clone(), + )), + }; + return ( + WarpSync::InProgress(self.inner), + Some(Error::BabeFetchEpoch(err)), + ); + } + } + }; -/// Adding more sources of warp sync data to is required to continue. -pub struct WaitingForSources { - /// List of sources. It is guaranteed that they all have `already_tried` equal to `true`. - sources: slab::Slab>, - state: PreVerificationState, - previous_verifier_values: Option<(Header, ChainInformationFinality)>, -} + // The number of slots per epoch is never modified once the chain is running, + // and as such is copied from the original chain information. + let slots_per_epoch = match self.inner.start_chain_information.as_ref().consensus { + ChainInformationConsensusRef::Babe { + slots_per_epoch, .. + } => slots_per_epoch, + _ => unreachable!(), + }; -impl WaitingForSources { - /// Add a source to the list of sources. - pub fn add_source(mut self, user_data: TSrc) -> GrandpaWarpSyncRequest { - let source_id = SourceId(self.sources.insert(Source { - user_data, - already_tried: false, - })); + // Build a `ChainInformation` using the parameters found in the runtime. + // It is possible, however, that the runtime produces parameters that aren't + // coherent. For example the runtime could give "current" and "next" Babe + // epochs that don't follow each other. + let chain_information = + match ValidChainInformation::try_from(ChainInformation { + finalized_block_header: header.clone(), + finality: chain_information_finality.clone(), + consensus: ChainInformationConsensus::Babe { + finalized_block_epoch_information: Some(current_epoch), + finalized_next_epoch_transition: next_epoch, + slots_per_epoch, + }, + }) { + Ok(ci) => ci, + Err(err) => { + self.inner.phase = Phase::DownloadFragments { + previous_verifier_values: Some(( + header.clone(), + chain_information_finality.clone(), + )), + }; + return ( + WarpSync::InProgress(self.inner), + Some(Error::InvalidChain(err)), + ); + } + }; - GrandpaWarpSyncRequest { - source_id, - sources: self.sources, - state: self.state, - previous_verifier_values: self.previous_verifier_values, + return ( + WarpSync::Finished(Success { + chain_information, + finalized_runtime: runtime, + finalized_storage_code: Some(finalized_storage_code), + finalized_storage_heap_pages, + sources: self.inner + .sources + .drain() + .map(|source| source.user_data) + .collect(), + in_progress_requests: mem::take(&mut self.inner + .in_progress_requests) + .into_iter() + .map(|(id, (src_id, user_data, detail))| (src_id, RequestId(id), user_data, detail)) + .collect(), + }), + None, + ); + } + ChainInformationConsensusRef::Aura { .. } | // TODO: https://github.com/paritytech/smoldot/issues/933 + ChainInformationConsensusRef::Unknown => { + // TODO: detect this at warp sync initialization + self.inner.phase = Phase::DownloadFragments { + previous_verifier_values: Some(( + header.clone(), + chain_information_finality.clone(), + )), + }; + return ( + WarpSync::InProgress(self.inner), + Some(Error::UnknownConsensus), + ); + } + } + } else { + unreachable!() } } - - /// Remove a source from the list of sources. - /// - /// # Panic - /// - /// Panics if the source wasn't added to the list earlier. - /// - pub fn remove_source(mut self, to_remove: SourceId) -> (TSrc, InProgressWarpSync) { - debug_assert!(self.sources.contains(to_remove.0)); - let removed = self.sources.remove(to_remove.0).user_data; - (removed, InProgressWarpSync::WaitingForSources(self)) - } -} - -#[derive(Debug, Copy, Clone)] -struct Source { - user_data: TSrc, - /// `true` if this source has been in a past `WarpSyncRequest`. `false` if the source is - /// currently in a `WarpSyncRequest`. - already_tried: bool, }