diff --git a/libafl/src/events/centralized.rs b/libafl/src/events/centralized.rs index 2381aa6960..515e991144 100644 --- a/libafl/src/events/centralized.rs +++ b/libafl/src/events/centralized.rs @@ -86,10 +86,19 @@ where /// /// The port must not be bound yet to have a broker. #[cfg(feature = "std")] - pub fn on_port(shmem_provider: SP, port: u16) -> Result { + pub fn on_port( + shmem_provider: SP, + port: u16, + client_timeout: Option, + ) -> Result { Ok(Self { // TODO switch to false after solving the bug - llmp: LlmpBroker::with_keep_pages_attach_to_tcp(shmem_provider, port, true)?, + llmp: LlmpBroker::with_keep_pages_attach_to_tcp( + shmem_provider, + port, + true, + client_timeout, + )?, #[cfg(feature = "llmp_compression")] compressor: GzipCompressor::new(COMPRESS_THRESHOLD), phantom: PhantomData, diff --git a/libafl/src/events/launcher.rs b/libafl/src/events/launcher.rs index 18181c46b8..75f250e9fb 100644 --- a/libafl/src/events/launcher.rs +++ b/libafl/src/events/launcher.rs @@ -24,7 +24,7 @@ use std::net::SocketAddr; #[cfg(all(feature = "std", any(windows, not(feature = "fork"))))] use std::process::Stdio; #[cfg(all(unix, feature = "std", feature = "fork"))] -use std::{fs::File, os::unix::io::AsRawFd}; +use std::{fs::File, os::unix::io::AsRawFd, time::Duration}; #[cfg(all(feature = "std", any(windows, not(feature = "fork"))))] use libafl_bolts::os::startable_self; @@ -496,10 +496,9 @@ where S: State + HasExecutions, SP: ShMemProvider + 'static, { - /// Launch the broker and the clients and fuzz #[allow(clippy::similar_names)] #[allow(clippy::too_many_lines)] - pub fn launch(&mut self) -> Result<(), Error> { + fn launch_internal(&mut self, client_timeout: Option) -> Result<(), Error> { if self.cores.ids.is_empty() { return Err(Error::illegal_argument( "No cores to spawn on given, cannot launch anything.", @@ -544,6 +543,7 @@ where CentralizedLlmpEventBroker::on_port( self.shmem_provider.clone(), self.centralized_broker_port, + client_timeout, )?; broker.broker_loop()?; } @@ -643,4 +643,14 @@ where Ok(()) } + + /// Launch the broker and the clients and fuzz + pub fn launch(&mut self) -> Result<(), Error> { + self.launch_internal(None) + } + + /// Launch the broker and the clients and fuzz with a given timeout for the clients + pub fn launch_with_client_timeout(&mut self, client_timeout: Duration) -> Result<(), Error> { + self.launch_internal(Some(client_timeout)) + } } diff --git a/libafl/src/events/llmp.rs b/libafl/src/events/llmp.rs index 808a56e9c9..5cc02ad97c 100644 --- a/libafl/src/events/llmp.rs +++ b/libafl/src/events/llmp.rs @@ -109,10 +109,15 @@ where /// /// The port must not be bound yet to have a broker. #[cfg(feature = "std")] - pub fn on_port(shmem_provider: SP, monitor: MT, port: u16) -> Result { + pub fn on_port( + shmem_provider: SP, + monitor: MT, + port: u16, + client_timeout: Option, + ) -> Result { Ok(Self { monitor, - llmp: llmp::LlmpBroker::create_attach_to_tcp(shmem_provider, port)?, + llmp: llmp::LlmpBroker::create_attach_to_tcp(shmem_provider, port, client_timeout)?, #[cfg(feature = "llmp_compression")] compressor: GzipCompressor::new(COMPRESS_THRESHOLD), phantom: PhantomData, @@ -1172,8 +1177,10 @@ where false } - /// Launch the restarting manager - pub fn launch(&mut self) -> Result<(Option, LlmpRestartingEventManager), Error> { + fn launch_internal( + &mut self, + client_timeout: Option, + ) -> Result<(Option, LlmpRestartingEventManager), Error> { // We start ourself as child process to actually fuzz let (staterestorer, new_shmem_provider, core_id) = if std::env::var(_ENV_FUZZER_SENDER) .is_err() @@ -1195,8 +1202,11 @@ where // We get here if we are on Unix, or we are a broker on Windows (or without forks). let (mgr, core_id) = match self.kind { ManagerKind::Any => { - let connection = - LlmpConnection::on_port(self.shmem_provider.clone(), self.broker_port)?; + let connection = LlmpConnection::on_port( + self.shmem_provider.clone(), + self.broker_port, + client_timeout, + )?; match connection { LlmpConnection::IsBroker { broker } => { let event_broker = LlmpEventBroker::::new( @@ -1224,6 +1234,7 @@ where self.shmem_provider.clone(), self.monitor.take().unwrap(), self.broker_port, + client_timeout, )?; broker_things(event_broker, self.remote_broker_addr)?; @@ -1386,6 +1397,19 @@ where Ok((state, mgr)) } + + /// Launch the restarting manager + pub fn launch(&mut self) -> Result<(Option, LlmpRestartingEventManager), Error> { + self.launch_internal(None) + } + + /// Launch the restarting manager with a custom client timeout + pub fn launch_with_client_timeout( + &mut self, + client_timeout: Duration, + ) -> Result<(Option, LlmpRestartingEventManager), Error> { + self.launch_internal(Some(client_timeout)) + } } /// A manager-like llmp client that converts between input types diff --git a/libafl_bolts/examples/llmp_test/main.rs b/libafl_bolts/examples/llmp_test/main.rs index 5c193c436c..f247c5f31b 100644 --- a/libafl_bolts/examples/llmp_test/main.rs +++ b/libafl_bolts/examples/llmp_test/main.rs @@ -155,7 +155,7 @@ fn main() -> Result<(), Box> { match mode.as_str() { "broker" => { - let mut broker = llmp::LlmpBroker::new(StdShMemProvider::new()?)?; + let mut broker = llmp::LlmpBroker::new(StdShMemProvider::new()?, None)?; broker.launch_tcp_listener_on(port)?; // Exit when we got at least _n_ nodes, and all of them quit. broker.set_exit_cleanly_after(NonZeroUsize::new(1_usize).unwrap()); @@ -166,7 +166,7 @@ fn main() -> Result<(), Box> { ); } "b2b" => { - let mut broker = llmp::LlmpBroker::new(StdShMemProvider::new()?)?; + let mut broker = llmp::LlmpBroker::new(StdShMemProvider::new()?, None)?; broker.launch_tcp_listener_on(b2b_port)?; // connect back to the main broker. broker.connect_b2b(("127.0.0.1", port))?; diff --git a/libafl_bolts/src/llmp.rs b/libafl_bolts/src/llmp.rs index 00d0cd8a8e..e858395b2c 100644 --- a/libafl_bolts/src/llmp.rs +++ b/libafl_bolts/src/llmp.rs @@ -107,9 +107,9 @@ use crate::{ ClientId, Error, }; -/// The timeout after which a client will be considered stale, and removed. +/// The default timeout in seconds after which a client will be considered stale, and removed. #[cfg(feature = "std")] -const CLIENT_TIMEOUT: Duration = Duration::from_secs(60 * 5); +const DEFAULT_CLIENT_TIMEOUT_SECS: u64 = 60 * 5; /// The max number of pages a [`client`] may have mapped that were not yet read by the [`broker`] /// Usually, this value should not exceed `1`, else the broker cannot keep up with the amount of incoming messages. @@ -699,13 +699,17 @@ where { #[cfg(feature = "std")] /// Creates either a broker, if the tcp port is not bound, or a client, connected to this port. - pub fn on_port(shmem_provider: SP, port: u16) -> Result { + pub fn on_port( + shmem_provider: SP, + port: u16, + client_timeout: Option, + ) -> Result { match tcp_bind(port) { Ok(listener) => { // We got the port. We are the broker! :) log::info!("We're the broker"); - let mut broker = LlmpBroker::new(shmem_provider)?; + let mut broker = LlmpBroker::new(shmem_provider, client_timeout)?; let _listener_thread = broker.launch_listener(Listener::Tcp(listener))?; Ok(LlmpConnection::IsBroker { broker }) } @@ -725,9 +729,13 @@ where /// Creates a new broker on the given port #[cfg(feature = "std")] - pub fn broker_on_port(shmem_provider: SP, port: u16) -> Result { + pub fn broker_on_port( + shmem_provider: SP, + port: u16, + client_timeout: Option, + ) -> Result { Ok(LlmpConnection::IsBroker { - broker: LlmpBroker::create_attach_to_tcp(shmem_provider, port)?, + broker: LlmpBroker::create_attach_to_tcp(shmem_provider, port, client_timeout)?, }) } @@ -1961,6 +1969,9 @@ where clients_to_remove: Vec, /// The ShMemProvider to use shmem_provider: SP, + #[cfg(feature = "std")] + /// The timeout after which a client will be considered stale, and removed. + client_timeout: Duration, } /// A signal handler for the [`LlmpBroker`]. @@ -2009,16 +2020,28 @@ where SP: ShMemProvider + 'static, { /// Create and initialize a new [`LlmpBroker`] - pub fn new(shmem_provider: SP) -> Result { + pub fn new( + shmem_provider: SP, + #[cfg(feature = "std")] client_timeout: Option, + ) -> Result { // Broker never cleans up the pages so that new // clients may join at any time - Self::with_keep_pages(shmem_provider, true) + #[cfg(feature = "std")] + { + Self::with_keep_pages(shmem_provider, true, client_timeout) + } + + #[cfg(not(feature = "std"))] + { + Self::with_keep_pages(shmem_provider, true) + } } /// Create and initialize a new [`LlmpBroker`] telling if it has to keep pages forever pub fn with_keep_pages( mut shmem_provider: SP, keep_pages_forever: bool, + #[cfg(feature = "std")] client_timeout: Option, ) -> Result { Ok(LlmpBroker { llmp_out: LlmpSender { @@ -2039,6 +2062,12 @@ where listeners: vec![], exit_cleanly_after: None, num_clients_total: 0, + #[cfg(feature = "std")] + client_timeout: if let Some(to) = client_timeout { + to + } else { + Duration::from_secs(DEFAULT_CLIENT_TIMEOUT_SECS) + }, }) } @@ -2058,8 +2087,12 @@ where /// Create a new [`LlmpBroker`] attaching to a TCP port #[cfg(feature = "std")] - pub fn create_attach_to_tcp(shmem_provider: SP, port: u16) -> Result { - Self::with_keep_pages_attach_to_tcp(shmem_provider, port, true) + pub fn create_attach_to_tcp( + shmem_provider: SP, + port: u16, + client_timeout: Option, + ) -> Result { + Self::with_keep_pages_attach_to_tcp(shmem_provider, port, true, client_timeout) } /// Create a new [`LlmpBroker`] attaching to a TCP port and telling if it has to keep pages forever @@ -2068,10 +2101,15 @@ where shmem_provider: SP, port: u16, keep_pages_forever: bool, + client_timeout: Option, ) -> Result { match tcp_bind(port) { Ok(listener) => { - let mut broker = LlmpBroker::with_keep_pages(shmem_provider, keep_pages_forever)?; + let mut broker = LlmpBroker::with_keep_pages( + shmem_provider, + keep_pages_forever, + client_timeout, + )?; let _listener_thread = broker.launch_listener(Listener::Tcp(listener))?; Ok(broker) } @@ -2233,7 +2271,7 @@ where if !has_messages && !self.listeners.iter().any(|&x| x == client_id) { let last_msg_time = self.llmp_clients[i].last_msg_time; if last_msg_time < current_time - && current_time - last_msg_time > CLIENT_TIMEOUT + && current_time - last_msg_time > self.client_timeout { self.clients_to_remove.push(i); #[cfg(feature = "llmp_debug")] @@ -3257,13 +3295,15 @@ mod tests { pub fn test_llmp_connection() { #[allow(unused_variables)] let shmem_provider = StdShMemProvider::new().unwrap(); - let mut broker = match LlmpConnection::on_port(shmem_provider.clone(), 1337).unwrap() { + let mut broker = match LlmpConnection::on_port(shmem_provider.clone(), 1337, None).unwrap() + { IsClient { client: _ } => panic!("Could not bind to port as broker"), IsBroker { broker } => broker, }; // Add the first client (2nd, actually, because of the tcp listener client) - let mut client = match LlmpConnection::on_port(shmem_provider.clone(), 1337).unwrap() { + let mut client = match LlmpConnection::on_port(shmem_provider.clone(), 1337, None).unwrap() + { IsBroker { broker: _ } => panic!("Second connect should be a client!"), IsClient { client } => client, };