From 1cf42d9aeec34b184d93484ea25d9a843a45bc3f Mon Sep 17 00:00:00 2001 From: Antoine Poinsot Date: Fri, 15 Mar 2024 11:15:53 +0100 Subject: [PATCH] poller: introduce a communication channel with the poller thread We'll need to ask the poller thread another thing besides to shut down, so it's cleaner to start using proper messages. The mpsc channel in the std lib was buggy for awhile but since they merged crossbeam and are using this behind the hood now it should be fine starting with Rust 1.67. That's (slightly) higher than our MSRV but it's what we use for releases so that's reasonable. See https://github.com/rust-lang/rust/issues/39364 for details. --- src/bitcoin/poller/mod.rs | 48 +++++++++++++++++++++++++++----------- src/lib.rs | 49 ++++++++++++++++++--------------------- 2 files changed, 57 insertions(+), 40 deletions(-) diff --git a/src/bitcoin/poller/mod.rs b/src/bitcoin/poller/mod.rs index 1825e8a1c..e760f211b 100644 --- a/src/bitcoin/poller/mod.rs +++ b/src/bitcoin/poller/mod.rs @@ -3,12 +3,17 @@ mod looper; use crate::{bitcoin::BitcoinInterface, database::DatabaseInterface, descriptors}; use std::{ - sync::{self, atomic}, - thread, time, + sync::{self, mpsc}, + time, }; use miniscript::bitcoin::secp256k1; +#[derive(Debug, Clone)] +pub enum PollerMessage { + Shutdown, +} + /// The Bitcoin poller handler. pub struct Poller { bit: sync::Arc>, @@ -50,29 +55,44 @@ impl Poller { pub fn poll_forever( &self, poll_interval: time::Duration, - shutdown: sync::Arc, + receiver: mpsc::Receiver, ) { let mut last_poll = None; let mut synced = false; - while !shutdown.load(atomic::Ordering::Relaxed) || last_poll.is_none() { - let now = time::Instant::now(); - - if let Some(last_poll) = last_poll { - let time_since_poll = now.duration_since(last_poll); + loop { + // How long to wait before the next poll. + let time_before_poll = if let Some(last_poll) = last_poll { + let time_since_poll = time::Instant::now().duration_since(last_poll); + // Until we are synced we poll less often to avoid harassing bitcoind and impeding + // the sync. As a function since it's mocked for the tests. let poll_interval = if synced { poll_interval } else { - // Until we are synced we poll less often to avoid harassing bitcoind and impeding - // the sync. As a function since it's mocked for the tests. looper::sync_poll_interval() }; - if time_since_poll < poll_interval { - thread::sleep(time::Duration::from_millis(500)); - continue; + poll_interval.saturating_sub(time_since_poll) + } else { + // Don't wait before doing the first poll. + time::Duration::ZERO + }; + + // Wait for the duration of the interval between polls, but listen to messages in the + // meantime. + match receiver.recv_timeout(time_before_poll) { + Ok(PollerMessage::Shutdown) => { + log::info!("Bitcoin poller was told to shut down."); + return; + } + Err(mpsc::RecvTimeoutError::Timeout) => { + // It's been long enough since the last poll. + } + Err(mpsc::RecvTimeoutError::Disconnected) => { + log::error!("Bitcoin poller communication channel got disconnected. Exiting."); + return; } } - last_poll = Some(now); + last_poll = Some(time::Instant::now()); // Don't poll until the Bitcoin backend is fully synced. if !synced { diff --git a/src/lib.rs b/src/lib.rs index 4b57b1905..d3a5222b1 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -28,7 +28,11 @@ use crate::{ }, }; -use std::{error, fmt, fs, io, path, sync, thread}; +use std::{ + error, fmt, fs, io, path, + sync::{self, mpsc}, + thread, +}; use miniscript::bitcoin::secp256k1; @@ -284,12 +288,12 @@ impl DaemonControl { /// JSONRPC server or one which exposes its API through a `DaemonControl`. pub enum DaemonHandle { Controller { - poller_shutdown: sync::Arc, + poller_sender: mpsc::SyncSender, poller_handle: thread::JoinHandle<()>, control: DaemonControl, }, Server { - poller_shutdown: sync::Arc, + poller_sender: mpsc::SyncSender, poller_handle: thread::JoinHandle<()>, rpcserver_shutdown: sync::Arc, rpcserver_handle: thread::JoinHandle>, @@ -368,15 +372,14 @@ impl DaemonHandle { // an atomic to be able to stop it. let bitcoin_poller = poller::Poller::new(bit.clone(), db.clone(), config.main_descriptor.clone()); - let poller_shutdown = sync::Arc::from(sync::atomic::AtomicBool::from(false)); + let (poller_sender, poller_receiver) = mpsc::sync_channel(0); let poller_handle = thread::Builder::new() .name("Bitcoin Network poller".to_string()) .spawn({ let poll_interval = config.bitcoin_config.poll_interval_secs; - let shutdown = poller_shutdown.clone(); move || { log::info!("Bitcoin poller started."); - bitcoin_poller.poll_forever(poll_interval, shutdown); + bitcoin_poller.poll_forever(poll_interval, poller_receiver); log::info!("Bitcoin poller stopped."); } }) @@ -406,14 +409,14 @@ impl DaemonHandle { .expect("Spawning the RPC server thread should never fail."); DaemonHandle::Server { - poller_shutdown, + poller_sender, poller_handle, rpcserver_shutdown, rpcserver_handle, } } else { DaemonHandle::Controller { - poller_shutdown, + poller_sender, poller_handle, control, } @@ -454,21 +457,25 @@ impl DaemonHandle { pub fn stop(self) -> Result<(), Box> { match self { Self::Controller { - poller_shutdown, + poller_sender, poller_handle, .. } => { - poller_shutdown.store(true, sync::atomic::Ordering::Relaxed); + poller_sender + .send(poller::PollerMessage::Shutdown) + .expect("The other end should never have hung up before this."); poller_handle.join().expect("Poller thread must not panic"); Ok(()) } Self::Server { - poller_shutdown, + poller_sender, poller_handle, rpcserver_shutdown, rpcserver_handle, } => { - poller_shutdown.store(true, sync::atomic::Ordering::Relaxed); + poller_sender + .send(poller::PollerMessage::Shutdown) + .expect("The other end should never have hung up before this."); rpcserver_shutdown.store(true, sync::atomic::Ordering::Relaxed); rpcserver_handle .join() @@ -656,18 +663,6 @@ mod tests { stream.flush().unwrap(); } - // Send them a response to 'getblockchaininfo' saying we are far from being synced - fn complete_sync_check(server: &net::TcpListener) { - let net_resp = [ - "HTTP/1.1 200\n\r\n{\"jsonrpc\":\"2.0\",\"id\":1,\"result\":{\"verificationprogress\":0.1,\"headers\":1000,\"blocks\":100}}\n".as_bytes(), - ] - .concat(); - let (mut stream, _) = server.accept().unwrap(); - read_til_json_end(&mut stream); - stream.write_all(&net_resp).unwrap(); - stream.flush().unwrap(); - } - // TODO: we could move the dummy bitcoind thread stuff to the bitcoind module to test the // bitcoind interface, and use the DummyLiana from testutils to sanity check the startup. // Note that startup as checked by this unit test is also tested in the functional test @@ -744,7 +739,8 @@ mod tests { complete_wallet_check(&server, &wo_path); complete_desc_check(&server, &receive_desc.to_string(), &change_desc.to_string()); complete_tip_init(&server); - complete_sync_check(&server); + // We don't have to complete the sync check as the poller checks whether it needs to stop + // before checking the bitcoind sync status. t.join().unwrap(); // The datadir is created now, so if we restart it it won't create the wo wallet. @@ -761,7 +757,8 @@ mod tests { complete_wallet_loading(&server); complete_wallet_check(&server, &wo_path); complete_desc_check(&server, &receive_desc.to_string(), &change_desc.to_string()); - complete_sync_check(&server); + // We don't have to complete the sync check as the poller checks whether it needs to stop + // before checking the bitcoind sync status. t.join().unwrap(); fs::remove_dir_all(&tmp_dir).unwrap();