Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Configurable LLMP client timeout #1838

Merged
merged 4 commits into from
Feb 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 11 additions & 2 deletions libafl/src/events/centralized.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,10 +86,19 @@ where
///
/// The port must not be bound yet to have a broker.
#[cfg(feature = "std")]
pub fn on_port(shmem_provider: SP, port: u16) -> Result<Self, Error> {
pub fn on_port(
shmem_provider: SP,
port: u16,
client_timeout: Option<Duration>,
) -> Result<Self, Error> {
Ok(Self {
// TODO switch to false after solving the bug
llmp: LlmpBroker::with_keep_pages_attach_to_tcp(shmem_provider, port, true)?,
llmp: LlmpBroker::with_keep_pages_attach_to_tcp(
shmem_provider,
port,
true,
client_timeout,
)?,
#[cfg(feature = "llmp_compression")]
compressor: GzipCompressor::new(COMPRESS_THRESHOLD),
phantom: PhantomData,
Expand Down
16 changes: 13 additions & 3 deletions libafl/src/events/launcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use std::net::SocketAddr;
#[cfg(all(feature = "std", any(windows, not(feature = "fork"))))]
use std::process::Stdio;
#[cfg(all(unix, feature = "std", feature = "fork"))]
use std::{fs::File, os::unix::io::AsRawFd};
use std::{fs::File, os::unix::io::AsRawFd, time::Duration};

#[cfg(all(feature = "std", any(windows, not(feature = "fork"))))]
use libafl_bolts::os::startable_self;
Expand Down Expand Up @@ -496,10 +496,9 @@ where
S: State + HasExecutions,
SP: ShMemProvider + 'static,
{
/// Launch the broker and the clients and fuzz
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not keep that docstring in any case?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since it's an internal function, I moved the old documentation to the public function.
Maybe we want documentation for both?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can never have enough documentation I guess

#[allow(clippy::similar_names)]
#[allow(clippy::too_many_lines)]
pub fn launch(&mut self) -> Result<(), Error> {
fn launch_internal(&mut self, client_timeout: Option<Duration>) -> Result<(), Error> {
if self.cores.ids.is_empty() {
return Err(Error::illegal_argument(
"No cores to spawn on given, cannot launch anything.",
Expand Down Expand Up @@ -544,6 +543,7 @@ where
CentralizedLlmpEventBroker::on_port(
self.shmem_provider.clone(),
self.centralized_broker_port,
client_timeout,
)?;
broker.broker_loop()?;
}
Expand Down Expand Up @@ -643,4 +643,14 @@ where

Ok(())
}

/// Launch the broker and the clients and fuzz
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Document that this has the default timeout, if that's what we want(?)

Copy link
Member

@domenukk domenukk Feb 5, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we want one api for with, and one for without timeouts?

Also we probably should document better which of the three(or so) different timeouts this is :D

pub fn launch(&mut self) -> Result<(), Error> {
self.launch_internal(None)
}

/// Launch the broker and the clients and fuzz with a given timeout for the clients
pub fn launch_with_client_timeout(&mut self, client_timeout: Duration) -> Result<(), Error> {
self.launch_internal(Some(client_timeout))
}
}
36 changes: 30 additions & 6 deletions libafl/src/events/llmp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,10 +109,15 @@ where
///
/// The port must not be bound yet to have a broker.
#[cfg(feature = "std")]
pub fn on_port(shmem_provider: SP, monitor: MT, port: u16) -> Result<Self, Error> {
pub fn on_port(
shmem_provider: SP,
monitor: MT,
port: u16,
client_timeout: Option<Duration>,
) -> Result<Self, Error> {
Ok(Self {
monitor,
llmp: llmp::LlmpBroker::create_attach_to_tcp(shmem_provider, port)?,
llmp: llmp::LlmpBroker::create_attach_to_tcp(shmem_provider, port, client_timeout)?,
#[cfg(feature = "llmp_compression")]
compressor: GzipCompressor::new(COMPRESS_THRESHOLD),
phantom: PhantomData,
Expand Down Expand Up @@ -1172,8 +1177,10 @@ where
false
}

/// Launch the restarting manager
pub fn launch(&mut self) -> Result<(Option<S>, LlmpRestartingEventManager<S, SP>), Error> {
fn launch_internal(
&mut self,
client_timeout: Option<Duration>,
) -> Result<(Option<S>, LlmpRestartingEventManager<S, SP>), Error> {
// We start ourself as child process to actually fuzz
let (staterestorer, new_shmem_provider, core_id) = if std::env::var(_ENV_FUZZER_SENDER)
.is_err()
Expand All @@ -1195,8 +1202,11 @@ where
// We get here if we are on Unix, or we are a broker on Windows (or without forks).
let (mgr, core_id) = match self.kind {
ManagerKind::Any => {
let connection =
LlmpConnection::on_port(self.shmem_provider.clone(), self.broker_port)?;
let connection = LlmpConnection::on_port(
self.shmem_provider.clone(),
self.broker_port,
client_timeout,
)?;
match connection {
LlmpConnection::IsBroker { broker } => {
let event_broker = LlmpEventBroker::<S::Input, MT, SP>::new(
Expand Down Expand Up @@ -1224,6 +1234,7 @@ where
self.shmem_provider.clone(),
self.monitor.take().unwrap(),
self.broker_port,
client_timeout,
)?;

broker_things(event_broker, self.remote_broker_addr)?;
Expand Down Expand Up @@ -1386,6 +1397,19 @@ where

Ok((state, mgr))
}

/// Launch the restarting manager
pub fn launch(&mut self) -> Result<(Option<S>, LlmpRestartingEventManager<S, SP>), Error> {
self.launch_internal(None)
}

/// Launch the restarting manager with a custom client timeout
pub fn launch_with_client_timeout(
&mut self,
client_timeout: Duration,
) -> Result<(Option<S>, LlmpRestartingEventManager<S, SP>), Error> {
self.launch_internal(Some(client_timeout))
}
}

/// A manager-like llmp client that converts between input types
Expand Down
4 changes: 2 additions & 2 deletions libafl_bolts/examples/llmp_test/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {

match mode.as_str() {
"broker" => {
let mut broker = llmp::LlmpBroker::new(StdShMemProvider::new()?)?;
let mut broker = llmp::LlmpBroker::new(StdShMemProvider::new()?, None)?;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not have a timeout here?

broker.launch_tcp_listener_on(port)?;
// Exit when we got at least _n_ nodes, and all of them quit.
broker.set_exit_cleanly_after(NonZeroUsize::new(1_usize).unwrap());
Expand All @@ -166,7 +166,7 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
);
}
"b2b" => {
let mut broker = llmp::LlmpBroker::new(StdShMemProvider::new()?)?;
let mut broker = llmp::LlmpBroker::new(StdShMemProvider::new()?, None)?;
broker.launch_tcp_listener_on(b2b_port)?;
// connect back to the main broker.
broker.connect_b2b(("127.0.0.1", port))?;
Expand Down
68 changes: 54 additions & 14 deletions libafl_bolts/src/llmp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,9 +107,9 @@ use crate::{
ClientId, Error,
};

/// The timeout after which a client will be considered stale, and removed.
/// The default timeout in seconds after which a client will be considered stale, and removed.
#[cfg(feature = "std")]
const CLIENT_TIMEOUT: Duration = Duration::from_secs(60 * 5);
const DEFAULT_CLIENT_TIMEOUT_SECS: u64 = 60 * 5;

/// The max number of pages a [`client`] may have mapped that were not yet read by the [`broker`]
/// Usually, this value should not exceed `1`, else the broker cannot keep up with the amount of incoming messages.
Expand Down Expand Up @@ -699,13 +699,17 @@ where
{
#[cfg(feature = "std")]
/// Creates either a broker, if the tcp port is not bound, or a client, connected to this port.
pub fn on_port(shmem_provider: SP, port: u16) -> Result<Self, Error> {
pub fn on_port(
shmem_provider: SP,
port: u16,
client_timeout: Option<Duration>,
) -> Result<Self, Error> {
match tcp_bind(port) {
Ok(listener) => {
// We got the port. We are the broker! :)
log::info!("We're the broker");

let mut broker = LlmpBroker::new(shmem_provider)?;
let mut broker = LlmpBroker::new(shmem_provider, client_timeout)?;
let _listener_thread = broker.launch_listener(Listener::Tcp(listener))?;
Ok(LlmpConnection::IsBroker { broker })
}
Expand All @@ -725,9 +729,13 @@ where

/// Creates a new broker on the given port
#[cfg(feature = "std")]
pub fn broker_on_port(shmem_provider: SP, port: u16) -> Result<Self, Error> {
pub fn broker_on_port(
shmem_provider: SP,
port: u16,
client_timeout: Option<Duration>,
) -> Result<Self, Error> {
Ok(LlmpConnection::IsBroker {
broker: LlmpBroker::create_attach_to_tcp(shmem_provider, port)?,
broker: LlmpBroker::create_attach_to_tcp(shmem_provider, port, client_timeout)?,
})
}

Expand Down Expand Up @@ -1961,6 +1969,9 @@ where
clients_to_remove: Vec<usize>,
/// The ShMemProvider to use
shmem_provider: SP,
#[cfg(feature = "std")]
/// The timeout after which a client will be considered stale, and removed.
client_timeout: Duration,
}

/// A signal handler for the [`LlmpBroker`].
Expand Down Expand Up @@ -2009,16 +2020,28 @@ where
SP: ShMemProvider + 'static,
{
/// Create and initialize a new [`LlmpBroker`]
pub fn new(shmem_provider: SP) -> Result<Self, Error> {
pub fn new(
shmem_provider: SP,
#[cfg(feature = "std")] client_timeout: Option<Duration>,
) -> Result<Self, Error> {
// Broker never cleans up the pages so that new
// clients may join at any time
Self::with_keep_pages(shmem_provider, true)
#[cfg(feature = "std")]
{
Self::with_keep_pages(shmem_provider, true, client_timeout)
}

#[cfg(not(feature = "std"))]
{
Self::with_keep_pages(shmem_provider, true)
}
}

/// Create and initialize a new [`LlmpBroker`] telling if it has to keep pages forever
pub fn with_keep_pages(
mut shmem_provider: SP,
keep_pages_forever: bool,
#[cfg(feature = "std")] client_timeout: Option<Duration>,
) -> Result<Self, Error> {
Ok(LlmpBroker {
llmp_out: LlmpSender {
Expand All @@ -2039,6 +2062,12 @@ where
listeners: vec![],
exit_cleanly_after: None,
num_clients_total: 0,
#[cfg(feature = "std")]
client_timeout: if let Some(to) = client_timeout {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we sure this should default to a time (and not just be no-timeout if None)?
Especially since no_std doesn't have that timeout to begin with..

to
} else {
Duration::from_secs(DEFAULT_CLIENT_TIMEOUT_SECS)
},
})
}

Expand All @@ -2058,8 +2087,12 @@ where

/// Create a new [`LlmpBroker`] attaching to a TCP port
#[cfg(feature = "std")]
pub fn create_attach_to_tcp(shmem_provider: SP, port: u16) -> Result<Self, Error> {
Self::with_keep_pages_attach_to_tcp(shmem_provider, port, true)
pub fn create_attach_to_tcp(
shmem_provider: SP,
port: u16,
client_timeout: Option<Duration>,
) -> Result<Self, Error> {
Self::with_keep_pages_attach_to_tcp(shmem_provider, port, true, client_timeout)
}

/// Create a new [`LlmpBroker`] attaching to a TCP port and telling if it has to keep pages forever
Expand All @@ -2068,10 +2101,15 @@ where
shmem_provider: SP,
port: u16,
keep_pages_forever: bool,
client_timeout: Option<Duration>,
) -> Result<Self, Error> {
match tcp_bind(port) {
Ok(listener) => {
let mut broker = LlmpBroker::with_keep_pages(shmem_provider, keep_pages_forever)?;
let mut broker = LlmpBroker::with_keep_pages(
shmem_provider,
keep_pages_forever,
client_timeout,
)?;
let _listener_thread = broker.launch_listener(Listener::Tcp(listener))?;
Ok(broker)
}
Expand Down Expand Up @@ -2233,7 +2271,7 @@ where
if !has_messages && !self.listeners.iter().any(|&x| x == client_id) {
let last_msg_time = self.llmp_clients[i].last_msg_time;
if last_msg_time < current_time
&& current_time - last_msg_time > CLIENT_TIMEOUT
&& current_time - last_msg_time > self.client_timeout
{
self.clients_to_remove.push(i);
#[cfg(feature = "llmp_debug")]
Expand Down Expand Up @@ -3257,13 +3295,15 @@ mod tests {
pub fn test_llmp_connection() {
#[allow(unused_variables)]
let shmem_provider = StdShMemProvider::new().unwrap();
let mut broker = match LlmpConnection::on_port(shmem_provider.clone(), 1337).unwrap() {
let mut broker = match LlmpConnection::on_port(shmem_provider.clone(), 1337, None).unwrap()
{
IsClient { client: _ } => panic!("Could not bind to port as broker"),
IsBroker { broker } => broker,
};

// Add the first client (2nd, actually, because of the tcp listener client)
let mut client = match LlmpConnection::on_port(shmem_provider.clone(), 1337).unwrap() {
let mut client = match LlmpConnection::on_port(shmem_provider.clone(), 1337, None).unwrap()
{
IsBroker { broker: _ } => panic!("Second connect should be a client!"),
IsClient { client } => client,
};
Expand Down
Loading