Skip to content

Commit

Permalink
Better server error support
Browse files Browse the repository at this point in the history
Rustygeard was unwrapping and panicing on basic string problems and
generally not sending good errors. There are more of these, but this
starts the process of both being able to send them, and receive them in
the client. The server error codes are not defined in the Gearman
protocol, they seem to be server-specific, so we needed to define a few
here.
  • Loading branch information
SpamapS committed Jun 21, 2024
1 parent e04762f commit 1800762
Show file tree
Hide file tree
Showing 9 changed files with 93 additions and 19 deletions.
22 changes: 20 additions & 2 deletions rustygear/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,15 @@ impl Client {
self
}

/// Configures the client ID for this client
///
/// It's right and valid to set client IDs to something other than what UTF8 allows
/// on other implementations, and for testing purposes on rustygear.
pub fn set_client_id_bytes(mut self, client_id: &[u8]) -> Self {
self.client_id = Some(Bytes::copy_from_slice(client_id));
self
}

/// Returns a Vec of references to strings corresponding to only active servers
pub fn active_servers(&self) -> Vec<Hostname> {
// Active servers will have a writer and a reader
Expand Down Expand Up @@ -877,7 +886,16 @@ impl Client {

/// Gets a single error that might have come from the server. The tuple returned is (code,
/// message)
pub async fn error(&mut self) -> Result<Option<(Bytes, Bytes)>, &str> {
Ok(self.client_data.receivers().error_rx.recv().await)
pub async fn error(&mut self) -> Option<(Bytes, Bytes)> {
match self.client_data.receivers().error_rx.try_recv() {
Ok(content) => Some(content),
Err(e) => match e {
TryRecvError::Empty => None,
TryRecvError::Disconnected => {
warn!("Error Channel read whlie disconnected.");
None
}
},
}
}
}
6 changes: 6 additions & 0 deletions rustygear/src/conn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,12 @@ impl ConnHandler {
let code = next_field(&mut data);
let text = next_field(&mut data);
let tx = self.client_data.error_tx();
warn!(
"We got an error from [{}]. [{}]({})",
self.server(),
String::from_utf8_lossy(&code),
String::from_utf8_lossy(&text)
);
runtime::Handle::current().spawn(async move { tx.send((code, text)).await });
no_response()
}
Expand Down
1 change: 1 addition & 0 deletions rustygear/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ pub mod clientdata;
pub mod codec;
pub mod conn;
pub mod constants;
pub mod error;
pub mod job;
pub mod util;
pub mod wrappedstream;
13 changes: 12 additions & 1 deletion rustygear/src/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@
*/
use crate::codec::{Packet, PacketMagic};
use crate::constants::*;
use bytes::{Buf, Bytes};
use crate::error::RustygearServerError;
use bytes::{Buf, BufMut, Bytes, BytesMut};

pub fn bytes2bool(input: &Bytes) -> bool {
if input.len() != 1 {
Expand Down Expand Up @@ -45,6 +46,16 @@ pub fn new_req(ptype: u32, data: Bytes) -> Packet {
}
}

pub fn new_err(err: RustygearServerError, message: Bytes) -> Packet {
let code = format!("{}", err as i32);
let code = code.bytes();
let mut data = BytesMut::with_capacity(code.len() + message.len() + 1);
data.extend(code);
data.put_u8(b'\0');
data.extend(message);
new_res(ERROR, data.freeze())
}

pub fn next_field(buf: &mut Bytes) -> Bytes {
match buf[..].iter().position(|b| *b == b'\0') {
Some(null_pos) => {
Expand Down
5 changes: 3 additions & 2 deletions rustygeard/src/admin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,9 @@ pub fn admin_command_workers(workers: WorkersByConnId) -> Packet {
// actually makes the workers command more useful as it lets us see
// where in the roundrobin each worker is
let mut worker = worker.lock().unwrap();
let client_id = String::from_utf8(worker.client_id.to_vec()).unwrap();
response.extend(format!("{} {} {} :", conn_id, worker.peer_addr, client_id).bytes());
response.extend(format!("{} {} ", conn_id, worker.peer_addr).bytes());
response.extend(worker.client_id.as_bytes());
response.extend(b" :");
for func in worker.functions.iter() {
response.put_u8(b' ');
response.extend(func);
Expand Down
26 changes: 19 additions & 7 deletions rustygeard/src/service.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use std::collections::{BTreeMap, HashMap};
use std::io;
use std::io::{self};
use std::net::SocketAddr;
use std::ops::Drop;
use std::pin::Pin;
Expand All @@ -9,6 +9,7 @@ use std::sync::{Arc, Mutex};
use core::task::{Context, Poll};

use futures::Future;
use rustygear::error::RustygearServerError;
use tokio::runtime;
use tokio::sync::mpsc::Sender;
use tower_service::Service;
Expand All @@ -18,7 +19,7 @@ use bytes::{BufMut, Bytes, BytesMut};
use rustygear::codec::{Packet, PacketMagic};
use rustygear::constants::*;
use rustygear::job::Job;
use rustygear::util::{new_res, next_field, no_response};
use rustygear::util::{new_err, new_res, next_field, no_response};

use crate::admin;
use crate::queues::{HandleJobStorage, JobQueuePriority, SharedJobStorage};
Expand Down Expand Up @@ -122,7 +123,7 @@ impl GearmanService {
GearmanService {
conn_id: conn_id,
queues: queues,
worker: Arc::new(Mutex::new(Worker::new(peer_addr, Bytes::from("-")))),
worker: Arc::new(Mutex::new(Worker::new(peer_addr, String::from("-")))),
workers: workers,
job_count: job_count,
senders_by_conn_id: senders_by_conn_id,
Expand Down Expand Up @@ -354,10 +355,21 @@ impl GearmanService {
}

fn handle_set_client_id(&self, packet: &Packet) -> Result<Packet, io::Error> {
let d = packet.data.clone();
let mut worker = self.worker.lock().unwrap();
worker.client_id = d;
Ok(no_response())
let d: Vec<u8> = packet.data.clone().into();
match String::from_utf8(d) {
Ok(s) => {
let mut worker = self.worker.lock().unwrap();
worker.client_id = s;
Ok(no_response())
}
Err(e) => {
warn!("Received bad clientID: {}", e);
Ok(new_err(
RustygearServerError::InvalidClientId,
Bytes::from("ClientID must be valid UTF-8"),
))
}
}
}

fn handle_get_status(&self, packet: &Packet) -> Result<Packet, io::Error> {
Expand Down
4 changes: 2 additions & 2 deletions rustygeard/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,12 +153,12 @@ impl Workers {
pub struct Worker {
pub peer_addr: SocketAddr,
pub functions: WrappingHashSet<Bytes>,
pub client_id: Bytes,
pub client_id: String,
jobs: HashMap<Bytes, Arc<Job>>,
}

impl Worker {
pub fn new(peer_addr: SocketAddr, client_id: Bytes) -> Worker {
pub fn new(peer_addr: SocketAddr, client_id: String) -> Worker {
Worker {
peer_addr: peer_addr,
functions: WrappingHashSet::new(),
Expand Down
8 changes: 4 additions & 4 deletions rustygeard/tests/admin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ fn admin_command_status_1job() {
Bytes::new(),
Bytes::from("h"),
);
let mut w = Worker::new("127.0.0.1:37337".parse().unwrap(), Bytes::from("client1"));
let mut w = Worker::new("127.0.0.1:37337".parse().unwrap(), String::from("client1"));
w.can_do(Bytes::from("f"));
let mut storage = SharedJobStorage::new_job_storage();
let mut workers = SharedWorkers::new_workers();
Expand Down Expand Up @@ -59,7 +59,7 @@ fn admin_command_workers_with2() {
let mut wbci = workers_by_conn_id.lock().unwrap();
let worker = Arc::new(Mutex::new(Worker::new(
"127.0.0.1:37337".parse().unwrap(),
Bytes::from("hacker1"),
String::from("hacker1"),
)));
{
let mut worker = worker.lock().unwrap();
Expand All @@ -68,7 +68,7 @@ fn admin_command_workers_with2() {
wbci.insert(10, worker);
let worker = Arc::new(Mutex::new(Worker::new(
"127.0.0.1:33333".parse().unwrap(),
Bytes::from("-"),
String::from("-"),
)));
wbci.insert(11, worker);
}
Expand Down Expand Up @@ -100,7 +100,7 @@ fn admin_command_priority_status_priority_jobs() {
storage.add_job(Arc::new(job), priority, None);
}
}
let mut w = Worker::new("127.0.0.1:37337".parse().unwrap(), Bytes::from("client1"));
let mut w = Worker::new("127.0.0.1:37337".parse().unwrap(), String::from("client1"));
w.can_do(Bytes::from("func"));
let mut workers = SharedWorkers::new_workers();
workers.sleep(&mut w, 1);
Expand Down
27 changes: 26 additions & 1 deletion rustygeard/tests/errors.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::time::Duration;

use bytes::BytesMut;
use bytes::{Bytes, BytesMut};
use rustygear::{
client::{Client, WorkUpdate, WorkerJob},
constants::WORK_STATUS,
Expand Down Expand Up @@ -52,3 +52,28 @@ async fn test_worker_sends_bad_work_status() {
}
));
}

#[tokio::test]
async fn test_client_sends_nulled_client_id() {
let server = start_test_server().unwrap();
let mut client = Client::new()
.add_server(&server.addr().to_string())
.set_client_id_bytes(b"b4F8\xF8after")
.connect()
.await
.expect("Should connect to server");
client
.echo(b"wait for client error")
.await
.expect("ECHO should go through");
assert_eq!(
client
.error()
.await
.expect("We should get an error immediately"),
(
Bytes::copy_from_slice(b"2"),
Bytes::copy_from_slice(b"ClientID must be valid UTF-8")
)
);
}

0 comments on commit 1800762

Please sign in to comment.