diff --git a/rustygear/src/client.rs b/rustygear/src/client.rs index 5fb448b..8cd212e 100644 --- a/rustygear/src/client.rs +++ b/rustygear/src/client.rs @@ -1,6 +1,7 @@ use core::fmt; #[cfg(feature = "tls")] use std::convert::TryFrom; +use std::error::Error; use std::fmt::Display; /* * Copyright 2020 Clint Byrum @@ -170,11 +171,14 @@ impl ClientJob { /// Should only return when the worker has sent data or completed the job. /// /// Use this in clients to wait for a response on a job that was submitted. This will return an error if used on a background job. - pub async fn response(&mut self) -> Result { + pub async fn response(&mut self) -> Result> { if let Some(workupdate) = self.response_rx.recv().await { Ok(workupdate) } else { - Err(io::Error::new(io::ErrorKind::Other, "Nothing to receive.")) + Err(Box::new(io::Error::new( + io::ErrorKind::NotConnected, + "Nothing to receive.", + ))) } } } @@ -223,12 +227,21 @@ impl WorkerJob { payload.put_u8(b'\0'); payload.extend(denominator.as_bytes()); let packet = new_res(WORK_STATUS, payload.freeze()); - self.send_packet(packet).await + self.send_packet(packet).await.map_err(|e| { + if e.is::() { + *e.downcast::().expect("downcast after is") + } else { + std::io::Error::new(io::ErrorKind::Other, e.to_string()) + } + }) } - async fn send_packet(&mut self, packet: Packet) -> Result<(), io::Error> { + async fn send_packet(&mut self, packet: Packet) -> Result<(), Box> { match self.sink_tx.send(packet).await { - Err(_) => Err(io::Error::new(io::ErrorKind::Other, "Connection closed")), + Err(_) => Err(Box::new(io::Error::new( + io::ErrorKind::NotConnected, + "Connection closed", + ))), Ok(_) => Ok(()), } } @@ -237,7 +250,7 @@ impl WorkerJob { /// /// This method is typically called by the [Client::work] method upon return /// of an error from the assigned closure. - pub async fn work_fail(&mut self) -> Result<(), io::Error> { + pub async fn work_fail(&mut self) -> Result<(), Box> { let packet = new_res(WORK_FAIL, self.handle.clone()); self.send_packet(packet).await } @@ -246,7 +259,7 @@ impl WorkerJob { /// /// This method is typically called by the [Client::work] method upon return of /// the assigned closure. - pub async fn work_complete(&mut self, response: Vec) -> Result<(), io::Error> { + pub async fn work_complete(&mut self, response: Vec) -> Result<(), Box> { let mut payload = BytesMut::with_capacity(self.handle.len() + 1 + self.payload.len()); payload.extend(self.handle.clone()); payload.put_u8(b'\0'); @@ -437,13 +450,15 @@ impl Client { while let Some(frame) = stream.next().await { trace!("Frame read: {:?}", frame); let response = match frame { - Err(e) => Err(e), + Err(e) => Err(e.to_string()), Ok(frame) => { let handler = handler.clone(); debug!("Locking handler"); let mut handler = handler; debug!("Locked handler"); - handler.call(frame) + handler + .call(frame) + .map_err(|e| e.to_string()) } }; match response { @@ -544,7 +559,7 @@ impl Client { /// Sends an ECHO_REQ to the first server, a good way to confirm the connection is alive /// /// Returns an error if there aren't any connected servers, or no ECHO_RES comes back - pub async fn echo(&mut self, payload: &[u8]) -> Result<(), io::Error> { + pub async fn echo(&mut self, payload: &[u8]) -> Result<(), Box> { let packet = new_req(ECHO_REQ, Bytes::copy_from_slice(payload)); { let conns = self @@ -552,10 +567,10 @@ impl Client { .lock() .expect("All lock holders should not panic"); if conns.len() < 1 { - return Err(io::Error::new( - io::ErrorKind::Other, + return Err(Box::new(io::Error::new( + io::ErrorKind::NotConnected, "No connections for echo!", - )); + ))); } conns .get(0) @@ -573,7 +588,11 @@ impl Client { /// Submits a foreground job. The see [ClientJob::response] for how to see the response from the /// worker. The unique ID will be generated using [Uuid::new_v4] - pub async fn submit(&mut self, function: &str, payload: &[u8]) -> Result { + pub async fn submit( + &mut self, + function: &str, + payload: &[u8], + ) -> Result> { self.direct_submit(SUBMIT_JOB, function, payload, None) .await } @@ -584,7 +603,7 @@ impl Client { function: &str, unique: &[u8], payload: &[u8], - ) -> Result { + ) -> Result> { self.direct_submit(SUBMIT_JOB, function, payload, Some(unique)) .await } @@ -595,7 +614,7 @@ impl Client { &mut self, function: &str, payload: &[u8], - ) -> Result { + ) -> Result> { self.direct_submit(SUBMIT_JOB_BG, function, payload, None) .await } @@ -607,7 +626,7 @@ impl Client { function: &str, unique: &[u8], payload: &[u8], - ) -> Result { + ) -> Result> { self.direct_submit(SUBMIT_JOB_BG, function, payload, Some(unique)) .await } @@ -618,7 +637,7 @@ impl Client { function: &str, payload: &[u8], unique: Option<&[u8]>, - ) -> Result { + ) -> Result> { let mut uuid_unique = BytesMut::new(); let unique: &[u8] = match unique { None => { @@ -641,10 +660,10 @@ impl Client { .expect("All lock holders should not panic"); let conn = match conns.get_hashed_conn(&unique.iter().map(|b| *b).collect()) { None => { - return Err(io::Error::new( - io::ErrorKind::Other, + return Err(Box::new(io::Error::new( + io::ErrorKind::NotConnected, "No connections for submitting jobs.", - )); + ))); } Some(conn) => conn, }; @@ -664,13 +683,16 @@ impl Client { }; Ok(ClientJob::new(handle, rx)) } else { - Err(io::Error::new(io::ErrorKind::Other, "No job created!")) + Err(Box::new(io::Error::new( + io::ErrorKind::NotConnected, + "Receiver exited.", + ))) }; - submit_result + Ok(submit_result?) } /// Sends a GET_STATUS packet and then returns the STATUS_RES in a [JobStatus] - pub async fn get_status(&mut self, handle: &ServerHandle) -> Result { + pub async fn get_status(&mut self, handle: &ServerHandle) -> Result> { let mut payload = BytesMut::with_capacity(handle.handle().len()); payload.extend(handle.handle()); let status_req = new_req(GET_STATUS, payload.freeze()); @@ -686,7 +708,12 @@ impl Client { None } }) { - None => return Err(io::Error::new(ErrorKind::Other, "No connection for job")), + None => { + return Err(Box::new(io::Error::new( + ErrorKind::NotConnected, + "No connection for job", + ))) + } Some(conn) => conn, }; conn.send_packet(status_req).await?; @@ -695,7 +722,10 @@ impl Client { if let Some(status_res) = self.client_data.receivers().status_res_rx.recv().await { Ok(status_res) } else { - Err(io::Error::new(io::ErrorKind::Other, "No status to report!")) + Err(Box::new(io::Error::new( + io::ErrorKind::NotConnected, + "No status to report!", + ))) } } @@ -709,7 +739,7 @@ impl Client { /// /// See examples/worker.rs for more information. /// - pub async fn can_do(mut self, function: &str, func: F) -> Result + pub async fn can_do(mut self, function: &str, func: F) -> Result> where F: FnMut(&mut WorkerJob) -> Result, io::Error> + Send + 'static, { @@ -743,7 +773,7 @@ impl Client { task::spawn_blocking(move || { let rt = tokio::runtime::Builder::new_current_thread() .build() - .unwrap(); + .expect("Tokio builder should not panic"); let res = func_clone.lock().unwrap()(&mut job); match res { Err(_) => { @@ -767,7 +797,7 @@ impl Client { /// Receive and do just one job. Will not return until a job is done or there /// is an error. This is called in a loop by [Client::work]. - pub async fn do_one_job(&mut self) -> Result<(), io::Error> { + pub async fn do_one_job(&mut self) -> Result<(), Box> { let job = self.client_data.receivers().worker_job_rx.try_recv(); let job = match job { Err(TryRecvError::Empty) => { @@ -784,18 +814,18 @@ impl Client { match self.client_data.receivers().worker_job_rx.recv().await { Some(job) => job, None => { - return Err(io::Error::new( - io::ErrorKind::Other, + return Err(Box::new(io::Error::new( + io::ErrorKind::NotConnected, "Worker job tx are all dropped", - )) + ))) } } } Err(TryRecvError::Disconnected) => { - return Err(io::Error::new( - io::ErrorKind::Other, + return Err(Box::new(io::Error::new( + io::ErrorKind::NotConnected, "Worker job tx are all dropped", - )) + ))) } Ok(job) => job, }; @@ -804,13 +834,13 @@ impl Client { .get_jobs_tx_by_func(&Vec::from(job.function())) { None => { - return Err(io::Error::new( - io::ErrorKind::Other, + return Err(Box::new(io::Error::new( + io::ErrorKind::InvalidInput, format!( "Received job for unregistered function: {:?}", job.function() ), - )) + ))) } Some(tx) => tx, }; @@ -827,7 +857,7 @@ impl Client { /// not return unless there is an unexpected error. /// /// See examples/worker.rs for more information on how to use it. - pub async fn work(mut self) -> Result<(), io::Error> { + pub async fn work(mut self) -> Result<(), Box> { loop { self.do_one_job().await?; } @@ -835,7 +865,7 @@ impl Client { /// Gets a single error that might have come from the server. The tuple returned is (code, /// message) - pub async fn error(&mut self) -> Result, io::Error> { + pub async fn error(&mut self) -> Result, &str> { Ok(self.client_data.receivers().error_rx.recv().await) } } diff --git a/rustygear/src/conn.rs b/rustygear/src/conn.rs index 6892404..cc10125 100644 --- a/rustygear/src/conn.rs +++ b/rustygear/src/conn.rs @@ -1,5 +1,6 @@ use std::{ collections::HashMap, + error::Error, fmt::{Debug, Display}, io, slice::{Iter, IterMut}, @@ -82,85 +83,78 @@ impl ConnHandler { self.active = active } - pub async fn send_packet(&self, packet: Packet) -> Result<(), io::Error> { - if let Err(e) = self.sink_tx.send(packet).await { - error!("Receiver dropped"); - return Err(io::Error::new(io::ErrorKind::Other, format!("{}", e))); - } - Ok(()) + pub async fn send_packet(&self, packet: Packet) -> Result<(), Box> { + Ok(self.sink_tx.send(packet).await?) } - pub fn call(&mut self, req: Packet) -> Result { + pub fn call(&mut self, req: Packet) -> Result> { debug!("[{:?}] Got a req {:?}", self.client_id, req); match req.ptype { NOOP => self.handle_noop(), - JOB_CREATED => self.handle_job_created(&req), - NO_JOB => self.handle_no_job(), - JOB_ASSIGN => self.handle_job_assign(&req), - JOB_ASSIGN_UNIQ => self.handle_job_assign_uniq(&req), - ECHO_RES => self.handle_echo_res(&req), - ERROR => self.handle_error(&req), + JOB_CREATED => Ok(self.handle_job_created(&req)), + NO_JOB => Ok(self.handle_no_job()), + JOB_ASSIGN => Ok(self.handle_job_assign(&req)), + JOB_ASSIGN_UNIQ => Ok(self.handle_job_assign_uniq(&req)), + ECHO_RES => Ok(self.handle_echo_res(&req)), + ERROR => Ok(self.handle_error(&req)), STATUS_RES | STATUS_RES_UNIQUE => self.handle_status_res(&req), - OPTION_RES => self.handle_option_res(&req), + OPTION_RES => Ok(self.handle_option_res(&req)), WORK_COMPLETE | WORK_DATA | WORK_STATUS | WORK_WARNING | WORK_FAIL | WORK_EXCEPTION => { self.handle_work_update(&req) } //JOB_ASSIGN_ALL => self.handle_job_assign_all(&req), _ => { error!("Unimplemented: {:?} processing packet", req); - Err(io::Error::new( - io::ErrorKind::Other, + Err(Box::new(io::Error::new( + io::ErrorKind::InvalidData, format!("Invalid packet type {}", req.ptype), - )) + ))) } } } - fn handle_noop(&self) -> Result { + fn handle_noop(&self) -> Result> { Ok(new_req(GRAB_JOB_UNIQ, Bytes::new())) } - fn handle_no_job(&self) -> Result { - Ok(new_req(PRE_SLEEP, Bytes::new())) + fn handle_no_job(&self) -> Packet { + new_req(PRE_SLEEP, Bytes::new()) } - fn handle_echo_res(&mut self, req: &Packet) -> Result { + fn handle_echo_res(&mut self, req: &Packet) -> Packet { info!("Echo response received: {:?}", req.data); let tx = self.client_data.echo_tx(); let data = req.data.clone(); runtime::Handle::current().spawn(async move { tx.send(data).await }); - Ok(no_response()) + no_response() } - fn handle_job_created(&mut self, req: &Packet) -> Result { + fn handle_job_created(&mut self, req: &Packet) -> Packet { info!("Job Created: {:?}", req); let tx = self.client_data.job_created_tx(); let handle = req.data.clone(); let server = self.server.clone(); runtime::Handle::current() .spawn(async move { tx.send(ServerHandle::new(server, handle)).await }); - Ok(no_response()) + no_response() } - fn handle_error(&mut self, req: &Packet) -> Result { + fn handle_error(&mut self, req: &Packet) -> Packet { let mut data = req.data.clone(); let code = next_field(&mut data); let text = next_field(&mut data); let tx = self.client_data.error_tx(); runtime::Handle::current().spawn(async move { tx.send((code, text)).await }); - Ok(no_response()) + no_response() } - fn handle_status_res(&mut self, req: &Packet) -> Result { + fn handle_status_res(&mut self, req: &Packet) -> Result> { let mut req = req.clone(); let mut js = JobStatus { handle: next_field(&mut req.data), known: bytes2bool(&next_field(&mut req.data)), running: bytes2bool(&next_field(&mut req.data)), - numerator: String::from_utf8(next_field(&mut req.data).to_vec()) - .unwrap() - .parse() - .unwrap(), // XXX we can do better + numerator: String::from_utf8(next_field(&mut req.data).to_vec())?.parse()?, denominator: String::from_utf8(next_field(&mut req.data).to_vec()) .unwrap() .parse() @@ -178,16 +172,16 @@ impl ConnHandler { Ok(no_response()) } - fn handle_option_res(&mut self, _req: &Packet) -> Result { + fn handle_option_res(&mut self, _req: &Packet) -> Packet { /* if let Some(option_call) = self.option_call { option_call(req.data) } */ - Ok(no_response()) + no_response() } - fn handle_work_update(&mut self, req: &Packet) -> Result { + fn handle_work_update(&mut self, req: &Packet) -> Result> { let mut data = req.data.clone(); let handle = next_field(&mut data); let payload = next_field(&mut data); @@ -238,7 +232,7 @@ impl ConnHandler { Ok(no_response()) } - fn handle_job_assign(&mut self, req: &Packet) -> Result { + fn handle_job_assign(&mut self, req: &Packet) -> Packet { let mut data = req.data.clone(); let handle = next_field(&mut data); let function = next_field(&mut data); @@ -253,10 +247,10 @@ impl ConnHandler { }; let tx = self.client_data.worker_job_tx(); runtime::Handle::current().spawn(async move { tx.send(job).await }); - Ok(no_response()) + no_response() } - fn handle_job_assign_uniq(&mut self, req: &Packet) -> Result { + fn handle_job_assign_uniq(&mut self, req: &Packet) -> Packet { let mut data = req.data.clone(); let handle = next_field(&mut data); let function = next_field(&mut data); @@ -271,7 +265,7 @@ impl ConnHandler { }; let tx = self.client_data.worker_job_tx(); runtime::Handle::current().spawn(async move { tx.send(job).await }); - Ok(no_response()) + no_response() } } diff --git a/rustygeard/tests/client.rs b/rustygeard/tests/client.rs index f9aa2ac..187ed76 100644 --- a/rustygeard/tests/client.rs +++ b/rustygeard/tests/client.rs @@ -202,7 +202,14 @@ async fn test_client_multi_server() { }, }; }; - assert_eq!(status_error.kind(), ErrorKind::Other); + assert!(status_error.is::()); + assert_eq!( + status_error + .downcast::() + .expect("downcast after is") + .kind(), + ErrorKind::NotConnected + ); } #[tokio::test]