diff --git a/lib/src/sync/all.rs b/lib/src/sync/all.rs index d43eccb3bd..66b268d8f6 100644 --- a/lib/src/sync/all.rs +++ b/lib/src/sync/all.rs @@ -1878,6 +1878,7 @@ pub enum DesiredRequest { impl DesiredRequest { /// Caps the number of blocks to request to `max`. + // TODO: consider removing due to the many types of requests pub fn num_blocks_clamp(&mut self, max: NonZeroU64) { if let DesiredRequest::BlocksRequest { num_blocks, .. } = self { *num_blocks = NonZeroU64::new(cmp::min(num_blocks.get(), max.get())).unwrap(); @@ -1885,6 +1886,7 @@ impl DesiredRequest { } /// Caps the number of blocks to request to `max`. + // TODO: consider removing due to the many types of requests pub fn with_num_blocks_clamp(mut self, max: NonZeroU64) -> Self { self.num_blocks_clamp(max); self diff --git a/light-base/src/sync_service/standalone.rs b/light-base/src/sync_service/standalone.rs index 2b327da3a4..3179c94673 100644 --- a/light-base/src/sync_service/standalone.rs +++ b/light-base/src/sync_service/standalone.rs @@ -29,7 +29,7 @@ use alloc::{ vec::Vec, }; use core::{ - iter, + cmp, iter, num::{NonZeroU32, NonZeroU64}, pin::Pin, time::Duration, @@ -133,15 +133,10 @@ pub(super) async fn start_standalone_chain( // Try to perform some CPU-heavy operations. // If any CPU-heavy verification was performed, then `queue_empty` will be `false`, in // which case we will loop again as soon as possible. + // TODO: integrate this within WakeUpReason, see https://github.com/smol-dot/smoldot/issues/1382 this is however complicated because process_one() moves out from sync, and that sync doesn't impl Sync, a refactor of AllSync might be necessary let queue_empty = { let mut queue_empty = true; - // Start a networking request (block requests, warp sync requests, etc.) that the - // syncing state machine would like to start. - if task.start_next_request() { - queue_empty = false; - } - // TODO: handle obsolete requests // The sync state machine can be in a few various states. At the time of writing: @@ -162,64 +157,16 @@ pub(super) async fn start_standalone_chain( queue_empty }; - // Processing the queue might have updated the best block of the syncing state machine. - if !task.network_up_to_date_best { - // The networking service needs to be kept up to date with what the local node - // considers as the best block. - // For some reason, first building the future then executing it solves a borrow - // checker error. - let fut = task.network_service.set_local_best_block( - network_chain_id, - task.sync.best_block_hash(), - task.sync.best_block_number(), - ); - fut.await; - - task.network_up_to_date_best = true; - } - - // Processing the queue might have updated the finalized block of the syncing state - // machine. - if !task.network_up_to_date_finalized { - // If the chain uses GrandPa, the networking has to be kept up-to-date with the - // state of finalization for other peers to send back relevant gossip messages. - // (code style) `grandpa_set_id` is extracted first in order to avoid borrowing - // checker issues. - let grandpa_set_id = - if let chain::chain_information::ChainInformationFinalityRef::Grandpa { - after_finalized_block_authorities_set_id, - .. - } = task.sync.as_chain_information().as_ref().finality - { - Some(after_finalized_block_authorities_set_id) - } else { - None - }; - - if let Some(set_id) = grandpa_set_id { - let commit_finalized_height = task.sync.finalized_block_header().number; - task.network_service - .set_local_grandpa_state( - network_chain_id, - network::service::GrandpaState { - set_id, - round_number: 1, // TODO: - commit_finalized_height, - }, - ) - .await; - } - - task.network_up_to_date_finalized = true; - } - // Now waiting for some event to happen: a network event, a request from the frontend // of the sync service, or a request being finished. enum WakeUpReason { + MustUpdateNetworkWithBestBlock, + MustUpdateNetworkWithFinalizedBlock, MustSubscribeNetworkEvents, NetworkEvent(network_service::Event), ForegroundMessage(ToBackground), ForegroundClosed, + StartRequest(all::SourceId, all::DesiredRequest), RequestFinished(all::RequestId, Result), WarpSyncTakingLongTimeWarning, MustLoopAgain, @@ -252,6 +199,20 @@ pub(super) async fn start_standalone_chain( let (request_id, result) = task.pending_requests.select_next_some().await; WakeUpReason::RequestFinished(request_id, result) }) + .or(async { + if !task.network_up_to_date_finalized { + WakeUpReason::MustUpdateNetworkWithFinalizedBlock + } else { + future::pending().await + } + }) + .or(async { + if !task.network_up_to_date_best { + WakeUpReason::MustUpdateNetworkWithBestBlock + } else { + future::pending().await + } + }) .or(async { (&mut task.warp_sync_taking_long_time_warning).await; task.warp_sync_taking_long_time_warning = @@ -259,6 +220,26 @@ pub(super) async fn start_standalone_chain( .fuse(); WakeUpReason::WarpSyncTakingLongTimeWarning }) + .or({ + // `desired_requests()` returns, in decreasing order of priority, the requests + // that should be started in order for the syncing to proceed. The fact that + // multiple requests are returned could be used to filter out undesired one. We + // use this filtering to enforce a maximum of one ongoing request per source. + let desired_request = task + .sync + .desired_requests() + .find(|(source_id, _, _)| { + task.sync.source_num_ongoing_requests(*source_id) == 0 + }) + .map(|(source_id, _, request_detail)| (source_id, request_detail)); + async move { + if let Some((source_id, request_detail)) = desired_request { + WakeUpReason::StartRequest(source_id, request_detail) + } else { + future::pending().await + } + } + }) .or(async { // If the list of CPU-heavy operations to perform is potentially non-empty, // then we wait for a future that is always instantly ready, in order to loop @@ -273,267 +254,430 @@ pub(super) async fn start_standalone_chain( .await }; - let response_outcome = match wake_up_reason { - WakeUpReason::NetworkEvent(network_event) => { - // Something happened on the networking. - task.inject_network_event(network_event); - continue; + match wake_up_reason { + WakeUpReason::NetworkEvent(network_service::Event::Connected { + peer_id, + role, + best_block_number, + best_block_hash, + }) => { + task.peers_source_id_map.insert( + peer_id.clone(), + task.sync + .add_source((peer_id, role), best_block_number, best_block_hash), + ); } - WakeUpReason::MustSubscribeNetworkEvents => { - debug_assert!(task.from_network_service.is_none()); - for (_, sync_source_id) in task.peers_source_id_map.drain() { - let (_, requests) = task.sync.remove_source(sync_source_id); - for (_, abort) in requests { - abort.abort(); - } + WakeUpReason::NetworkEvent(network_service::Event::Disconnected { peer_id }) => { + let sync_source_id = task.peers_source_id_map.remove(&peer_id).unwrap(); + let (_, requests) = task.sync.remove_source(sync_source_id); + + // The `Disconnect` network event indicates that the main notifications substream + // with that peer has been closed, not necessarily that the connection as a whole + // has been closed. As such, the in-progress network requests might continue if + // we don't abort them. + for (_, abort) in requests { + abort.abort(); } - task.from_network_service = Some(Box::pin( - // As documented, `subscribe().await` is expected to return quickly. - task.network_service.subscribe(task.network_chain_id).await, - )); - continue; } - WakeUpReason::ForegroundMessage(message) => { - // Received message from the front `SyncService`. - task.process_foreground_message(message); - continue; - } + WakeUpReason::NetworkEvent(network_service::Event::BlockAnnounce { + peer_id, + announce, + }) => { + let sync_source_id = *task.peers_source_id_map.get(&peer_id).unwrap(); + let decoded = announce.decode(); - WakeUpReason::ForegroundClosed => { - // The channel with the frontend sync service has been closed. - // Closing the sync background task as a result. - return; - } + match header::decode(decoded.scale_encoded_header, task.sync.block_number_bytes()) { + Ok(decoded_header) => { + log::debug!( + target: &task.log_target, + "Sync <= BlockAnnounce(sender={}, hash={}, is_best={}, parent_hash={})", + peer_id, + HashDisplay(&header::hash_from_scale_encoded_header(decoded.scale_encoded_header)), + decoded.is_best, + HashDisplay(decoded_header.parent_hash) + ); + } + Err(error) => { + log::debug!( + target: &task.log_target, + "Sync <= BlockAnnounce(sender={}, hash={}, is_best={}, parent_hash=)", + peer_id, + HashDisplay(&header::hash_from_scale_encoded_header(decoded.scale_encoded_header)), + decoded.is_best, + ); - WakeUpReason::RequestFinished(request_id, result) => { - // A request has been finished. - // `result` is an error if the request got cancelled by the sync state machine. - let Ok(result) = result else { - continue; - }; + log::debug!( + target: &task.log_target, + "Sync => InvalidBlockHeader(error={})", + error + ); - // Inject the result of the request into the sync state machine. - match result { - RequestOutcome::Block(Ok(v)) => { - task.sync - .blocks_request_response( - request_id, - Ok(v.into_iter().filter_map(|block| { - Some(all::BlockRequestSuccessBlock { - scale_encoded_header: block.header?, - scale_encoded_justifications: block - .justifications - .unwrap_or(Vec::new()) - .into_iter() - .map(|j| all::Justification { - engine_id: j.engine_id, - justification: j.justification, - }) - .collect(), - scale_encoded_extrinsics: Vec::new(), - user_data: (), - }) - })), - ) - .1 + log::warn!( + target: &task.log_target, + "Failed to decode header in block announce received from {}. Error: {}", + peer_id, error, + ) } - RequestOutcome::Block(Err(_)) => { - // TODO: should disconnect peer - task.sync - .blocks_request_response(request_id, Err::, _>(())) - .1 + } + + match task.sync.block_announce( + sync_source_id, + decoded.scale_encoded_header.to_owned(), + decoded.is_best, + ) { + all::BlockAnnounceOutcome::HeaderVerify + | all::BlockAnnounceOutcome::AlreadyInChain => { + log::debug!( + target: &task.log_target, + "Sync => Ok" + ); } - RequestOutcome::WarpSync(Ok(result)) => { - let decoded = result.decode(); - let fragments = decoded - .fragments - .into_iter() - .map(|f| all::WarpSyncFragment { - scale_encoded_header: f.scale_encoded_header.to_vec(), - scale_encoded_justification: f.scale_encoded_justification.to_vec(), - }) - .collect(); - task.sync - .grandpa_warp_sync_response_ok( - request_id, - fragments, - decoded.is_finished, - ) - .1 + all::BlockAnnounceOutcome::Discarded => { + log::debug!( + target: &task.log_target, + "Sync => Discarded" + ); } - RequestOutcome::WarpSync(Err(_)) => { - // TODO: should disconnect peer - task.sync.grandpa_warp_sync_response_err(request_id); - continue; + all::BlockAnnounceOutcome::StoredForLater {} => { + log::debug!( + target: &task.log_target, + "Sync => StoredForLater" + ); } - RequestOutcome::Storage(r) => task.sync.storage_get_response(request_id, r).1, - RequestOutcome::CallProof(Ok(r)) => { - task.sync - .call_proof_response(request_id, Ok(r.decode().to_owned())) - .1 - } // TODO: need help from networking service to avoid this to_owned - RequestOutcome::CallProof(Err(err)) => { - task.sync.call_proof_response(request_id, Err(err)).1 + all::BlockAnnounceOutcome::TooOld { + announce_block_height, + .. + } => { + log::debug!( + target: &task.log_target, + "Sync => TooOld" + ); + + log::warn!( + target: &task.log_target, + "Block announce header height (#{}) from {} is below finalized block", + announce_block_height, + peer_id + ); } - } - } + all::BlockAnnounceOutcome::NotFinalizedChain => { + log::debug!( + target: &task.log_target, + "Sync => NotFinalized" + ); - WakeUpReason::WarpSyncTakingLongTimeWarning => { - match task.sync.status() { - all::Status::Sync => {} - all::Status::WarpSyncFragments { - source: None, - finalized_block_hash, - finalized_block_number, - } => { log::warn!( target: &task.log_target, - "GrandPa warp sync idle at block #{} (0x{})", - finalized_block_number, - HashDisplay(&finalized_block_hash), + "Block announce from {} isn't part of finalized chain", + peer_id ); } - all::Status::WarpSyncFragments { - finalized_block_hash, - finalized_block_number, - .. + all::BlockAnnounceOutcome::InvalidHeader(_) => { + // Log messages are already printed above. } - | all::Status::WarpSyncChainInformation { - finalized_block_hash, - finalized_block_number, - } => { - log::warn!( + } + } + + WakeUpReason::NetworkEvent(network_service::Event::GrandpaNeighborPacket { + peer_id, + finalized_block_height, + }) => { + let sync_source_id = *task.peers_source_id_map.get(&peer_id).unwrap(); + task.sync + .update_source_finality_state(sync_source_id, finalized_block_height); + } + + WakeUpReason::NetworkEvent(network_service::Event::GrandpaCommitMessage { + peer_id, + message, + }) => { + let sync_source_id = *task.peers_source_id_map.get(&peer_id).unwrap(); + match task + .sync + .grandpa_commit_message(sync_source_id, message.into_encoded()) + { + all::GrandpaCommitMessageOutcome::Queued => { + // TODO: print more details? + log::debug!( target: &task.log_target, - "GrandPa warp sync in progress. Block: #{} (0x{}).", - finalized_block_number, - HashDisplay(&finalized_block_hash) + "Sync <= QueuedGrandpaCommit" ); } - }; + all::GrandpaCommitMessageOutcome::Discarded => { + log::debug!( + target: &task.log_target, + "Sync <= IgnoredGrandpaCommit" + ); + } + } + } - continue; + WakeUpReason::MustSubscribeNetworkEvents => { + debug_assert!(task.from_network_service.is_none()); + for (_, sync_source_id) in task.peers_source_id_map.drain() { + let (_, requests) = task.sync.remove_source(sync_source_id); + for (_, abort) in requests { + abort.abort(); + } + } + task.from_network_service = Some(Box::pin( + // As documented, `subscribe().await` is expected to return quickly. + task.network_service.subscribe(task.network_chain_id).await, + )); } - WakeUpReason::MustLoopAgain => { - continue; + WakeUpReason::MustUpdateNetworkWithBestBlock => { + // The networking service needs to be kept up to date with what the local node + // considers as the best block. + // For some reason, first building the future then executing it solves a borrow + // checker error. + let fut = task.network_service.set_local_best_block( + network_chain_id, + task.sync.best_block_hash(), + task.sync.best_block_number(), + ); + fut.await; + + task.network_up_to_date_best = true; } - }; - // `response_outcome` represents the way the state machine has changed as a - // consequence of the response to a request. - match response_outcome { - all::ResponseOutcome::Outdated - | all::ResponseOutcome::Queued - | all::ResponseOutcome::NotFinalizedChain { .. } - | all::ResponseOutcome::AllAlreadyInChain { .. } => {} - } - } -} + WakeUpReason::MustUpdateNetworkWithFinalizedBlock => { + // If the chain uses GrandPa, the networking has to be kept up-to-date with the + // state of finalization for other peers to send back relevant gossip messages. + // (code style) `grandpa_set_id` is extracted first in order to avoid borrowing + // checker issues. + let grandpa_set_id = + if let chain::chain_information::ChainInformationFinalityRef::Grandpa { + after_finalized_block_authorities_set_id, + .. + } = task.sync.as_chain_information().as_ref().finality + { + Some(after_finalized_block_authorities_set_id) + } else { + None + }; + + if let Some(set_id) = grandpa_set_id { + let commit_finalized_height = task.sync.finalized_block_header().number; + task.network_service + .set_local_grandpa_state( + network_chain_id, + network::service::GrandpaState { + set_id, + round_number: 1, // TODO: + commit_finalized_height, + }, + ) + .await; + } -struct Task { - /// Log target to use for all logs that are emitted. - log_target: String, + task.network_up_to_date_finalized = true; + } - /// Access to the platform's capabilities. - platform: TPlat, + WakeUpReason::ForegroundMessage(ToBackground::IsNearHeadOfChainHeuristic { + send_back, + }) => { + // Frontend is querying something. + let _ = send_back.send(task.sync.is_near_head_of_chain_heuristic()); + } - /// Main syncing state machine. Contains a list of peers, requests, and blocks, and manages - /// everything about the non-finalized chain. - /// - /// For each request, we store a [`future::AbortHandle`] that can be used to abort the - /// request if desired. - sync: all::AllSync, + WakeUpReason::ForegroundMessage(ToBackground::SubscribeAll { + send_back, + buffer_size, + runtime_interest, + }) => { + // Frontend would like to subscribe to events. - /// If `Some`, contains the runtime of the current finalized block. - known_finalized_runtime: Option, + let (tx, new_blocks) = async_channel::bounded(buffer_size.saturating_sub(1)); + task.all_notifications.push(tx); - /// For each networking peer, the index of the corresponding peer within the [`Task::sync`]. - peers_source_id_map: HashMap, + let non_finalized_blocks_ancestry_order = { + let best_hash = task.sync.best_block_hash(); + task.sync + .non_finalized_blocks_ancestry_order() + .map(|h| { + let scale_encoding = + h.scale_encoding_vec(task.sync.block_number_bytes()); + BlockNotification { + is_new_best: header::hash_from_scale_encoded_header( + &scale_encoding, + ) == best_hash, + scale_encoded_header: scale_encoding, + parent_hash: *h.parent_hash, + } + }) + .collect() + }; - /// `false` after the best block in the [`Task::sync`] has changed. Set back to `true` - /// after the networking has been notified of this change. - network_up_to_date_best: bool, - /// `false` after the finalized block in the [`Task::sync`] has changed. Set back to `true` - /// after the networking has been notified of this change. - network_up_to_date_finalized: bool, + let _ = send_back.send(SubscribeAll { + finalized_block_scale_encoded_header: task + .sync + .finalized_block_header() + .scale_encoding_vec(task.sync.block_number_bytes()), + finalized_block_runtime: if runtime_interest { + task.known_finalized_runtime.take() + } else { + None + }, + non_finalized_blocks_ancestry_order, + new_blocks, + }); + } - /// All event subscribers that are interested in events about the chain. - all_notifications: Vec>, + WakeUpReason::ForegroundMessage(ToBackground::PeersAssumedKnowBlock { + send_back, + block_number, + block_hash, + }) => { + // Frontend queries the list of peers which are expected to know about a certain + // block. + let finalized_num = task.sync.finalized_block_header().number; + let outcome = if block_number <= finalized_num { + task.sync + .sources() + .filter(|source_id| { + let source_best = task.sync.source_best_block(*source_id); + source_best.0 > block_number + || (source_best.0 == block_number && *source_best.1 == block_hash) + }) + .map(|id| task.sync[id].0.clone()) + .collect() + } else { + // As documented, `knows_non_finalized_block` would panic if the + // block height was below the one of the known finalized block. + task.sync + .knows_non_finalized_block(block_number, &block_hash) + .map(|id| task.sync[id].0.clone()) + .collect() + }; + let _ = send_back.send(outcome); + } - /// Contains a `Delay` after which we print a warning about GrandPa warp sync taking a long - /// time. Set to `Pending` after the warp sync has finished, so that future remains pending - /// forever. - warp_sync_taking_long_time_warning: - future::Fuse>, future::Pending<()>>>, + WakeUpReason::ForegroundMessage(ToBackground::SyncingPeers { send_back }) => { + // Frontend is querying the list of peers. + let out = task + .sync + .sources() + .map(|src| { + let (peer_id, role) = task.sync[src].clone(); + let (height, hash) = task.sync.source_best_block(src); + (peer_id, role, height, *hash) + }) + .collect::>(); + let _ = send_back.send(out); + } - /// Network service. Used to send out requests to peers. - network_service: Arc>, - /// Index within the network service of the chain we are interested in. Must be indicated to - /// the network service whenever a request is started. - network_chain_id: network_service::ChainId, - /// Events coming from the networking service. `None` if not subscribed yet. - from_network_service: Option>>>, + WakeUpReason::ForegroundMessage(ToBackground::SerializeChainInformation { + send_back, + }) => { + // Frontend is querying the chain information. + let _ = send_back.send(Some(task.sync.as_chain_information().into())); + } - /// List of requests currently in progress. - pending_requests: stream::FuturesUnordered< - future::BoxFuture<'static, (all::RequestId, Result)>, - >, -} + WakeUpReason::ForegroundClosed => { + // The channel with the frontend sync service has been closed. + // Closing the sync background task as a result. + return; + } -enum RequestOutcome { - Block(Result, network_service::BlocksRequestError>), - WarpSync( - Result< - network::service::EncodedGrandpaWarpSyncResponse, - network_service::WarpSyncRequestError, - >, - ), - Storage(Result, ()>), - CallProof(Result), -} + WakeUpReason::RequestFinished(_, Err(_)) => { + // A request has been cancelled by the sync state machine. Nothing to do. + } -impl Task { - /// Starts one network request if any is necessary. - /// - /// Returns `true` if a request has been started. - fn start_next_request(&mut self) -> bool { - // `desired_requests()` returns, in decreasing order of priority, the requests - // that should be started in order for the syncing to proceed. The fact that multiple - // requests are returned could be used to filter out undesired one. We use this - // filtering to enforce a maximum of one ongoing request per source. - let (source_id, _, mut request_detail) = match self - .sync - .desired_requests() - .find(|(source_id, _, _)| self.sync.source_num_ongoing_requests(*source_id) == 0) - { - Some(v) => v, - None => return false, - }; + WakeUpReason::RequestFinished(request_id, Ok(RequestOutcome::Block(Ok(v)))) => { + // Successful block request. + task.sync.blocks_request_response( + request_id, + Ok(v.into_iter().filter_map(|block| { + Some(all::BlockRequestSuccessBlock { + scale_encoded_header: block.header?, + scale_encoded_justifications: block + .justifications + .unwrap_or(Vec::new()) + .into_iter() + .map(|j| all::Justification { + engine_id: j.engine_id, + justification: j.justification, + }) + .collect(), + scale_encoded_extrinsics: Vec::new(), + user_data: (), + }) + })), + ); + } - // Before inserting the request back to the syncing state machine, clamp the number - // of blocks to the number of blocks we expect to receive. - // This constant corresponds to the maximum number of blocks that nodes will answer - // in one request. If this constant happens to be inaccurate, everything will still - // work but less efficiently. - request_detail.num_blocks_clamp(NonZeroU64::new(64).unwrap()); - - match request_detail { - all::DesiredRequest::BlocksRequest { - first_block_hash, - first_block_height, - ascending, - num_blocks, - request_headers, - request_bodies, - request_justification, - } => { - let peer_id = self.sync[source_id].0.clone(); // TODO: why does this require cloning? weird borrow chk issue + WakeUpReason::RequestFinished(request_id, Ok(RequestOutcome::Block(Err(_)))) => { + // Failed block request. + // TODO: should disconnect peer + task.sync + .blocks_request_response(request_id, Err::, _>(())); + } + + WakeUpReason::RequestFinished(request_id, Ok(RequestOutcome::WarpSync(Ok(result)))) => { + // Successful warp sync request. + let decoded = result.decode(); + let fragments = decoded + .fragments + .into_iter() + .map(|f| all::WarpSyncFragment { + scale_encoded_header: f.scale_encoded_header.to_vec(), + scale_encoded_justification: f.scale_encoded_justification.to_vec(), + }) + .collect(); + task.sync + .grandpa_warp_sync_response_ok(request_id, fragments, decoded.is_finished); + } + + WakeUpReason::RequestFinished(request_id, Ok(RequestOutcome::WarpSync(Err(_)))) => { + // Failed warp sync request. + // TODO: should disconnect peer + task.sync.grandpa_warp_sync_response_err(request_id); + } + + WakeUpReason::RequestFinished(request_id, Ok(RequestOutcome::Storage(r))) => { + // Storage proof request. + task.sync.storage_get_response(request_id, r); + } + + WakeUpReason::RequestFinished(request_id, Ok(RequestOutcome::CallProof(Ok(r)))) => { + // Successful call proof request. + task.sync + .call_proof_response(request_id, Ok(r.decode().to_owned())); + // TODO: need help from networking service to avoid this to_owned + } - let block_request = self.network_service.clone().blocks_request( + WakeUpReason::RequestFinished(request_id, Ok(RequestOutcome::CallProof(Err(err)))) => { + // Failed call proof request. + task.sync.call_proof_response(request_id, Err(err)); + } + + WakeUpReason::StartRequest( + source_id, + all::DesiredRequest::BlocksRequest { + first_block_hash, + first_block_height, + ascending, + num_blocks, + request_headers, + request_bodies, + request_justification, + }, + ) => { + // Before inserting the request back to the syncing state machine, clamp the number + // of blocks to the number of blocks we expect to receive. + // This constant corresponds to the maximum number of blocks that nodes will answer + // in one request. If this constant happens to be inaccurate, everything will still + // work but less efficiently. + let num_blocks = NonZeroU64::new(cmp::min(64, num_blocks.get())).unwrap(); + + let peer_id = task.sync[source_id].0.clone(); // TODO: why does this require cloning? weird borrow chk issue + + let block_request = task.network_service.clone().blocks_request( peer_id, - self.network_chain_id, + task.network_chain_id, network::codec::BlocksRequestConfig { start: if let Some(first_block_hash) = first_block_hash { network::codec::BlocksRequestConfigStart::Hash(first_block_hash) @@ -559,23 +703,36 @@ impl Task { ); let (block_request, abort) = future::abortable(block_request); - let request_id = self - .sync - .add_request(source_id, request_detail.into(), abort); + let request_id = task.sync.add_request( + source_id, + all::RequestDetail::BlocksRequest { + first_block_hash, + first_block_height, + ascending, + num_blocks, + request_headers, + request_bodies, + request_justification, + }, + abort, + ); - self.pending_requests.push(Box::pin(async move { + task.pending_requests.push(Box::pin(async move { (request_id, block_request.await.map(RequestOutcome::Block)) })); } - all::DesiredRequest::WarpSync { - sync_start_block_hash, - } => { - let peer_id = self.sync[source_id].0.clone(); // TODO: why does this require cloning? weird borrow chk issue + WakeUpReason::StartRequest( + source_id, + all::DesiredRequest::WarpSync { + sync_start_block_hash, + }, + ) => { + let peer_id = task.sync[source_id].0.clone(); // TODO: why does this require cloning? weird borrow chk issue - let grandpa_request = self.network_service.clone().grandpa_warp_sync_request( + let grandpa_request = task.network_service.clone().grandpa_warp_sync_request( peer_id, - self.network_chain_id, + task.network_chain_id, sync_start_block_hash, // The timeout needs to be long enough to potentially download the maximum // response size of 16 MiB. Assuming a 128 kiB/sec connection, that's @@ -585,11 +742,15 @@ impl Task { ); let (grandpa_request, abort) = future::abortable(grandpa_request); - let request_id = self - .sync - .add_request(source_id, request_detail.into(), abort); + let request_id = task.sync.add_request( + source_id, + all::RequestDetail::WarpSync { + sync_start_block_hash, + }, + abort, + ); - self.pending_requests.push(Box::pin(async move { + task.pending_requests.push(Box::pin(async move { ( request_id, grandpa_request.await.map(RequestOutcome::WarpSync), @@ -597,15 +758,16 @@ impl Task { })); } - all::DesiredRequest::StorageGetMerkleProof { - block_hash, - ref keys, - .. - } => { - let peer_id = self.sync[source_id].0.clone(); // TODO: why does this require cloning? weird borrow chk issue + WakeUpReason::StartRequest( + source_id, + all::DesiredRequest::StorageGetMerkleProof { + block_hash, keys, .. + }, + ) => { + let peer_id = task.sync[source_id].0.clone(); // TODO: why does this require cloning? weird borrow chk issue - let storage_request = self.network_service.clone().storage_proof_request( - self.network_chain_id, + let storage_request = task.network_service.clone().storage_proof_request( + task.network_chain_id, peer_id, network::codec::StorageProofRequestConfig { block_hash, @@ -624,11 +786,13 @@ impl Task { }; let (storage_request, abort) = future::abortable(storage_request); - let request_id = self - .sync - .add_request(source_id, request_detail.into(), abort); + let request_id = task.sync.add_request( + source_id, + all::RequestDetail::StorageGet { block_hash, keys }, + abort, + ); - self.pending_requests.push(Box::pin(async move { + task.pending_requests.push(Box::pin(async move { ( request_id, storage_request.await.map(RequestOutcome::Storage), @@ -636,53 +800,162 @@ impl Task { })); } - all::DesiredRequest::RuntimeCallMerkleProof { - block_hash, - ref function_name, - ref parameter_vectored, - } => { - let peer_id = self.sync[source_id].0.clone(); // TODO: why does this require cloning? weird borrow chk issue - let network_service = self.network_service.clone(); - let network_chain_id = self.network_chain_id; - // TODO: all this copying is done because of lifetime requirements in NetworkService::call_proof_request; maybe check if it can be avoided - let parameter_vectored = parameter_vectored.clone(); - let function_name = function_name.clone(); - - let call_proof_request = async move { - let rq = network_service.call_proof_request( - network_chain_id, - peer_id, - network::codec::CallProofRequestConfig { - block_hash, - method: function_name, - parameter_vectored: iter::once(parameter_vectored), - }, - Duration::from_secs(16), - ); + WakeUpReason::StartRequest( + source_id, + all::DesiredRequest::RuntimeCallMerkleProof { + block_hash, + function_name, + parameter_vectored, + }, + ) => { + let peer_id = task.sync[source_id].0.clone(); // TODO: why does this require cloning? weird borrow chk issue + + let call_proof_request = { + // TODO: all this copying is done because of lifetime requirements in NetworkService::call_proof_request; maybe check if it can be avoided + let network_service = task.network_service.clone(); + let network_chain_id = task.network_chain_id; + let parameter_vectored = parameter_vectored.clone(); + let function_name = function_name.clone(); + async move { + let rq = network_service.call_proof_request( + network_chain_id, + peer_id, + network::codec::CallProofRequestConfig { + block_hash, + method: Cow::Borrowed(&*function_name), + parameter_vectored: iter::once(¶meter_vectored), + }, + Duration::from_secs(16), + ); + + match rq.await { + Ok(p) => Ok(p), + Err(_) => Err(()), + } + } + }; + + let (call_proof_request, abort) = future::abortable(call_proof_request); + let request_id = task.sync.add_request( + source_id, + all::RequestDetail::RuntimeCallMerkleProof { + block_hash, + function_name, + parameter_vectored, + }, + abort, + ); + + task.pending_requests.push(Box::pin(async move { + ( + request_id, + call_proof_request.await.map(RequestOutcome::CallProof), + ) + })); + } + + WakeUpReason::WarpSyncTakingLongTimeWarning => { + match task.sync.status() { + all::Status::Sync => {} + all::Status::WarpSyncFragments { + source: None, + finalized_block_hash, + finalized_block_number, + } => { + log::warn!( + target: &task.log_target, + "GrandPa warp sync idle at block #{} (0x{})", + finalized_block_number, + HashDisplay(&finalized_block_hash), + ); + } + all::Status::WarpSyncFragments { + finalized_block_hash, + finalized_block_number, + .. + } + | all::Status::WarpSyncChainInformation { + finalized_block_hash, + finalized_block_number, + } => { + log::warn!( + target: &task.log_target, + "GrandPa warp sync in progress. Block: #{} (0x{}).", + finalized_block_number, + HashDisplay(&finalized_block_hash) + ); + } + }; + } + + WakeUpReason::MustLoopAgain => {} + } + } +} + +struct Task { + /// Log target to use for all logs that are emitted. + log_target: String, + + /// Access to the platform's capabilities. + platform: TPlat, + + /// Main syncing state machine. Contains a list of peers, requests, and blocks, and manages + /// everything about the non-finalized chain. + /// + /// For each request, we store a [`future::AbortHandle`] that can be used to abort the + /// request if desired. + sync: all::AllSync, + + /// If `Some`, contains the runtime of the current finalized block. + known_finalized_runtime: Option, + + /// For each networking peer, the index of the corresponding peer within the [`Task::sync`]. + peers_source_id_map: HashMap, + + /// `false` after the best block in the [`Task::sync`] has changed. Set back to `true` + /// after the networking has been notified of this change. + network_up_to_date_best: bool, + /// `false` after the finalized block in the [`Task::sync`] has changed. Set back to `true` + /// after the networking has been notified of this change. + network_up_to_date_finalized: bool, + + /// All event subscribers that are interested in events about the chain. + all_notifications: Vec>, - match rq.await { - Ok(p) => Ok(p), - Err(_) => Err(()), - } - }; + /// Contains a `Delay` after which we print a warning about GrandPa warp sync taking a long + /// time. Set to `Pending` after the warp sync has finished, so that future remains pending + /// forever. + warp_sync_taking_long_time_warning: + future::Fuse>, future::Pending<()>>>, - let (call_proof_request, abort) = future::abortable(call_proof_request); - let request_id = self - .sync - .add_request(source_id, request_detail.into(), abort); + /// Network service. Used to send out requests to peers. + network_service: Arc>, + /// Index within the network service of the chain we are interested in. Must be indicated to + /// the network service whenever a request is started. + network_chain_id: network_service::ChainId, + /// Events coming from the networking service. `None` if not subscribed yet. + from_network_service: Option>>>, - self.pending_requests.push(Box::pin(async move { - ( - request_id, - call_proof_request.await.map(RequestOutcome::CallProof), - ) - })); - } - } + /// List of requests currently in progress. + pending_requests: stream::FuturesUnordered< + future::BoxFuture<'static, (all::RequestId, Result)>, + >, +} - true - } +enum RequestOutcome { + Block(Result, network_service::BlocksRequestError>), + WarpSync( + Result< + network::service::EncodedGrandpaWarpSyncResponse, + network_service::WarpSyncRequestError, + >, + ), + Storage(Result, ()>), + CallProof(Result), +} +impl Task { /// Verifies one block, or finality proof, or warp sync fragment, etc. that is queued for /// verification. /// @@ -1061,258 +1334,6 @@ impl Task { (self, true) } - /// Process a request coming from the foreground service. - fn process_foreground_message(&mut self, message: ToBackground) { - match message { - ToBackground::IsNearHeadOfChainHeuristic { send_back } => { - let _ = send_back.send(self.sync.is_near_head_of_chain_heuristic()); - } - - ToBackground::SubscribeAll { - send_back, - buffer_size, - runtime_interest, - } => { - let (tx, new_blocks) = async_channel::bounded(buffer_size.saturating_sub(1)); - self.all_notifications.push(tx); - - let non_finalized_blocks_ancestry_order = { - let best_hash = self.sync.best_block_hash(); - self.sync - .non_finalized_blocks_ancestry_order() - .map(|h| { - let scale_encoding = - h.scale_encoding_vec(self.sync.block_number_bytes()); - BlockNotification { - is_new_best: header::hash_from_scale_encoded_header( - &scale_encoding, - ) == best_hash, - scale_encoded_header: scale_encoding, - parent_hash: *h.parent_hash, - } - }) - .collect() - }; - - let _ = send_back.send(SubscribeAll { - finalized_block_scale_encoded_header: self - .sync - .finalized_block_header() - .scale_encoding_vec(self.sync.block_number_bytes()), - finalized_block_runtime: if runtime_interest { - self.known_finalized_runtime.take() - } else { - None - }, - non_finalized_blocks_ancestry_order, - new_blocks, - }); - } - - ToBackground::PeersAssumedKnowBlock { - send_back, - block_number, - block_hash, - } => { - let finalized_num = self.sync.finalized_block_header().number; - let outcome = if block_number <= finalized_num { - self.sync - .sources() - .filter(|source_id| { - let source_best = self.sync.source_best_block(*source_id); - source_best.0 > block_number - || (source_best.0 == block_number && *source_best.1 == block_hash) - }) - .map(|id| self.sync[id].0.clone()) - .collect() - } else { - // As documented, `knows_non_finalized_block` would panic if the - // block height was below the one of the known finalized block. - self.sync - .knows_non_finalized_block(block_number, &block_hash) - .map(|id| self.sync[id].0.clone()) - .collect() - }; - let _ = send_back.send(outcome); - } - - ToBackground::SyncingPeers { send_back } => { - let out = self - .sync - .sources() - .map(|src| { - let (peer_id, role) = self.sync[src].clone(); - let (height, hash) = self.sync.source_best_block(src); - (peer_id, role, height, *hash) - }) - .collect::>(); - let _ = send_back.send(out); - } - - ToBackground::SerializeChainInformation { send_back } => { - let _ = send_back.send(Some(self.sync.as_chain_information().into())); - } - } - } - - /// Updates the task with a new event coming from the network service. - fn inject_network_event(&mut self, network_event: network_service::Event) { - match network_event { - network_service::Event::Connected { - peer_id, - role, - best_block_number, - best_block_hash, - } => { - self.peers_source_id_map.insert( - peer_id.clone(), - self.sync - .add_source((peer_id, role), best_block_number, best_block_hash), - ); - } - - network_service::Event::Disconnected { peer_id } => { - let sync_source_id = self.peers_source_id_map.remove(&peer_id).unwrap(); - let (_, requests) = self.sync.remove_source(sync_source_id); - - // The `Disconnect` network event indicates that the main notifications substream - // with that peer has been closed, not necessarily that the connection as a whole - // has been closed. As such, the in-progress network requests might continue if - // we don't abort them. - for (_, abort) in requests { - abort.abort(); - } - } - - network_service::Event::BlockAnnounce { peer_id, announce } => { - let sync_source_id = *self.peers_source_id_map.get(&peer_id).unwrap(); - let decoded = announce.decode(); - - match header::decode(decoded.scale_encoded_header, self.sync.block_number_bytes()) { - Ok(decoded_header) => { - log::debug!( - target: &self.log_target, - "Sync <= BlockAnnounce(sender={}, hash={}, is_best={}, parent_hash={})", - peer_id, - HashDisplay(&header::hash_from_scale_encoded_header(decoded.scale_encoded_header)), - decoded.is_best, - HashDisplay(decoded_header.parent_hash) - ); - } - Err(error) => { - log::debug!( - target: &self.log_target, - "Sync <= BlockAnnounce(sender={}, hash={}, is_best={}, parent_hash=)", - peer_id, - HashDisplay(&header::hash_from_scale_encoded_header(decoded.scale_encoded_header)), - decoded.is_best, - ); - - log::debug!( - target: &self.log_target, - "Sync => InvalidBlockHeader(error={})", - error - ); - - log::warn!( - target: &self.log_target, - "Failed to decode header in block announce received from {}. Error: {}", - peer_id, error, - ) - } - } - - match self.sync.block_announce( - sync_source_id, - decoded.scale_encoded_header.to_owned(), - decoded.is_best, - ) { - all::BlockAnnounceOutcome::HeaderVerify - | all::BlockAnnounceOutcome::AlreadyInChain => { - log::debug!( - target: &self.log_target, - "Sync => Ok" - ); - } - all::BlockAnnounceOutcome::Discarded => { - log::debug!( - target: &self.log_target, - "Sync => Discarded" - ); - } - all::BlockAnnounceOutcome::StoredForLater {} => { - log::debug!( - target: &self.log_target, - "Sync => StoredForLater" - ); - } - all::BlockAnnounceOutcome::TooOld { - announce_block_height, - .. - } => { - log::debug!( - target: &self.log_target, - "Sync => TooOld" - ); - - log::warn!( - target: &self.log_target, - "Block announce header height (#{}) from {} is below finalized block", - announce_block_height, - peer_id - ); - } - all::BlockAnnounceOutcome::NotFinalizedChain => { - log::debug!( - target: &self.log_target, - "Sync => NotFinalized" - ); - - log::warn!( - target: &self.log_target, - "Block announce from {} isn't part of finalized chain", - peer_id - ); - } - all::BlockAnnounceOutcome::InvalidHeader(_) => { - // Log messages are already printed above. - } - } - } - - network_service::Event::GrandpaNeighborPacket { - peer_id, - finalized_block_height, - } => { - let sync_source_id = *self.peers_source_id_map.get(&peer_id).unwrap(); - self.sync - .update_source_finality_state(sync_source_id, finalized_block_height); - } - - network_service::Event::GrandpaCommitMessage { peer_id, message } => { - let sync_source_id = *self.peers_source_id_map.get(&peer_id).unwrap(); - match self - .sync - .grandpa_commit_message(sync_source_id, message.into_encoded()) - { - all::GrandpaCommitMessageOutcome::Queued => { - // TODO: print more details? - log::debug!( - target: &self.log_target, - "Sync <= QueuedGrandpaCommit" - ); - } - all::GrandpaCommitMessageOutcome::Discarded => { - log::debug!( - target: &self.log_target, - "Sync <= IgnoredGrandpaCommit" - ); - } - } - } - } - } - /// Sends a notification to all the notification receivers. fn dispatch_all_subscribers(&mut self, notification: Notification) { // Elements in `all_notifications` are removed one by one and inserted back if the