Skip to content

Commit

Permalink
BREAKING: Refactor error handling
Browse files Browse the repository at this point in the history
Errors have been largely returned as io::Error for everything up until
now. But there are other errors which are panics now, that should be
results. Rather than bend over backward implementing froms for all of
them, we'll make the errors dynamic.

This is expected to break client code if client code was set up to
inspect the errors returning. As such, we are also changing the
ErrorKind for some of the io::Errors we're returning.
  • Loading branch information
SpamapS committed Jun 17, 2024
1 parent 242b02b commit 53c5677
Show file tree
Hide file tree
Showing 3 changed files with 110 additions and 79 deletions.
110 changes: 70 additions & 40 deletions rustygear/src/client.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use core::fmt;
#[cfg(feature = "tls")]
use std::convert::TryFrom;
use std::error::Error;
use std::fmt::Display;
/*
* Copyright 2020 Clint Byrum
Expand Down Expand Up @@ -170,11 +171,14 @@ impl ClientJob {
/// Should only return when the worker has sent data or completed the job.
///
/// Use this in clients to wait for a response on a job that was submitted. This will return an error if used on a background job.
pub async fn response(&mut self) -> Result<WorkUpdate, io::Error> {
pub async fn response(&mut self) -> Result<WorkUpdate, Box<dyn Error>> {
if let Some(workupdate) = self.response_rx.recv().await {
Ok(workupdate)
} else {
Err(io::Error::new(io::ErrorKind::Other, "Nothing to receive."))
Err(Box::new(io::Error::new(
io::ErrorKind::NotConnected,
"Nothing to receive.",
)))
}
}
}
Expand Down Expand Up @@ -223,12 +227,21 @@ impl WorkerJob {
payload.put_u8(b'\0');
payload.extend(denominator.as_bytes());
let packet = new_res(WORK_STATUS, payload.freeze());
self.send_packet(packet).await
self.send_packet(packet).await.map_err(|e| {
if e.is::<std::io::Error>() {
*e.downcast::<std::io::Error>().expect("downcast after is")
} else {
std::io::Error::new(io::ErrorKind::Other, e.to_string())
}
})
}

async fn send_packet(&mut self, packet: Packet) -> Result<(), io::Error> {
async fn send_packet(&mut self, packet: Packet) -> Result<(), Box<dyn Error>> {
match self.sink_tx.send(packet).await {
Err(_) => Err(io::Error::new(io::ErrorKind::Other, "Connection closed")),
Err(_) => Err(Box::new(io::Error::new(
io::ErrorKind::NotConnected,
"Connection closed",
))),
Ok(_) => Ok(()),
}
}
Expand All @@ -237,7 +250,7 @@ impl WorkerJob {
///
/// This method is typically called by the [Client::work] method upon return
/// of an error from the assigned closure.
pub async fn work_fail(&mut self) -> Result<(), io::Error> {
pub async fn work_fail(&mut self) -> Result<(), Box<dyn Error>> {
let packet = new_res(WORK_FAIL, self.handle.clone());
self.send_packet(packet).await
}
Expand All @@ -246,7 +259,7 @@ impl WorkerJob {
///
/// This method is typically called by the [Client::work] method upon return of
/// the assigned closure.
pub async fn work_complete(&mut self, response: Vec<u8>) -> Result<(), io::Error> {
pub async fn work_complete(&mut self, response: Vec<u8>) -> Result<(), Box<dyn Error>> {
let mut payload = BytesMut::with_capacity(self.handle.len() + 1 + self.payload.len());
payload.extend(self.handle.clone());
payload.put_u8(b'\0');
Expand Down Expand Up @@ -437,13 +450,15 @@ impl Client {
while let Some(frame) = stream.next().await {
trace!("Frame read: {:?}", frame);
let response = match frame {
Err(e) => Err(e),
Err(e) => Err(e.to_string()),
Ok(frame) => {
let handler = handler.clone();
debug!("Locking handler");
let mut handler = handler;
debug!("Locked handler");
handler.call(frame)
handler
.call(frame)
.map_err(|e| e.to_string())
}
};
match response {
Expand Down Expand Up @@ -544,18 +559,18 @@ impl Client {
/// Sends an ECHO_REQ to the first server, a good way to confirm the connection is alive
///
/// Returns an error if there aren't any connected servers, or no ECHO_RES comes back
pub async fn echo(&mut self, payload: &[u8]) -> Result<(), io::Error> {
pub async fn echo(&mut self, payload: &[u8]) -> Result<(), Box<dyn Error>> {
let packet = new_req(ECHO_REQ, Bytes::copy_from_slice(payload));
{
let conns = self
.conns
.lock()
.expect("All lock holders should not panic");
if conns.len() < 1 {
return Err(io::Error::new(
io::ErrorKind::Other,
return Err(Box::new(io::Error::new(
io::ErrorKind::NotConnected,
"No connections for echo!",
));
)));
}
conns
.get(0)
Expand All @@ -573,7 +588,11 @@ impl Client {

/// Submits a foreground job. The see [ClientJob::response] for how to see the response from the
/// worker. The unique ID will be generated using [Uuid::new_v4]
pub async fn submit(&mut self, function: &str, payload: &[u8]) -> Result<ClientJob, io::Error> {
pub async fn submit(
&mut self,
function: &str,
payload: &[u8],
) -> Result<ClientJob, Box<dyn Error>> {
self.direct_submit(SUBMIT_JOB, function, payload, None)
.await
}
Expand All @@ -584,7 +603,7 @@ impl Client {
function: &str,
unique: &[u8],
payload: &[u8],
) -> Result<ClientJob, io::Error> {
) -> Result<ClientJob, Box<dyn Error>> {
self.direct_submit(SUBMIT_JOB, function, payload, Some(unique))
.await
}
Expand All @@ -595,7 +614,7 @@ impl Client {
&mut self,
function: &str,
payload: &[u8],
) -> Result<ClientJob, io::Error> {
) -> Result<ClientJob, Box<dyn Error>> {
self.direct_submit(SUBMIT_JOB_BG, function, payload, None)
.await
}
Expand All @@ -607,7 +626,7 @@ impl Client {
function: &str,
unique: &[u8],
payload: &[u8],
) -> Result<ClientJob, io::Error> {
) -> Result<ClientJob, Box<dyn Error>> {
self.direct_submit(SUBMIT_JOB_BG, function, payload, Some(unique))
.await
}
Expand All @@ -618,7 +637,7 @@ impl Client {
function: &str,
payload: &[u8],
unique: Option<&[u8]>,
) -> Result<ClientJob, io::Error> {
) -> Result<ClientJob, Box<dyn Error>> {
let mut uuid_unique = BytesMut::new();
let unique: &[u8] = match unique {
None => {
Expand All @@ -641,10 +660,10 @@ impl Client {
.expect("All lock holders should not panic");
let conn = match conns.get_hashed_conn(&unique.iter().map(|b| *b).collect()) {
None => {
return Err(io::Error::new(
io::ErrorKind::Other,
return Err(Box::new(io::Error::new(
io::ErrorKind::NotConnected,
"No connections for submitting jobs.",
));
)));
}
Some(conn) => conn,
};
Expand All @@ -664,13 +683,16 @@ impl Client {
};
Ok(ClientJob::new(handle, rx))
} else {
Err(io::Error::new(io::ErrorKind::Other, "No job created!"))
Err(Box::new(io::Error::new(
io::ErrorKind::NotConnected,
"Receiver exited.",
)))
};
submit_result
Ok(submit_result?)
}

/// Sends a GET_STATUS packet and then returns the STATUS_RES in a [JobStatus]
pub async fn get_status(&mut self, handle: &ServerHandle) -> Result<JobStatus, io::Error> {
pub async fn get_status(&mut self, handle: &ServerHandle) -> Result<JobStatus, Box<dyn Error>> {
let mut payload = BytesMut::with_capacity(handle.handle().len());
payload.extend(handle.handle());
let status_req = new_req(GET_STATUS, payload.freeze());
Expand All @@ -686,7 +708,12 @@ impl Client {
None
}
}) {
None => return Err(io::Error::new(ErrorKind::Other, "No connection for job")),
None => {
return Err(Box::new(io::Error::new(
ErrorKind::NotConnected,
"No connection for job",
)))
}
Some(conn) => conn,
};
conn.send_packet(status_req).await?;
Expand All @@ -695,7 +722,10 @@ impl Client {
if let Some(status_res) = self.client_data.receivers().status_res_rx.recv().await {
Ok(status_res)
} else {
Err(io::Error::new(io::ErrorKind::Other, "No status to report!"))
Err(Box::new(io::Error::new(
io::ErrorKind::NotConnected,
"No status to report!",
)))
}
}

Expand All @@ -709,7 +739,7 @@ impl Client {
///
/// See examples/worker.rs for more information.
///
pub async fn can_do<F>(mut self, function: &str, func: F) -> Result<Self, io::Error>
pub async fn can_do<F>(mut self, function: &str, func: F) -> Result<Self, Box<dyn Error>>
where
F: FnMut(&mut WorkerJob) -> Result<Vec<u8>, io::Error> + Send + 'static,
{
Expand Down Expand Up @@ -743,7 +773,7 @@ impl Client {
task::spawn_blocking(move || {
let rt = tokio::runtime::Builder::new_current_thread()
.build()
.unwrap();
.expect("Tokio builder should not panic");
let res = func_clone.lock().unwrap()(&mut job);
match res {
Err(_) => {
Expand All @@ -767,7 +797,7 @@ impl Client {

/// Receive and do just one job. Will not return until a job is done or there
/// is an error. This is called in a loop by [Client::work].
pub async fn do_one_job(&mut self) -> Result<(), io::Error> {
pub async fn do_one_job(&mut self) -> Result<(), Box<dyn Error>> {
let job = self.client_data.receivers().worker_job_rx.try_recv();
let job = match job {
Err(TryRecvError::Empty) => {
Expand All @@ -784,18 +814,18 @@ impl Client {
match self.client_data.receivers().worker_job_rx.recv().await {
Some(job) => job,
None => {
return Err(io::Error::new(
io::ErrorKind::Other,
return Err(Box::new(io::Error::new(
io::ErrorKind::NotConnected,
"Worker job tx are all dropped",
))
)))
}
}
}
Err(TryRecvError::Disconnected) => {
return Err(io::Error::new(
io::ErrorKind::Other,
return Err(Box::new(io::Error::new(
io::ErrorKind::NotConnected,
"Worker job tx are all dropped",
))
)))
}
Ok(job) => job,
};
Expand All @@ -804,13 +834,13 @@ impl Client {
.get_jobs_tx_by_func(&Vec::from(job.function()))
{
None => {
return Err(io::Error::new(
io::ErrorKind::Other,
return Err(Box::new(io::Error::new(
io::ErrorKind::InvalidInput,
format!(
"Received job for unregistered function: {:?}",
job.function()
),
))
)))
}
Some(tx) => tx,
};
Expand All @@ -827,15 +857,15 @@ impl Client {
/// not return unless there is an unexpected error.
///
/// See examples/worker.rs for more information on how to use it.
pub async fn work(mut self) -> Result<(), io::Error> {
pub async fn work(mut self) -> Result<(), Box<dyn Error>> {
loop {
self.do_one_job().await?;
}
}

/// Gets a single error that might have come from the server. The tuple returned is (code,
/// message)
pub async fn error(&mut self) -> Result<Option<(Bytes, Bytes)>, io::Error> {
pub async fn error(&mut self) -> Result<Option<(Bytes, Bytes)>, &str> {
Ok(self.client_data.receivers().error_rx.recv().await)
}
}
Loading

0 comments on commit 53c5677

Please sign in to comment.