From 3e730cd1f720dc72e0ced0f2cc53342811d5cc35 Mon Sep 17 00:00:00 2001 From: Toka Date: Thu, 29 Apr 2021 19:36:12 +0900 Subject: [PATCH] Add Llmp Compression Feature (#60), closes #46 * add compression * modify event/llmp.rs * rename to LLMP_TAG_COMPRESS * remove compression code from bolts/llmp.rs * add compress.rs * handle compress & decompress in GzipCompress struct, compress if the size is large enough * add code for benchmark * remove LLMP_TAG_COMPRESS, use a flag instead * cargo fmt * rm test.sh * passes the test * comment benchmarks code out * add recv_buf_with_flag() * add the llmp_compress feature * add send_buf, do not compile compression code if it's not used * fix warning * merged dev * add error handling code * doc for compress.rs * remove tag from decompress * rename every flag to flags * fix some clippy.sh errors * simplify recv_buf * delete benchmark printf code * cargo fmt * fix doc Co-authored-by: Dominik Maier --- README.md | 5 ++ libafl/Cargo.toml | 2 + libafl/examples/llmp_test/main.rs | 1 + libafl/src/bolts/compress.rs | 56 +++++++++++++++++++++ libafl/src/bolts/llmp.rs | 81 +++++++++++++++++++++++++++---- libafl/src/bolts/mod.rs | 4 ++ libafl/src/events/llmp.rs | 60 +++++++++++++++++++++-- libafl/src/lib.rs | 12 +++++ 8 files changed, 207 insertions(+), 14 deletions(-) create mode 100644 libafl/src/bolts/compress.rs diff --git a/README.md b/README.md index 1735986603f..1c9e7a0a79e 100644 --- a/README.md +++ b/README.md @@ -32,6 +32,11 @@ LibAFL offers integrations with popular instrumemntation frameworks. At the mome + Frida, in [libafl_frida](./libafl_frida), by s1341 (Windows support is broken atm, it relies on [this upstream issue](https://github.com/meme/frida-rust/issues/9) to be fixed.) + More to come (QEMU-mode, ...) +LibAFL offers integrations with popular instrumemntation frameworks too. At the moment, the supported backends are: + ++ SanitizerCoverage, in [libafl_targets](./libafl_targets) ++ Frida, in [libafl_frida](./libafl_frida), by s1341 (Windows support will be added soon) + ## Getting started Clone the LibAFL repository with diff --git a/libafl/Cargo.toml b/libafl/Cargo.toml index b6aeebda977..321e50ecd36 100644 --- a/libafl/Cargo.toml +++ b/libafl/Cargo.toml @@ -40,6 +40,7 @@ anymap_debug = ["serde_json"] # uses serde_json to Debug the anymap trait. Disab derive = ["libafl_derive"] # provide derive(SerdeAny) macro. llmp_small_maps = [] # reduces initial map size for llmp llmp_debug = ["backtrace"] # Enables debug output for LLMP +llmp_compress = [] #llmp compression using GZip [[example]] name = "llmp_test" @@ -58,6 +59,7 @@ static_assertions = "1.1.0" ctor = "*" libafl_derive = { version = "*", optional = true, path = "../libafl_derive" } serde_json = { version = "1.0", optional = true, default-features = false, features = ["alloc"] } # an easy way to debug print SerdeAnyMap +compression = { version = "0.1.5" } num_enum = "0.5.1" spin = "0.9.0" diff --git a/libafl/examples/llmp_test/main.rs b/libafl/examples/llmp_test/main.rs index 5356514eeca..9b6cde1ecbd 100644 --- a/libafl/examples/llmp_test/main.rs +++ b/libafl/examples/llmp_test/main.rs @@ -83,6 +83,7 @@ fn large_msg_loop(port: u16) -> ! { fn broker_message_hook( client_id: u32, tag: llmp::Tag, + _flags: llmp::Flag, message: &[u8], ) -> Result { match tag { diff --git a/libafl/src/bolts/compress.rs b/libafl/src/bolts/compress.rs new file mode 100644 index 00000000000..e2cdb689f8f --- /dev/null +++ b/libafl/src/bolts/compress.rs @@ -0,0 +1,56 @@ +//! Compression of events passed between a broker and clients. +//! Currently we use the gzip compression algorithm for its fast decompression performance. + +#[cfg(feature = "llmp_compress")] +use crate::{ + bolts::llmp::{Flag, LLMP_FLAG_COMPRESSED}, + Error, +}; +use alloc::vec::Vec; +use compression::prelude::*; +use core::fmt::Debug; + +#[derive(Debug)] +pub struct GzipCompressor { + threshold: usize, +} + +impl GzipCompressor { + /// If the buffer is larger than the threshold value, we compress the buffer. + pub fn new(threshold: usize) -> Self { + GzipCompressor { threshold } + } +} + +impl GzipCompressor { + /// Compression. + /// The buffer is compressed with the gzip algo + pub fn compress(&self, buf: &[u8]) -> Result>, Error> { + if buf.len() > self.threshold { + //compress if the buffer is large enough + let compressed = buf + .iter() + .cloned() + .encode(&mut GZipEncoder::new(), Action::Finish) + .collect::, _>>()?; + Ok(Some(compressed)) + } else { + Ok(None) + } + } + + /// Decompression. + /// Flag is used to indicate if it's compressed or not + pub fn decompress(&self, flags: Flag, buf: &[u8]) -> Result>, Error> { + if flags & LLMP_FLAG_COMPRESSED == LLMP_FLAG_COMPRESSED { + let decompressed: Vec = buf + .iter() + .cloned() + .decode(&mut GZipDecoder::new()) + .collect::, _>>()?; + Ok(Some(decompressed)) + } else { + Ok(None) + } + } +} diff --git a/libafl/src/bolts/llmp.rs b/libafl/src/bolts/llmp.rs index 3c550ee04f3..565f6c769c4 100644 --- a/libafl/src/bolts/llmp.rs +++ b/libafl/src/bolts/llmp.rs @@ -104,6 +104,9 @@ const LLMP_TAG_NEW_SHM_CLIENT: Tag = 0xC11E471; /// The sender on this map is exiting (if broker exits, clients should exit gracefully); const LLMP_TAG_EXITING: Tag = 0x13C5171; +pub const LLMP_FLAG_INITIALIZED: Flag = 0x0; +pub const LLMP_FLAG_COMPRESSED: Flag = 0x1; + /// An env var of this value indicates that the set value was a NULL PTR const _NULL_ENV_STR: &str = "_NULL"; @@ -124,6 +127,7 @@ static mut GLOBAL_SIGHANDLER_STATE: LlmpBrokerSignalHandler = LlmpBrokerSignalHa /// TAGs used thorughout llmp pub type Tag = u32; +pub type Flag = u64; /// This is for the server the broker will spawn. /// If an llmp connection is local - use sharedmaps @@ -323,6 +327,8 @@ pub struct LlmpMsg { pub tag: Tag, /// Sender of this messge pub sender: u32, + /// flags, currently only used for indicating compression + pub flags: Flag, /// The message ID, unique per page pub message_id: u64, /// Buffer length as specified by the user @@ -442,6 +448,13 @@ where LlmpConnection::IsClient { client } => client.send_buf(tag, buf), } } + + pub fn send_buf_with_flags(&mut self, tag: Tag, buf: &[u8], flags: Flag) -> Result<(), Error> { + match self { + LlmpConnection::IsBroker { broker } => broker.send_buf_with_flags(tag, flags, buf), + LlmpConnection::IsClient { client } => client.send_buf_with_flags(tag, flags, buf), + } + } } /// Contents of the share mem pages, used by llmp internally @@ -898,6 +911,30 @@ where unsafe { let msg = self.alloc_next(buf.len())?; (*msg).tag = tag; + (*msg).flags = LLMP_FLAG_INITIALIZED; + buf.as_ptr() + .copy_to_nonoverlapping((*msg).buf.as_mut_ptr(), buf.len()); + self.send(msg) + } + } + + pub fn send_buf_with_flags(&mut self, tag: Tag, flags: Flag, buf: &[u8]) -> Result<(), Error> { + // Make sure we don't reuse already allocated tags + if tag == LLMP_TAG_NEW_SHM_CLIENT + || tag == LLMP_TAG_END_OF_PAGE + || tag == LLMP_TAG_UNINITIALIZED + || tag == LLMP_TAG_UNSET + { + return Err(Error::Unknown(format!( + "Reserved tag supplied to send_buf ({:#X})", + tag + ))); + } + + unsafe { + let msg = self.alloc_next(buf.len())?; + (*msg).tag = tag; + (*msg).flags = flags; buf.as_ptr() .copy_to_nonoverlapping((*msg).buf.as_mut_ptr(), buf.len()); self.send(msg) @@ -1114,12 +1151,22 @@ where /// Returns the next message, tag, buf, if avaliable, else None #[allow(clippy::type_complexity)] #[inline] - pub fn recv_buf(&mut self) -> Result, Error> { + pub fn recv_buf(&mut self) -> Result, Error> { + if let Some((sender, tag, _flags, buf)) = self.recv_buf_with_flags()? { + Ok(Some((sender, tag, buf))) + } else { + Ok(None) + } + } + + #[inline] + pub fn recv_buf_with_flags(&mut self) -> Result, Error> { unsafe { Ok(match self.recv()? { Some(msg) => Some(( (*msg).sender, (*msg).tag, + (*msg).flags, (*msg).as_slice(&mut self.current_recv_map)?, )), None => None, @@ -1129,7 +1176,7 @@ where /// Returns the next sender, tag, buf, looping until it becomes available #[inline] - pub fn recv_buf_blocking(&mut self) -> Result<(u32, u32, &[u8]), Error> { + pub fn recv_buf_blocking(&mut self) -> Result<(u32, Tag, &[u8]), Error> { unsafe { let msg = self.recv_blocking()?; Ok(( @@ -1424,7 +1471,7 @@ where #[inline] pub fn once(&mut self, on_new_msg: &mut F) -> Result<(), Error> where - F: FnMut(u32, Tag, &[u8]) -> Result, + F: FnMut(u32, Tag, Flag, &[u8]) -> Result, { compiler_fence(Ordering::SeqCst); for i in 0..self.llmp_clients.len() { @@ -1455,7 +1502,7 @@ where /// 5 millis of sleep can't hurt to keep busywait not at 100% pub fn loop_forever(&mut self, on_new_msg: &mut F, sleep_time: Option) where - F: FnMut(u32, Tag, &[u8]) -> Result, + F: FnMut(u32, Tag, Flag, &[u8]) -> Result, { #[cfg(unix)] if let Err(_e) = unsafe { setup_signal_handler(&mut GLOBAL_SIGHANDLER_STATE) } { @@ -1492,6 +1539,10 @@ where self.llmp_out.send_buf(tag, buf) } + pub fn send_buf_with_flags(&mut self, tag: Tag, flags: Flag, buf: &[u8]) -> Result<(), Error> { + self.llmp_out.send_buf_with_flags(tag, flags, buf) + } + #[cfg(feature = "std")] /// Launches a thread using a tcp listener socket, on which new clients may connect to this broker /// Does so on the given port. @@ -1595,7 +1646,7 @@ where #[inline] unsafe fn handle_new_msgs(&mut self, client_id: u32, on_new_msg: &mut F) -> Result<(), Error> where - F: FnMut(u32, Tag, &[u8]) -> Result, + F: FnMut(u32, Tag, Flag, &[u8]) -> Result, { let mut next_id = self.llmp_clients.len() as u32; @@ -1662,7 +1713,9 @@ where let map = &mut self.llmp_clients[client_id as usize].current_recv_map; let msg_buf = (*msg).as_slice(map)?; - if let LlmpMsgHookResult::Handled = (on_new_msg)(client_id, (*msg).tag, msg_buf)? { + if let LlmpMsgHookResult::Handled = + (on_new_msg)(client_id, (*msg).tag, (*msg).flags, msg_buf)? + { should_forward_msg = false }; if should_forward_msg { @@ -1827,6 +1880,10 @@ where self.sender.send_buf(tag, buf) } + pub fn send_buf_with_flags(&mut self, tag: Tag, flags: Flag, buf: &[u8]) -> Result<(), Error> { + self.sender.send_buf_with_flags(tag, flags, buf) + } + /// Informs the broker about a new client in town, with the given map id pub fn send_client_added_msg( &mut self, @@ -1876,16 +1933,20 @@ where /// Returns the next message, tag, buf, if avaliable, else None #[allow(clippy::type_complexity)] #[inline] - pub fn recv_buf(&mut self) -> Result, Error> { + pub fn recv_buf(&mut self) -> Result, Error> { self.receiver.recv_buf() } /// Receives a buf from the broker, looping until a messages becomes avaliable #[inline] - pub fn recv_buf_blocking(&mut self) -> Result<(u32, u32, &[u8]), Error> { + pub fn recv_buf_blocking(&mut self) -> Result<(u32, Tag, &[u8]), Error> { self.receiver.recv_buf_blocking() } + pub fn recv_buf_with_flags(&mut self) -> Result, Error> { + self.receiver.recv_buf_with_flags() + } + #[cfg(feature = "std")] /// Creates a new LlmpClient, reading the map id and len from env pub fn create_using_env(mut shmem_provider: SP, env_var: &str) -> Result { @@ -1952,7 +2013,7 @@ mod tests { // Give the (background) tcp thread a few millis to post the message sleep(Duration::from_millis(100)); broker - .once(&mut |_sender_id, _tag, _msg| Ok(ForwardToClients)) + .once(&mut |_sender_id, _tag, _flags, _msg| Ok(ForwardToClients)) .unwrap(); let tag: Tag = 0x1337; @@ -1975,7 +2036,7 @@ mod tests { // Forward stuff to clients broker - .once(&mut |_sender_id, _tag, _msg| Ok(ForwardToClients)) + .once(&mut |_sender_id, _tag, _flags, _msg| Ok(ForwardToClients)) .unwrap(); let (_sender_id, tag2, arr2) = client.recv_buf_blocking().unwrap(); assert_eq!(tag, tag2); diff --git a/libafl/src/bolts/mod.rs b/libafl/src/bolts/mod.rs index 86c761710bb..87db34aa7c1 100644 --- a/libafl/src/bolts/mod.rs +++ b/libafl/src/bolts/mod.rs @@ -1,6 +1,10 @@ //! Bolts are no conceptual fuzzing elements, but they keep libafl-based fuzzers together. pub mod bindings; + +#[cfg(feature = "llmp_compress")] +pub mod compress; + pub mod llmp; pub mod os; pub mod ownedref; diff --git a/libafl/src/events/llmp.rs b/libafl/src/events/llmp.rs index 934001f0aa2..62caf9d0de0 100644 --- a/libafl/src/events/llmp.rs +++ b/libafl/src/events/llmp.rs @@ -15,7 +15,7 @@ use crate::bolts::{ use crate::{ bolts::{ - llmp::{self, LlmpClientDescription, LlmpSender, Tag}, + llmp::{self, Flag, LlmpClientDescription, LlmpSender, Tag}, shmem::ShMemProvider, }, corpus::CorpusScheduler, @@ -29,6 +29,12 @@ use crate::{ Error, }; +#[cfg(feature = "llmp_compress")] +use crate::bolts::{ + compress::GzipCompressor, + llmp::{LLMP_FLAG_COMPRESSED, LLMP_FLAG_INITIALIZED}, +}; + #[cfg(all(feature = "std", windows))] use crate::utils::startable_self; @@ -45,7 +51,6 @@ const _LLMP_TAG_EVENT_TO_BROKER: llmp::Tag = 0x2B80438; /// Handle in both /// const LLMP_TAG_EVENT_TO_BOTH: llmp::Tag = 0x2B0741; - const _LLMP_TAG_RESTART: llmp::Tag = 0x8357A87; const _LLMP_TAG_NO_RESTART: llmp::Tag = 0x57A7EE71; @@ -60,9 +65,15 @@ where { stats: Option, llmp: llmp::LlmpConnection, + #[cfg(feature = "llmp_compress")] + compressor: GzipCompressor, + phantom: PhantomData<(I, S)>, } +#[cfg(feature = "llmp_compress")] +const COMPRESS_THRESHOLD: usize = 1024; + impl Drop for LlmpEventManager where I: Input, @@ -91,6 +102,8 @@ where Ok(Self { stats: Some(stats), llmp: llmp::LlmpConnection::on_port(shmem_provider, port)?, + #[cfg(feature = "llmp_compress")] + compressor: GzipCompressor::new(COMPRESS_THRESHOLD), phantom: PhantomData, }) } @@ -103,6 +116,8 @@ where llmp: llmp::LlmpConnection::IsClient { client: LlmpClient::on_existing_from_env(shmem_provider, env_name)?, }, + #[cfg(feature = "llmp_compress")] + compressor: GzipCompressor::new(COMPRESS_THRESHOLD), // Inserting a nop-stats element here so rust won't complain. // In any case, the client won't currently use it. phantom: PhantomData, @@ -125,6 +140,8 @@ where shmem_provider, description, )?, + #[cfg(feature = "llmp_compress")] + compressor: GzipCompressor::new(COMPRESS_THRESHOLD), // Inserting a nop-stats element here so rust won't complain. // In any case, the client won't currently use it. phantom: PhantomData, @@ -152,9 +169,17 @@ where match &mut self.llmp { llmp::LlmpConnection::IsBroker { broker } => { let stats = self.stats.as_mut().unwrap(); + #[cfg(feature = "llmp_compress")] + let compressor = &self.compressor; broker.loop_forever( - &mut |sender_id: u32, tag: Tag, msg: &[u8]| { + &mut |sender_id: u32, tag: Tag, _flags: Flag, msg: &[u8]| { if tag == LLMP_TAG_EVENT_TO_BOTH { + #[cfg(feature = "llmp_compress")] + let event: Event = match compressor.decompress(_flags, msg)? { + Some(decompressed) => postcard::from_bytes(&decompressed)?, + None => postcard::from_bytes(msg)?, + }; + #[cfg(not(feature = "llmp_compress"))] let event: Event = postcard::from_bytes(msg)?; match Self::handle_in_broker(stats, sender_id, &event)? { BrokerEventResult::Forward => { @@ -310,10 +335,16 @@ where let mut events = vec![]; match &mut self.llmp { llmp::LlmpConnection::IsClient { client } => { - while let Some((sender_id, tag, msg)) = client.recv_buf()? { + while let Some((sender_id, tag, _flags, msg)) = client.recv_buf_with_flags()? { if tag == _LLMP_TAG_EVENT_TO_BROKER { panic!("EVENT_TO_BROKER parcel should not have arrived in the client!"); } + #[cfg(feature = "llmp_compress")] + let event: Event = match self.compressor.decompress(_flags, msg)? { + Some(decompressed) => postcard::from_bytes(&decompressed)?, + None => postcard::from_bytes(msg)?, + }; + #[cfg(not(feature = "llmp_compress"))] let event: Event = postcard::from_bytes(msg)?; events.push((sender_id, event)); } @@ -330,6 +361,27 @@ where Ok(count) } + #[cfg(feature = "llmp_compress")] + fn fire(&mut self, _state: &mut S, event: Event) -> Result<(), Error> { + let serialized = postcard::to_allocvec(&event)?; + let flags: Flag = LLMP_FLAG_INITIALIZED; + + match self.compressor.compress(&serialized)? { + Some(comp_buf) => { + self.llmp.send_buf_with_flags( + LLMP_TAG_EVENT_TO_BOTH, + &comp_buf, + flags | LLMP_FLAG_COMPRESSED, + )?; + } + None => { + self.llmp.send_buf(LLMP_TAG_EVENT_TO_BOTH, &serialized)?; + } + } + Ok(()) + } + + #[cfg(not(feature = "llmp_compress"))] fn fire(&mut self, _state: &mut S, event: Event) -> Result<(), Error> { let serialized = postcard::to_allocvec(&event)?; self.llmp.send_buf(LLMP_TAG_EVENT_TO_BOTH, &serialized)?; diff --git a/libafl/src/lib.rs b/libafl/src/lib.rs index 6ea3ad222de..b08b258916b 100644 --- a/libafl/src/lib.rs +++ b/libafl/src/lib.rs @@ -50,6 +50,9 @@ use std::{env::VarError, io, num::ParseIntError, string::FromUtf8Error}; pub enum Error { /// Serialization error Serialize(String), + /// Compression error + #[cfg(feature = "llmp_compress")] + Compression(String), /// File related error #[cfg(feature = "std")] File(io::Error), @@ -77,6 +80,8 @@ impl fmt::Display for Error { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { match self { Self::Serialize(s) => write!(f, "Error in Serialization: `{0}`", &s), + #[cfg(feature = "llmp_compress")] + Self::Compression(s) => write!(f, "Error in Compression: `{0}`", &s), #[cfg(feature = "std")] Self::File(err) => write!(f, "File IO failed: {:?}", &err), Self::EmptyOptional(s) => write!(f, "Optional value `{0}` was not set", &s), @@ -101,6 +106,13 @@ impl From for Error { } } +#[cfg(feature = "llmp_compress")] +impl From for Error { + fn from(err: compression::prelude::CompressionError) -> Self { + Self::Compression(format!("{:?}", err)) + } +} + /// Stringify the json serializer error #[cfg(feature = "std")] impl From for Error {