Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Address comments from #2302 #2322

Merged
merged 6 commits into from
Jun 18, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions libafl/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,12 @@ handle_sigpipe = []

#! ## Additional Components

## Enables `TcpEventManager`, a simple EventManager proxying everything via TCP. This uses `tokio`.
tcp_manager = ["tokio", "std"]

## Enables compression for the TCP manager
tcp_compression = ["tcp_manager", "libafl_bolts/gzip"]

## Enable multi-machine support
multi_machine = ["tokio", "std", "enumflags2", "ahash/std"]

Expand Down
12 changes: 8 additions & 4 deletions libafl/src/events/broker_hooks/centralized_multi_machine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ use libafl_bolts::{
shmem::ShMemProvider,
ClientId, Error,
};
use log::debug;
use tokio::{
net::ToSocketAddrs,
runtime::Runtime,
Expand Down Expand Up @@ -121,7 +120,12 @@ where
I: Input + Send + Sync + 'static,
{
/// Should not be created alone. Use [`TcpMultiMachineBuilder`] instead.
pub(crate) fn new(
///
/// # Safety
/// For [`Self::on_new_message`], this struct assumes that the `msg` parameter
/// (or rather, the memory it points to), lives sufficiently long
/// for an async background task to process it.
pub(crate) unsafe fn new(
shared_state: Arc<RwLock<TcpMultiMachineState<A>>>,
rt: Arc<Runtime>,
) -> Self {
Expand Down Expand Up @@ -200,7 +204,7 @@ where
// TODO: do not copy here
state_wr_lock.add_past_msg(msg);

debug!("Sending msg...");
log::debug!("Sending msg...");

state_wr_lock
.send_interesting_event_to_nodes(&mm_msg)
Expand Down Expand Up @@ -239,7 +243,7 @@ where
.receive_new_messages_from_nodes(&mut incoming_msgs)
.await?;

debug!("received {} new incoming msg(s)", incoming_msgs.len());
log::debug!("received {} new incoming msg(s)", incoming_msgs.len());

let msgs_to_forward: Result<Vec<(Tag, Flags, Vec<u8>)>, Error> = incoming_msgs
.into_iter()
Expand Down
15 changes: 7 additions & 8 deletions libafl/src/events/centralized.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ use libafl_bolts::{
tuples::Handle,
ClientId,
};
use log::debug;
use serde::{Deserialize, Serialize};

use super::NopEventManager;
Expand Down Expand Up @@ -551,7 +550,7 @@ where
};
let event: Event<<<Self as UsesState>::State as UsesInput>::Input> =
postcard::from_bytes(event_bytes)?;
debug!("Processor received message {}", event.name_detailed());
log::debug!("Processor received message {}", event.name_detailed());
self.handle_in_main(fuzzer, executor, state, client_id, event)?;
count += 1;
}
Expand All @@ -574,7 +573,7 @@ where
Z: ExecutionProcessor<E::Observers, State = <Self as UsesState>::State>
+ EvaluatorObservers<E::Observers>,
{
debug!("handle_in_main!");
log::debug!("handle_in_main!");

let event_name = event.name_detailed();

Expand All @@ -591,7 +590,7 @@ where
#[cfg(feature = "multi_machine")]
node_id,
} => {
debug!(
log::debug!(
"Received {} from {client_id:?} ({client_config:?}, forward {forward_id:?})",
event_name
);
Expand All @@ -604,7 +603,7 @@ where
{
state.scalability_monitor_mut().testcase_with_observers += 1;
}
debug!(
log::debug!(
"[{}] Running fuzzer with event {}",
process::id(),
event_name
Expand All @@ -622,7 +621,7 @@ where
{
state.scalability_monitor_mut().testcase_without_observers += 1;
}
debug!(
log::debug!(
"[{}] Running fuzzer with event {}",
process::id(),
event_name
Expand Down Expand Up @@ -652,15 +651,15 @@ where

self.hooks.on_fire_all(state, client_id, &event)?;

debug!(
log::debug!(
"[{}] Adding received Testcase {} as item #{item}...",
process::id(),
event_name
);

self.inner.fire(state, event)?;
} else {
debug!("[{}] {} was discarded...)", process::id(), event_name);
log::debug!("[{}] {} was discarded...)", process::id(), event_name);
}
}
_ => {
Expand Down
28 changes: 17 additions & 11 deletions libafl/src/events/launcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,6 @@ use libafl_bolts::{
shmem::ShMemProvider,
tuples::{tuple_list, Handle},
};
#[cfg(all(unix, feature = "std", feature = "fork"))]
use log::debug;
#[cfg(feature = "std")]
use typed_builder::TypedBuilder;

Expand Down Expand Up @@ -657,7 +655,7 @@ where
let num_cores = core_ids.len();
let mut handles = vec![];

debug!("spawning on cores: {:?}", self.cores);
log::debug!("spawning on cores: {:?}", self.cores);

self.opened_stdout_file = self
.stdout_file
Expand Down Expand Up @@ -700,7 +698,7 @@ where

if index == 1 {
// Main client
debug!("Running main client on PID {}", std::process::id());
log::debug!("Running main client on PID {}", std::process::id());
let (state, mgr) =
main_inner_mgr_builder.take().unwrap()(self, *bind_to)?;

Expand All @@ -721,7 +719,7 @@ where
self.main_run_client.take().unwrap()(state, c_mgr, *bind_to)
} else {
// Secondary clients
debug!("Running secondary client on PID {}", std::process::id());
log::debug!("Running secondary client on PID {}", std::process::id());
let (state, mgr) =
secondary_inner_mgr_builder.take().unwrap()(self, *bind_to)?;

Expand All @@ -744,11 +742,19 @@ where

#[cfg(feature = "multi_machine")]
// Create this after forks, to avoid problems with tokio runtime
let (multi_machine_sender_hook, multi_machine_receiver_hook) =
TcpMultiMachineBuilder::build::<
SocketAddr,
<<EM as UsesState>::State as UsesInput>::Input,
>(self.multi_machine_node_descriptor.clone())?;

// # Safety
// The `multi_machine_receiver_hook` needs messages to outlive the receiver.
// The underlying memory region for incoming messages lives longer than the async thread processing them.
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rmalmain Are we sure this is the case? Is there any race condition when memory gets cleared up before the async thread kicks in?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

afaik the messages live long enough. i'll double check this

let TcpMultiMachine {
sender: multi_machine_sender_hook,
receiver: multi_machine_receiver_hook,
} = unsafe {
TcpMultiMachineBuilder()
.build::<SocketAddr, <<EM as UsesState>::State as UsesInput>::Input>(
self.multi_machine_node_descriptor.clone(),
)?
};

let mut brokers = Brokers::new();

Expand Down Expand Up @@ -812,7 +818,7 @@ where
brokers.add(Box::new(broker));
}

debug!(
log::debug!(
"Brokers have been initialized on port {}.",
std::process::id()
);
Expand Down
13 changes: 6 additions & 7 deletions libafl/src/events/llmp/mgr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ use libafl_bolts::{
llmp::{recv_tcp_msg, send_tcp_msg, TcpRequest, TcpResponse},
IP_LOCALHOST,
};
use log::debug;
use serde::{Deserialize, Serialize};

#[cfg(feature = "llmp_compression")]
Expand Down Expand Up @@ -370,7 +369,7 @@ where
Ok(_) => (),
Err(e) => log::error!("Failed to send tcp message {:#?}", e),
}
debug!("Asking he broker to be disconnected");
log::debug!("Asking he broker to be disconnected");
Ok(())
}

Expand Down Expand Up @@ -423,11 +422,11 @@ where
..
} => {
#[cfg(feature = "std")]
debug!("[{}] Received new Testcase {evt_name} from {client_id:?} ({client_config:?}, forward {forward_id:?})", std::process::id());
log::debug!("[{}] Received new Testcase {evt_name} from {client_id:?} ({client_config:?}, forward {forward_id:?})", std::process::id());

if self.always_interesting {
let item = fuzzer.add_input(state, executor, self, input)?;
debug!("Added received Testcase as item #{item}");
log::debug!("Added received Testcase as item #{item}");
} else {
let res = if client_config.match_with(&self.configuration)
&& observers_buf.is_some()
Expand Down Expand Up @@ -455,9 +454,9 @@ where
)?
};
if let Some(item) = res.1 {
debug!("Added received Testcase {evt_name} as item #{item}");
log::debug!("Added received Testcase {evt_name} as item #{item}");
} else {
debug!("Testcase {evt_name} was discarded");
log::debug!("Testcase {evt_name} was discarded");
}
}
}
Expand Down Expand Up @@ -620,7 +619,7 @@ where
msg
};
let event: Event<S::Input> = postcard::from_bytes(event_bytes)?;
debug!("Received event in normal llmp {}", event.name_detailed());
log::debug!("Received event in normal llmp {}", event.name_detailed());
self.handle_in_client(fuzzer, executor, state, client_id, event)?;
count += 1;
}
Expand Down
5 changes: 2 additions & 3 deletions libafl/src/events/llmp/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ use libafl_bolts::{
shmem::{NopShMemProvider, ShMemProvider},
ClientId,
};
use log::debug;
use serde::Deserialize;

use crate::{
Expand Down Expand Up @@ -303,7 +302,7 @@ where
Event::NewTestcase {
input, forward_id, ..
} => {
debug!("Received new Testcase to convert from {client_id:?} (forward {forward_id:?}, forward {forward_id:?})");
log::debug!("Received new Testcase to convert from {client_id:?} (forward {forward_id:?}, forward {forward_id:?})");

let Some(converter) = self.converter_back.as_mut() else {
return Ok(());
Expand Down Expand Up @@ -377,7 +376,7 @@ where
};

let event: Event<DI> = postcard::from_bytes(event_bytes)?;
debug!("Processor received message {}", event.name_detailed());
log::debug!("Processor received message {}", event.name_detailed());
self.handle_in_client(fuzzer, executor, state, manager, client_id, event)?;
count += 1;
}
Expand Down
4 changes: 1 addition & 3 deletions libafl/src/events/llmp/restarting.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,6 @@ use libafl_bolts::{
use libafl_bolts::{
llmp::LlmpConnection, os::CTRL_C_EXIT, shmem::StdShMemProvider, staterestore::StateRestorer,
};
#[cfg(all(unix, feature = "fork"))]
use log::debug;
use serde::{Deserialize, Serialize};
#[cfg(feature = "std")]
use typed_builder::TypedBuilder;
Expand Down Expand Up @@ -564,7 +562,7 @@ where
handle.status()
}
ForkResult::Child => {
debug!(
log::debug!(
"{} has been forked into {}",
std::os::unix::process::parent_id(),
std::process::id()
Expand Down
3 changes: 3 additions & 0 deletions libafl/src/events/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ pub mod launcher;
#[allow(clippy::ignored_unit_patterns)]
pub mod llmp;
pub use llmp::*;
#[cfg(feature = "tcp_manager")]
#[allow(clippy::ignored_unit_patterns)]
pub mod tcp;

pub mod broker_hooks;
use alloc::{
Expand Down
34 changes: 21 additions & 13 deletions libafl/src/events/multi_machine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,27 +154,35 @@ pub struct NodeDescriptor<A> {
pub flags: BitFlags<NodePolicy>, // The policy for shared messages between nodes.
}

/// A set of multi-machine `broker_hooks`.
#[derive(Debug)]
pub struct TcpMultiMachine<A, I> {
/// The sender hooks
pub sender: TcpMultiMachineLlmpSenderHook<A, I>,
/// The receiver hooks
pub recevier: TcpMultiMachineLlmpReceiverHook<A, I>,
}

/// A Multi-machine `broker_hooks` builder.
#[derive(Debug)]
pub struct TcpMultiMachineBuilder {
_private: (),
}

impl TcpMultiMachineBuilder {
/// Build a new couple [`TcpMultiMachineLlmpSenderHook`] / [`TcpMultiMachineLlmpReceiverHook`] from a [`NodeDescriptor`].
/// Build a new [`TcpMultiMachineHooks`] containing a sender and a receiver from a [`NodeDescriptor`].
/// Everything is initialized and ready to be used.
/// Beware, the hooks should run in the same process as the one this function is called.
/// This is because we spawn a tokio runtime underneath.
/// Check `<https://github.com/tokio-rs/tokio/issues/4301>` for more details.
pub fn build<A, I>(
///
/// # Safety
/// The returned [`TcpMultiMachineLlmpReceiverHook`] assumes that the `msg` parameter
/// passed to the `on_new_message` method (or rather, the memory it points to),
/// lives sufficiently long for an async background task to process it.
pub unsafe fn build<A, I>(
node_descriptor: NodeDescriptor<A>,
) -> Result<
(
TcpMultiMachineLlmpSenderHook<A, I>,
TcpMultiMachineLlmpReceiverHook<A, I>,
),
Error,
>
) -> Result<TcpMultiMachineHooks<A, I>, Error>
where
A: Clone + Display + ToSocketAddrs + Send + Sync + 'static,
I: Input + Send + Sync + 'static,
Expand All @@ -197,10 +205,10 @@ impl TcpMultiMachineBuilder {
TcpMultiMachineState::init::<I>(&state.clone(), &rt.clone())?;
}

Ok((
TcpMultiMachineLlmpSenderHook::new(state.clone(), rt.clone()),
TcpMultiMachineLlmpReceiverHook::new(state, rt),
))
Ok(TcpMultiMachineBuilder {
sender: TcpMultiMachineLlmpSenderHook::new(state.clone(), rt.clone()),
receiver: TcpMultiMachineLlmpReceiverHook::new(state, rt),
})
}
}

Expand Down
Loading
Loading