Skip to content

Commit

Permalink
Restructure error handling in maker server
Browse files Browse the repository at this point in the history
  • Loading branch information
claddyy committed Dec 16, 2024
1 parent 1ca49c6 commit a9ea6ce
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 15 deletions.
12 changes: 6 additions & 6 deletions src/maker/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ pub struct ConnectionState {
}

pub struct ThreadPool {
pub threads: Mutex<Vec<JoinHandle<Result<(), MakerError>>>>,
pub threads: Mutex<Vec<JoinHandle<()>>>,
}

impl Default for ThreadPool {
Expand All @@ -116,18 +116,18 @@ impl ThreadPool {
}
}

pub fn add_thread(&self, handle: JoinHandle<Result<(), MakerError>>) {
pub fn add_thread(&self, handle: JoinHandle<()>) {
let mut threads = self.threads.lock().unwrap();
threads.push(handle);
}

#[inline]
fn join_all_threads(&self) -> Result<(), MakerError> {
let mut threads = self
.threads
.lock()
.map_err(|_| MakerError::General("Failed to lock threads"))?;
while let Some(thread) = threads.pop() {
thread.join().unwrap()?;
thread.join().unwrap();
}
Ok(())
}
Expand Down Expand Up @@ -511,7 +511,7 @@ pub fn check_for_broadcasted_contracts(maker: Arc<Maker>) -> Result<(), MakerErr
maker.config.port
);
let handle = std::thread::spawn(move || {
recover_from_swap(maker_clone, outgoings, incomings)
recover_from_swap(maker_clone, outgoings, incomings).unwrap();
});
maker.thread_pool.add_thread(handle);
// Clear the state value here
Expand Down Expand Up @@ -593,7 +593,7 @@ pub fn check_for_idle_states(maker: Arc<Maker>) -> Result<(), MakerError> {
maker.config.port
);
let handle = std::thread::spawn(move || {
recover_from_swap(maker_clone, outgoings, incomings)
recover_from_swap(maker_clone, outgoings, incomings).unwrap()
});
maker.thread_pool.add_thread(handle);
// Clear the state values here
Expand Down
5 changes: 3 additions & 2 deletions src/maker/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -678,8 +678,9 @@ fn unexpected_recovery(maker: Arc<Maker>) -> Result<(), MakerError> {
}
// Spawn a separate thread to wait for contract maturity and broadcasting timelocked.
let maker_clone = maker.clone();
let handle =
std::thread::spawn(move || recover_from_swap(maker_clone, outgoings, incomings));
let handle = std::thread::spawn(move || {
recover_from_swap(maker_clone, outgoings, incomings).unwrap()
});
maker.thread_pool.add_thread(handle);
}
Ok(())
Expand Down
26 changes: 19 additions & 7 deletions src/maker/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -443,7 +443,10 @@ pub fn start_maker_server(maker: Arc<Maker>) -> Result<(), MakerError> {
.name("Bitcoin Core Connection Checker Thread".to_string())
.spawn(move || {
log::info!("[{}] Spawning Bitcoin Core connection checker thread", port);
check_connection_with_core(maker_clone, acc_client_clone)
if let Err(e) = check_connection_with_core(maker_clone.clone(), acc_client_clone) {
log::error!("[{}] Bitcoin Core connection check failed: {:?}", port, e);
maker_clone.shutdown.store(true, Relaxed);
}
})?;
maker.thread_pool.add_thread(conn_check_thread);

Expand All @@ -458,7 +461,10 @@ pub fn start_maker_server(maker: Arc<Maker>) -> Result<(), MakerError> {
"[{}] Spawning Client connection status checker thread",
port
);
check_for_idle_states(maker_clone.clone())
if let Err(e) = check_for_idle_states(maker_clone.clone()) {
log::error!("Failed checking client's idle state {:?}", e);
maker_clone.shutdown.store(true, Relaxed);
}
})?;
maker.thread_pool.add_thread(idle_conn_check_thread);

Expand All @@ -471,7 +477,10 @@ pub fn start_maker_server(maker: Arc<Maker>) -> Result<(), MakerError> {
.name("Contract Watcher Thread".to_string())
.spawn(move || {
log::info!("[{}] Spawning contract-watcher thread", port);
check_for_broadcasted_contracts(maker_clone.clone())
if let Err(e) = check_for_broadcasted_contracts(maker_clone.clone()) {
maker_clone.shutdown.store(true, Relaxed);
log::error!("Failed checking broadcasted contracts {:?}", e);
}
})?;
maker.thread_pool.add_thread(contract_watcher_thread);

Expand All @@ -482,7 +491,13 @@ pub fn start_maker_server(maker: Arc<Maker>) -> Result<(), MakerError> {
.name("RPC Thread".to_string())
.spawn(move || {
log::info!("[{}] Spawning RPC server thread", port);
start_rpc_server(maker_clone)
match start_rpc_server(maker_clone.clone()) {
Ok(_) => (),
Err(e) => {
log::error!("Failed starting rpc server {:?}", e);
maker_clone.shutdown.store(true, Relaxed);
}
}
})?;

maker.thread_pool.add_thread(rpc_thread);
Expand Down Expand Up @@ -518,9 +533,6 @@ pub fn start_maker_server(maker: Arc<Maker>) -> Result<(), MakerError> {
.spawn(move || {
if let Err(e) = handle_client(maker_for_handler, &mut stream, client_addr) {
log::error!("[{}] Error Handling client request {:?}", port, e);
Err(e)
} else {
Ok(())
}
})?;
maker.thread_pool.add_thread(client_handler_thread);
Expand Down

0 comments on commit a9ea6ce

Please sign in to comment.