Skip to content

Commit

Permalink
Add Llmp Compression Feature (AFLplusplus#60), closes AFLplusplus#46
Browse files Browse the repository at this point in the history
* add compression

* modify event/llmp.rs

* rename to LLMP_TAG_COMPRESS

* remove compression code from bolts/llmp.rs

* add compress.rs

* handle compress & decompress in GzipCompress struct, compress if the size is large enough

* add code for benchmark

* remove LLMP_TAG_COMPRESS, use a flag instead

* cargo fmt

* rm test.sh

* passes the test

* comment benchmarks code out

* add recv_buf_with_flag()

* add the llmp_compress feature

* add send_buf, do not compile compression code if it's not used

* fix warning

* merged dev

* add error handling code

* doc for compress.rs

* remove tag from decompress

* rename every flag to flags

* fix some clippy.sh errors

* simplify recv_buf

* delete benchmark printf code

* cargo fmt

* fix doc

Co-authored-by: Dominik Maier <domenukk@gmail.com>
  • Loading branch information
tokatoka and domenukk authored Apr 29, 2021
1 parent 02fcc33 commit 3e730cd
Show file tree
Hide file tree
Showing 8 changed files with 207 additions and 14 deletions.
5 changes: 5 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,11 @@ LibAFL offers integrations with popular instrumemntation frameworks. At the mome
+ Frida, in [libafl_frida](./libafl_frida), by s1341 <github@shmarya.net> (Windows support is broken atm, it relies on [this upstream issue](https://github.com/meme/frida-rust/issues/9) to be fixed.)
+ More to come (QEMU-mode, ...)

LibAFL offers integrations with popular instrumemntation frameworks too. At the moment, the supported backends are:

+ SanitizerCoverage, in [libafl_targets](./libafl_targets)
+ Frida, in [libafl_frida](./libafl_frida), by s1341 <github@shmarya.net> (Windows support will be added soon)

## Getting started

Clone the LibAFL repository with
Expand Down
2 changes: 2 additions & 0 deletions libafl/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ anymap_debug = ["serde_json"] # uses serde_json to Debug the anymap trait. Disab
derive = ["libafl_derive"] # provide derive(SerdeAny) macro.
llmp_small_maps = [] # reduces initial map size for llmp
llmp_debug = ["backtrace"] # Enables debug output for LLMP
llmp_compress = [] #llmp compression using GZip

[[example]]
name = "llmp_test"
Expand All @@ -58,6 +59,7 @@ static_assertions = "1.1.0"
ctor = "*"
libafl_derive = { version = "*", optional = true, path = "../libafl_derive" }
serde_json = { version = "1.0", optional = true, default-features = false, features = ["alloc"] } # an easy way to debug print SerdeAnyMap
compression = { version = "0.1.5" }
num_enum = "0.5.1"
spin = "0.9.0"

Expand Down
1 change: 1 addition & 0 deletions libafl/examples/llmp_test/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ fn large_msg_loop(port: u16) -> ! {
fn broker_message_hook(
client_id: u32,
tag: llmp::Tag,
_flags: llmp::Flag,
message: &[u8],
) -> Result<llmp::LlmpMsgHookResult, Error> {
match tag {
Expand Down
56 changes: 56 additions & 0 deletions libafl/src/bolts/compress.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
//! Compression of events passed between a broker and clients.
//! Currently we use the gzip compression algorithm for its fast decompression performance.
#[cfg(feature = "llmp_compress")]
use crate::{
bolts::llmp::{Flag, LLMP_FLAG_COMPRESSED},
Error,
};
use alloc::vec::Vec;
use compression::prelude::*;
use core::fmt::Debug;

#[derive(Debug)]
pub struct GzipCompressor {
threshold: usize,
}

impl GzipCompressor {
/// If the buffer is larger than the threshold value, we compress the buffer.
pub fn new(threshold: usize) -> Self {
GzipCompressor { threshold }
}
}

impl GzipCompressor {
/// Compression.
/// The buffer is compressed with the gzip algo
pub fn compress(&self, buf: &[u8]) -> Result<Option<Vec<u8>>, Error> {
if buf.len() > self.threshold {
//compress if the buffer is large enough
let compressed = buf
.iter()
.cloned()
.encode(&mut GZipEncoder::new(), Action::Finish)
.collect::<Result<Vec<_>, _>>()?;
Ok(Some(compressed))
} else {
Ok(None)
}
}

/// Decompression.
/// Flag is used to indicate if it's compressed or not
pub fn decompress(&self, flags: Flag, buf: &[u8]) -> Result<Option<Vec<u8>>, Error> {
if flags & LLMP_FLAG_COMPRESSED == LLMP_FLAG_COMPRESSED {
let decompressed: Vec<u8> = buf
.iter()
.cloned()
.decode(&mut GZipDecoder::new())
.collect::<Result<Vec<_>, _>>()?;
Ok(Some(decompressed))
} else {
Ok(None)
}
}
}
81 changes: 71 additions & 10 deletions libafl/src/bolts/llmp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,9 @@ const LLMP_TAG_NEW_SHM_CLIENT: Tag = 0xC11E471;
/// The sender on this map is exiting (if broker exits, clients should exit gracefully);
const LLMP_TAG_EXITING: Tag = 0x13C5171;

pub const LLMP_FLAG_INITIALIZED: Flag = 0x0;
pub const LLMP_FLAG_COMPRESSED: Flag = 0x1;

/// An env var of this value indicates that the set value was a NULL PTR
const _NULL_ENV_STR: &str = "_NULL";

Expand All @@ -124,6 +127,7 @@ static mut GLOBAL_SIGHANDLER_STATE: LlmpBrokerSignalHandler = LlmpBrokerSignalHa

/// TAGs used thorughout llmp
pub type Tag = u32;
pub type Flag = u64;

/// This is for the server the broker will spawn.
/// If an llmp connection is local - use sharedmaps
Expand Down Expand Up @@ -323,6 +327,8 @@ pub struct LlmpMsg {
pub tag: Tag,
/// Sender of this messge
pub sender: u32,
/// flags, currently only used for indicating compression
pub flags: Flag,
/// The message ID, unique per page
pub message_id: u64,
/// Buffer length as specified by the user
Expand Down Expand Up @@ -442,6 +448,13 @@ where
LlmpConnection::IsClient { client } => client.send_buf(tag, buf),
}
}

pub fn send_buf_with_flags(&mut self, tag: Tag, buf: &[u8], flags: Flag) -> Result<(), Error> {
match self {
LlmpConnection::IsBroker { broker } => broker.send_buf_with_flags(tag, flags, buf),
LlmpConnection::IsClient { client } => client.send_buf_with_flags(tag, flags, buf),
}
}
}

/// Contents of the share mem pages, used by llmp internally
Expand Down Expand Up @@ -898,6 +911,30 @@ where
unsafe {
let msg = self.alloc_next(buf.len())?;
(*msg).tag = tag;
(*msg).flags = LLMP_FLAG_INITIALIZED;
buf.as_ptr()
.copy_to_nonoverlapping((*msg).buf.as_mut_ptr(), buf.len());
self.send(msg)
}
}

pub fn send_buf_with_flags(&mut self, tag: Tag, flags: Flag, buf: &[u8]) -> Result<(), Error> {
// Make sure we don't reuse already allocated tags
if tag == LLMP_TAG_NEW_SHM_CLIENT
|| tag == LLMP_TAG_END_OF_PAGE
|| tag == LLMP_TAG_UNINITIALIZED
|| tag == LLMP_TAG_UNSET
{
return Err(Error::Unknown(format!(
"Reserved tag supplied to send_buf ({:#X})",
tag
)));
}

unsafe {
let msg = self.alloc_next(buf.len())?;
(*msg).tag = tag;
(*msg).flags = flags;
buf.as_ptr()
.copy_to_nonoverlapping((*msg).buf.as_mut_ptr(), buf.len());
self.send(msg)
Expand Down Expand Up @@ -1114,12 +1151,22 @@ where
/// Returns the next message, tag, buf, if avaliable, else None
#[allow(clippy::type_complexity)]
#[inline]
pub fn recv_buf(&mut self) -> Result<Option<(u32, u32, &[u8])>, Error> {
pub fn recv_buf(&mut self) -> Result<Option<(u32, Tag, &[u8])>, Error> {
if let Some((sender, tag, _flags, buf)) = self.recv_buf_with_flags()? {
Ok(Some((sender, tag, buf)))
} else {
Ok(None)
}
}

#[inline]
pub fn recv_buf_with_flags(&mut self) -> Result<Option<(u32, Tag, Flag, &[u8])>, Error> {
unsafe {
Ok(match self.recv()? {
Some(msg) => Some((
(*msg).sender,
(*msg).tag,
(*msg).flags,
(*msg).as_slice(&mut self.current_recv_map)?,
)),
None => None,
Expand All @@ -1129,7 +1176,7 @@ where

/// Returns the next sender, tag, buf, looping until it becomes available
#[inline]
pub fn recv_buf_blocking(&mut self) -> Result<(u32, u32, &[u8]), Error> {
pub fn recv_buf_blocking(&mut self) -> Result<(u32, Tag, &[u8]), Error> {
unsafe {
let msg = self.recv_blocking()?;
Ok((
Expand Down Expand Up @@ -1424,7 +1471,7 @@ where
#[inline]
pub fn once<F>(&mut self, on_new_msg: &mut F) -> Result<(), Error>
where
F: FnMut(u32, Tag, &[u8]) -> Result<LlmpMsgHookResult, Error>,
F: FnMut(u32, Tag, Flag, &[u8]) -> Result<LlmpMsgHookResult, Error>,
{
compiler_fence(Ordering::SeqCst);
for i in 0..self.llmp_clients.len() {
Expand Down Expand Up @@ -1455,7 +1502,7 @@ where
/// 5 millis of sleep can't hurt to keep busywait not at 100%
pub fn loop_forever<F>(&mut self, on_new_msg: &mut F, sleep_time: Option<Duration>)
where
F: FnMut(u32, Tag, &[u8]) -> Result<LlmpMsgHookResult, Error>,
F: FnMut(u32, Tag, Flag, &[u8]) -> Result<LlmpMsgHookResult, Error>,
{
#[cfg(unix)]
if let Err(_e) = unsafe { setup_signal_handler(&mut GLOBAL_SIGHANDLER_STATE) } {
Expand Down Expand Up @@ -1492,6 +1539,10 @@ where
self.llmp_out.send_buf(tag, buf)
}

pub fn send_buf_with_flags(&mut self, tag: Tag, flags: Flag, buf: &[u8]) -> Result<(), Error> {
self.llmp_out.send_buf_with_flags(tag, flags, buf)
}

#[cfg(feature = "std")]
/// Launches a thread using a tcp listener socket, on which new clients may connect to this broker
/// Does so on the given port.
Expand Down Expand Up @@ -1595,7 +1646,7 @@ where
#[inline]
unsafe fn handle_new_msgs<F>(&mut self, client_id: u32, on_new_msg: &mut F) -> Result<(), Error>
where
F: FnMut(u32, Tag, &[u8]) -> Result<LlmpMsgHookResult, Error>,
F: FnMut(u32, Tag, Flag, &[u8]) -> Result<LlmpMsgHookResult, Error>,
{
let mut next_id = self.llmp_clients.len() as u32;

Expand Down Expand Up @@ -1662,7 +1713,9 @@ where

let map = &mut self.llmp_clients[client_id as usize].current_recv_map;
let msg_buf = (*msg).as_slice(map)?;
if let LlmpMsgHookResult::Handled = (on_new_msg)(client_id, (*msg).tag, msg_buf)? {
if let LlmpMsgHookResult::Handled =
(on_new_msg)(client_id, (*msg).tag, (*msg).flags, msg_buf)?
{
should_forward_msg = false
};
if should_forward_msg {
Expand Down Expand Up @@ -1827,6 +1880,10 @@ where
self.sender.send_buf(tag, buf)
}

pub fn send_buf_with_flags(&mut self, tag: Tag, flags: Flag, buf: &[u8]) -> Result<(), Error> {
self.sender.send_buf_with_flags(tag, flags, buf)
}

/// Informs the broker about a new client in town, with the given map id
pub fn send_client_added_msg(
&mut self,
Expand Down Expand Up @@ -1876,16 +1933,20 @@ where
/// Returns the next message, tag, buf, if avaliable, else None
#[allow(clippy::type_complexity)]
#[inline]
pub fn recv_buf(&mut self) -> Result<Option<(u32, u32, &[u8])>, Error> {
pub fn recv_buf(&mut self) -> Result<Option<(u32, Tag, &[u8])>, Error> {
self.receiver.recv_buf()
}

/// Receives a buf from the broker, looping until a messages becomes avaliable
#[inline]
pub fn recv_buf_blocking(&mut self) -> Result<(u32, u32, &[u8]), Error> {
pub fn recv_buf_blocking(&mut self) -> Result<(u32, Tag, &[u8]), Error> {
self.receiver.recv_buf_blocking()
}

pub fn recv_buf_with_flags(&mut self) -> Result<Option<(u32, Tag, Flag, &[u8])>, Error> {
self.receiver.recv_buf_with_flags()
}

#[cfg(feature = "std")]
/// Creates a new LlmpClient, reading the map id and len from env
pub fn create_using_env(mut shmem_provider: SP, env_var: &str) -> Result<Self, Error> {
Expand Down Expand Up @@ -1952,7 +2013,7 @@ mod tests {
// Give the (background) tcp thread a few millis to post the message
sleep(Duration::from_millis(100));
broker
.once(&mut |_sender_id, _tag, _msg| Ok(ForwardToClients))
.once(&mut |_sender_id, _tag, _flags, _msg| Ok(ForwardToClients))
.unwrap();

let tag: Tag = 0x1337;
Expand All @@ -1975,7 +2036,7 @@ mod tests {

// Forward stuff to clients
broker
.once(&mut |_sender_id, _tag, _msg| Ok(ForwardToClients))
.once(&mut |_sender_id, _tag, _flags, _msg| Ok(ForwardToClients))
.unwrap();
let (_sender_id, tag2, arr2) = client.recv_buf_blocking().unwrap();
assert_eq!(tag, tag2);
Expand Down
4 changes: 4 additions & 0 deletions libafl/src/bolts/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
//! Bolts are no conceptual fuzzing elements, but they keep libafl-based fuzzers together.
pub mod bindings;

#[cfg(feature = "llmp_compress")]
pub mod compress;

pub mod llmp;
pub mod os;
pub mod ownedref;
Expand Down
Loading

0 comments on commit 3e730cd

Please sign in to comment.