diff --git a/Cargo.lock b/Cargo.lock index 4de0d3367c..b30a529614 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -9255,6 +9255,7 @@ dependencies = [ "parking_lot 0.12.1", "rand 0.8.5", "rand_chacha 0.3.1", + "rayon", "sc-client-api", "sc-consensus", "sc-consensus-slots", diff --git a/crates/sc-consensus-subspace/Cargo.toml b/crates/sc-consensus-subspace/Cargo.toml index 5dd9a97dd9..e6af48e259 100644 --- a/crates/sc-consensus-subspace/Cargo.toml +++ b/crates/sc-consensus-subspace/Cargo.toml @@ -25,6 +25,7 @@ parking_lot = "0.12.1" prometheus-endpoint = { package = "substrate-prometheus-endpoint", git = "https://github.com/subspace/polkadot-sdk", rev = "20be5f33a3d2b3f4b31a894f9829184b29fba3ef", version = "0.10.0-dev" } rand = "0.8.5" rand_chacha = "0.3.1" +rayon = "1.7.0" schnorrkel = "0.9.1" sc-client-api = { version = "4.0.0-dev", git = "https://github.com/subspace/polkadot-sdk", rev = "20be5f33a3d2b3f4b31a894f9829184b29fba3ef" } sc-consensus = { version = "0.10.0-dev", git = "https://github.com/subspace/polkadot-sdk", rev = "20be5f33a3d2b3f4b31a894f9829184b29fba3ef" } diff --git a/crates/sc-consensus-subspace/src/archiver.rs b/crates/sc-consensus-subspace/src/archiver.rs index 4e74a6ac37..815c0c8472 100644 --- a/crates/sc-consensus-subspace/src/archiver.rs +++ b/crates/sc-consensus-subspace/src/archiver.rs @@ -25,10 +25,12 @@ use crate::{ }; use codec::{Decode, Encode}; use futures::StreamExt; -use log::{debug, error, info, warn}; +use log::{debug, info, warn}; use parking_lot::Mutex; use rand::prelude::*; use rand_chacha::ChaCha8Rng; +use rayon::prelude::*; +use rayon::ThreadPoolBuilder; use sc_client_api::{AuxStore, Backend as BackendT, BlockBackend, Finalizer, LockImportRun}; use sc_telemetry::{telemetry, TelemetryHandle, CONSENSUS_INFO}; use sc_utils::mpsc::tracing_unbounded; @@ -48,6 +50,9 @@ use subspace_core_primitives::crypto::kzg::Kzg; use subspace_core_primitives::objects::BlockObjectMapping; use subspace_core_primitives::{BlockNumber, RecordedHistorySegment, SegmentHeader, SegmentIndex}; +/// This corresponds to default value of `--max-runtime-instances` in Substrate +const BLOCKS_TO_ARCHIVE_CONCURRENCY: usize = 8; + #[derive(Debug)] struct SegmentHeadersStoreInner { aux_store: Arc, @@ -78,7 +83,7 @@ where const INITIAL_CACHE_CAPACITY: usize = 1_000; /// Create new instance - pub fn new(aux_store: Arc) -> Result { + pub fn new(aux_store: Arc) -> sp_blockchain::Result { let mut cache = Vec::with_capacity(Self::INITIAL_CACHE_CAPACITY); let mut next_key_index = 0; @@ -121,7 +126,7 @@ where pub fn add_segment_headers( &self, segment_headers: &[SegmentHeader], - ) -> Result<(), sp_blockchain::Error> { + ) -> sp_blockchain::Result<()> { let mut maybe_last_segment_index = self.max_segment_index(); let mut segment_headers_to_store = Vec::with_capacity(segment_headers.len()); for segment_header in segment_headers { @@ -201,139 +206,63 @@ where /// https://github.com/paritytech/substrate/discussions/14359 pub(crate) const FINALIZATION_DEPTH_IN_SEGMENTS: usize = 5; -fn find_last_archived_block( +fn find_last_archived_block( client: &Client, - best_block_hash: Block::Hash, -) -> Option<(SegmentHeader, Block, BlockObjectMapping)> + segment_headers_store: &SegmentHeadersStore, +) -> sp_blockchain::Result> where Block: BlockT, Client: ProvideRuntimeApi + BlockBackend + HeaderBackend, Client::Api: SubspaceApi + ObjectsApi, + AS: AuxStore, { - let mut block_to_check = best_block_hash; - let last_segment_header = 'outer: loop { - let block = client - .block(block_to_check) - .expect("Older blocks should always exist") - .expect("Older blocks should always exist"); - - for extrinsic in block.block.extrinsics() { - match client - .runtime_api() - .extract_segment_headers(block_to_check, extrinsic) - { - Ok(Some(segment_headers)) => { - break 'outer segment_headers.into_iter().last()?; - } - Ok(None) => { - // Some other extrinsic, ignore - } - Err(error) => { - // TODO: Probably light client, can this even happen? - panic!( - "Failed to make runtime API call during last archived block search: \ - {error:?}" - ); - } - } - } - - let parent_block_hash = *block.block.header().parent_hash(); - - if parent_block_hash == Block::Hash::default() { - // Genesis block, nothing else to check - return None; - } - - block_to_check = parent_block_hash; - }; - - let last_archived_block_number = last_segment_header.last_archived_block().number; - - let last_archived_block = loop { - let block = client - .block(block_to_check) - .expect("Older blocks must always exist") - .expect("Older blocks must always exist") - .block; - - if *block.header().number() == last_archived_block_number.into() { - break block; - } - - block_to_check = *block.header().parent_hash(); + let Some(max_segment_index) = segment_headers_store.max_segment_index() else { + return Ok(None); }; - let last_archived_block_hash = block_to_check; - - let block_object_mappings = client - .runtime_api() - .validated_object_call_hashes(last_archived_block_hash) - .and_then(|calls| { - client.runtime_api().extract_block_object_mapping( - *last_archived_block.header().parent_hash(), - last_archived_block.clone(), - calls, - ) - }) - .unwrap_or_default(); + if max_segment_index == SegmentIndex::ZERO { + // Just genesis, nothing else to check + return Ok(None); + } - Some(( - last_segment_header, - last_archived_block, - block_object_mappings, - )) -} + for segment_header in (SegmentIndex::ZERO..=max_segment_index) + .rev() + .filter_map(|segment_index| segment_headers_store.get_segment_header(segment_index)) + { + let last_archived_block_number = segment_header.last_archived_block().number; + let Some(last_archived_block_hash) = client.hash(last_archived_block_number.into())? else { + // This block number is not in our chain yet (segment headers store may know about more + // blocks in existence than is currently imported) + continue; + }; -struct BlockHashesToArchive -where - Block: BlockT, -{ - block_hashes: Vec, - best_archived: Option<(Block::Hash, NumberFor)>, -} + let last_segment_header = segment_header; -fn block_hashes_to_archive( - client: &Client, - best_block_hash: Block::Hash, - blocks_to_archive_from: NumberFor, - blocks_to_archive_to: NumberFor, -) -> BlockHashesToArchive -where - Block: BlockT, - Client: HeaderBackend, -{ - let block_range = blocks_to_archive_from..=blocks_to_archive_to; - let mut block_hashes = Vec::new(); - let mut block_hash_to_check = best_block_hash; - let mut best_archived = None; - - loop { - // TODO: `Error` here must be handled instead - let header = client - .header(block_hash_to_check) - .expect("Parent block must exist; qed") - .expect("Parent block must exist; qed"); - - if block_range.contains(header.number()) { - block_hashes.push(block_hash_to_check); - - if best_archived.is_none() { - best_archived.replace((block_hash_to_check, *header.number())); - } - } + let last_archived_block = client + .block(last_archived_block_hash)? + .expect("Last archived block must always be retrievable; qed") + .block; - if *header.number() == blocks_to_archive_from { - break; - } + let block_object_mappings = client + .runtime_api() + .validated_object_call_hashes(last_archived_block_hash) + .and_then(|calls| { + client.runtime_api().extract_block_object_mapping( + *last_archived_block.header().parent_hash(), + last_archived_block.clone(), + calls, + ) + }) + .unwrap_or_default(); - block_hash_to_check = *header.parent_hash(); + return Ok(Some(( + last_segment_header, + last_archived_block, + block_object_mappings, + ))); } - BlockHashesToArchive { - block_hashes, - best_archived, - } + Ok(None) } /// Derive genesis segment on demand, returns `Ok(None)` in case genesis block was already pruned @@ -414,12 +343,11 @@ where } fn initialize_archiver( - best_block_hash: Block::Hash, best_block_number: NumberFor, segment_headers_store: &SegmentHeadersStore, subspace_link: &SubspaceLink, client: &Client, -) -> InitializedArchiver +) -> sp_blockchain::Result> where Block: BlockT, Client: ProvideRuntimeApi + BlockBackend + HeaderBackend + AuxStore, @@ -430,7 +358,7 @@ where .expect("Must always be able to get chain constants") .confirmation_depth_k(); - let maybe_last_archived_block = find_last_archived_block(client, best_block_hash); + let maybe_last_archived_block = find_last_archived_block(client, segment_headers_store)?; let have_last_segment_header = maybe_last_archived_block.is_some(); let mut best_archived_block = None; @@ -505,40 +433,60 @@ where blocks_to_archive_to, ); - let block_hashes_to_archive = block_hashes_to_archive( - client, - best_block_hash, - blocks_to_archive_from.into(), - blocks_to_archive_to.into(), - ); - best_archived_block = block_hashes_to_archive.best_archived; - let block_hashes_to_archive = block_hashes_to_archive.block_hashes; - - for block_hash_to_archive in block_hashes_to_archive.into_iter().rev() { - let block = client - .block(block_hash_to_archive) - .expect("Older block by number must always exist") - .expect("Older block by number must always exist") - .block; - let block_number_to_archive = *block.header().number(); + let thread_pool = ThreadPoolBuilder::new() + .num_threads(BLOCKS_TO_ARCHIVE_CONCURRENCY) + .build() + .map_err(|error| { + sp_blockchain::Error::Backend(format!( + "Failed to create thread pool for archiver initialization: {error}" + )) + })?; + // We need to limit number of threads to avoid running out of WASM instances + let blocks_to_archive = thread_pool.install(|| { + (blocks_to_archive_from..=blocks_to_archive_to) + .into_par_iter() + .map_init( + || client.runtime_api(), + |runtime_api, block_number| { + let block_hash = client + .hash(block_number.into())? + .expect("All blocks since last archived must be present; qed"); + + let block = client + .block(block_hash)? + .expect("All blocks since last archived must be present; qed") + .block; + + let block_object_mappings = runtime_api + .validated_object_call_hashes(block_hash) + .and_then(|calls| { + client.runtime_api().extract_block_object_mapping( + *block.header().parent_hash(), + block.clone(), + calls, + ) + }) + .unwrap_or_default(); + + Ok((block, block_object_mappings)) + }, + ) + .collect::>>() + })?; - let block_object_mappings = client - .runtime_api() - .validated_object_call_hashes(block_hash_to_archive) - .and_then(|calls| { - client.runtime_api().extract_block_object_mapping( - *block.header().parent_hash(), - block.clone(), - calls, - ) - }) - .unwrap_or_default(); + best_archived_block = blocks_to_archive + .last() + .map(|(block, _block_object_mappings)| (block.hash(), *block.header().number())); + + for (block, block_object_mappings) in blocks_to_archive { + let block_number_to_archive = *block.header().number(); let encoded_block = if block_number_to_archive.is_zero() { encode_genesis_block(&block) } else { block.encode() }; + debug!( target: "subspace", "Encoded block {} has size of {:.2} kiB", @@ -556,11 +504,7 @@ where older_archived_segments.extend(archived_segments); if !new_segment_headers.is_empty() { - if let Err(error) = - segment_headers_store.add_segment_headers(&new_segment_headers) - { - panic!("Failed to store segment headers: {error}"); - } + segment_headers_store.add_segment_headers(&new_segment_headers)?; // Set list of expected segment headers for the block where we expect segment // header extrinsic to be included subspace_link.segment_headers.lock().put( @@ -578,13 +522,13 @@ where } } - InitializedArchiver { + Ok(InitializedArchiver { confirmation_depth_k, archiver, older_archived_segments, best_archived_block: best_archived_block .expect("Must always set if there is no logical error; qed"), - } + }) } fn finalize_block( @@ -603,7 +547,7 @@ fn finalize_block( } // We don't have anything useful to do with this result yet, the only source of errors was // logged already inside - let _result: Result<_, sp_blockchain::Error> = client.lock_import_and_run(|import_op| { + let _result: sp_blockchain::Result<_> = client.lock_import_and_run(|import_op| { // Ideally some handle to a synchronization oracle would be used to avoid unconditionally // notifying. client @@ -637,7 +581,7 @@ pub fn create_subspace_archiver( client: Arc, sync_oracle: SubspaceSyncOracle, telemetry: Option, -) -> impl Future + Send + 'static +) -> sp_blockchain::Result> + Send + 'static> where Block: BlockT, Backend: BackendT, @@ -655,7 +599,6 @@ where SO: SyncOracle + Send + Sync + 'static, { let client_info = client.info(); - let best_block_hash = client_info.best_hash; let best_block_number = client_info.best_number; let InitializedArchiver { @@ -664,12 +607,11 @@ where older_archived_segments, best_archived_block: (mut best_archived_block_hash, mut best_archived_block_number), } = initialize_archiver( - best_block_hash, best_block_number, &segment_headers_store, subspace_link, client.as_ref(), - ); + )?; let mut block_importing_notification_stream = subspace_link .block_importing_notification_stream @@ -678,7 +620,7 @@ where subspace_link.archived_segment_notification_sender.clone(); let segment_headers = Arc::clone(&subspace_link.segment_headers); - async move { + Ok(async move { // Farmers may have not received all previous segments, send them now. for archived_segment in older_archived_segments { send_archived_segment_notification( @@ -714,11 +656,9 @@ where let block = client .block( client - .hash(block_number_to_archive) - .expect("Older block by number must always exist") + .hash(block_number_to_archive)? .expect("Older block by number must always exist"), - ) - .expect("Older block by number must always exist") + )? .expect("Older block by number must always exist") .block; @@ -733,19 +673,19 @@ where ); if parent_block_hash != best_archived_block_hash { - error!( - target: "subspace", + let error = format!( "Attempt to switch to a different fork beyond archiving depth, \ can't do it: parent block hash {}, best archived block hash {}", - parent_block_hash, - best_archived_block_hash + parent_block_hash, best_archived_block_hash ); - return; + return Err(sp_blockchain::Error::Consensus(sp_consensus::Error::Other( + error.into(), + ))); } best_archived_block_hash = block_hash_to_archive; - let block_object_mappings = match client + let block_object_mappings = client .runtime_api() .validated_object_call_hashes(block_hash_to_archive) .and_then(|calls| { @@ -754,16 +694,12 @@ where block.clone(), calls, ) - }) { - Ok(block_object_mappings) => block_object_mappings, - Err(error) => { - error!( - target: "subspace", - "Failed to retrieve block object mappings: {error}" - ); - return; - } - }; + }) + .map_err(|error| { + sp_blockchain::Error::Application( + format!("Failed to retrieve block object mappings: {error}").into(), + ) + })?; let encoded_block = block.encode(); debug!( @@ -781,15 +717,7 @@ where ) { let segment_header = archived_segment.segment_header; - if let Err(error) = - segment_headers_store.add_segment_headers(slice::from_ref(&segment_header)) - { - error!( - target: "subspace", - "Failed to store segment headers: {error}" - ); - return; - } + segment_headers_store.add_segment_headers(slice::from_ref(&segment_header))?; send_archived_segment_notification( &archived_segment_notification_sender, @@ -815,8 +743,7 @@ where if let Some(block_number_to_finalize) = maybe_block_number_to_finalize { let block_hash_to_finalize = client - .hash(block_number_to_finalize.into()) - .expect("Block about to be finalized must always exist") + .hash(block_number_to_finalize.into())? .expect("Block about to be finalized must always exist"); finalize_block( client.as_ref(), @@ -827,7 +754,9 @@ where } } } - } + + Ok(()) + }) } async fn send_archived_segment_notification( diff --git a/crates/subspace-service/src/lib.rs b/crates/subspace-service/src/lib.rs index 936b2a38cb..65535c61ba 100644 --- a/crates/subspace-service/src/lib.rs +++ b/crates/subspace-service/src/lib.rs @@ -865,11 +865,20 @@ where client.clone(), sync_oracle.clone(), telemetry.as_ref().map(|telemetry| telemetry.handle()), - ); + ) + .map_err(ServiceError::Client)?; task_manager .spawn_essential_handle() - .spawn_essential_blocking("subspace-archiver", None, Box::pin(subspace_archiver)); + .spawn_essential_blocking( + "subspace-archiver", + None, + Box::pin(async move { + if let Err(error) = subspace_archiver.await { + error!(%error, "Archiver exited with error"); + } + }), + ); if config.enable_subspace_block_relay { network_wrapper.set(network_service.clone());