diff --git a/src/maker/api.rs b/src/maker/api.rs index d615938c..523bdf4f 100644 --- a/src/maker/api.rs +++ b/src/maker/api.rs @@ -92,7 +92,7 @@ pub struct ConnectionState { } pub struct ThreadPool { - pub threads: Mutex>>>, + pub threads: Mutex>>, } impl Default for ThreadPool { @@ -116,18 +116,18 @@ impl ThreadPool { } } - pub fn add_thread(&self, handle: JoinHandle>) { + pub fn add_thread(&self, handle: JoinHandle<()>) { let mut threads = self.threads.lock().unwrap(); threads.push(handle); } - + #[inline] fn join_all_threads(&self) -> Result<(), MakerError> { let mut threads = self .threads .lock() .map_err(|_| MakerError::General("Failed to lock threads"))?; while let Some(thread) = threads.pop() { - thread.join().unwrap()?; + thread.join().unwrap(); } Ok(()) } @@ -511,7 +511,7 @@ pub fn check_for_broadcasted_contracts(maker: Arc) -> Result<(), MakerErr maker.config.port ); let handle = std::thread::spawn(move || { - recover_from_swap(maker_clone, outgoings, incomings) + recover_from_swap(maker_clone, outgoings, incomings).unwrap(); }); maker.thread_pool.add_thread(handle); // Clear the state value here @@ -593,7 +593,7 @@ pub fn check_for_idle_states(maker: Arc) -> Result<(), MakerError> { maker.config.port ); let handle = std::thread::spawn(move || { - recover_from_swap(maker_clone, outgoings, incomings) + recover_from_swap(maker_clone, outgoings, incomings).unwrap() }); maker.thread_pool.add_thread(handle); // Clear the state values here diff --git a/src/maker/handlers.rs b/src/maker/handlers.rs index 98579ef4..647138d3 100644 --- a/src/maker/handlers.rs +++ b/src/maker/handlers.rs @@ -678,8 +678,9 @@ fn unexpected_recovery(maker: Arc) -> Result<(), MakerError> { } // Spawn a separate thread to wait for contract maturity and broadcasting timelocked. let maker_clone = maker.clone(); - let handle = - std::thread::spawn(move || recover_from_swap(maker_clone, outgoings, incomings)); + let handle = std::thread::spawn(move || { + recover_from_swap(maker_clone, outgoings, incomings).unwrap() + }); maker.thread_pool.add_thread(handle); } Ok(()) diff --git a/src/maker/server.rs b/src/maker/server.rs index f3660995..b247b18e 100644 --- a/src/maker/server.rs +++ b/src/maker/server.rs @@ -443,7 +443,10 @@ pub fn start_maker_server(maker: Arc) -> Result<(), MakerError> { .name("Bitcoin Core Connection Checker Thread".to_string()) .spawn(move || { log::info!("[{}] Spawning Bitcoin Core connection checker thread", port); - check_connection_with_core(maker_clone, acc_client_clone) + if let Err(e) = check_connection_with_core(maker_clone.clone(), acc_client_clone) { + log::error!("[{}] Bitcoin Core connection check failed: {:?}", port, e); + maker_clone.shutdown.store(true, Relaxed); + } })?; maker.thread_pool.add_thread(conn_check_thread); @@ -458,7 +461,10 @@ pub fn start_maker_server(maker: Arc) -> Result<(), MakerError> { "[{}] Spawning Client connection status checker thread", port ); - check_for_idle_states(maker_clone.clone()) + if let Err(e) = check_for_idle_states(maker_clone.clone()) { + log::error!("Failed checking client's idle state {:?}", e); + maker_clone.shutdown.store(true, Relaxed); + } })?; maker.thread_pool.add_thread(idle_conn_check_thread); @@ -471,7 +477,10 @@ pub fn start_maker_server(maker: Arc) -> Result<(), MakerError> { .name("Contract Watcher Thread".to_string()) .spawn(move || { log::info!("[{}] Spawning contract-watcher thread", port); - check_for_broadcasted_contracts(maker_clone.clone()) + if let Err(e) = check_for_broadcasted_contracts(maker_clone.clone()) { + maker_clone.shutdown.store(true, Relaxed); + log::error!("Failed checking broadcasted contracts {:?}", e); + } })?; maker.thread_pool.add_thread(contract_watcher_thread); @@ -482,7 +491,13 @@ pub fn start_maker_server(maker: Arc) -> Result<(), MakerError> { .name("RPC Thread".to_string()) .spawn(move || { log::info!("[{}] Spawning RPC server thread", port); - start_rpc_server(maker_clone) + match start_rpc_server(maker_clone.clone()) { + Ok(_) => (), + Err(e) => { + log::error!("Failed starting rpc server {:?}", e); + maker_clone.shutdown.store(true, Relaxed); + } + } })?; maker.thread_pool.add_thread(rpc_thread); @@ -518,9 +533,6 @@ pub fn start_maker_server(maker: Arc) -> Result<(), MakerError> { .spawn(move || { if let Err(e) = handle_client(maker_for_handler, &mut stream, client_addr) { log::error!("[{}] Error Handling client request {:?}", port, e); - Err(e) - } else { - Ok(()) } })?; maker.thread_pool.add_thread(client_handler_thread);