diff --git a/substrate/network/src/import_queue.rs b/substrate/network/src/import_queue.rs index 6cc20d2fdee2d..308a68fffd3b0 100644 --- a/substrate/network/src/import_queue.rs +++ b/substrate/network/src/import_queue.rs @@ -37,6 +37,8 @@ use sync::ChainSync; pub trait ImportQueue: Send + Sync { /// Clear the queue when sync is restarting. fn clear(&self); + /// Clears the import queue and stops importing. + fn stop(&self); /// Get queue status. fn status(&self) -> ImportQueueStatus; /// Is block with given hash is currently in the queue. @@ -109,6 +111,16 @@ impl ImportQueue for AsyncImportQueue { *best_importing_number = Zero::zero(); } + fn stop(&self) { + self.clear(); + if let Some(handle) = self.handle.lock().take() { + self.data.is_stopping.store(true, Ordering::SeqCst); + self.data.signal.notify_one(); + + let _ = handle.join(); + } + } + fn status(&self) -> ImportQueueStatus { ImportQueueStatus { importing_count: self.data.queue_blocks.read().len(), @@ -138,12 +150,7 @@ impl ImportQueue for AsyncImportQueue { impl Drop for AsyncImportQueue { fn drop(&mut self) { - if let Some(handle) = self.handle.lock().take() { - self.data.is_stopping.store(true, Ordering::SeqCst); - self.data.signal.notify_one(); - - let _ = handle.join(); - } + self.stop(); } } @@ -433,6 +440,8 @@ pub mod tests { impl ImportQueue for SyncImportQueue { fn clear(&self) { } + fn stop(&self) { } + fn status(&self) -> ImportQueueStatus { ImportQueueStatus { importing_count: 0, diff --git a/substrate/network/src/protocol.rs b/substrate/network/src/protocol.rs index 597834090e0c3..a9e09a36d1bc8 100644 --- a/substrate/network/src/protocol.rs +++ b/substrate/network/src/protocol.rs @@ -536,6 +536,15 @@ impl> Protocol { handshaking_peers.clear(); } + pub fn stop(&self) { + // stop processing import requests first (without holding a sync lock) + let import_queue = self.sync.read().import_queue(); + import_queue.stop(); + + // and then clear all the sync data + self.abort(); + } + pub fn on_block_announce(&self, io: &mut SyncIo, peer_id: PeerId, announce: message::BlockAnnounce) { let header = announce.header; let hash = header.hash(); diff --git a/substrate/network/src/service.rs b/substrate/network/src/service.rs index 788c80ea5e6a0..aa6c8aa19b19a 100644 --- a/substrate/network/src/service.rs +++ b/substrate/network/src/service.rs @@ -213,7 +213,7 @@ impl> Service { } fn stop(&self) { - self.handler.protocol.abort(); + self.handler.protocol.stop(); self.network.stop(); } } diff --git a/substrate/network/src/sync.rs b/substrate/network/src/sync.rs index c3cb800889a12..900cf864e564a 100644 --- a/substrate/network/src/sync.rs +++ b/substrate/network/src/sync.rs @@ -100,7 +100,12 @@ impl ChainSync { self.peers.values().max_by_key(|p| p.best_number).map(|p| p.best_number) } - /// Returns sync status + /// Returns import queue reference. + pub(crate) fn import_queue(&self) -> Arc> { + self.import_queue.clone() + } + + /// Returns sync status. pub(crate) fn status(&self) -> Status { let best_seen = self.best_seen_block(); let state = match &best_seen {