From 6d6428f81d3bc325af9e26bc9c32f8f7c70f729b Mon Sep 17 00:00:00 2001 From: Liu-Cheng Xu Date: Mon, 13 Feb 2023 16:41:39 +0800 Subject: [PATCH 1/6] Support the subscription of every import block Close #13315 --- client/api/src/backend.rs | 15 +++ client/api/src/client.rs | 3 + .../merkle-mountain-range/src/test_utils.rs | 4 + client/service/src/client/client.rs | 109 +++++++++++++++--- 4 files changed, 114 insertions(+), 17 deletions(-) diff --git a/client/api/src/backend.rs b/client/api/src/backend.rs index 4ef609bdd4525..01760e0ff2c7e 100644 --- a/client/api/src/backend.rs +++ b/client/api/src/backend.rs @@ -49,6 +49,19 @@ pub type TransactionForSB = >>::Trans /// Extracts the transaction for the given backend. pub type TransactionFor = TransactionForSB, Block>; +/// Describes which block import notification stream should be notified. +#[derive(Debug, Clone)] +pub enum ImportNotificationAction { + /// Notify only when the chain has synced to the tip or there is a re-org. + RecentBlock, + /// Notify for every single block no matter what the sync state is. + EveryBlock, + /// Both block import notifications above should be fired. + Both, + /// No block import notification should be fired. + None, +} + /// Import operation summary. /// /// Contains information about the block that just got imported, @@ -68,6 +81,8 @@ pub struct ImportSummary { /// /// If `None`, there was no re-org while importing. pub tree_route: Option>, + /// What notify action to take for this import. + pub import_notification_action: ImportNotificationAction, } /// Finalization operation summary. diff --git a/client/api/src/client.rs b/client/api/src/client.rs index 8e7ceb68704b2..491841616ad75 100644 --- a/client/api/src/client.rs +++ b/client/api/src/client.rs @@ -63,6 +63,9 @@ pub trait BlockchainEvents { /// imported block. fn import_notification_stream(&self) -> ImportNotifications; + /// Get a stream of every imported block. + fn every_import_notification_stream(&self) -> ImportNotifications; + /// Get a stream of finality notifications. Not guaranteed to be fired for every /// finalized block. fn finality_notification_stream(&self) -> FinalityNotifications; diff --git a/client/merkle-mountain-range/src/test_utils.rs b/client/merkle-mountain-range/src/test_utils.rs index e5a6673483dbb..0d12b37ac6455 100644 --- a/client/merkle-mountain-range/src/test_utils.rs +++ b/client/merkle-mountain-range/src/test_utils.rs @@ -264,6 +264,10 @@ impl BlockchainEvents for MockClient { unimplemented!() } + fn every_import_notification_stream(&self) -> ImportNotifications { + unimplemented!() + } + fn finality_notification_stream(&self) -> FinalityNotifications { self.client.lock().finality_notification_stream() } diff --git a/client/service/src/client/client.rs b/client/service/src/client/client.rs index 6a75fad628681..cff41503fc57a 100644 --- a/client/service/src/client/client.rs +++ b/client/service/src/client/client.rs @@ -31,7 +31,7 @@ use sc_block_builder::{BlockBuilderApi, BlockBuilderProvider, RecordProof}; use sc_client_api::{ backend::{ self, apply_aux, BlockImportOperation, ClientImportOperation, FinalizeSummary, Finalizer, - ImportSummary, LockImportRun, NewBlockState, StorageProvider, + ImportNotificationAction, ImportSummary, LockImportRun, NewBlockState, StorageProvider, }, client::{ BadBlocks, BlockBackend, BlockImportNotification, BlockOf, BlockchainEvents, ClientInfo, @@ -106,6 +106,7 @@ where executor: E, storage_notifications: StorageNotifications, import_notification_sinks: NotificationSinks>, + every_block_import_notification_sinks: NotificationSinks>, finality_notification_sinks: NotificationSinks>, // Collects auxiliary operations to be performed atomically together with // block import operations. @@ -304,19 +305,22 @@ where FinalityNotification::from_summary(summary, self.unpin_worker_sender.clone()) }); - let (import_notification, storage_changes) = match notify_imported { - Some(mut summary) => { - let storage_changes = summary.storage_changes.take(); - ( - Some(BlockImportNotification::from_summary( - summary, - self.unpin_worker_sender.clone(), - )), - storage_changes, - ) - }, - None => (None, None), - }; + let (import_notification, storage_changes, import_notification_action) = + match notify_imported { + Some(mut summary) => { + let import_notification_action = summary.import_notification_action.clone(); + let storage_changes = summary.storage_changes.take(); + ( + Some(BlockImportNotification::from_summary( + summary, + self.unpin_worker_sender.clone(), + )), + storage_changes, + import_notification_action, + ) + }, + None => (None, None, ImportNotificationAction::None), + }; if let Some(ref notification) = finality_notification { for action in self.finality_actions.lock().iter_mut() { @@ -353,7 +357,18 @@ where } self.notify_finalized(finality_notification)?; - self.notify_imported(import_notification, storage_changes)?; + + match import_notification_action { + ImportNotificationAction::Both => { + self.notify_imported(import_notification.clone(), storage_changes.clone())?; + self.notify_imported_for_every_block(import_notification, storage_changes)?; + }, + ImportNotificationAction::RecentBlock => + self.notify_imported(import_notification, storage_changes)?, + ImportNotificationAction::EveryBlock => + self.notify_imported_for_every_block(import_notification, storage_changes)?, + ImportNotificationAction::None => {}, + } Ok(r) }; @@ -451,6 +466,7 @@ where executor, storage_notifications: StorageNotifications::new(prometheus_registry), import_notification_sinks: Default::default(), + every_block_import_notification_sinks: Default::default(), finality_notification_sinks: Default::default(), import_actions: Default::default(), finality_actions: Default::default(), @@ -771,9 +787,14 @@ where operation.op.insert_aux(aux)?; - // We only notify when we are already synced to the tip of the chain + let should_notify_every_block = + !self.every_block_import_notification_sinks.lock().is_empty(); + + // Notify when we are already synced to the tip of the chain // or if this import triggers a re-org - if make_notifications || tree_route.is_some() { + let should_notify_recent_block = make_notifications || tree_route.is_some(); + + if should_notify_every_block || should_notify_recent_block { let header = import_headers.into_post(); if finalized { let mut summary = match operation.notify_finalized.take() { @@ -812,6 +833,16 @@ where operation.notify_finalized = Some(summary); } + let import_notification_action = if should_notify_every_block { + if should_notify_recent_block { + ImportNotificationAction::Both + } else { + ImportNotificationAction::EveryBlock + } + } else { + ImportNotificationAction::RecentBlock + }; + operation.notify_imported = Some(ImportSummary { hash, origin, @@ -819,6 +850,7 @@ where is_new_best, storage_changes, tree_route, + import_notification_action, }) } @@ -1047,6 +1079,43 @@ where Ok(()) } + fn notify_imported_for_every_block( + &self, + notification: Option>, + storage_changes: Option<(StorageCollection, ChildStorageCollection)>, + ) -> sp_blockchain::Result<()> { + let notification = match notification { + Some(notify_import) => notify_import, + None => { + // Cleanup any closed import notification sinks since we won't + // be sending any notifications below which would remove any + // closed sinks. this is necessary since during initial sync we + // won't send any import notifications which could lead to a + // temporary leak of closed/discarded notification sinks (e.g. + // from consensus code). + self.every_block_import_notification_sinks + .lock() + .retain(|sink| !sink.is_closed()); + return Ok(()) + }, + }; + + if let Some(storage_changes) = storage_changes { + // TODO [ToDr] How to handle re-orgs? Should we re-emit all storage changes? + self.storage_notifications.trigger( + ¬ification.hash, + storage_changes.0.into_iter(), + storage_changes.1.into_iter().map(|(sk, v)| (sk, v.into_iter())), + ); + } + + self.every_block_import_notification_sinks + .lock() + .retain(|sink| sink.unbounded_send(notification.clone()).is_ok()); + + Ok(()) + } + /// Attempts to revert the chain by `n` blocks guaranteeing that no block is /// reverted past the last finalized block. Returns the number of blocks /// that were successfully reverted. @@ -1980,6 +2049,12 @@ where stream } + fn every_import_notification_stream(&self) -> ImportNotifications { + let (sink, stream) = tracing_unbounded("mpsc_every_import_notification_stream", 100_000); + self.every_block_import_notification_sinks.lock().push(sink); + stream + } + fn finality_notification_stream(&self) -> FinalityNotifications { let (sink, stream) = tracing_unbounded("mpsc_finality_notification_stream", 100_000); self.finality_notification_sinks.lock().push(sink); From b198212f91f28ee9d180610a934fecfbadf30bce Mon Sep 17 00:00:00 2001 From: Liu-Cheng Xu Date: Mon, 13 Feb 2023 17:29:25 +0800 Subject: [PATCH 2/6] Clean up any closed block import notification sinks --- client/service/src/client/client.rs | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/client/service/src/client/client.rs b/client/service/src/client/client.rs index cff41503fc57a..13b22f73d150b 100644 --- a/client/service/src/client/client.rs +++ b/client/service/src/client/client.rs @@ -367,7 +367,13 @@ where self.notify_imported(import_notification, storage_changes)?, ImportNotificationAction::EveryBlock => self.notify_imported_for_every_block(import_notification, storage_changes)?, - ImportNotificationAction::None => {}, + ImportNotificationAction::None => { + // Cleanup any closed import notification sinks. + self.import_notification_sinks.lock().retain(|sink| !sink.is_closed()); + self.every_block_import_notification_sinks + .lock() + .retain(|sink| !sink.is_closed()); + }, } Ok(r) @@ -1087,12 +1093,7 @@ where let notification = match notification { Some(notify_import) => notify_import, None => { - // Cleanup any closed import notification sinks since we won't - // be sending any notifications below which would remove any - // closed sinks. this is necessary since during initial sync we - // won't send any import notifications which could lead to a - // temporary leak of closed/discarded notification sinks (e.g. - // from consensus code). + // Cleanup any closed import notification sinks. self.every_block_import_notification_sinks .lock() .retain(|sink| !sink.is_closed()); From f961ac6815443ea6570b9f1301d2dd9f43f46451 Mon Sep 17 00:00:00 2001 From: Liu-Cheng Xu Date: Mon, 13 Feb 2023 20:44:58 +0800 Subject: [PATCH 3/6] Apply review suggestions --- client/api/src/backend.rs | 2 +- client/api/src/client.rs | 7 ++- client/service/src/client/client.rs | 76 ++++++++++------------------- 3 files changed, 32 insertions(+), 53 deletions(-) diff --git a/client/api/src/backend.rs b/client/api/src/backend.rs index 01760e0ff2c7e..aa5250b6cdab4 100644 --- a/client/api/src/backend.rs +++ b/client/api/src/backend.rs @@ -50,7 +50,7 @@ pub type TransactionForSB = >>::Trans pub type TransactionFor = TransactionForSB, Block>; /// Describes which block import notification stream should be notified. -#[derive(Debug, Clone)] +#[derive(Debug, Clone, Copy)] pub enum ImportNotificationAction { /// Notify only when the chain has synced to the tip or there is a re-org. RecentBlock, diff --git a/client/api/src/client.rs b/client/api/src/client.rs index 491841616ad75..72319cdd5bc76 100644 --- a/client/api/src/client.rs +++ b/client/api/src/client.rs @@ -59,8 +59,11 @@ pub trait BlockOf { /// A source of blockchain events. pub trait BlockchainEvents { - /// Get block import event stream. Not guaranteed to be fired for every - /// imported block. + /// Get block import event stream. + /// + /// Not guaranteed to be fired for every imported block, only fired when the node + /// has synced to the tip or there is a re-org. Use `every_import_notification_stream()` + /// if you want a notification of every imported block regardless. fn import_notification_stream(&self) -> ImportNotifications; /// Get a stream of every imported block. diff --git a/client/service/src/client/client.rs b/client/service/src/client/client.rs index 13b22f73d150b..82b533f45fea1 100644 --- a/client/service/src/client/client.rs +++ b/client/service/src/client/client.rs @@ -308,7 +308,7 @@ where let (import_notification, storage_changes, import_notification_action) = match notify_imported { Some(mut summary) => { - let import_notification_action = summary.import_notification_action.clone(); + let import_notification_action = summary.import_notification_action; let storage_changes = summary.storage_changes.take(); ( Some(BlockImportNotification::from_summary( @@ -357,24 +357,7 @@ where } self.notify_finalized(finality_notification)?; - - match import_notification_action { - ImportNotificationAction::Both => { - self.notify_imported(import_notification.clone(), storage_changes.clone())?; - self.notify_imported_for_every_block(import_notification, storage_changes)?; - }, - ImportNotificationAction::RecentBlock => - self.notify_imported(import_notification, storage_changes)?, - ImportNotificationAction::EveryBlock => - self.notify_imported_for_every_block(import_notification, storage_changes)?, - ImportNotificationAction::None => { - // Cleanup any closed import notification sinks. - self.import_notification_sinks.lock().retain(|sink| !sink.is_closed()); - self.every_block_import_notification_sinks - .lock() - .retain(|sink| !sink.is_closed()); - }, - } + self.notify_imported(import_notification, import_notification_action, storage_changes)?; Ok(r) }; @@ -802,7 +785,7 @@ where if should_notify_every_block || should_notify_recent_block { let header = import_headers.into_post(); - if finalized { + if finalized && should_notify_recent_block { let mut summary = match operation.notify_finalized.take() { Some(mut summary) => { summary.header = header.clone(); @@ -1053,6 +1036,7 @@ where fn notify_imported( &self, notification: Option>, + import_notification_action: ImportNotificationAction, storage_changes: Option<(StorageCollection, ChildStorageCollection)>, ) -> sp_blockchain::Result<()> { let notification = match notification { @@ -1065,6 +1049,11 @@ where // temporary leak of closed/discarded notification sinks (e.g. // from consensus code). self.import_notification_sinks.lock().retain(|sink| !sink.is_closed()); + + self.every_block_import_notification_sinks + .lock() + .retain(|sink| !sink.is_closed()); + return Ok(()) }, }; @@ -1078,42 +1067,29 @@ where ); } - self.import_notification_sinks - .lock() - .retain(|sink| sink.unbounded_send(notification.clone()).is_ok()); - - Ok(()) - } + match import_notification_action { + ImportNotificationAction::Both => { + self.import_notification_sinks + .lock() + .retain(|sink| sink.unbounded_send(notification.clone()).is_ok()); - fn notify_imported_for_every_block( - &self, - notification: Option>, - storage_changes: Option<(StorageCollection, ChildStorageCollection)>, - ) -> sp_blockchain::Result<()> { - let notification = match notification { - Some(notify_import) => notify_import, - None => { - // Cleanup any closed import notification sinks. self.every_block_import_notification_sinks .lock() - .retain(|sink| !sink.is_closed()); - return Ok(()) + .retain(|sink| sink.unbounded_send(notification.clone()).is_ok()); }, - }; - - if let Some(storage_changes) = storage_changes { - // TODO [ToDr] How to handle re-orgs? Should we re-emit all storage changes? - self.storage_notifications.trigger( - ¬ification.hash, - storage_changes.0.into_iter(), - storage_changes.1.into_iter().map(|(sk, v)| (sk, v.into_iter())), - ); + ImportNotificationAction::RecentBlock => { + self.import_notification_sinks + .lock() + .retain(|sink| sink.unbounded_send(notification.clone()).is_ok()); + }, + ImportNotificationAction::EveryBlock => { + self.every_block_import_notification_sinks + .lock() + .retain(|sink| sink.unbounded_send(notification.clone()).is_ok()); + }, + ImportNotificationAction::None => {}, } - self.every_block_import_notification_sinks - .lock() - .retain(|sink| sink.unbounded_send(notification.clone()).is_ok()); - Ok(()) } From bf1366a9958b95d30a0d9ab86c40d124bde6c65a Mon Sep 17 00:00:00 2001 From: Liu-Cheng Xu Date: Mon, 13 Feb 2023 20:57:58 +0800 Subject: [PATCH 4/6] Nit --- client/api/src/backend.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/client/api/src/backend.rs b/client/api/src/backend.rs index aa5250b6cdab4..9774e79b79ad3 100644 --- a/client/api/src/backend.rs +++ b/client/api/src/backend.rs @@ -52,7 +52,7 @@ pub type TransactionFor = TransactionForSB, /// Describes which block import notification stream should be notified. #[derive(Debug, Clone, Copy)] pub enum ImportNotificationAction { - /// Notify only when the chain has synced to the tip or there is a re-org. + /// Notify only when the node has synced to the tip or there is a re-org. RecentBlock, /// Notify for every single block no matter what the sync state is. EveryBlock, From 323ee80d6794b80647cfd2ab0b61726aefb22fbc Mon Sep 17 00:00:00 2001 From: Liu-Cheng Xu Date: Thu, 23 Feb 2023 14:06:22 +0800 Subject: [PATCH 5/6] `every_block_import_notification_sinks` -> `every_import_notification_sinks` --- client/service/src/client/client.rs | 17 +++++++---------- 1 file changed, 7 insertions(+), 10 deletions(-) diff --git a/client/service/src/client/client.rs b/client/service/src/client/client.rs index 0127d25771910..3a43557dc7b9b 100644 --- a/client/service/src/client/client.rs +++ b/client/service/src/client/client.rs @@ -106,7 +106,7 @@ where executor: E, storage_notifications: StorageNotifications, import_notification_sinks: NotificationSinks>, - every_block_import_notification_sinks: NotificationSinks>, + every_import_notification_sinks: NotificationSinks>, finality_notification_sinks: NotificationSinks>, // Collects auxiliary operations to be performed atomically together with // block import operations. @@ -455,7 +455,7 @@ where executor, storage_notifications: StorageNotifications::new(prometheus_registry), import_notification_sinks: Default::default(), - every_block_import_notification_sinks: Default::default(), + every_import_notification_sinks: Default::default(), finality_notification_sinks: Default::default(), import_actions: Default::default(), finality_actions: Default::default(), @@ -774,8 +774,7 @@ where operation.op.insert_aux(aux)?; - let should_notify_every_block = - !self.every_block_import_notification_sinks.lock().is_empty(); + let should_notify_every_block = !self.every_import_notification_sinks.lock().is_empty(); // Notify when we are already synced to the tip of the chain // or if this import triggers a re-org @@ -1047,9 +1046,7 @@ where // from consensus code). self.import_notification_sinks.lock().retain(|sink| !sink.is_closed()); - self.every_block_import_notification_sinks - .lock() - .retain(|sink| !sink.is_closed()); + self.every_import_notification_sinks.lock().retain(|sink| !sink.is_closed()); return Ok(()) }, @@ -1070,7 +1067,7 @@ where .lock() .retain(|sink| sink.unbounded_send(notification.clone()).is_ok()); - self.every_block_import_notification_sinks + self.every_import_notification_sinks .lock() .retain(|sink| sink.unbounded_send(notification.clone()).is_ok()); }, @@ -1080,7 +1077,7 @@ where .retain(|sink| sink.unbounded_send(notification.clone()).is_ok()); }, ImportNotificationAction::EveryBlock => { - self.every_block_import_notification_sinks + self.every_import_notification_sinks .lock() .retain(|sink| sink.unbounded_send(notification.clone()).is_ok()); }, @@ -2022,7 +2019,7 @@ where fn every_import_notification_stream(&self) -> ImportNotifications { let (sink, stream) = tracing_unbounded("mpsc_every_import_notification_stream", 100_000); - self.every_block_import_notification_sinks.lock().push(sink); + self.every_import_notification_sinks.lock().push(sink); stream } From 07776f1634da950f60a0b633fe2f50430a91fde7 Mon Sep 17 00:00:00 2001 From: Liu-Cheng Xu Date: Thu, 23 Feb 2023 21:24:11 +0800 Subject: [PATCH 6/6] Apply review suggestions --- client/service/src/client/client.rs | 33 +++++++++++++++++++++-------- 1 file changed, 24 insertions(+), 9 deletions(-) diff --git a/client/service/src/client/client.rs b/client/service/src/client/client.rs index 27452a5a4ecb5..4b5822ae0e017 100644 --- a/client/service/src/client/client.rs +++ b/client/service/src/client/client.rs @@ -1052,17 +1052,20 @@ where }, }; - if let Some(storage_changes) = storage_changes { - // TODO [ToDr] How to handle re-orgs? Should we re-emit all storage changes? - self.storage_notifications.trigger( - ¬ification.hash, - storage_changes.0.into_iter(), - storage_changes.1.into_iter().map(|(sk, v)| (sk, v.into_iter())), - ); - } + let trigger_storage_changes_notification = || { + if let Some(storage_changes) = storage_changes { + // TODO [ToDr] How to handle re-orgs? Should we re-emit all storage changes? + self.storage_notifications.trigger( + ¬ification.hash, + storage_changes.0.into_iter(), + storage_changes.1.into_iter().map(|(sk, v)| (sk, v.into_iter())), + ); + } + }; match import_notification_action { ImportNotificationAction::Both => { + trigger_storage_changes_notification(); self.import_notification_sinks .lock() .retain(|sink| sink.unbounded_send(notification.clone()).is_ok()); @@ -1072,16 +1075,28 @@ where .retain(|sink| sink.unbounded_send(notification.clone()).is_ok()); }, ImportNotificationAction::RecentBlock => { + trigger_storage_changes_notification(); self.import_notification_sinks .lock() .retain(|sink| sink.unbounded_send(notification.clone()).is_ok()); + + self.every_import_notification_sinks.lock().retain(|sink| !sink.is_closed()); }, ImportNotificationAction::EveryBlock => { self.every_import_notification_sinks .lock() .retain(|sink| sink.unbounded_send(notification.clone()).is_ok()); + + self.import_notification_sinks.lock().retain(|sink| !sink.is_closed()); + }, + ImportNotificationAction::None => { + // This branch is unreachable in fact because the block import notification must be + // Some(_) instead of None (it's already handled at the beginning of this function) + // at this point. + self.import_notification_sinks.lock().retain(|sink| !sink.is_closed()); + + self.every_import_notification_sinks.lock().retain(|sink| !sink.is_closed()); }, - ImportNotificationAction::None => {}, } Ok(())