From 1f144b02b0ca73ea9c9edba0f1397a53af7cf376 Mon Sep 17 00:00:00 2001 From: wthrajat Date: Mon, 1 Jan 2024 23:09:16 +0530 Subject: [PATCH 01/15] hehe --- Cargo.toml | 1 + src/error.rs | 7 +++++++ src/maker/mod.rs | 14 ++++++++++---- src/utill.rs | 5 +++-- 4 files changed, 21 insertions(+), 6 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 5b55c6ff..7395ae65 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -11,6 +11,7 @@ bip39 = {version = "1.0.1", features = ["rand"] } bitcoin = {version = "0.30", features = ["rand"] } serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" +serde_cbor = "0.11.2" tokio = { version = "1.16.1", features = ["full"] } log = "^0.4" env_logger = "0.7" diff --git a/src/error.rs b/src/error.rs index 92831d7a..64eea3c1 100644 --- a/src/error.rs +++ b/src/error.rs @@ -9,6 +9,7 @@ pub enum NetError { Json(serde_json::Error), ReachedEOF, ConnectionTimedOut, + Cbor(serde_cbor::Error), } impl From for NetError { @@ -23,6 +24,12 @@ impl From for NetError { } } +impl From for NetError { + fn from(value: serde_cbor::Error) -> Self { + Self::Cbor(value) + } +} + /// Includes all Protocol-level errors. #[derive(Debug)] pub enum ProtocolError { diff --git a/src/maker/mod.rs b/src/maker/mod.rs index 9ad0eef0..dae3bd9b 100644 --- a/src/maker/mod.rs +++ b/src/maker/mod.rs @@ -1,8 +1,8 @@ //! Defines a Coinswap Maker Server. //! //! Handles connections, communication with takers, various aspects of the -//! Maker's behavior, includes functionalities such as checking for new connections, -//! handling messages from takers, refreshing offer caches, and interacting with the Bitcoin node. +//! Maker's behavior, includes functionalities such as checking for new connections, handling messages from takers, +//! refreshing offer caches, and interacting with the Bitcoin node. pub mod api; pub mod config; @@ -40,8 +40,9 @@ use crate::{ use crate::maker::error::MakerError; -/// Initializes and starts the Maker server, handling connections and various +/// This function initializes and starts the Maker server, handling connections and various /// aspects of the Maker's behavior. + #[tokio::main] pub async fn start_maker_server(maker: Arc) -> Result<(), MakerError> { log::debug!("Running maker with special behavior = {:?}", maker.behavior); @@ -213,7 +214,8 @@ pub async fn start_maker_server(maker: Arc) -> Result<(), MakerError> { }; line = line.trim_end().to_string(); - let message: TakerToMakerMessage = serde_json::from_str(&line).unwrap(); + let message: TakerToMakerMessage = + serde_cbor::from_slice(&line.as_bytes()).unwrap(); log::info!("[{}] <=== {} ", maker_clone.config.port, message); let message_result: Result, MakerError> = @@ -224,6 +226,10 @@ pub async fn start_maker_server(maker: Arc) -> Result<(), MakerError> { if let Some(message) = reply { log::info!("[{}] ===> {} ", maker_clone.config.port, message); log::debug!("{:#?}", message); + + // let cbor_bytes = serde_cbor::to_vec(&message)?; + // send_message(&mut socket_writer, &cbor_bytes).await?; + if let Err(e) = send_message(&mut socket_writer, &message).await { log::error!("closing due to io error sending message: {:?}", e); break; diff --git a/src/utill.rs b/src/utill.rs index 748f850e..dc2c751d 100644 --- a/src/utill.rs +++ b/src/utill.rs @@ -90,7 +90,8 @@ pub async fn send_message( socket_writer: &mut WriteHalf<'_>, message: &impl serde::Serialize, ) -> Result<(), NetError> { - let mut message_bytes = serde_json::to_vec(message).map_err(std::io::Error::from)?; + let mut message_bytes = serde_cbor::to_vec(message).map_err(|err| NetError::Cbor(err))?; + message_bytes.push(b'\n'); socket_writer.write_all(&message_bytes).await?; Ok(()) @@ -105,7 +106,7 @@ pub async fn read_message( if n == 0 { return Err(NetError::ReachedEOF); } - let message: MakerToTakerMessage = serde_json::from_str(&line)?; + let message: MakerToTakerMessage = serde_cbor::from_slice(line.trim().as_bytes())?; log::debug!("<== {:#?}", message); Ok(message) } From 29d46a86ed51469d8eddf55b698c62aff9e6d1c9 Mon Sep 17 00:00:00 2001 From: wthrajat Date: Sun, 7 Jan 2024 15:30:21 +0530 Subject: [PATCH 02/15] revert for clean branch This reverts commit 1f144b02b0ca73ea9c9edba0f1397a53af7cf376. --- Cargo.toml | 1 - src/error.rs | 7 ------- src/maker/mod.rs | 14 ++++---------- src/utill.rs | 5 ++--- 4 files changed, 6 insertions(+), 21 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 7395ae65..5b55c6ff 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -11,7 +11,6 @@ bip39 = {version = "1.0.1", features = ["rand"] } bitcoin = {version = "0.30", features = ["rand"] } serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" -serde_cbor = "0.11.2" tokio = { version = "1.16.1", features = ["full"] } log = "^0.4" env_logger = "0.7" diff --git a/src/error.rs b/src/error.rs index 64eea3c1..92831d7a 100644 --- a/src/error.rs +++ b/src/error.rs @@ -9,7 +9,6 @@ pub enum NetError { Json(serde_json::Error), ReachedEOF, ConnectionTimedOut, - Cbor(serde_cbor::Error), } impl From for NetError { @@ -24,12 +23,6 @@ impl From for NetError { } } -impl From for NetError { - fn from(value: serde_cbor::Error) -> Self { - Self::Cbor(value) - } -} - /// Includes all Protocol-level errors. #[derive(Debug)] pub enum ProtocolError { diff --git a/src/maker/mod.rs b/src/maker/mod.rs index dae3bd9b..9ad0eef0 100644 --- a/src/maker/mod.rs +++ b/src/maker/mod.rs @@ -1,8 +1,8 @@ //! Defines a Coinswap Maker Server. //! //! Handles connections, communication with takers, various aspects of the -//! Maker's behavior, includes functionalities such as checking for new connections, handling messages from takers, -//! refreshing offer caches, and interacting with the Bitcoin node. +//! Maker's behavior, includes functionalities such as checking for new connections, +//! handling messages from takers, refreshing offer caches, and interacting with the Bitcoin node. pub mod api; pub mod config; @@ -40,9 +40,8 @@ use crate::{ use crate::maker::error::MakerError; -/// This function initializes and starts the Maker server, handling connections and various +/// Initializes and starts the Maker server, handling connections and various /// aspects of the Maker's behavior. - #[tokio::main] pub async fn start_maker_server(maker: Arc) -> Result<(), MakerError> { log::debug!("Running maker with special behavior = {:?}", maker.behavior); @@ -214,8 +213,7 @@ pub async fn start_maker_server(maker: Arc) -> Result<(), MakerError> { }; line = line.trim_end().to_string(); - let message: TakerToMakerMessage = - serde_cbor::from_slice(&line.as_bytes()).unwrap(); + let message: TakerToMakerMessage = serde_json::from_str(&line).unwrap(); log::info!("[{}] <=== {} ", maker_clone.config.port, message); let message_result: Result, MakerError> = @@ -226,10 +224,6 @@ pub async fn start_maker_server(maker: Arc) -> Result<(), MakerError> { if let Some(message) = reply { log::info!("[{}] ===> {} ", maker_clone.config.port, message); log::debug!("{:#?}", message); - - // let cbor_bytes = serde_cbor::to_vec(&message)?; - // send_message(&mut socket_writer, &cbor_bytes).await?; - if let Err(e) = send_message(&mut socket_writer, &message).await { log::error!("closing due to io error sending message: {:?}", e); break; diff --git a/src/utill.rs b/src/utill.rs index dc2c751d..748f850e 100644 --- a/src/utill.rs +++ b/src/utill.rs @@ -90,8 +90,7 @@ pub async fn send_message( socket_writer: &mut WriteHalf<'_>, message: &impl serde::Serialize, ) -> Result<(), NetError> { - let mut message_bytes = serde_cbor::to_vec(message).map_err(|err| NetError::Cbor(err))?; - + let mut message_bytes = serde_json::to_vec(message).map_err(std::io::Error::from)?; message_bytes.push(b'\n'); socket_writer.write_all(&message_bytes).await?; Ok(()) @@ -106,7 +105,7 @@ pub async fn read_message( if n == 0 { return Err(NetError::ReachedEOF); } - let message: MakerToTakerMessage = serde_cbor::from_slice(line.trim().as_bytes())?; + let message: MakerToTakerMessage = serde_json::from_str(&line)?; log::debug!("<== {:#?}", message); Ok(message) } From ce675da9a62a7ebfb0527323257cfcff9da79c4c Mon Sep 17 00:00:00 2001 From: wthrajat Date: Sun, 7 Jan 2024 16:24:14 +0530 Subject: [PATCH 03/15] use Signed-off-by: wthrajat --- Cargo.toml | 1 + src/error.rs | 7 +++++++ src/maker/mod.rs | 9 ++++----- src/utill.rs | 13 ++++++------- 4 files changed, 18 insertions(+), 12 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 5b55c6ff..7395ae65 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -11,6 +11,7 @@ bip39 = {version = "1.0.1", features = ["rand"] } bitcoin = {version = "0.30", features = ["rand"] } serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" +serde_cbor = "0.11.2" tokio = { version = "1.16.1", features = ["full"] } log = "^0.4" env_logger = "0.7" diff --git a/src/error.rs b/src/error.rs index 92831d7a..90b777e0 100644 --- a/src/error.rs +++ b/src/error.rs @@ -9,6 +9,7 @@ pub enum NetError { Json(serde_json::Error), ReachedEOF, ConnectionTimedOut, + Cbor(serde_cbor::Error) } impl From for NetError { @@ -23,6 +24,12 @@ impl From for NetError { } } +impl From for NetError { + fn from(value: serde_cbor::Error) -> Self { + Self::Cbor(value) + } +} + /// Includes all Protocol-level errors. #[derive(Debug)] pub enum ProtocolError { diff --git a/src/maker/mod.rs b/src/maker/mod.rs index 9ad0eef0..01ff5b23 100644 --- a/src/maker/mod.rs +++ b/src/maker/mod.rs @@ -18,7 +18,7 @@ use std::{ use bitcoin::Network; use bitcoind::bitcoincore_rpc::RpcApi; use tokio::{ - io::{AsyncBufReadExt, BufReader}, + io::{BufReader, AsyncReadExt}, net::TcpListener, select, sync::mpsc, @@ -191,9 +191,9 @@ pub async fn start_maker_server(maker: Arc) -> Result<(), MakerError> { log::info!("[{}] ===> MakerHello", maker_clone.config.port); loop { - let mut line = String::new(); + let mut line : Vec = Vec::new(); select! { - readline_ret = reader.read_line(&mut line) => { + readline_ret = reader.read_to_end(&mut line) => { match readline_ret { Ok(0) => { log::info!("[{}] Connection closed by peer", maker_clone.config.port); @@ -212,8 +212,7 @@ pub async fn start_maker_server(maker: Arc) -> Result<(), MakerError> { }, }; - line = line.trim_end().to_string(); - let message: TakerToMakerMessage = serde_json::from_str(&line).unwrap(); + let message: TakerToMakerMessage = serde_cbor::from_slice(&line).unwrap(); log::info!("[{}] <=== {} ", maker_clone.config.port, message); let message_result: Result, MakerError> = diff --git a/src/utill.rs b/src/utill.rs index 748f850e..20ef39f4 100644 --- a/src/utill.rs +++ b/src/utill.rs @@ -21,7 +21,7 @@ use std::{ use serde_json::Value; use tokio::{ - io::{AsyncBufReadExt, AsyncWriteExt, BufReader}, + io::{AsyncWriteExt, BufReader, AsyncReadExt}, net::tcp::{ReadHalf, WriteHalf}, }; @@ -80,7 +80,7 @@ pub fn setup_logger() { .default_filter_or("coinswap=info") .default_write_style_or("always"), ) - .is_test(true) + //.is_test(true) .init(); }); } @@ -90,8 +90,7 @@ pub async fn send_message( socket_writer: &mut WriteHalf<'_>, message: &impl serde::Serialize, ) -> Result<(), NetError> { - let mut message_bytes = serde_json::to_vec(message).map_err(std::io::Error::from)?; - message_bytes.push(b'\n'); + let message_bytes = serde_cbor::to_vec(message).map_err(NetError::Cbor)?; socket_writer.write_all(&message_bytes).await?; Ok(()) } @@ -100,12 +99,12 @@ pub async fn send_message( pub async fn read_message( reader: &mut BufReader>, ) -> Result { - let mut line = String::new(); - let n = reader.read_line(&mut line).await?; + let mut line: Vec = Vec::new(); + let n = reader.read_to_end(&mut line).await?; if n == 0 { return Err(NetError::ReachedEOF); } - let message: MakerToTakerMessage = serde_json::from_str(&line)?; + let message: MakerToTakerMessage = serde_cbor::from_slice(&line)?; log::debug!("<== {:#?}", message); Ok(message) } From 93e321a1248bfc8d24af9778775e448f7c259471 Mon Sep 17 00:00:00 2001 From: wthrajat Date: Sun, 7 Jan 2024 16:25:32 +0530 Subject: [PATCH 04/15] cargo format --- src/error.rs | 2 +- src/maker/mod.rs | 4 ++-- src/utill.rs | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/src/error.rs b/src/error.rs index 90b777e0..64eea3c1 100644 --- a/src/error.rs +++ b/src/error.rs @@ -9,7 +9,7 @@ pub enum NetError { Json(serde_json::Error), ReachedEOF, ConnectionTimedOut, - Cbor(serde_cbor::Error) + Cbor(serde_cbor::Error), } impl From for NetError { diff --git a/src/maker/mod.rs b/src/maker/mod.rs index 01ff5b23..cb3a69e3 100644 --- a/src/maker/mod.rs +++ b/src/maker/mod.rs @@ -18,7 +18,7 @@ use std::{ use bitcoin::Network; use bitcoind::bitcoincore_rpc::RpcApi; use tokio::{ - io::{BufReader, AsyncReadExt}, + io::{AsyncReadExt, BufReader}, net::TcpListener, select, sync::mpsc, @@ -191,7 +191,7 @@ pub async fn start_maker_server(maker: Arc) -> Result<(), MakerError> { log::info!("[{}] ===> MakerHello", maker_clone.config.port); loop { - let mut line : Vec = Vec::new(); + let mut line: Vec = Vec::new(); select! { readline_ret = reader.read_to_end(&mut line) => { match readline_ret { diff --git a/src/utill.rs b/src/utill.rs index 20ef39f4..46ee5d33 100644 --- a/src/utill.rs +++ b/src/utill.rs @@ -21,7 +21,7 @@ use std::{ use serde_json::Value; use tokio::{ - io::{AsyncWriteExt, BufReader, AsyncReadExt}, + io::{AsyncReadExt, AsyncWriteExt, BufReader}, net::tcp::{ReadHalf, WriteHalf}, }; From 1182ca1c17cb9771c1235430ec66631acb28726b Mon Sep 17 00:00:00 2001 From: wthrajat Date: Tue, 9 Jan 2024 09:43:25 +0530 Subject: [PATCH 05/15] some progress, msgs de/serialising properly now Signed-off-by: wthrajat --- src/maker/mod.rs | 76 +++++++++++++++++++++++++++++++++--------------- src/taker/api.rs | 6 ++++ src/utill.rs | 18 ++++++------ 3 files changed, 68 insertions(+), 32 deletions(-) diff --git a/src/maker/mod.rs b/src/maker/mod.rs index cb3a69e3..15528fc0 100644 --- a/src/maker/mod.rs +++ b/src/maker/mod.rs @@ -8,8 +8,8 @@ pub mod api; pub mod config; pub mod error; mod handlers; - use std::{ + convert::TryInto, net::Ipv4Addr, sync::Arc, time::{Duration, Instant}, @@ -191,15 +191,15 @@ pub async fn start_maker_server(maker: Arc) -> Result<(), MakerError> { log::info!("[{}] ===> MakerHello", maker_clone.config.port); loop { - let mut line: Vec = Vec::new(); + let mut buf: Vec = Vec::new(); select! { - readline_ret = reader.read_to_end(&mut line) => { - match readline_ret { + readbuf_ret = reader.read_to_end(&mut buf) => { + match readbuf_ret { Ok(0) => { log::info!("[{}] Connection closed by peer", maker_clone.config.port); break; } - Ok(_) => (), + Ok(_) => log::info!("{:?} Bytes were read successfully!", buf.len()), Err(e) => { log::error!("error reading from socket: {:?}", e); break; @@ -212,29 +212,59 @@ pub async fn start_maker_server(maker: Arc) -> Result<(), MakerError> { }, }; - let message: TakerToMakerMessage = serde_cbor::from_slice(&line).unwrap(); - log::info!("[{}] <=== {} ", maker_clone.config.port, message); + if !buf.is_empty() { + // Read the length prefix (assuming it's a u32) + let len = u32::from_be_bytes(buf[0..4].try_into().unwrap()) as usize; - let message_result: Result, MakerError> = - handle_message(&maker_clone, &mut connection_state, message, addr.ip()).await; + // Ensure the buffer has enough data + if buf.len() < len + 4 { + break; + } - match message_result { - Ok(reply) => { - if let Some(message) = reply { - log::info!("[{}] ===> {} ", maker_clone.config.port, message); - log::debug!("{:#?}", message); - if let Err(e) = send_message(&mut socket_writer, &message).await { - log::error!("closing due to io error sending message: {:?}", e); - break; + // Get the message data + let message_data = &buf[4..4 + len]; + + // Deserialize the message + let message: TakerToMakerMessage = match serde_cbor::from_slice(message_data) { + Ok(message) => message, + Err(e) => { + log::error!("Error deserializing message: {:?}", e); + continue; + } + }; + + // Log the message + log::info!("[{}] <=== {} ", maker_clone.config.port, message); + log::info!("Message deserialised successfuly: {:?}", message); + + // Remove the processed data from the buffer + // buf.drain(0..4 + len); + + let message_result: Result, MakerError> = + handle_message(&maker_clone, &mut connection_state, message, addr.ip()) + .await; + + match message_result { + Ok(reply) => { + if let Some(message) = reply { + log::info!("[{}] ===> {} ", maker_clone.config.port, message); + log::debug!("{:#?}", message); + if let Err(e) = send_message(&mut socket_writer, &message).await { + log::error!( + "Closing due to IO error in sending message: {:?}", + e + ); + continue; + } } + // if reply is None then dont send anything to client + } + Err(err) => { + server_loop_comms_tx.send(err).await.unwrap(); + break; } - //if reply is None then dont send anything to client - } - Err(err) => { - server_loop_comms_tx.send(err).await.unwrap(); - break; } - }; + } } }); } diff --git a/src/taker/api.rs b/src/taker/api.rs index 88cf0f21..f25b0d07 100644 --- a/src/taker/api.rs +++ b/src/taker/api.rs @@ -387,11 +387,17 @@ impl Taker { let swap_locktime = self.config.refund_locktime + self.config.refund_locktime_step * self.ongoing_swap_state.swap_params.maker_count; + log::info!("---Swap Locktime: {}", swap_locktime); + // Loop until we find a live maker who responded to our signature request. let (maker, funding_txs) = loop { // Fail early if not enough good makers in the list to satisfy swap requirements. let untried_maker_count = self.offerbook.get_all_untried().len(); + + log::info!("----Untried Makers Count: {}", untried_maker_count); + if untried_maker_count < self.ongoing_swap_state.swap_params.maker_count as usize { + log::info!("We don't have enough makers to satisfy the swap requirements!"); return Err(TakerError::NotEnoughMakersInOfferBook); } let maker = self.choose_next_maker()?.clone(); diff --git a/src/utill.rs b/src/utill.rs index 46ee5d33..ed993ca8 100644 --- a/src/utill.rs +++ b/src/utill.rs @@ -80,7 +80,7 @@ pub fn setup_logger() { .default_filter_or("coinswap=info") .default_write_style_or("always"), ) - //.is_test(true) + // .is_test(true) .init(); }); } @@ -90,8 +90,9 @@ pub async fn send_message( socket_writer: &mut WriteHalf<'_>, message: &impl serde::Serialize, ) -> Result<(), NetError> { - let message_bytes = serde_cbor::to_vec(message).map_err(NetError::Cbor)?; - socket_writer.write_all(&message_bytes).await?; + let message_cbor = serde_cbor::to_vec(message).map_err(NetError::Cbor)?; + socket_writer.write_u32(message_cbor.len() as u32).await?; + socket_writer.write_all(&message_cbor).await?; Ok(()) } @@ -99,12 +100,11 @@ pub async fn send_message( pub async fn read_message( reader: &mut BufReader>, ) -> Result { - let mut line: Vec = Vec::new(); - let n = reader.read_to_end(&mut line).await?; - if n == 0 { - return Err(NetError::ReachedEOF); - } - let message: MakerToTakerMessage = serde_cbor::from_slice(&line)?; + let length = reader.read_u32().await?; + let mut buffer = vec![0; length as usize]; + reader.read_exact(&mut buffer).await?; + let message: MakerToTakerMessage = serde_cbor::from_reader(&*buffer).map_err(NetError::Cbor)?; + log::info!("-----Here is message: {:#?}", message); log::debug!("<== {:#?}", message); Ok(message) } From a49f79aa4f41f4ef6dafe5596c9045d666753468 Mon Sep 17 00:00:00 2001 From: wthrajat Date: Sun, 21 Jan 2024 19:38:29 +0530 Subject: [PATCH 06/15] some little progress and cleaned up a bit --- src/maker/mod.rs | 117 ++++++++++++++++++++---------------------- src/taker/routines.rs | 21 ++++++-- src/utill.rs | 8 +-- 3 files changed, 79 insertions(+), 67 deletions(-) diff --git a/src/maker/mod.rs b/src/maker/mod.rs index 15528fc0..3afaf8a6 100644 --- a/src/maker/mod.rs +++ b/src/maker/mod.rs @@ -8,13 +8,14 @@ pub mod api; pub mod config; pub mod error; mod handlers; + use std::{ - convert::TryInto, net::Ipv4Addr, sync::Arc, time::{Duration, Instant}, }; +// use crate::error::NetError; use bitcoin::Network; use bitcoind::bitcoincore_rpc::RpcApi; use tokio::{ @@ -191,79 +192,73 @@ pub async fn start_maker_server(maker: Arc) -> Result<(), MakerError> { log::info!("[{}] ===> MakerHello", maker_clone.config.port); loop { - let mut buf: Vec = Vec::new(); - select! { - readbuf_ret = reader.read_to_end(&mut buf) => { - match readbuf_ret { - Ok(0) => { - log::info!("[{}] Connection closed by peer", maker_clone.config.port); - break; - } - Ok(_) => log::info!("{:?} Bytes were read successfully!", buf.len()), - Err(e) => { - log::error!("error reading from socket: {:?}", e); - break; - } - } - }, - _ = sleep(Duration::from_secs(maker_clone.config.idle_connection_timeout)) => { - log::info!("[{}] Idle connection closed", addr.port()); + let mut length_buf = [0; 4]; + match reader.read_exact(&mut length_buf).await { + Ok(0) => { + log::info!(" --- got 0 while reading --- Connection closed gracefully"); + log::info!("[{}] Connection closed by peer", maker_clone.config.port); break; - }, - }; + } + Ok(_) => { + let message_len = u32::from_be_bytes(length_buf) as usize; + log::info!("--- message length = {}", message_len); + if message_len == 0 { + log::error!("Received message with length 0"); + break; + } - if !buf.is_empty() { - // Read the length prefix (assuming it's a u32) - let len = u32::from_be_bytes(buf[0..4].try_into().unwrap()) as usize; + let mut message_buf = vec![0; message_len]; + log::info!(" --- message buf length = {}", message_len); + if reader.read_exact(&mut message_buf).await.is_err() { + log::error!(" ---Failed to read message from socket"); + break; + } - // Ensure the buffer has enough data - if buf.len() < len + 4 { - break; - } + // Deserialize the message using serde_cbor + let message: TakerToMakerMessage = serde_cbor::from_slice(&message_buf) + .expect(" --- msg deserialization failed mod 214"); - // Get the message data - let message_data = &buf[4..4 + len]; + log::info!("[{}] <=== {} ", maker_clone.config.port, message); - // Deserialize the message - let message: TakerToMakerMessage = match serde_cbor::from_slice(message_data) { - Ok(message) => message, - Err(e) => { - log::error!("Error deserializing message: {:?}", e); - continue; - } - }; + let message_result: Result, MakerError> = + handle_message(&maker_clone, &mut connection_state, message, addr.ip()) + .await; - // Log the message - log::info!("[{}] <=== {} ", maker_clone.config.port, message); - log::info!("Message deserialised successfuly: {:?}", message); + log::info!(" ---- message result 221 = {:#?}", message_result); - // Remove the processed data from the buffer - // buf.drain(0..4 + len); + match message_result { + Ok(Some(reply)) => { + log::info!(" --- entered mod 223"); - let message_result: Result, MakerError> = - handle_message(&maker_clone, &mut connection_state, message, addr.ip()) - .await; + log::info!(" --- yay! got some reply = {:#?}", reply); + log::info!("[{}] ===> {} ", maker_clone.config.port, reply); + log::debug!("{:#?}", reply); - match message_result { - Ok(reply) => { - if let Some(message) = reply { - log::info!("[{}] ===> {} ", maker_clone.config.port, message); - log::debug!("{:#?}", message); - if let Err(e) = send_message(&mut socket_writer, &message).await { - log::error!( - "Closing due to IO error in sending message: {:?}", - e - ); - continue; + match send_message(&mut socket_writer, &reply).await { + Ok(_) => {} + Err(e) => { + log::error!(" --- error sending message: {:#?}", e); + break; + } } } - // if reply is None then dont send anything to client - } - Err(err) => { - server_loop_comms_tx.send(err).await.unwrap(); - break; + Ok(None) => { + log::info!(" --- this time got \"NONE\" hence not sending anything to client ---"); + } + Err(message_result_error) => { + log::error!(" ---- 257 mod Error handling message: {:#?}", message_result_error); + server_loop_comms_tx.send(message_result_error).await.unwrap(); + break; + } } } + Err(reader_error) => { + log::error!( + " --- hmmm err reading message length from socket i wonder why = {:#?}", + reader_error + ); + break; + } } } }); diff --git a/src/taker/routines.rs b/src/taker/routines.rs index eec8a9ac..588a606c 100644 --- a/src/taker/routines.rs +++ b/src/taker/routines.rs @@ -84,8 +84,12 @@ pub async fn handshake_maker<'a>( }), ) .await?; + log::info!(" --- Takerhello routines success 1"); let makerhello = match read_message(&mut socket_reader).await { - Ok(MakerToTakerMessage::MakerHello(m)) => m, + Ok(MakerToTakerMessage::MakerHello(m)) => { + log::info!(" --- Takerhello routines success 2"); + m + } Ok(any) => { return Err(ProtocolError::WrongMessage { expected: "MakerHello".to_string(), @@ -142,8 +146,10 @@ pub(crate) async fn req_sigs_for_sender_once( }), ) .await?; + log::info!(" --- ReqContractSigsForSender routines success 1"); let contract_sigs_for_sender = match read_message(&mut socket_reader).await { Ok(MakerToTakerMessage::RespContractSigsForSender(m)) => { + log::info!(" --- ReqContractSigsForSender routines success 2"); if m.sigs.len() != outgoing_swapcoins.len() { return Err(ProtocolError::WrongNumOfSigs { expected: outgoing_swapcoins.len(), @@ -201,8 +207,10 @@ pub(crate) async fn req_sigs_for_recvr_once( }), ) .await?; + log::info!(" --- ReqContractSigsFor Rec routines success 1"); let contract_sigs_for_recvr = match read_message(&mut socket_reader).await { Ok(MakerToTakerMessage::RespContractSigsForRecvr(m)) => { + log::info!(" --- ReqContractSigsFor Rec routines success 2"); if m.sigs.len() != incoming_swapcoins.len() { return Err(ProtocolError::WrongNumOfSigs { expected: incoming_swapcoins.len(), @@ -281,8 +289,10 @@ pub(crate) async fn send_proof_of_funding_and_init_next_hop( }), ) .await?; + log::info!(" --- send_proof_of_funding_and_init_next_hop success 1"); let contract_sigs_as_recvr_and_sender = match read_message(socket_reader).await { Ok(MakerToTakerMessage::ReqContractSigsAsRecvrAndSender(m)) => { + log::info!(" --- send_proof_of_funding_and_init_next_hop success 2"); if m.receivers_contract_txs.len() != tmi.funding_tx_infos.len() { return Err(ProtocolError::WrongNumOfContractTxs { expected: tmi.funding_tx_infos.len(), @@ -416,8 +426,10 @@ pub(crate) async fn send_hash_preimage_and_get_private_keys( }), ) .await?; + log::info!("send_hash_preimage_and_get_private_keys success 1"); let privkey_handover = match read_message(socket_reader).await { Ok(MakerToTakerMessage::RespPrivKeyHandover(m)) => { + log::info!("send_hash_preimage_and_get_private_keys success 1"); if m.multisig_privkeys.len() != receivers_multisig_redeemscripts.len() { return Err(ProtocolError::WrongNumOfPrivkeys { expected: receivers_multisig_redeemscripts.len(), @@ -451,9 +463,12 @@ async fn download_maker_offer_attempt_once(addr: &MakerAddress) -> Result m, + Ok(MakerToTakerMessage::RespOffer(m)) => { + log::info!("download maker once success 2"); + m + } Ok(any) => { return Err(ProtocolError::WrongMessage { expected: "RespOffer".to_string(), diff --git a/src/utill.rs b/src/utill.rs index b57c2c42..6b828080 100644 --- a/src/utill.rs +++ b/src/utill.rs @@ -81,7 +81,7 @@ pub fn setup_logger() { .default_filter_or("coinswap=info") .default_write_style_or("always"), ) - // .is_test(true) + .is_test(true) .init(); }); } @@ -94,6 +94,7 @@ pub async fn send_message( let message_cbor = serde_cbor::to_vec(message).map_err(NetError::Cbor)?; socket_writer.write_u32(message_cbor.len() as u32).await?; socket_writer.write_all(&message_cbor).await?; + log::info!(" --- send_message ka socketwriter = {:#?}", socket_writer); Ok(()) } @@ -104,8 +105,9 @@ pub async fn read_message( let length = reader.read_u32().await?; let mut buffer = vec![0; length as usize]; reader.read_exact(&mut buffer).await?; - let message: MakerToTakerMessage = serde_cbor::from_reader(&*buffer).map_err(NetError::Cbor)?; - log::info!("-----Here is message: {:#?}", message); + let message: MakerToTakerMessage = serde_cbor::from_slice(&buffer).map_err(NetError::Cbor)?; + log::info!("----- read_message ka message = {:#?}", message); + log::info!("-----read_message ka reader = {:#?}", reader); log::debug!("<== {:#?}", message); Ok(message) } From e89f477fe9dfab7e3ebcdf624a0ff022502d167e Mon Sep 17 00:00:00 2001 From: wthrajat Date: Sun, 21 Jan 2024 19:39:44 +0530 Subject: [PATCH 07/15] rustfmt --- src/maker/mod.rs | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/src/maker/mod.rs b/src/maker/mod.rs index 3afaf8a6..f2ecc496 100644 --- a/src/maker/mod.rs +++ b/src/maker/mod.rs @@ -246,8 +246,14 @@ pub async fn start_maker_server(maker: Arc) -> Result<(), MakerError> { log::info!(" --- this time got \"NONE\" hence not sending anything to client ---"); } Err(message_result_error) => { - log::error!(" ---- 257 mod Error handling message: {:#?}", message_result_error); - server_loop_comms_tx.send(message_result_error).await.unwrap(); + log::error!( + " ---- 257 mod Error handling message: {:#?}", + message_result_error + ); + server_loop_comms_tx + .send(message_result_error) + .await + .unwrap(); break; } } From 7be5b080e60c79f1cea71cd9ff40301268e580f0 Mon Sep 17 00:00:00 2001 From: wthrajat Date: Sun, 21 Jan 2024 22:10:29 +0530 Subject: [PATCH 08/15] disabled tags Signed-off-by: wthrajat --- src/maker/mod.rs | 40 +++++++++++----------------------------- src/protocol/messages.rs | 2 -- src/taker/routines.rs | 20 ++------------------ src/utill.rs | 9 +++------ 4 files changed, 16 insertions(+), 55 deletions(-) diff --git a/src/maker/mod.rs b/src/maker/mod.rs index f2ecc496..6f9200c7 100644 --- a/src/maker/mod.rs +++ b/src/maker/mod.rs @@ -15,7 +15,6 @@ use std::{ time::{Duration, Instant}, }; -// use crate::error::NetError; use bitcoin::Network; use bitcoind::bitcoincore_rpc::RpcApi; use tokio::{ @@ -186,37 +185,34 @@ pub async fn start_maker_server(maker: Arc) -> Result<(), MakerError> { ) .await { - log::error!("io error sending first message: {:?}", e); + log::error!("IO error sending first message: {:?}", e); return; } log::info!("[{}] ===> MakerHello", maker_clone.config.port); loop { let mut length_buf = [0; 4]; - match reader.read_exact(&mut length_buf).await { + let message_reader = reader.read_exact(&mut length_buf).await; + match message_reader { Ok(0) => { - log::info!(" --- got 0 while reading --- Connection closed gracefully"); log::info!("[{}] Connection closed by peer", maker_clone.config.port); break; } Ok(_) => { let message_len = u32::from_be_bytes(length_buf) as usize; - log::info!("--- message length = {}", message_len); if message_len == 0 { - log::error!("Received message with length 0"); + log::error!("Received empty message!"); break; } let mut message_buf = vec![0; message_len]; - log::info!(" --- message buf length = {}", message_len); if reader.read_exact(&mut message_buf).await.is_err() { - log::error!(" ---Failed to read message from socket"); + log::error!("Failed to read message from socket!"); break; } - // Deserialize the message using serde_cbor - let message: TakerToMakerMessage = serde_cbor::from_slice(&message_buf) - .expect(" --- msg deserialization failed mod 214"); + let message: TakerToMakerMessage = serde_cbor::de::from_slice(&message_buf) + .expect("Message deserialization failed!"); log::info!("[{}] <=== {} ", maker_clone.config.port, message); @@ -224,32 +220,24 @@ pub async fn start_maker_server(maker: Arc) -> Result<(), MakerError> { handle_message(&maker_clone, &mut connection_state, message, addr.ip()) .await; - log::info!(" ---- message result 221 = {:#?}", message_result); - match message_result { Ok(Some(reply)) => { - log::info!(" --- entered mod 223"); - - log::info!(" --- yay! got some reply = {:#?}", reply); log::info!("[{}] ===> {} ", maker_clone.config.port, reply); log::debug!("{:#?}", reply); match send_message(&mut socket_writer, &reply).await { Ok(_) => {} Err(e) => { - log::error!(" --- error sending message: {:#?}", e); + log::error!("Error sending message: {:?}", e); break; } } } Ok(None) => { - log::info!(" --- this time got \"NONE\" hence not sending anything to client ---"); + log::info!("Sending nothing to the client!"); } Err(message_result_error) => { - log::error!( - " ---- 257 mod Error handling message: {:#?}", - message_result_error - ); + log::error!("Error handling message: {:?}", message_result_error); server_loop_comms_tx .send(message_result_error) .await @@ -258,13 +246,7 @@ pub async fn start_maker_server(maker: Arc) -> Result<(), MakerError> { } } } - Err(reader_error) => { - log::error!( - " --- hmmm err reading message length from socket i wonder why = {:#?}", - reader_error - ); - break; - } + Err(_) => break, } } }); diff --git a/src/protocol/messages.rs b/src/protocol/messages.rs index 6d607ed6..f446d883 100644 --- a/src/protocol/messages.rs +++ b/src/protocol/messages.rs @@ -188,7 +188,6 @@ pub struct PrivKeyHandover { /// All messages sent from Taker to Maker. #[derive(Debug, Serialize, Deserialize)] -#[serde(tag = "method", rename_all = "lowercase")] pub enum TakerToMakerMessage { /// Protocol Handshake. TakerHello(TakerHello), @@ -290,7 +289,6 @@ pub struct ContractSigsForRecvr { /// All messages sent from Maker to Taker. #[derive(Debug, Serialize, Deserialize)] -#[serde(tag = "method", rename_all = "lowercase")] pub enum MakerToTakerMessage { /// Protocol Handshake. MakerHello(MakerHello), diff --git a/src/taker/routines.rs b/src/taker/routines.rs index 588a606c..5b390cb1 100644 --- a/src/taker/routines.rs +++ b/src/taker/routines.rs @@ -84,12 +84,8 @@ pub async fn handshake_maker<'a>( }), ) .await?; - log::info!(" --- Takerhello routines success 1"); let makerhello = match read_message(&mut socket_reader).await { - Ok(MakerToTakerMessage::MakerHello(m)) => { - log::info!(" --- Takerhello routines success 2"); - m - } + Ok(MakerToTakerMessage::MakerHello(m)) => m, Ok(any) => { return Err(ProtocolError::WrongMessage { expected: "MakerHello".to_string(), @@ -146,10 +142,8 @@ pub(crate) async fn req_sigs_for_sender_once( }), ) .await?; - log::info!(" --- ReqContractSigsForSender routines success 1"); let contract_sigs_for_sender = match read_message(&mut socket_reader).await { Ok(MakerToTakerMessage::RespContractSigsForSender(m)) => { - log::info!(" --- ReqContractSigsForSender routines success 2"); if m.sigs.len() != outgoing_swapcoins.len() { return Err(ProtocolError::WrongNumOfSigs { expected: outgoing_swapcoins.len(), @@ -207,10 +201,8 @@ pub(crate) async fn req_sigs_for_recvr_once( }), ) .await?; - log::info!(" --- ReqContractSigsFor Rec routines success 1"); let contract_sigs_for_recvr = match read_message(&mut socket_reader).await { Ok(MakerToTakerMessage::RespContractSigsForRecvr(m)) => { - log::info!(" --- ReqContractSigsFor Rec routines success 2"); if m.sigs.len() != incoming_swapcoins.len() { return Err(ProtocolError::WrongNumOfSigs { expected: incoming_swapcoins.len(), @@ -289,10 +281,8 @@ pub(crate) async fn send_proof_of_funding_and_init_next_hop( }), ) .await?; - log::info!(" --- send_proof_of_funding_and_init_next_hop success 1"); let contract_sigs_as_recvr_and_sender = match read_message(socket_reader).await { Ok(MakerToTakerMessage::ReqContractSigsAsRecvrAndSender(m)) => { - log::info!(" --- send_proof_of_funding_and_init_next_hop success 2"); if m.receivers_contract_txs.len() != tmi.funding_tx_infos.len() { return Err(ProtocolError::WrongNumOfContractTxs { expected: tmi.funding_tx_infos.len(), @@ -426,10 +416,8 @@ pub(crate) async fn send_hash_preimage_and_get_private_keys( }), ) .await?; - log::info!("send_hash_preimage_and_get_private_keys success 1"); let privkey_handover = match read_message(socket_reader).await { Ok(MakerToTakerMessage::RespPrivKeyHandover(m)) => { - log::info!("send_hash_preimage_and_get_private_keys success 1"); if m.multisig_privkeys.len() != receivers_multisig_redeemscripts.len() { return Err(ProtocolError::WrongNumOfPrivkeys { expected: receivers_multisig_redeemscripts.len(), @@ -463,12 +451,8 @@ async fn download_maker_offer_attempt_once(addr: &MakerAddress) -> Result { - log::info!("download maker once success 2"); - m - } + Ok(MakerToTakerMessage::RespOffer(m)) => m, Ok(any) => { return Err(ProtocolError::WrongMessage { expected: "RespOffer".to_string(), diff --git a/src/utill.rs b/src/utill.rs index 6b828080..670231dc 100644 --- a/src/utill.rs +++ b/src/utill.rs @@ -91,10 +91,9 @@ pub async fn send_message( socket_writer: &mut WriteHalf<'_>, message: &impl serde::Serialize, ) -> Result<(), NetError> { - let message_cbor = serde_cbor::to_vec(message).map_err(NetError::Cbor)?; + let message_cbor = serde_cbor::ser::to_vec(message).map_err(NetError::Cbor)?; socket_writer.write_u32(message_cbor.len() as u32).await?; socket_writer.write_all(&message_cbor).await?; - log::info!(" --- send_message ka socketwriter = {:#?}", socket_writer); Ok(()) } @@ -105,10 +104,8 @@ pub async fn read_message( let length = reader.read_u32().await?; let mut buffer = vec![0; length as usize]; reader.read_exact(&mut buffer).await?; - let message: MakerToTakerMessage = serde_cbor::from_slice(&buffer).map_err(NetError::Cbor)?; - log::info!("----- read_message ka message = {:#?}", message); - log::info!("-----read_message ka reader = {:#?}", reader); - log::debug!("<== {:#?}", message); + let message: MakerToTakerMessage = + serde_cbor::de::from_slice(&buffer).map_err(NetError::Cbor)?; Ok(message) } From c129b846f6ec9723d1d6b5e400bf7bf46507c765 Mon Sep 17 00:00:00 2001 From: wthrajat Date: Sun, 21 Jan 2024 22:19:23 +0530 Subject: [PATCH 09/15] remove logging statements --- src/taker/api.rs | 4 ---- 1 file changed, 4 deletions(-) diff --git a/src/taker/api.rs b/src/taker/api.rs index f25b0d07..e0e9e879 100644 --- a/src/taker/api.rs +++ b/src/taker/api.rs @@ -387,15 +387,11 @@ impl Taker { let swap_locktime = self.config.refund_locktime + self.config.refund_locktime_step * self.ongoing_swap_state.swap_params.maker_count; - log::info!("---Swap Locktime: {}", swap_locktime); - // Loop until we find a live maker who responded to our signature request. let (maker, funding_txs) = loop { // Fail early if not enough good makers in the list to satisfy swap requirements. let untried_maker_count = self.offerbook.get_all_untried().len(); - log::info!("----Untried Makers Count: {}", untried_maker_count); - if untried_maker_count < self.ongoing_swap_state.swap_params.maker_count as usize { log::info!("We don't have enough makers to satisfy the swap requirements!"); return Err(TakerError::NotEnoughMakersInOfferBook); From a453f94b96d969dccfe2760b327ce71a015431cf Mon Sep 17 00:00:00 2001 From: wthrajat Date: Sun, 21 Jan 2024 23:19:58 +0530 Subject: [PATCH 10/15] removed JsonError as it's not needed now --- src/error.rs | 7 ------- 1 file changed, 7 deletions(-) diff --git a/src/error.rs b/src/error.rs index 64eea3c1..525ccdc6 100644 --- a/src/error.rs +++ b/src/error.rs @@ -6,7 +6,6 @@ use crate::protocol::error::ContractError; #[derive(Debug)] pub enum NetError { IO(std::io::Error), - Json(serde_json::Error), ReachedEOF, ConnectionTimedOut, Cbor(serde_cbor::Error), @@ -18,12 +17,6 @@ impl From for NetError { } } -impl From for NetError { - fn from(value: serde_json::Error) -> Self { - Self::Json(value) - } -} - impl From for NetError { fn from(value: serde_cbor::Error) -> Self { Self::Cbor(value) From f7a452185842e78d646c00ca2f77c209cac8cd8f Mon Sep 17 00:00:00 2001 From: rajarshimaitra Date: Sun, 21 Jan 2024 23:43:24 +0530 Subject: [PATCH 11/15] basic logic cleanups and some loggings --- src/maker/error.rs | 7 +++++++ src/taker/offers.rs | 9 +++------ src/taker/routines.rs | 20 ++++++++++---------- src/utill.rs | 3 +-- 4 files changed, 21 insertions(+), 18 deletions(-) diff --git a/src/maker/error.rs b/src/maker/error.rs index 9c8a5f6b..bb71cf4a 100644 --- a/src/maker/error.rs +++ b/src/maker/error.rs @@ -17,6 +17,7 @@ pub enum MakerError { Secp(secp256k1::Error), ContractError(ContractError), Wallet(WalletError), + Deserialize(serde_cbor::Error), } impl From for MakerError { @@ -25,6 +26,12 @@ impl From for MakerError { } } +impl From for MakerError { + fn from(value: serde_cbor::Error) -> Self { + Self::Deserialize(value) + } +} + impl From for MakerError { fn from(value: serde_json::Error) -> Self { Self::Json(value) diff --git a/src/taker/offers.rs b/src/taker/offers.rs index b2496245..b7de1189 100644 --- a/src/taker/offers.rs +++ b/src/taker/offers.rs @@ -160,12 +160,9 @@ pub async fn sync_offerbook_with_addresses( let offers_writer = offers_writer_m.clone(); let taker_config: TakerConfig = config.clone(); tokio::spawn(async move { - if let Err(_e) = offers_writer - .send(download_maker_offer(addr, taker_config).await) - .await - { - panic!("mpsc failed"); - } + let offer = download_maker_offer(addr, taker_config).await; + log::info!("Received Maker Offer: {:?}", offer); + offers_writer.send(offer).await.unwrap(); }); } let mut result = Vec::::new(); diff --git a/src/taker/routines.rs b/src/taker/routines.rs index 5b390cb1..ed90de20 100644 --- a/src/taker/routines.rs +++ b/src/taker/routines.rs @@ -451,16 +451,16 @@ async fn download_maker_offer_attempt_once(addr: &MakerAddress) -> Result m, - Ok(any) => { - return Err(ProtocolError::WrongMessage { + + let msg = read_message(&mut socket_reader).await?; + let offer = match msg { + MakerToTakerMessage::RespOffer(offer) => offer, + msg => { + return Err(TakerError::Protocol(ProtocolError::WrongMessage { expected: "RespOffer".to_string(), - received: format!("{}", any), - } - .into()); + received: format!("{}", msg), + })) } - Err(e) => return Err(e.into()), }; log::debug!(target: "offerbook", "Obtained offer from {}", addr); @@ -479,7 +479,7 @@ pub async fn download_maker_offer( match ret { Ok(offer) => return Some(OfferAndAddress { offer, address }), Err(e) => { - log::debug!(target: "offerbook", + log::warn!( "Failed to request offer from maker {}, \ reattempting... error={:?}", address, @@ -495,7 +495,7 @@ pub async fn download_maker_offer( } }, _ = sleep(Duration::from_secs(config.first_connect_attempt_timeout_sec)) => { - log::debug!(target: "offerbook", + log::warn!( "Timeout for request offer from maker {}, reattempting...", address ); diff --git a/src/utill.rs b/src/utill.rs index 670231dc..c6fd641a 100644 --- a/src/utill.rs +++ b/src/utill.rs @@ -104,8 +104,7 @@ pub async fn read_message( let length = reader.read_u32().await?; let mut buffer = vec![0; length as usize]; reader.read_exact(&mut buffer).await?; - let message: MakerToTakerMessage = - serde_cbor::de::from_slice(&buffer).map_err(NetError::Cbor)?; + let message: MakerToTakerMessage = serde_cbor::from_slice(&buffer)?; Ok(message) } From c58b4ad51cc4c247e53a8fb068cbadbdc9150396 Mon Sep 17 00:00:00 2001 From: rajarshimaitra Date: Sun, 21 Jan 2024 23:44:26 +0530 Subject: [PATCH 12/15] simplify maker read message logic --- src/maker/mod.rs | 107 +++++++++++++++++++++++++---------------------- 1 file changed, 57 insertions(+), 50 deletions(-) diff --git a/src/maker/mod.rs b/src/maker/mod.rs index 6f9200c7..42fd59fa 100644 --- a/src/maker/mod.rs +++ b/src/maker/mod.rs @@ -19,7 +19,7 @@ use bitcoin::Network; use bitcoind::bitcoincore_rpc::RpcApi; use tokio::{ io::{AsyncReadExt, BufReader}, - net::TcpListener, + net::{tcp::ReadHalf, TcpListener}, select, sync::mpsc, time::sleep, @@ -191,64 +191,71 @@ pub async fn start_maker_server(maker: Arc) -> Result<(), MakerError> { log::info!("[{}] ===> MakerHello", maker_clone.config.port); loop { - let mut length_buf = [0; 4]; - let message_reader = reader.read_exact(&mut length_buf).await; - match message_reader { - Ok(0) => { - log::info!("[{}] Connection closed by peer", maker_clone.config.port); - break; - } - Ok(_) => { - let message_len = u32::from_be_bytes(length_buf) as usize; - if message_len == 0 { - log::error!("Received empty message!"); - break; - } - - let mut message_buf = vec![0; message_len]; - if reader.read_exact(&mut message_buf).await.is_err() { - log::error!("Failed to read message from socket!"); - break; + let message = select! { + read_result = read_message(&mut reader) => { + match read_result { + Ok(None) => { + log::info!("[{}] Connection closed by peer", maker_clone.config.port); + break; + }, + Ok(Some(msg)) => msg, + Err(e) => { + log::error!("error reading from socket: {:?}", e); + break; + } } + }, + _ = sleep(Duration::from_secs(maker_clone.config.idle_connection_timeout)) => { + log::info!("[{}] Idle connection closed", addr.port()); + break; + }, + }; - let message: TakerToMakerMessage = serde_cbor::de::from_slice(&message_buf) - .expect("Message deserialization failed!"); - - log::info!("[{}] <=== {} ", maker_clone.config.port, message); - - let message_result: Result, MakerError> = - handle_message(&maker_clone, &mut connection_state, message, addr.ip()) - .await; + log::info!("[{}] <=== {} ", maker_clone.config.port, message); - match message_result { - Ok(Some(reply)) => { - log::info!("[{}] ===> {} ", maker_clone.config.port, reply); - log::debug!("{:#?}", reply); + let reply: Result, MakerError> = + handle_message(&maker_clone, &mut connection_state, message, addr.ip()).await; - match send_message(&mut socket_writer, &reply).await { - Ok(_) => {} - Err(e) => { - log::error!("Error sending message: {:?}", e); - break; - } - } - } - Ok(None) => { - log::info!("Sending nothing to the client!"); - } - Err(message_result_error) => { - log::error!("Error handling message: {:?}", message_result_error); - server_loop_comms_tx - .send(message_result_error) - .await - .unwrap(); - break; + match reply { + Ok(reply) => { + if let Some(message) = reply { + log::info!("[{}] ===> {} ", maker_clone.config.port, message); + log::debug!("{:#?}", message); + if let Err(e) = send_message(&mut socket_writer, &message).await { + log::error!("Closing due to IO error in sending message: {:?}", e); + continue; } } + // if reply is None then don't send anything to client + } + Err(err) => { + server_loop_comms_tx.send(err).await.unwrap(); + break; } - Err(_) => break, } } }); } } + +/// Read a Taker Message. +async fn read_message( + reader: &mut BufReader>, +) -> Result, MakerError> { + let read_result = reader.read_u32().await; + // If its EOF, return None + if read_result + .as_ref() + .is_err_and(|e| e.kind() == std::io::ErrorKind::UnexpectedEof) + { + return Ok(None); + } + let length = read_result?; + if length == 0 { + return Ok(None); + } + let mut buffer = vec![0; length as usize]; + reader.read_exact(&mut buffer).await?; + let message: TakerToMakerMessage = serde_cbor::from_slice(&buffer)?; + Ok(Some(message)) +} From b9ae1af4dc6c3d16232ac47b8c10c44983ec0539 Mon Sep 17 00:00:00 2001 From: wthrajat Date: Mon, 22 Jan 2024 00:08:34 +0530 Subject: [PATCH 13/15] distinguished between maker and taker read_message --- src/maker/mod.rs | 6 +++--- src/taker/routines.rs | 14 +++++++------- src/utill.rs | 2 +- 3 files changed, 11 insertions(+), 11 deletions(-) diff --git a/src/maker/mod.rs b/src/maker/mod.rs index 42fd59fa..45094613 100644 --- a/src/maker/mod.rs +++ b/src/maker/mod.rs @@ -192,7 +192,7 @@ pub async fn start_maker_server(maker: Arc) -> Result<(), MakerError> { loop { let message = select! { - read_result = read_message(&mut reader) => { + read_result = read_taker_message(&mut reader) => { match read_result { Ok(None) => { log::info!("[{}] Connection closed by peer", maker_clone.config.port); @@ -238,8 +238,8 @@ pub async fn start_maker_server(maker: Arc) -> Result<(), MakerError> { } } -/// Read a Taker Message. -async fn read_message( +/// Reads a Taker Message. +async fn read_taker_message( reader: &mut BufReader>, ) -> Result, MakerError> { let read_result = reader.read_u32().await; diff --git a/src/taker/routines.rs b/src/taker/routines.rs index ed90de20..858de7b6 100644 --- a/src/taker/routines.rs +++ b/src/taker/routines.rs @@ -37,7 +37,7 @@ use crate::{ }, Hash160, }, - utill::{read_message, send_message}, + utill::{read_maker_message, send_message}, }; use super::{ @@ -84,7 +84,7 @@ pub async fn handshake_maker<'a>( }), ) .await?; - let makerhello = match read_message(&mut socket_reader).await { + let makerhello = match read_maker_message(&mut socket_reader).await { Ok(MakerToTakerMessage::MakerHello(m)) => m, Ok(any) => { return Err(ProtocolError::WrongMessage { @@ -142,7 +142,7 @@ pub(crate) async fn req_sigs_for_sender_once( }), ) .await?; - let contract_sigs_for_sender = match read_message(&mut socket_reader).await { + let contract_sigs_for_sender = match read_maker_message(&mut socket_reader).await { Ok(MakerToTakerMessage::RespContractSigsForSender(m)) => { if m.sigs.len() != outgoing_swapcoins.len() { return Err(ProtocolError::WrongNumOfSigs { @@ -201,7 +201,7 @@ pub(crate) async fn req_sigs_for_recvr_once( }), ) .await?; - let contract_sigs_for_recvr = match read_message(&mut socket_reader).await { + let contract_sigs_for_recvr = match read_maker_message(&mut socket_reader).await { Ok(MakerToTakerMessage::RespContractSigsForRecvr(m)) => { if m.sigs.len() != incoming_swapcoins.len() { return Err(ProtocolError::WrongNumOfSigs { @@ -281,7 +281,7 @@ pub(crate) async fn send_proof_of_funding_and_init_next_hop( }), ) .await?; - let contract_sigs_as_recvr_and_sender = match read_message(socket_reader).await { + let contract_sigs_as_recvr_and_sender = match read_maker_message(socket_reader).await { Ok(MakerToTakerMessage::ReqContractSigsAsRecvrAndSender(m)) => { if m.receivers_contract_txs.len() != tmi.funding_tx_infos.len() { return Err(ProtocolError::WrongNumOfContractTxs { @@ -416,7 +416,7 @@ pub(crate) async fn send_hash_preimage_and_get_private_keys( }), ) .await?; - let privkey_handover = match read_message(socket_reader).await { + let privkey_handover = match read_maker_message(socket_reader).await { Ok(MakerToTakerMessage::RespPrivKeyHandover(m)) => { if m.multisig_privkeys.len() != receivers_multisig_redeemscripts.len() { return Err(ProtocolError::WrongNumOfPrivkeys { @@ -452,7 +452,7 @@ async fn download_maker_offer_attempt_once(addr: &MakerAddress) -> Result offer, msg => { diff --git a/src/utill.rs b/src/utill.rs index c6fd641a..12991d26 100644 --- a/src/utill.rs +++ b/src/utill.rs @@ -98,7 +98,7 @@ pub async fn send_message( } /// Read a Maker Message. -pub async fn read_message( +pub async fn read_maker_message( reader: &mut BufReader>, ) -> Result { let length = reader.read_u32().await?; From 8328094ffdf26541dea6b969f421668d102027cf Mon Sep 17 00:00:00 2001 From: wthrajat Date: Mon, 22 Jan 2024 01:27:17 +0530 Subject: [PATCH 14/15] removed JsonError from MakerError --- src/maker/error.rs | 7 ------- 1 file changed, 7 deletions(-) diff --git a/src/maker/error.rs b/src/maker/error.rs index bb71cf4a..e0e5038a 100644 --- a/src/maker/error.rs +++ b/src/maker/error.rs @@ -10,7 +10,6 @@ use crate::{protocol::error::ContractError, wallet::WalletError}; #[derive(Debug)] pub enum MakerError { IO(std::io::Error), - Json(serde_json::Error), UnexpectedMessage { expected: String, got: String }, General(&'static str), MutexPossion, @@ -32,12 +31,6 @@ impl From for MakerError { } } -impl From for MakerError { - fn from(value: serde_json::Error) -> Self { - Self::Json(value) - } -} - impl<'a, T> From>> for MakerError { fn from(_: PoisonError>) -> Self { Self::MutexPossion From 5fe30948a68a37c5a8a4f17a07873b7e659ca77f Mon Sep 17 00:00:00 2001 From: rajarshimaitra Date: Mon, 22 Jan 2024 10:20:44 +0530 Subject: [PATCH 15/15] readme roadmap update. --- README.md | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index 15467c08..75fdc568 100644 --- a/README.md +++ b/README.md @@ -127,7 +127,8 @@ If you're interested in contributing to the project, explore the [open issues](h - [ ] Complete all unit tests in modules. - [ ] Achieve >80% crate level test coverage ratio (including integration tests). - [ ] Clean up and integrate fidelity bonds with maker banning. -- [ ] Switch to binary encoding for wallet data storage and network messages. +- [x] Switch to binary encoding for network messages. +- [ ] Switch to binary encoding for wallet data. - [ ] Make tor detectable and connectable by default for Maker and Taker. And Tor configs to their config lists. - [ ] Sketch a simple `AddressBook` server. Tor must. This is for MVP. Later on we will move to more decentralized address server architecture. - [ ] Turn maker server into a `makerd` binary, and a `maker-cli` rpc controller app, with MVP API.