Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Safer blocks removal on ParachainBlockImport level overflow #1864

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions client/consensus/common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ tracing = "0.1.37"
# Substrate
sc-client-api = { git = "https://github.com/paritytech/substrate", branch = "master" }
sc-consensus = { git = "https://github.com/paritytech/substrate", branch = "master" }
sc-service = { git = "https://github.com/paritytech/substrate", branch = "master" }
sp-blockchain = { git = "https://github.com/paritytech/substrate", branch = "master" }
sp-consensus = { git = "https://github.com/paritytech/substrate", branch = "master" }
sp-runtime = { git = "https://github.com/paritytech/substrate", branch = "master" }
Expand Down
5 changes: 4 additions & 1 deletion client/consensus/common/src/level_monitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,10 @@ where

let level = self.levels.get(&number)?;

for blk_hash in level.iter().filter(|hash| **hash != best_hash) {
for blk_hash in level
.iter()
.filter(|hash| **hash != best_hash && self.freshness.get(*hash) != Some(&0u32.into()))
{
// Search for the fresher leaf information for this block
let candidate_info = leaves
.iter()
Expand Down
102 changes: 96 additions & 6 deletions client/consensus/common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,23 +14,25 @@
// You should have received a copy of the GNU General Public License
// along with Cumulus. If not, see <http://www.gnu.org/licenses/>.

use polkadot_primitives::v2::{Hash as PHash, PersistedValidationData};
use cumulus_relay_chain_interface::{RelayChainInterface, RelayChainResult};
use polkadot_primitives::v2::{Hash as RelayHash, Id as ParaId, PersistedValidationData};

use sc_client_api::Backend;
use sc_consensus::{shared_data::SharedData, BlockImport, ImportResult};
use sp_runtime::traits::{Block as BlockT, Header as HeaderT};
use sc_service::SpawnTaskHandle;
use sp_runtime::traits::{Block as BlockT, Header as HeaderT, Zero};

use std::sync::Arc;

mod level_monitor;
mod parachain_consensus;

#[cfg(test)]
mod tests;

pub use parachain_consensus::run_parachain_consensus;

use level_monitor::LevelMonitor;
pub use level_monitor::{LevelLimit, MAX_LEAVES_PER_LEVEL_SENSIBLE_DEFAULT};
pub use parachain_consensus::run_parachain_consensus;

/// The result of [`ParachainConsensus::produce_candidate`].
pub struct ParachainCandidate<B> {
Expand Down Expand Up @@ -59,7 +61,7 @@ pub trait ParachainConsensus<B: BlockT>: Send + Sync + dyn_clone::DynClone {
async fn produce_candidate(
&mut self,
parent: &B::Header,
relay_parent: PHash,
relay_parent: RelayHash,
validation_data: &PersistedValidationData,
) -> Option<ParachainCandidate<B>>;
}
Expand All @@ -71,7 +73,7 @@ impl<B: BlockT> ParachainConsensus<B> for Box<dyn ParachainConsensus<B> + Send +
async fn produce_candidate(
&mut self,
parent: &B::Header,
relay_parent: PHash,
relay_parent: RelayHash,
validation_data: &PersistedValidationData,
) -> Option<ParachainCandidate<B>> {
(*self).produce_candidate(parent, relay_parent, validation_data).await
Expand Down Expand Up @@ -112,6 +114,94 @@ impl<Block: BlockT, BI, BE: Backend<Block>> ParachainBlockImport<Block, BI, BE>

Self { inner, monitor }
}

/// EXPERIMENTAL - AND NOT THE DEFINITIVE INTERFACE
///
/// Spawns an observer task preventing the level monitor from removing the blocks that
/// were already available on the relay chain.
pub fn spawn_relay_chain_observer<RC>(
&self,
relay_chain: RC,
para_id: ParaId,
spawn_handle: SpawnTaskHandle,
) where
RC: RelayChainInterface + Clone + 'static,
BE: 'static,
{
let monitor = match self.monitor.as_ref() {
Some(m) => m.clone(),
None => {
eprintln!("YYY Monitor not instanced without level limit");
return
},
};

let relay_clone = relay_chain.clone();
let task = async move {
// TODO: is observer exit a FATAL error?
match run_relay_chain_observer::<Block, BE>(relay_clone, para_id, monitor).await {
Ok(_) => eprintln!("YYY EXITED"),
Err(e) => eprintln!("YYY Relay chain observer is DEAD: {:?}", e.to_string()),
}
};

spawn_handle.spawn("relay chain listener", None, task);
}
}

// EXPERIMENTAL
//
// The following code aims to keep the monitor more resilient and should
// prevent the removal of blocks marked as available by the relay chain.

async fn run_relay_chain_observer<Block: BlockT, BE: Backend<Block>>(
relay_chain: impl RelayChainInterface,
para_id: ParaId,
monitor: SharedData<LevelMonitor<Block, BE>>,
) -> RelayChainResult<()> {
use futures::StreamExt;

eprintln!("YYY START IMPORT LISTENER");
let mut imports = relay_chain.import_notification_stream().await?;
loop {
let pheader = match imports.next().await {
Some(h) => h,
None => {
eprintln!("YYY spurious notificaiton");
continue
},
};
let hash = pheader.hash();
eprintln!("YYY: imported {:?}", hash);

let validation_data = relay_chain
.persisted_validation_data(
hash,
para_id,
polkadot_primitives::v2::OccupiedCoreAssumption::TimedOut,
)
.await?;
let validation_data = match validation_data {
Some(v) => v,
None => {
eprintln!("YYY no state associated");
continue
},
};

use codec::Decode;
let para_head = Block::Header::decode(&mut &validation_data.parent_head.0[..])?;
let para_hash = para_head.hash();

eprintln!("YYY: setting monitor availability for {:?}", para_hash);

if let Some(entry) = monitor.shared_data().freshness.get_mut(&para_hash) {
eprintln!("YYY: Updated");
*entry = Zero::zero();
}

eprintln!("YYY: {:#?}", para_head);
}
}

impl<Block: BlockT, I: Clone, BE> Clone for ParachainBlockImport<Block, I, BE> {
Expand Down
6 changes: 6 additions & 0 deletions polkadot-parachain/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -536,6 +536,12 @@ where
s => s.to_string().into(),
})?;

block_import.spawn_relay_chain_observer(
relay_chain_interface.clone(),
para_id,
task_manager.spawn_handle(),
);

let block_announce_validator =
BlockAnnounceValidator::new(relay_chain_interface.clone(), para_id);

Expand Down