From 5794550e311f0f43fee201d2418eb5bf46924469 Mon Sep 17 00:00:00 2001 From: Chris Sosnin Date: Thu, 19 May 2022 19:57:48 +0300 Subject: [PATCH] Migrate provisioner subsystem --- node/core/provisioner/src/error.rs | 34 ++- node/core/provisioner/src/lib.rs | 350 +++++++++++++++-------------- node/core/provisioner/src/tests.rs | 4 +- node/service/src/overseer.rs | 18 +- 4 files changed, 224 insertions(+), 182 deletions(-) diff --git a/node/core/provisioner/src/error.rs b/node/core/provisioner/src/error.rs index 7f5807c7c7a3..4589ab02cf31 100644 --- a/node/core/provisioner/src/error.rs +++ b/node/core/provisioner/src/error.rs @@ -15,16 +15,18 @@ // along with Polkadot. If not, see . ///! Error types for provisioner module -use fatality; +use fatality::Nested; use futures::channel::{mpsc, oneshot}; -use polkadot_node_subsystem::errors::{ChainApiError, RuntimeApiError}; +use polkadot_node_subsystem::errors::{ChainApiError, RuntimeApiError, SubsystemError}; use polkadot_node_subsystem_util as util; use polkadot_primitives::v2::Hash; -use thiserror::Error; + +pub type FatalResult = std::result::Result; +pub type Result = std::result::Result; /// Errors in the provisioner. -#[derive(Debug, Error)] #[allow(missing_docs)] +#[fatality::fatality(splitable)] pub enum Error { #[error(transparent)] Util(#[from] util::Error), @@ -63,6 +65,13 @@ pub enum Error { "backed candidate does not correspond to selected candidate; check logic in provisioner" )] BackedCandidateOrderingProblem, + + #[fatal] + #[error("Failed to spawn background task")] + FailedToSpawnBackgroundTask, + + #[error(transparent)] + SubsystemError(#[from] SubsystemError), } /// Used by `get_onchain_disputes` to represent errors related to fetching on-chain disputes from the Runtime @@ -81,3 +90,20 @@ pub enum GetOnchainDisputesError { )] NotSupported(#[source] RuntimeApiError, Hash), } + +pub fn log_error(result: Result<()>) -> std::result::Result<(), FatalError> { + match result.into_nested()? { + Ok(()) => Ok(()), + Err(jfyi) => { + jfyi.log(); + Ok(()) + }, + } +} + +impl JfyiError { + /// Log a `JfyiError`. + pub fn log(self) { + gum::debug!(target: super::LOG_TARGET, error = ?self); + } +} diff --git a/node/core/provisioner/src/lib.rs b/node/core/provisioner/src/lib.rs index afbdf458746e..2f669dcb4760 100644 --- a/node/core/provisioner/src/lib.rs +++ b/node/core/provisioner/src/lib.rs @@ -21,10 +21,10 @@ use bitvec::vec::BitVec; use futures::{ - channel::{mpsc, oneshot}, - prelude::*, + channel::oneshot, future::BoxFuture, prelude::*, stream::FuturesUnordered, FutureExt, }; use futures_timer::Delay; + use polkadot_node_primitives::CandidateVotes; use polkadot_node_subsystem::{ jaeger, @@ -32,28 +32,23 @@ use polkadot_node_subsystem::{ CandidateBackingMessage, ChainApiMessage, DisputeCoordinatorMessage, ProvisionableData, ProvisionerInherentData, ProvisionerMessage, }, - overseer, ActivatedLeaf, LeafStatus, PerLeafSpan, -}; -use polkadot_node_subsystem_util::{ - request_availability_cores, request_persisted_validation_data, JobSender, JobSubsystem, - JobTrait, + overseer, ActivatedLeaf, ActiveLeavesUpdate, FromOverseer, LeafStatus, OverseerSignal, + PerLeafSpan, SpawnedSubsystem, SubsystemError, }; +use polkadot_node_subsystem_util::{request_availability_cores, request_persisted_validation_data}; use polkadot_primitives::v2::{ BackedCandidate, BlockNumber, CandidateHash, CandidateReceipt, CoreState, DisputeState, DisputeStatement, DisputeStatementSet, Hash, MultiDisputeStatementSet, OccupiedCoreAssumption, SessionIndex, SignedAvailabilityBitfield, ValidatorIndex, }; -use std::{ - collections::{BTreeMap, HashMap, HashSet}, - pin::Pin, -}; +use std::collections::{BTreeMap, HashMap, HashSet}; mod error; mod metrics; mod onchain_disputes; pub use self::metrics::*; -use error::Error; +use error::{Error, FatalResult}; #[cfg(test)] mod tests; @@ -63,197 +58,229 @@ const PRE_PROPOSE_TIMEOUT: std::time::Duration = core::time::Duration::from_mill const LOG_TARGET: &str = "parachain::provisioner"; -enum InherentAfter { - Ready, - Wait(Delay), +/// The provisioner subsystem. +pub struct ProvisionerSubsystem { + metrics: Metrics, } -impl InherentAfter { - fn new_from_now() -> Self { - InherentAfter::Wait(Delay::new(PRE_PROPOSE_TIMEOUT)) - } - - fn is_ready(&self) -> bool { - match *self { - InherentAfter::Ready => true, - InherentAfter::Wait(_) => false, - } - } - - async fn ready(&mut self) { - match *self { - InherentAfter::Ready => { - // Make sure we never end the returned future. - // This is required because the `select!` that calls this future will end in a busy loop. - futures::pending!() - }, - InherentAfter::Wait(ref mut d) => { - d.await; - *self = InherentAfter::Ready; - }, - } +impl ProvisionerSubsystem { + /// Create a new instance of the `ProvisionerSubsystem`. + pub fn new(metrics: Metrics) -> Self { + Self { metrics } } } -/// Provisioner run arguments. -#[derive(Debug, Clone, Copy)] -pub struct ProvisionerConfig; - -/// A per-relay-parent job for the provisioning subsystem. -pub struct ProvisionerJob { +/// A per-relay-parent state for the provisioning subsystem. +pub struct PerRelayParent { leaf: ActivatedLeaf, - receiver: mpsc::Receiver, backed_candidates: Vec, signed_bitfields: Vec, - metrics: Metrics, - inherent_after: InherentAfter, + is_inherent_ready: bool, awaiting_inherent: Vec>, - _phantom: std::marker::PhantomData, + span: PerLeafSpan, } -impl JobTrait for ProvisionerJob -where - Sender: overseer::ProvisionerSenderTrait + std::marker::Unpin, -{ - type ToJob = ProvisionerMessage; - type OutgoingMessages = overseer::ProvisionerOutgoingMessages; - type Sender = Sender; - type Error = Error; - type RunArgs = ProvisionerConfig; - type Metrics = Metrics; +impl PerRelayParent { + fn new(leaf: ActivatedLeaf) -> Self { + let span = PerLeafSpan::new(leaf.span.clone(), "provisioner"); + + Self { + leaf, + backed_candidates: Vec::new(), + signed_bitfields: Vec::new(), + is_inherent_ready: false, + awaiting_inherent: Vec::new(), + span, + } + } +} - const NAME: &'static str = "provisioner-job"; +type InherentDelays = FuturesUnordered>; - /// Run a job for the parent block indicated - // - // this function is in charge of creating and executing the job's main loop - fn run( - leaf: ActivatedLeaf, - _: Self::RunArgs, - metrics: Self::Metrics, - receiver: mpsc::Receiver, - mut sender: JobSender, - ) -> Pin> + Send>> { - let span = leaf.span.clone(); - async move { - let job = ProvisionerJob::new(leaf, metrics, receiver); - - job.run_loop(sender.subsystem_sender(), PerLeafSpan::new(span, "provisioner")) +#[overseer::subsystem(Provisioner, error=SubsystemError, prefix=self::overseer)] +impl ProvisionerSubsystem { + fn start(self, ctx: Context) -> SpawnedSubsystem { + let future = async move { + run(ctx, self.metrics) .await + .map_err(|e| SubsystemError::with_origin("provisioner", e)) } - .boxed() + .boxed(); + + SpawnedSubsystem { name: "provisioner-subsystem", future } } } -impl ProvisionerJob -where - Sender: overseer::ProvisionerSenderTrait, -{ - fn new( - leaf: ActivatedLeaf, - metrics: Metrics, - receiver: mpsc::Receiver, - ) -> Self { - Self { - leaf, - receiver, - backed_candidates: Vec::new(), - signed_bitfields: Vec::new(), - metrics, - inherent_after: InherentAfter::new_from_now(), - awaiting_inherent: Vec::new(), - _phantom: std::marker::PhantomData::::default(), +#[overseer::contextbounds(Provisioner, prefix = self::overseer)] +async fn run(mut ctx: Context, metrics: Metrics) -> FatalResult<()> { + let mut inherent_delays = InherentDelays::new(); + let mut per_relay_parent = HashMap::new(); + + loop { + let result = + run_iteration(&mut ctx, &mut per_relay_parent, &mut inherent_delays, &metrics).await; + + match result { + Ok(()) => break, + err => crate::error::log_error(err)?, } } - async fn run_loop(mut self, sender: &mut Sender, span: PerLeafSpan) -> Result<(), Error> { - loop { - futures::select! { - msg = self.receiver.next() => match msg { - Some(ProvisionerMessage::RequestInherentData(_, return_sender)) => { - let _span = span.child("req-inherent-data"); - let _timer = self.metrics.time_request_inherent_data(); - - if self.inherent_after.is_ready() { - self.send_inherent_data(sender, vec![return_sender]).await; - } else { - self.awaiting_inherent.push(return_sender); - } - } - Some(ProvisionerMessage::ProvisionableData(_, data)) => { - let span = span.child("provisionable-data"); - let _timer = self.metrics.time_provisionable_data(); + Ok(()) +} - self.note_provisionable_data(&span, data); - } - None => break, - }, - _ = self.inherent_after.ready().fuse() => { - let _span = span.child("send-inherent-data"); - let return_senders = std::mem::take(&mut self.awaiting_inherent); +#[overseer::contextbounds(Provisioner, prefix = self::overseer)] +async fn run_iteration( + ctx: &mut Context, + per_relay_parent: &mut HashMap, + inherent_delays: &mut InherentDelays, + metrics: &Metrics, +) -> Result<(), Error> { + loop { + futures::select! { + from_overseer = ctx.recv().fuse() => { + match from_overseer? { + FromOverseer::Signal(OverseerSignal::ActiveLeaves(update)) => + handle_active_leaves_update(update, per_relay_parent, inherent_delays), + FromOverseer::Signal(OverseerSignal::BlockFinalized(..)) => {}, + FromOverseer::Signal(OverseerSignal::Conclude) => return Ok(()), + FromOverseer::Communication { msg } => { + handle_communication(ctx, per_relay_parent, msg, metrics).await?; + }, + } + }, + hash = inherent_delays.select_next_some() => { + if let Some(state) = per_relay_parent.get_mut(&hash) { + state.is_inherent_ready = true; + + let return_senders = std::mem::take(&mut state.awaiting_inherent); if !return_senders.is_empty() { - self.send_inherent_data(sender, return_senders).await; + send_inherent_data_bg(ctx, &state, return_senders, metrics.clone()).await?; } } } } + } +} - Ok(()) +fn handle_active_leaves_update( + update: ActiveLeavesUpdate, + per_relay_parent: &mut HashMap, + inherent_delays: &mut InherentDelays, +) { + for deactivated in &update.deactivated { + per_relay_parent.remove(deactivated); + } + + for leaf in update.activated { + let delay_fut = Delay::new(PRE_PROPOSE_TIMEOUT).map(move |_| leaf.hash).boxed(); + per_relay_parent.insert(leaf.hash, PerRelayParent::new(leaf)); + inherent_delays.push(delay_fut); + } +} + +#[overseer::contextbounds(Provisioner, prefix = self::overseer)] +async fn handle_communication( + ctx: &mut Context, + per_relay_parent: &mut HashMap, + message: ProvisionerMessage, + metrics: &Metrics, +) -> Result<(), Error> { + match message { + ProvisionerMessage::RequestInherentData(relay_parent, return_sender) => { + if let Some(state) = per_relay_parent.get_mut(&relay_parent) { + if state.is_inherent_ready { + send_inherent_data_bg(ctx, &state, vec![return_sender], metrics.clone()) + .await?; + } else { + state.awaiting_inherent.push(return_sender); + } + } + }, + ProvisionerMessage::ProvisionableData(relay_parent, data) => { + if let Some(state) = per_relay_parent.get_mut(&relay_parent) { + let span = state.span.child("provisionable-data"); + let _timer = metrics.time_provisionable_data(); + + note_provisionable_data(state, &span, data); + } + }, } - async fn send_inherent_data( - &mut self, - sender: &mut Sender, - return_senders: Vec>, - ) { + Ok(()) +} + +#[overseer::contextbounds(Provisioner, prefix = self::overseer)] +async fn send_inherent_data_bg( + ctx: &mut Context, + per_relay_parent: &PerRelayParent, + return_senders: Vec>, + metrics: Metrics, +) -> Result<(), Error> { + let leaf = per_relay_parent.leaf.clone(); + let signed_bitfields = per_relay_parent.signed_bitfields.clone(); + let backed_candidates = per_relay_parent.backed_candidates.clone(); + let span = per_relay_parent.span.child("req-inherent-data"); + + let mut sender = ctx.sender().clone(); + + let bg = async move { + let _span = span; + let _timer = metrics.time_request_inherent_data(); + if let Err(err) = send_inherent_data( - &self.leaf, - &self.signed_bitfields, - &self.backed_candidates, + &leaf, + &signed_bitfields, + &backed_candidates, return_senders, - sender, - &self.metrics, + &mut sender, + &metrics, ) .await { gum::warn!(target: LOG_TARGET, err = ?err, "failed to assemble or send inherent data"); - self.metrics.on_inherent_data_request(Err(())); + metrics.on_inherent_data_request(Err(())); } else { - self.metrics.on_inherent_data_request(Ok(())); + metrics.on_inherent_data_request(Ok(())); gum::debug!( target: LOG_TARGET, - signed_bitfield_count = self.signed_bitfields.len(), - backed_candidates_count = self.backed_candidates.len(), - leaf_hash = ?self.leaf.hash, + signed_bitfield_count = signed_bitfields.len(), + backed_candidates_count = backed_candidates.len(), + leaf_hash = ?leaf.hash, "inherent data sent successfully" ); } - } + }; - fn note_provisionable_data( - &mut self, - span: &jaeger::Span, - provisionable_data: ProvisionableData, - ) { - match provisionable_data { - ProvisionableData::Bitfield(_, signed_bitfield) => - self.signed_bitfields.push(signed_bitfield), - ProvisionableData::BackedCandidate(backed_candidate) => { - let candidate_hash = backed_candidate.hash(); - gum::trace!( - target: LOG_TARGET, - ?candidate_hash, - para = ?backed_candidate.descriptor().para_id, - "noted backed candidate", - ); - let _span = span - .child("provisionable-backed") - .with_candidate(candidate_hash) - .with_para_id(backed_candidate.descriptor().para_id); - self.backed_candidates.push(backed_candidate) - }, - _ => {}, - } + ctx.spawn("send-inherent-data", bg.boxed()) + .map_err(|_| Error::FailedToSpawnBackgroundTask)?; + + Ok(()) +} + +fn note_provisionable_data( + per_relay_parent: &mut PerRelayParent, + span: &jaeger::Span, + provisionable_data: ProvisionableData, +) { + match provisionable_data { + ProvisionableData::Bitfield(_, signed_bitfield) => + per_relay_parent.signed_bitfields.push(signed_bitfield), + ProvisionableData::BackedCandidate(backed_candidate) => { + let candidate_hash = backed_candidate.hash(); + gum::trace!( + target: LOG_TARGET, + ?candidate_hash, + para = ?backed_candidate.descriptor().para_id, + "noted backed candidate", + ); + let _span = span + .child("provisionable-backed") + .with_candidate(candidate_hash) + .with_para_id(backed_candidate.descriptor().para_id); + per_relay_parent.backed_candidates.push(backed_candidate) + }, + _ => {}, } } @@ -806,6 +833,3 @@ async fn select_disputes( }) .collect()) } - -/// The provisioner subsystem. -pub type ProvisionerSubsystem = JobSubsystem, Spawner>; diff --git a/node/core/provisioner/src/tests.rs b/node/core/provisioner/src/tests.rs index f87fbb8ce16a..a58e22d7efc2 100644 --- a/node/core/provisioner/src/tests.rs +++ b/node/core/provisioner/src/tests.rs @@ -197,6 +197,7 @@ mod select_availability_bitfields { mod common { use super::super::*; + use futures::channel::mpsc; use polkadot_node_subsystem::messages::AllMessages; use polkadot_node_subsystem_test_helpers::TestSubsystemSender; @@ -225,6 +226,7 @@ mod select_candidates { scheduled_core, }; use ::test_helpers::{dummy_candidate_descriptor, dummy_hash}; + use futures::channel::mpsc; use polkadot_node_subsystem::messages::{ AllMessages, RuntimeApiMessage, RuntimeApiRequest::{ @@ -497,8 +499,8 @@ mod select_candidates { } mod select_disputes { - use super::{super::*, common::test_harness}; + use futures::channel::mpsc; use polkadot_node_subsystem::{ messages::{AllMessages, DisputeCoordinatorMessage, RuntimeApiMessage, RuntimeApiRequest}, RuntimeApiError, diff --git a/node/service/src/overseer.rs b/node/service/src/overseer.rs index 604fe67a673a..22a5fe0cce9c 100644 --- a/node/service/src/overseer.rs +++ b/node/service/src/overseer.rs @@ -22,18 +22,15 @@ use polkadot_node_core_av_store::Config as AvailabilityConfig; use polkadot_node_core_candidate_validation::Config as CandidateValidationConfig; use polkadot_node_core_chain_selection::Config as ChainSelectionConfig; use polkadot_node_core_dispute_coordinator::Config as DisputeCoordinatorConfig; -use polkadot_node_core_provisioner::ProvisionerConfig; use polkadot_node_network_protocol::request_response::{v1 as request_v1, IncomingRequestReceiver}; -use polkadot_node_subsystem_types::messages::ProvisionerMessage; #[cfg(any(feature = "malus", test))] pub use polkadot_overseer::{ dummy::{dummy_overseer_builder, DummySubsystem}, HeadSupportsParachains, }; use polkadot_overseer::{ - gen::SubsystemContext, metrics::Metrics as OverseerMetrics, BlockInfo, - InitializedOverseerBuilder, MetricsTrait, Overseer, OverseerConnector, OverseerHandle, - OverseerSubsystemContext, + metrics::Metrics as OverseerMetrics, BlockInfo, InitializedOverseerBuilder, MetricsTrait, + Overseer, OverseerConnector, OverseerHandle, }; use polkadot_primitives::runtime_api::ParachainHost; @@ -156,10 +153,7 @@ pub fn prepared_overseer_builder<'a, Spawner, RuntimeClient>( AvailabilityRecoverySubsystem, BitfieldSigningSubsystem, BitfieldDistributionSubsystem, - ProvisionerSubsystem< - Spawner, - as SubsystemContext>::Sender, - >, + ProvisionerSubsystem, RuntimeApiSubsystem, AvailabilityStoreSubsystem, NetworkBridgeSubsystem< @@ -245,11 +239,7 @@ where Box::new(network_service.clone()), Metrics::register(registry)?, )) - .provisioner(ProvisionerSubsystem::new( - spawner.clone(), - ProvisionerConfig, - Metrics::register(registry)?, - )) + .provisioner(ProvisionerSubsystem::new(Metrics::register(registry)?)) .runtime_api(RuntimeApiSubsystem::new( runtime_client.clone(), Metrics::register(registry)?,