diff --git a/node/core/provisioner/src/error.rs b/node/core/provisioner/src/error.rs
index 7f5807c7c7a3..4589ab02cf31 100644
--- a/node/core/provisioner/src/error.rs
+++ b/node/core/provisioner/src/error.rs
@@ -15,16 +15,18 @@
// along with Polkadot. If not, see .
///! Error types for provisioner module
-use fatality;
+use fatality::Nested;
use futures::channel::{mpsc, oneshot};
-use polkadot_node_subsystem::errors::{ChainApiError, RuntimeApiError};
+use polkadot_node_subsystem::errors::{ChainApiError, RuntimeApiError, SubsystemError};
use polkadot_node_subsystem_util as util;
use polkadot_primitives::v2::Hash;
-use thiserror::Error;
+
+pub type FatalResult = std::result::Result;
+pub type Result = std::result::Result;
/// Errors in the provisioner.
-#[derive(Debug, Error)]
#[allow(missing_docs)]
+#[fatality::fatality(splitable)]
pub enum Error {
#[error(transparent)]
Util(#[from] util::Error),
@@ -63,6 +65,13 @@ pub enum Error {
"backed candidate does not correspond to selected candidate; check logic in provisioner"
)]
BackedCandidateOrderingProblem,
+
+ #[fatal]
+ #[error("Failed to spawn background task")]
+ FailedToSpawnBackgroundTask,
+
+ #[error(transparent)]
+ SubsystemError(#[from] SubsystemError),
}
/// Used by `get_onchain_disputes` to represent errors related to fetching on-chain disputes from the Runtime
@@ -81,3 +90,20 @@ pub enum GetOnchainDisputesError {
)]
NotSupported(#[source] RuntimeApiError, Hash),
}
+
+pub fn log_error(result: Result<()>) -> std::result::Result<(), FatalError> {
+ match result.into_nested()? {
+ Ok(()) => Ok(()),
+ Err(jfyi) => {
+ jfyi.log();
+ Ok(())
+ },
+ }
+}
+
+impl JfyiError {
+ /// Log a `JfyiError`.
+ pub fn log(self) {
+ gum::debug!(target: super::LOG_TARGET, error = ?self);
+ }
+}
diff --git a/node/core/provisioner/src/lib.rs b/node/core/provisioner/src/lib.rs
index bef9c6423524..4c41eb8256a7 100644
--- a/node/core/provisioner/src/lib.rs
+++ b/node/core/provisioner/src/lib.rs
@@ -21,10 +21,10 @@
use bitvec::vec::BitVec;
use futures::{
- channel::{mpsc, oneshot},
- prelude::*,
+ channel::oneshot, future::BoxFuture, prelude::*, stream::FuturesUnordered, FutureExt,
};
use futures_timer::Delay;
+
use polkadot_node_primitives::CandidateVotes;
use polkadot_node_subsystem::{
jaeger,
@@ -32,28 +32,23 @@ use polkadot_node_subsystem::{
CandidateBackingMessage, ChainApiMessage, DisputeCoordinatorMessage, ProvisionableData,
ProvisionerInherentData, ProvisionerMessage,
},
- overseer, ActivatedLeaf, LeafStatus, PerLeafSpan,
-};
-use polkadot_node_subsystem_util::{
- request_availability_cores, request_persisted_validation_data, JobSender, JobSubsystem,
- JobTrait,
+ overseer, ActivatedLeaf, ActiveLeavesUpdate, FromOrchestra, LeafStatus, OverseerSignal,
+ PerLeafSpan, SpawnedSubsystem, SubsystemError,
};
+use polkadot_node_subsystem_util::{request_availability_cores, request_persisted_validation_data};
use polkadot_primitives::v2::{
BackedCandidate, BlockNumber, CandidateHash, CandidateReceipt, CoreState, DisputeState,
DisputeStatement, DisputeStatementSet, Hash, MultiDisputeStatementSet, OccupiedCoreAssumption,
SessionIndex, SignedAvailabilityBitfield, ValidatorIndex,
};
-use std::{
- collections::{BTreeMap, HashMap, HashSet},
- pin::Pin,
-};
+use std::collections::{BTreeMap, HashMap, HashSet};
mod error;
mod metrics;
mod onchain_disputes;
pub use self::metrics::*;
-use error::Error;
+use error::{Error, FatalResult};
#[cfg(test)]
mod tests;
@@ -63,198 +58,230 @@ const PRE_PROPOSE_TIMEOUT: std::time::Duration = core::time::Duration::from_mill
const LOG_TARGET: &str = "parachain::provisioner";
-enum InherentAfter {
- Ready,
- Wait(Delay),
+/// The provisioner subsystem.
+pub struct ProvisionerSubsystem {
+ metrics: Metrics,
}
-impl InherentAfter {
- fn new_from_now() -> Self {
- InherentAfter::Wait(Delay::new(PRE_PROPOSE_TIMEOUT))
- }
-
- fn is_ready(&self) -> bool {
- match *self {
- InherentAfter::Ready => true,
- InherentAfter::Wait(_) => false,
- }
- }
-
- async fn ready(&mut self) {
- match *self {
- InherentAfter::Ready => {
- // Make sure we never end the returned future.
- // This is required because the `select!` that calls this future will end in a busy loop.
- futures::pending!()
- },
- InherentAfter::Wait(ref mut d) => {
- d.await;
- *self = InherentAfter::Ready;
- },
- }
+impl ProvisionerSubsystem {
+ /// Create a new instance of the `ProvisionerSubsystem`.
+ pub fn new(metrics: Metrics) -> Self {
+ Self { metrics }
}
}
-/// Provisioner run arguments.
-#[derive(Debug, Clone, Copy)]
-pub struct ProvisionerConfig;
-
-/// A per-relay-parent job for the provisioning subsystem.
-pub struct ProvisionerJob {
+/// A per-relay-parent state for the provisioning subsystem.
+pub struct PerRelayParent {
leaf: ActivatedLeaf,
- receiver: mpsc::Receiver,
backed_candidates: Vec,
signed_bitfields: Vec,
- metrics: Metrics,
- inherent_after: InherentAfter,
+ is_inherent_ready: bool,
awaiting_inherent: Vec>,
- _phantom: std::marker::PhantomData,
+ span: PerLeafSpan,
}
-impl JobTrait for ProvisionerJob
-where
- Sender: overseer::ProvisionerSenderTrait + std::marker::Unpin,
-{
- type ToJob = ProvisionerMessage;
- type OutgoingMessages = overseer::ProvisionerOutgoingMessages;
- type Sender = Sender;
- type Error = Error;
- type RunArgs = ProvisionerConfig;
- type Metrics = Metrics;
+impl PerRelayParent {
+ fn new(leaf: ActivatedLeaf) -> Self {
+ let span = PerLeafSpan::new(leaf.span.clone(), "provisioner");
+
+ Self {
+ leaf,
+ backed_candidates: Vec::new(),
+ signed_bitfields: Vec::new(),
+ is_inherent_ready: false,
+ awaiting_inherent: Vec::new(),
+ span,
+ }
+ }
+}
- const NAME: &'static str = "provisioner-job";
+type InherentDelays = FuturesUnordered>;
- /// Run a job for the parent block indicated
- //
- // this function is in charge of creating and executing the job's main loop
- fn run(
- leaf: ActivatedLeaf,
- _: Self::RunArgs,
- metrics: Self::Metrics,
- receiver: mpsc::Receiver,
- mut sender: JobSender,
- ) -> Pin> + Send>> {
- let span = leaf.span.clone();
- async move {
- let job = ProvisionerJob::new(leaf, metrics, receiver);
-
- job.run_loop(sender.subsystem_sender(), PerLeafSpan::new(span, "provisioner"))
+#[overseer::subsystem(Provisioner, error=SubsystemError, prefix=self::overseer)]
+impl ProvisionerSubsystem {
+ fn start(self, ctx: Context) -> SpawnedSubsystem {
+ let future = async move {
+ run(ctx, self.metrics)
.await
+ .map_err(|e| SubsystemError::with_origin("provisioner", e))
}
- .boxed()
+ .boxed();
+
+ SpawnedSubsystem { name: "provisioner-subsystem", future }
}
}
-impl ProvisionerJob
-where
- Sender: overseer::ProvisionerSenderTrait,
-{
- fn new(
- leaf: ActivatedLeaf,
- metrics: Metrics,
- receiver: mpsc::Receiver,
- ) -> Self {
- Self {
- leaf,
- receiver,
- backed_candidates: Vec::new(),
- signed_bitfields: Vec::new(),
- metrics,
- inherent_after: InherentAfter::new_from_now(),
- awaiting_inherent: Vec::new(),
- _phantom: std::marker::PhantomData::::default(),
+#[overseer::contextbounds(Provisioner, prefix = self::overseer)]
+async fn run(mut ctx: Context, metrics: Metrics) -> FatalResult<()> {
+ let mut inherent_delays = InherentDelays::new();
+ let mut per_relay_parent = HashMap::new();
+
+ loop {
+ let result =
+ run_iteration(&mut ctx, &mut per_relay_parent, &mut inherent_delays, &metrics).await;
+
+ match result {
+ Ok(()) => break,
+ err => crate::error::log_error(err)?,
}
}
- async fn run_loop(mut self, sender: &mut Sender, span: PerLeafSpan) -> Result<(), Error> {
- loop {
- futures::select! {
- msg = self.receiver.next() => match msg {
- Some(ProvisionerMessage::RequestInherentData(_, return_sender)) => {
- let _span = span.child("req-inherent-data");
- let _timer = self.metrics.time_request_inherent_data();
-
- if self.inherent_after.is_ready() {
- self.send_inherent_data(sender, vec![return_sender]).await;
- } else {
- self.awaiting_inherent.push(return_sender);
- }
- }
- Some(ProvisionerMessage::ProvisionableData(_, data)) => {
- let span = span.child("provisionable-data");
- let _timer = self.metrics.time_provisionable_data();
+ Ok(())
+}
- self.note_provisionable_data(&span, data);
- }
- None => break,
- },
- _ = self.inherent_after.ready().fuse() => {
- let _span = span.child("send-inherent-data");
- let return_senders = std::mem::take(&mut self.awaiting_inherent);
+#[overseer::contextbounds(Provisioner, prefix = self::overseer)]
+async fn run_iteration(
+ ctx: &mut Context,
+ per_relay_parent: &mut HashMap,
+ inherent_delays: &mut InherentDelays,
+ metrics: &Metrics,
+) -> Result<(), Error> {
+ loop {
+ futures::select! {
+ from_overseer = ctx.recv().fuse() => {
+ match from_overseer? {
+ FromOrchestra::Signal(OverseerSignal::ActiveLeaves(update)) =>
+ handle_active_leaves_update(update, per_relay_parent, inherent_delays),
+ FromOrchestra::Signal(OverseerSignal::BlockFinalized(..)) => {},
+ FromOrchestra::Signal(OverseerSignal::Conclude) => return Ok(()),
+ FromOrchestra::Communication { msg } => {
+ handle_communication(ctx, per_relay_parent, msg, metrics).await?;
+ },
+ }
+ },
+ hash = inherent_delays.select_next_some() => {
+ if let Some(state) = per_relay_parent.get_mut(&hash) {
+ state.is_inherent_ready = true;
+
+ let return_senders = std::mem::take(&mut state.awaiting_inherent);
if !return_senders.is_empty() {
- self.send_inherent_data(sender, return_senders).await;
+ send_inherent_data_bg(ctx, &state, return_senders, metrics.clone()).await?;
}
}
}
}
+ }
+}
- Ok(())
+fn handle_active_leaves_update(
+ update: ActiveLeavesUpdate,
+ per_relay_parent: &mut HashMap,
+ inherent_delays: &mut InherentDelays,
+) {
+ for deactivated in &update.deactivated {
+ per_relay_parent.remove(deactivated);
+ }
+
+ for leaf in update.activated {
+ let delay_fut = Delay::new(PRE_PROPOSE_TIMEOUT).map(move |_| leaf.hash).boxed();
+ per_relay_parent.insert(leaf.hash, PerRelayParent::new(leaf));
+ inherent_delays.push(delay_fut);
+ }
+}
+
+#[overseer::contextbounds(Provisioner, prefix = self::overseer)]
+async fn handle_communication(
+ ctx: &mut Context,
+ per_relay_parent: &mut HashMap,
+ message: ProvisionerMessage,
+ metrics: &Metrics,
+) -> Result<(), Error> {
+ match message {
+ ProvisionerMessage::RequestInherentData(relay_parent, return_sender) => {
+ if let Some(state) = per_relay_parent.get_mut(&relay_parent) {
+ if state.is_inherent_ready {
+ send_inherent_data_bg(ctx, &state, vec![return_sender], metrics.clone())
+ .await?;
+ } else {
+ state.awaiting_inherent.push(return_sender);
+ }
+ }
+ },
+ ProvisionerMessage::ProvisionableData(relay_parent, data) => {
+ if let Some(state) = per_relay_parent.get_mut(&relay_parent) {
+ let span = state.span.child("provisionable-data");
+ let _timer = metrics.time_provisionable_data();
+
+ note_provisionable_data(state, &span, data);
+ }
+ },
}
- async fn send_inherent_data(
- &mut self,
- sender: &mut Sender,
- return_senders: Vec>,
- ) {
+ Ok(())
+}
+
+#[overseer::contextbounds(Provisioner, prefix = self::overseer)]
+async fn send_inherent_data_bg(
+ ctx: &mut Context,
+ per_relay_parent: &PerRelayParent,
+ return_senders: Vec>,
+ metrics: Metrics,
+) -> Result<(), Error> {
+ let leaf = per_relay_parent.leaf.clone();
+ let signed_bitfields = per_relay_parent.signed_bitfields.clone();
+ let backed_candidates = per_relay_parent.backed_candidates.clone();
+ let span = per_relay_parent.span.child("req-inherent-data");
+
+ let mut sender = ctx.sender().clone();
+
+ let bg = async move {
+ let _span = span;
+ let _timer = metrics.time_request_inherent_data();
+
if let Err(err) = send_inherent_data(
- &self.leaf,
- &self.signed_bitfields,
- &self.backed_candidates,
+ &leaf,
+ &signed_bitfields,
+ &backed_candidates,
return_senders,
- sender,
- &self.metrics,
+ &mut sender,
+ &metrics,
)
.await
{
gum::warn!(target: LOG_TARGET, err = ?err, "failed to assemble or send inherent data");
- self.metrics.on_inherent_data_request(Err(()));
+ metrics.on_inherent_data_request(Err(()));
} else {
- self.metrics.on_inherent_data_request(Ok(()));
+ metrics.on_inherent_data_request(Ok(()));
gum::debug!(
target: LOG_TARGET,
- signed_bitfield_count = self.signed_bitfields.len(),
- backed_candidates_count = self.backed_candidates.len(),
- leaf_hash = ?self.leaf.hash,
+ signed_bitfield_count = signed_bitfields.len(),
+ backed_candidates_count = backed_candidates.len(),
+ leaf_hash = ?leaf.hash,
"inherent data sent successfully"
);
- self.metrics.observe_inherent_data_bitfields_count(self.signed_bitfields.len());
+ metrics.observe_inherent_data_bitfields_count(signed_bitfields.len());
}
- }
+ };
- fn note_provisionable_data(
- &mut self,
- span: &jaeger::Span,
- provisionable_data: ProvisionableData,
- ) {
- match provisionable_data {
- ProvisionableData::Bitfield(_, signed_bitfield) =>
- self.signed_bitfields.push(signed_bitfield),
- ProvisionableData::BackedCandidate(backed_candidate) => {
- let candidate_hash = backed_candidate.hash();
- gum::trace!(
- target: LOG_TARGET,
- ?candidate_hash,
- para = ?backed_candidate.descriptor().para_id,
- "noted backed candidate",
- );
- let _span = span
- .child("provisionable-backed")
- .with_candidate(candidate_hash)
- .with_para_id(backed_candidate.descriptor().para_id);
- self.backed_candidates.push(backed_candidate)
- },
- _ => {},
- }
+ ctx.spawn("send-inherent-data", bg.boxed())
+ .map_err(|_| Error::FailedToSpawnBackgroundTask)?;
+
+ Ok(())
+}
+
+fn note_provisionable_data(
+ per_relay_parent: &mut PerRelayParent,
+ span: &jaeger::Span,
+ provisionable_data: ProvisionableData,
+) {
+ match provisionable_data {
+ ProvisionableData::Bitfield(_, signed_bitfield) =>
+ per_relay_parent.signed_bitfields.push(signed_bitfield),
+ ProvisionableData::BackedCandidate(backed_candidate) => {
+ let candidate_hash = backed_candidate.hash();
+ gum::trace!(
+ target: LOG_TARGET,
+ ?candidate_hash,
+ para = ?backed_candidate.descriptor().para_id,
+ "noted backed candidate",
+ );
+ let _span = span
+ .child("provisionable-backed")
+ .with_candidate(candidate_hash)
+ .with_para_id(backed_candidate.descriptor().para_id);
+ per_relay_parent.backed_candidates.push(backed_candidate)
+ },
+ _ => {},
}
}
@@ -807,6 +834,3 @@ async fn select_disputes(
})
.collect())
}
-
-/// The provisioner subsystem.
-pub type ProvisionerSubsystem = JobSubsystem, Spawner>;
diff --git a/node/core/provisioner/src/tests.rs b/node/core/provisioner/src/tests.rs
index f87fbb8ce16a..a58e22d7efc2 100644
--- a/node/core/provisioner/src/tests.rs
+++ b/node/core/provisioner/src/tests.rs
@@ -197,6 +197,7 @@ mod select_availability_bitfields {
mod common {
use super::super::*;
+ use futures::channel::mpsc;
use polkadot_node_subsystem::messages::AllMessages;
use polkadot_node_subsystem_test_helpers::TestSubsystemSender;
@@ -225,6 +226,7 @@ mod select_candidates {
scheduled_core,
};
use ::test_helpers::{dummy_candidate_descriptor, dummy_hash};
+ use futures::channel::mpsc;
use polkadot_node_subsystem::messages::{
AllMessages, RuntimeApiMessage,
RuntimeApiRequest::{
@@ -497,8 +499,8 @@ mod select_candidates {
}
mod select_disputes {
-
use super::{super::*, common::test_harness};
+ use futures::channel::mpsc;
use polkadot_node_subsystem::{
messages::{AllMessages, DisputeCoordinatorMessage, RuntimeApiMessage, RuntimeApiRequest},
RuntimeApiError,
diff --git a/node/service/src/overseer.rs b/node/service/src/overseer.rs
index 06d47d7de0c5..c1f2352dd622 100644
--- a/node/service/src/overseer.rs
+++ b/node/service/src/overseer.rs
@@ -24,18 +24,15 @@ use polkadot_node_core_av_store::Config as AvailabilityConfig;
use polkadot_node_core_candidate_validation::Config as CandidateValidationConfig;
use polkadot_node_core_chain_selection::Config as ChainSelectionConfig;
use polkadot_node_core_dispute_coordinator::Config as DisputeCoordinatorConfig;
-use polkadot_node_core_provisioner::ProvisionerConfig;
use polkadot_node_network_protocol::request_response::{v1 as request_v1, IncomingRequestReceiver};
-use polkadot_node_subsystem_types::messages::ProvisionerMessage;
#[cfg(any(feature = "malus", test))]
pub use polkadot_overseer::{
dummy::{dummy_overseer_builder, DummySubsystem},
HeadSupportsParachains,
};
use polkadot_overseer::{
- gen::SubsystemContext, metrics::Metrics as OverseerMetrics, BlockInfo,
- InitializedOverseerBuilder, MetricsTrait, Overseer, OverseerConnector, OverseerHandle,
- OverseerSubsystemContext, SpawnGlue,
+ metrics::Metrics as OverseerMetrics, BlockInfo, InitializedOverseerBuilder, MetricsTrait,
+ Overseer, OverseerConnector, OverseerHandle, SpawnGlue,
};
use polkadot_primitives::runtime_api::ParachainHost;
@@ -158,10 +155,7 @@ pub fn prepared_overseer_builder<'a, Spawner, RuntimeClient>(
AvailabilityRecoverySubsystem,
BitfieldSigningSubsystem,
BitfieldDistributionSubsystem,
- ProvisionerSubsystem<
- SpawnGlue,
- as SubsystemContext>::Sender,
- >,
+ ProvisionerSubsystem,
RuntimeApiSubsystem,
AvailabilityStoreSubsystem,
NetworkBridgeSubsystem<
@@ -249,11 +243,7 @@ where
Box::new(network_service.clone()),
Metrics::register(registry)?,
))
- .provisioner(ProvisionerSubsystem::new(
- spawner.clone(),
- ProvisionerConfig,
- Metrics::register(registry)?,
- ))
+ .provisioner(ProvisionerSubsystem::new(Metrics::register(registry)?))
.runtime_api(RuntimeApiSubsystem::new(
runtime_client.clone(),
Metrics::register(registry)?,