Skip to content

Commit

Permalink
Address review; minor nits
Browse files Browse the repository at this point in the history
  • Loading branch information
claddyy committed Dec 18, 2024
1 parent e1c0526 commit 8d47bd7
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 32 deletions.
52 changes: 24 additions & 28 deletions src/maker/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ use std::{
atomic::{AtomicBool, Ordering::Relaxed},
Arc, Mutex, RwLock,
},
thread,
thread::JoinHandle,
time::{Duration, Instant},
};
Expand Down Expand Up @@ -93,12 +94,7 @@ pub struct ConnectionState {

pub struct ThreadPool {
pub threads: Mutex<Vec<JoinHandle<()>>>,
}

impl Default for ThreadPool {
fn default() -> Self {
Self::new()
}
pub port: u16,
}

impl Drop for ThreadPool {
Expand All @@ -110,9 +106,10 @@ impl Drop for ThreadPool {
}

impl ThreadPool {
pub fn new() -> Self {
pub fn new(port: u16) -> Self {
Self {
threads: Mutex::new(Vec::new()),
port,
}
}

Expand All @@ -127,29 +124,29 @@ impl ThreadPool {
.lock()
.map_err(|_| MakerError::General("Failed to lock threads"))?;

let thread_count = threads.len();
log::info!("Joining {} threads", thread_count);
log::info!("Joining {} threads", threads.len());

let mut joined_count = 0;
while let Some(thread) = threads.pop() {
let thread_name = thread.thread().name().unwrap().to_string();

match thread.join() {
Ok(_) => {
log::info!("Thread {} terminated successfully", thread_name);
log::info!("[{}] Thread {} joined", self.port, thread_name);
joined_count += 1;
}
Err(e) => {
log::error!("Thread {} terminated due to error {:?}", thread_name, e);
log::error!(
"[{}] Error {:?} while joining thread {}",
self.port,
e,
thread_name
);
}
}
}

log::info!(
"Successfully joined {} out of {} threads",
joined_count,
thread_count
);
log::info!("Successfully joined {} threads", joined_count,);
Ok(())
}
}
Expand Down Expand Up @@ -272,6 +269,8 @@ impl Maker {
config.connection_type = connection_type;
}

let thread_pool_port = config.port;

config.write_to_file(&data_dir.join("config.toml"))?;

log::info!("Initializing wallet sync");
Expand All @@ -286,7 +285,7 @@ impl Maker {
connection_state: Mutex::new(HashMap::new()),
highest_fidelity_proof: RwLock::new(None),
is_setup_complete: AtomicBool::new(false),
thread_pool: Arc::new(ThreadPool::new()),
thread_pool: Arc::new(ThreadPool::new(thread_pool_port)),
})
}

Expand Down Expand Up @@ -531,13 +530,12 @@ pub fn check_for_broadcasted_contracts(maker: Arc<Maker>) -> Result<(), MakerErr
"[{}] Spawning recovery thread after seeing contracts in mempool",
maker.config.port
);
let handle = std::thread::Builder::new()
.name("recovery: saw contracts in mempool".to_string())
let handle = thread::Builder::new()
.name("Swap recovery thread".to_string())
.spawn(move || {
if let Err(e) =
recover_from_swap(maker_clone.clone(), outgoings, incomings)
if let Err(e) = recover_from_swap(maker_clone, outgoings, incomings)
{
log::error!("Recovery thread failed with error: {:?}", e);
log::error!("Failed to recover from swap due to: {:?}", e);
}
})?;
maker.thread_pool.add_thread(handle);
Expand Down Expand Up @@ -620,12 +618,10 @@ pub fn check_for_idle_states(maker: Arc<Maker>) -> Result<(), MakerError> {
maker.config.port
);
let handle = std::thread::Builder::new()
.name("recovery: taker dropped".to_string())
.name("Swap Recovery Thread".to_string())
.spawn(move || {
if let Err(e) =
recover_from_swap(maker_clone.clone(), outgoings, incomings)
{
log::error!("Recovery thread failed with error: {:?}", e);
if let Err(e) = recover_from_swap(maker_clone, outgoings, incomings) {
log::error!("Failed to recover from swap due to: {:?}", e);
}
})?;
maker.thread_pool.add_thread(handle);
Expand All @@ -641,7 +637,7 @@ pub fn check_for_idle_states(maker: Arc<Maker>) -> Result<(), MakerError> {
}
} // All locks are cleared here

std::thread::sleep(Duration::from_secs(HEART_BEAT_INTERVAL_SECS));
thread::sleep(Duration::from_secs(HEART_BEAT_INTERVAL_SECS));
}

Ok(())
Expand Down
12 changes: 8 additions & 4 deletions src/maker/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
//! The file includes functions to validate and sign contract transactions, verify proof of funding, and handle unexpected recovery scenarios.
//! Implements the core functionality for a Maker in a Bitcoin coinswap protocol.
use std::{net::IpAddr, sync::Arc, time::Instant};
use std::{net::IpAddr, sync::Arc, thread, time::Instant};

use bitcoin::{
hashes::Hash,
Expand Down Expand Up @@ -678,9 +678,13 @@ fn unexpected_recovery(maker: Arc<Maker>) -> Result<(), MakerError> {
}
// Spawn a separate thread to wait for contract maturity and broadcasting timelocked.
let maker_clone = maker.clone();
let handle = std::thread::Builder::new()
.name("recovery: wait for contract maturity & broadcasting timelocked".to_string())
.spawn(move || recover_from_swap(maker_clone, outgoings, incomings).unwrap())?;
let handle = thread::Builder::new()
.name("Swap Recovery Thread".to_string())
.spawn(move || {
if let Err(e) = recover_from_swap(maker_clone, outgoings, incomings) {
log::error!("Failed to recover from swap due to: {:?}", e);
}
})?;
maker.thread_pool.add_thread(handle);
}
Ok(())
Expand Down

0 comments on commit 8d47bd7

Please sign in to comment.