Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Commit

Permalink
explicit stop of AsyncImportThread (#360)
Browse files Browse the repository at this point in the history
  • Loading branch information
svyatonik authored and gavofyork committed Jul 18, 2018
1 parent c55765e commit 113cbda
Show file tree
Hide file tree
Showing 4 changed files with 31 additions and 8 deletions.
21 changes: 15 additions & 6 deletions substrate/network/src/import_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ use sync::ChainSync;
pub trait ImportQueue<B: BlockT>: Send + Sync {
/// Clear the queue when sync is restarting.
fn clear(&self);
/// Clears the import queue and stops importing.
fn stop(&self);
/// Get queue status.
fn status(&self) -> ImportQueueStatus<B>;
/// Is block with given hash is currently in the queue.
Expand Down Expand Up @@ -109,6 +111,16 @@ impl<B: BlockT> ImportQueue<B> for AsyncImportQueue<B> {
*best_importing_number = Zero::zero();
}

fn stop(&self) {
self.clear();
if let Some(handle) = self.handle.lock().take() {
self.data.is_stopping.store(true, Ordering::SeqCst);
self.data.signal.notify_one();

let _ = handle.join();
}
}

fn status(&self) -> ImportQueueStatus<B> {
ImportQueueStatus {
importing_count: self.data.queue_blocks.read().len(),
Expand Down Expand Up @@ -138,12 +150,7 @@ impl<B: BlockT> ImportQueue<B> for AsyncImportQueue<B> {

impl<B: BlockT> Drop for AsyncImportQueue<B> {
fn drop(&mut self) {
if let Some(handle) = self.handle.lock().take() {
self.data.is_stopping.store(true, Ordering::SeqCst);
self.data.signal.notify_one();

let _ = handle.join();
}
self.stop();
}
}

Expand Down Expand Up @@ -433,6 +440,8 @@ pub mod tests {
impl<B: 'static + BlockT> ImportQueue<B> for SyncImportQueue {
fn clear(&self) { }

fn stop(&self) { }

fn status(&self) -> ImportQueueStatus<B> {
ImportQueueStatus {
importing_count: 0,
Expand Down
9 changes: 9 additions & 0 deletions substrate/network/src/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -536,6 +536,15 @@ impl<B: BlockT, S: Specialization<B>> Protocol<B, S> {
handshaking_peers.clear();
}

pub fn stop(&self) {
// stop processing import requests first (without holding a sync lock)
let import_queue = self.sync.read().import_queue();
import_queue.stop();

// and then clear all the sync data
self.abort();
}

pub fn on_block_announce(&self, io: &mut SyncIo, peer_id: PeerId, announce: message::BlockAnnounce<B::Header>) {
let header = announce.header;
let hash = header.hash();
Expand Down
2 changes: 1 addition & 1 deletion substrate/network/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ impl<B: BlockT + 'static, S: Specialization<B>> Service<B, S> {
}

fn stop(&self) {
self.handler.protocol.abort();
self.handler.protocol.stop();
self.network.stop();
}
}
Expand Down
7 changes: 6 additions & 1 deletion substrate/network/src/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,12 @@ impl<B: BlockT> ChainSync<B> {
self.peers.values().max_by_key(|p| p.best_number).map(|p| p.best_number)
}

/// Returns sync status
/// Returns import queue reference.
pub(crate) fn import_queue(&self) -> Arc<ImportQueue<B>> {
self.import_queue.clone()
}

/// Returns sync status.
pub(crate) fn status(&self) -> Status<B> {
let best_seen = self.best_seen_block();
let state = match &best_seen {
Expand Down

0 comments on commit 113cbda

Please sign in to comment.