diff --git a/src/bitcoin/poller/mod.rs b/src/bitcoin/poller/mod.rs index 1825e8a1c..e760f211b 100644 --- a/src/bitcoin/poller/mod.rs +++ b/src/bitcoin/poller/mod.rs @@ -3,12 +3,17 @@ mod looper; use crate::{bitcoin::BitcoinInterface, database::DatabaseInterface, descriptors}; use std::{ - sync::{self, atomic}, - thread, time, + sync::{self, mpsc}, + time, }; use miniscript::bitcoin::secp256k1; +#[derive(Debug, Clone)] +pub enum PollerMessage { + Shutdown, +} + /// The Bitcoin poller handler. pub struct Poller { bit: sync::Arc<sync::Mutex<dyn BitcoinInterface>>, @@ -50,29 +55,44 @@ impl Poller { pub fn poll_forever( &self, poll_interval: time::Duration, - shutdown: sync::Arc<atomic::AtomicBool>, + receiver: mpsc::Receiver<PollerMessage>, ) { let mut last_poll = None; let mut synced = false; - while !shutdown.load(atomic::Ordering::Relaxed) || last_poll.is_none() { - let now = time::Instant::now(); - - if let Some(last_poll) = last_poll { - let time_since_poll = now.duration_since(last_poll); + loop { + // How long to wait before the next poll. + let time_before_poll = if let Some(last_poll) = last_poll { + let time_since_poll = time::Instant::now().duration_since(last_poll); + // Until we are synced we poll less often to avoid harassing bitcoind and impeding + // the sync. As a function since it's mocked for the tests. let poll_interval = if synced { poll_interval } else { - // Until we are synced we poll less often to avoid harassing bitcoind and impeding - // the sync. As a function since it's mocked for the tests. looper::sync_poll_interval() }; - if time_since_poll < poll_interval { - thread::sleep(time::Duration::from_millis(500)); - continue; + poll_interval.saturating_sub(time_since_poll) + } else { + // Don't wait before doing the first poll. + time::Duration::ZERO + }; + + // Wait for the duration of the interval between polls, but listen to messages in the + // meantime. + match receiver.recv_timeout(time_before_poll) { + Ok(PollerMessage::Shutdown) => { + log::info!("Bitcoin poller was told to shut down."); + return; + } + Err(mpsc::RecvTimeoutError::Timeout) => { + // It's been long enough since the last poll. + } + Err(mpsc::RecvTimeoutError::Disconnected) => { + log::error!("Bitcoin poller communication channel got disconnected. Exiting."); + return; } } - last_poll = Some(now); + last_poll = Some(time::Instant::now()); // Don't poll until the Bitcoin backend is fully synced. if !synced { diff --git a/src/lib.rs b/src/lib.rs index 4b57b1905..d3a5222b1 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -28,7 +28,11 @@ use crate::{ }, }; -use std::{error, fmt, fs, io, path, sync, thread}; +use std::{ + error, fmt, fs, io, path, + sync::{self, mpsc}, + thread, +}; use miniscript::bitcoin::secp256k1; @@ -284,12 +288,12 @@ impl DaemonControl { /// JSONRPC server or one which exposes its API through a `DaemonControl`. pub enum DaemonHandle { Controller { - poller_shutdown: sync::Arc<sync::atomic::AtomicBool>, + poller_sender: mpsc::SyncSender<poller::PollerMessage>, poller_handle: thread::JoinHandle<()>, control: DaemonControl, }, Server { - poller_shutdown: sync::Arc<sync::atomic::AtomicBool>, + poller_sender: mpsc::SyncSender<poller::PollerMessage>, poller_handle: thread::JoinHandle<()>, rpcserver_shutdown: sync::Arc<sync::atomic::AtomicBool>, rpcserver_handle: thread::JoinHandle<Result<(), io::Error>>, @@ -368,15 +372,14 @@ impl DaemonHandle { // an atomic to be able to stop it. let bitcoin_poller = poller::Poller::new(bit.clone(), db.clone(), config.main_descriptor.clone()); - let poller_shutdown = sync::Arc::from(sync::atomic::AtomicBool::from(false)); + let (poller_sender, poller_receiver) = mpsc::sync_channel(0); let poller_handle = thread::Builder::new() .name("Bitcoin Network poller".to_string()) .spawn({ let poll_interval = config.bitcoin_config.poll_interval_secs; - let shutdown = poller_shutdown.clone(); move || { log::info!("Bitcoin poller started."); - bitcoin_poller.poll_forever(poll_interval, shutdown); + bitcoin_poller.poll_forever(poll_interval, poller_receiver); log::info!("Bitcoin poller stopped."); } }) @@ -406,14 +409,14 @@ impl DaemonHandle { .expect("Spawning the RPC server thread should never fail."); DaemonHandle::Server { - poller_shutdown, + poller_sender, poller_handle, rpcserver_shutdown, rpcserver_handle, } } else { DaemonHandle::Controller { - poller_shutdown, + poller_sender, poller_handle, control, } @@ -454,21 +457,25 @@ impl DaemonHandle { pub fn stop(self) -> Result<(), Box<dyn error::Error>> { match self { Self::Controller { - poller_shutdown, + poller_sender, poller_handle, .. } => { - poller_shutdown.store(true, sync::atomic::Ordering::Relaxed); + poller_sender + .send(poller::PollerMessage::Shutdown) + .expect("The other end should never have hung up before this."); poller_handle.join().expect("Poller thread must not panic"); Ok(()) } Self::Server { - poller_shutdown, + poller_sender, poller_handle, rpcserver_shutdown, rpcserver_handle, } => { - poller_shutdown.store(true, sync::atomic::Ordering::Relaxed); + poller_sender + .send(poller::PollerMessage::Shutdown) + .expect("The other end should never have hung up before this."); rpcserver_shutdown.store(true, sync::atomic::Ordering::Relaxed); rpcserver_handle .join() @@ -656,18 +663,6 @@ mod tests { stream.flush().unwrap(); } - // Send them a response to 'getblockchaininfo' saying we are far from being synced - fn complete_sync_check(server: &net::TcpListener) { - let net_resp = [ - "HTTP/1.1 200\n\r\n{\"jsonrpc\":\"2.0\",\"id\":1,\"result\":{\"verificationprogress\":0.1,\"headers\":1000,\"blocks\":100}}\n".as_bytes(), - ] - .concat(); - let (mut stream, _) = server.accept().unwrap(); - read_til_json_end(&mut stream); - stream.write_all(&net_resp).unwrap(); - stream.flush().unwrap(); - } - // TODO: we could move the dummy bitcoind thread stuff to the bitcoind module to test the // bitcoind interface, and use the DummyLiana from testutils to sanity check the startup. // Note that startup as checked by this unit test is also tested in the functional test @@ -744,7 +739,8 @@ mod tests { complete_wallet_check(&server, &wo_path); complete_desc_check(&server, &receive_desc.to_string(), &change_desc.to_string()); complete_tip_init(&server); - complete_sync_check(&server); + // We don't have to complete the sync check as the poller checks whether it needs to stop + // before checking the bitcoind sync status. t.join().unwrap(); // The datadir is created now, so if we restart it it won't create the wo wallet. @@ -761,7 +757,8 @@ mod tests { complete_wallet_loading(&server); complete_wallet_check(&server, &wo_path); complete_desc_check(&server, &receive_desc.to_string(), &change_desc.to_string()); - complete_sync_check(&server); + // We don't have to complete the sync check as the poller checks whether it needs to stop + // before checking the bitcoind sync status. t.join().unwrap(); fs::remove_dir_all(&tmp_dir).unwrap();