From d702e17f73c2f16218730c7724b9961ffe2d1bae Mon Sep 17 00:00:00 2001 From: sahil Date: Tue, 21 Nov 2023 18:29:12 +0000 Subject: [PATCH] Refactor - less unwrap and more loggin --- Cargo.lock | 7 +++ Cargo.toml | 1 + src/main.rs | 149 ++++++++++++++++++++++++++++++++++++---------------- 3 files changed, 113 insertions(+), 44 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 6d591e9..9455d48 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -26,6 +26,12 @@ dependencies = [ "memchr", ] +[[package]] +name = "anyhow" +version = "1.0.75" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a4668cab20f66d8d020e1fbc0ebe47217433c1b6c8f2040faf858554e394ace6" + [[package]] name = "autocfg" version = "1.1.0" @@ -84,6 +90,7 @@ checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" name = "chat-app" version = "0.1.0" dependencies = [ + "anyhow", "env_logger", "log", "tokio", diff --git a/Cargo.toml b/Cargo.toml index 3bae2da..65e121e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -6,6 +6,7 @@ edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] +anyhow = "1.0.75" env_logger = "0.10.1" log = "0.4.20" tokio = { version = "*", features = ["full", "io-util", "net"] } diff --git a/src/main.rs b/src/main.rs index ec823f9..d74d26c 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,33 +1,60 @@ -use std::{sync::{mpsc::{channel, Receiver, Sender}, Arc}, net::TcpStream, io::Read, ops::Deref, isize}; +use std::collections::HashMap; use std::io::Write; use std::net::SocketAddr; +use std::{ + io::Read, + net::TcpStream, + ops::Deref, + sync::{ + mpsc::{channel, Receiver, Sender}, + Arc, + }, +}; -use log::{info, error, debug}; +use log::{debug, error, info}; +/// Buffer size of a single message +const MESSAGE_SIZE: usize = 32; -fn main() -> std::io::Result<()> { +/// Main thread, which listens to the TcpConnection for the server +fn main() -> anyhow::Result<()> { + // channels for server and clients to communicate let (tx, rx) = channel(); let address = "127.0.0.1:6969"; env_logger::init(); - let listener = std::net::TcpListener::bind(address).map_err(|err| { - eprintln!("[ERROR]: cannot bind {err}"); - }).unwrap(); + // Starting TcpListener + let listener = std::net::TcpListener::bind(address)?; - let mut server = Server::new(); + //creates a new server and spawns it in a new thread + let mut server = Server::new(); info!("Server started and Listing on {address}"); - std::thread::spawn(move || server.start_server(rx)); + // Stars listing to client messages for stream in listener.incoming() { match stream { Ok(stream) => { - debug!("Client Connected from {ip}" , ip = stream.peer_addr().unwrap()); - let connection = Arc::new(stream); - tx.send(ServerEvent::ClientJoinRequest(connection, tx.clone())).expect("Failed"); - }, + debug!( + "Client Connected from {ip}", + ip = stream.peer_addr().unwrap() + ); + + // at this stage the Tcp connection is established between the client and the + // server + let connection = Arc::new(stream); + let addr = connection.as_ref().peer_addr().map_err(|err| { + error!("Could not read clients addr {err}"); + err + })?; + // Send the connection to the server with the channel from above + // the receiver should be the server object which was listing to the events + // in case of errors log it and move on + let _ = tx.send(ServerEvent::ClientJoinRequest(connection, tx.clone(), addr)) + .map_err(|err| error!("Could not send message to server {err}")); + } Err(err) => error!("Error connecting to client {err}"), } } @@ -35,46 +62,67 @@ fn main() -> std::io::Result<()> { Ok(()) } +/// Represents the Messages sent between the client and the server #[derive(Debug)] enum ServerEvent { - ClientJoinRequest(Arc, Sender), + /// A new connection is received by the Main thread, the user wants to join the Server
+ /// `Arc` - the connection that is established between the user and the server
+ /// `Sender<_>` - The sender for future events from the accepted client
+ /// `SocketAdd` - the Ip address of the user + ClientJoinRequest(Arc, Sender, SocketAddr), + + /// A new message by a connected client
+ /// String - the clients message
+ /// SocketAddr - the Ip address of the client ClientMessage(String, SocketAddr), + + /// The client wants/needs to disconnect from the server
+ /// SocketAddr - the Ip address of the client ClientDisconnect(SocketAddr), } -struct Server { - clients: Vec, +/// Listens to the events from clients and main thread +struct Server { + /// All the connected the clients + clients: HashMap, } impl Server { + /// Gets a new instance of the Server fn new() -> Self { - Self { clients: vec![] } + Self { clients: HashMap::new() } } + + /// Starts listing to events, idealy done in a seperate thread. fn start_server(&mut self, rx: Receiver) { debug!("Server listing to events"); for r in rx { match r { - ServerEvent::ClientJoinRequest(connection, tx) => { - let client = Client::new(connection, tx); - self.clients.push(client); - }, + ServerEvent::ClientJoinRequest(connection, tx, addr) => { + let client = Client::new(connection, tx, addr); + self.clients.insert(addr, client); + } ServerEvent::ClientMessage(msg, addr) => { - for ele in self.clients.iter_mut() { - if ele.connection.deref().peer_addr().unwrap() == addr { + debug!("{addr} send the message {msg}"); + // Send message to everyone but the author + for (key, value) in self.clients.iter_mut() { + if key == &addr { continue; } - writeln!(ele.connection.deref(), "{msg}").unwrap(); + let msg = &msg; + let _ = writeln!(value.connection.deref(), "{msg}").map_err(|err| { + error!("Could not write to client: {err}"); + }); } - }, + } ServerEvent::ClientDisconnect(ip) => { - let mut i: isize = -1; - for (idx, val) in self.clients.iter().enumerate() { - if val.connection.peer_addr().expect("LOL") == ip { - debug!("Disconnecting client -> {ip}"); - i = idx as isize; - } + + let client = self.clients.remove(&ip); + if client.is_some() { + let _ = client.expect("unreachable").connection.shutdown(std::net::Shutdown::Both).map_err(|err| { + error!("Could not disconnect the client {ip} from the server {err}"); + }); } - self.clients.remove(i as usize); } } } @@ -87,33 +135,46 @@ struct Client { } impl Client { - - fn new(connection: Arc, tx: Sender) -> Self { + fn new(connection: Arc, tx: Sender, addr: SocketAddr) -> Self { let con = connection.clone(); std::thread::spawn(move || { - let mut buff = [0; 64]; + let mut buff = [0; MESSAGE_SIZE]; loop { match con.as_ref().read(&mut buff) { Ok(0) => { - tx.send(ServerEvent::ClientDisconnect(con.as_ref().peer_addr().unwrap())).unwrap(); + let _ = tx.send(ServerEvent::ClientDisconnect( + addr + )) + .map_err(|err|{ error!("could not send message to server: {err}") }); break; - }, + } Ok(n) => { - let buff = String::from_utf8(buff[0..n].to_vec()).unwrap().trim_nulls(); - tx.send(ServerEvent::ClientMessage(buff, con.as_ref().peer_addr().unwrap())).unwrap(); - }, - Err(err) => error!("Somethign went wrong with the client {err}"), + let buff = std::str::from_utf8(&buff[0..n]).map_err(|err| { + error!("User sent NON-UTF8 string: {err}"); + }); + + if buff.is_err() { + continue; + } + + let buff = buff.expect("unreachable").to_string().trim_nulls(); + let _ = tx.send(ServerEvent::ClientMessage( + buff, + addr + )) + .map_err(|err| { + error!("Could not send message to server {err}"); + }); + } + Err(err) => error!("Something went wrong with the client {err}"), } } }); - Self { - connection - } + Self { connection } } } - trait TcpStreamString { fn trim_nulls(&self) -> Self; }