From e4c1d1832803ef3f1fcc5d4131d97c131316965a Mon Sep 17 00:00:00 2001 From: Matisse Callewaert Date: Tue, 26 Mar 2024 10:16:05 +0100 Subject: [PATCH 01/11] :bento: Update ipv6 representation in packet --- common/src/lib.rs | 5 ++--- ebpf-ipv6/src/main.rs | 7 +++++-- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/common/src/lib.rs b/common/src/lib.rs index e58d254..f261296 100644 --- a/common/src/lib.rs +++ b/common/src/lib.rs @@ -1,5 +1,4 @@ #![no_std] -use network_types::ip::in6_addr; /// BasicFeaturesIpv4 is a struct collection all ipv4 traffic data and is 280 bits in size. #[repr(C)] #[derive(Clone, Copy)] @@ -30,8 +29,8 @@ unsafe impl aya::Pod for BasicFeaturesIpv4 {} #[repr(C)] #[derive(Clone, Copy)] pub struct BasicFeaturesIpv6 { - pub ipv6_destination: in6_addr, - pub ipv6_source: in6_addr, + pub ipv6_destination: u128, + pub ipv6_source: u128, pub port_destination: u16, pub port_source: u16, pub protocol: u8, diff --git a/ebpf-ipv6/src/main.rs b/ebpf-ipv6/src/main.rs index f5de23a..1718f9c 100644 --- a/ebpf-ipv6/src/main.rs +++ b/ebpf-ipv6/src/main.rs @@ -57,8 +57,11 @@ fn try_tc_flow_track(ctx: TcContext) -> Result { } let ipv6hdr: *const Ipv6Hdr = unsafe { ptr_at(&ctx, EthHdr::LEN)? }; - let ipv6_source = unsafe { (*ipv6hdr).src_addr }; - let ipv6_destination = unsafe { (*ipv6hdr).dst_addr }; + let bytes_ipv6_source = unsafe { (*ipv6hdr).src_addr.in6_u.u6_addr8 }; + let bytes_ipv6_destination = unsafe { (*ipv6hdr).dst_addr.in6_u.u6_addr8 }; + + let ipv6_source = u128::from_be_bytes(bytes_ipv6_source); + let ipv6_destination = u128::from_be_bytes(bytes_ipv6_destination); let source_port: u16; let destination_port: u16; From 3c1f4f4f3229144e5a9e30bde173bafe6b59b0bf Mon Sep 17 00:00:00 2001 From: Matisse Callewaert Date: Tue, 26 Mar 2024 10:17:13 +0100 Subject: [PATCH 02/11] :sparkles: Add pcap reader that extracts flows --- feature-extraction-tool/Cargo.toml | 2 + feature-extraction-tool/src/args.rs | 21 ++ feature-extraction-tool/src/main.rs | 498 +++++++++++++++++++++++++--- 3 files changed, 479 insertions(+), 42 deletions(-) diff --git a/feature-extraction-tool/Cargo.toml b/feature-extraction-tool/Cargo.toml index 3081533..982a09b 100644 --- a/feature-extraction-tool/Cargo.toml +++ b/feature-extraction-tool/Cargo.toml @@ -27,6 +27,8 @@ bytes = "1" env_logger = "0.11" chrono = "0.4.34" dashmap = "5.5.3" +pcap = "1.3.0" +pnet = "0.34.0" [[bin]] name = "feature-extraction-tool" diff --git a/feature-extraction-tool/src/args.rs b/feature-extraction-tool/src/args.rs index 94778f9..5fbd492 100644 --- a/feature-extraction-tool/src/args.rs +++ b/feature-extraction-tool/src/args.rs @@ -32,6 +32,27 @@ pub enum Commands { /// The relative path to the dataset path: String, }, + + /// Feature extraction from a pcap file + Pcap { + #[clap(value_enum)] + machine_type: GeneratedMachineType, + + #[clap(value_enum)] + flow_type: FlowType, + + /// The relative path to the pcap file + path: String, + }, +} + +#[derive(clap::ValueEnum, Clone, Debug)] +pub enum GeneratedMachineType { + /// The pcap file was generated on a Windows machine + Windows, + + /// The pcap file was generated on a Linux machine + Linux, } #[derive(clap::ValueEnum, Clone, Debug)] diff --git a/feature-extraction-tool/src/main.rs b/feature-extraction-tool/src/main.rs index 0ed2e78..8e7357c 100644 --- a/feature-extraction-tool/src/main.rs +++ b/feature-extraction-tool/src/main.rs @@ -10,7 +10,7 @@ use crate::{ records::{cic_record::CicRecord, print::Print}, utils::utils::{create_flow_id, get_duration}, }; -use args::{Cli, Commands, Dataset, FlowType}; +use args::{Cli, Commands, Dataset, FlowType, GeneratedMachineType}; use aya::{ include_bytes_aligned, maps::AsyncPerfEventArray, @@ -34,9 +34,19 @@ use std::{ use tokio::time::{self, Duration}; use tokio::{signal, task}; use utils::utils::BasicFeatures; +use pnet::packet::{ + ethernet::{EthernetPacket, EtherTypes}, + ip::IpNextHeaderProtocols, + ipv4::Ipv4Packet, + ipv6::Ipv6Packet, + tcp::TcpPacket, + Packet, +}; #[tokio::main] async fn main() { + env_logger::init(); + let cli = Cli::parse(); match cli.command { @@ -85,6 +95,18 @@ async fn main() { Commands::Dataset { dataset, path } => { handle_dataset(dataset, &path); } + Commands::Pcap { path, machine_type, flow_type } => { + match (machine_type, flow_type) { + (GeneratedMachineType::Windows, FlowType::BasicFlow) => read_pcap_file_ethernet::(&path), + (GeneratedMachineType::Windows, FlowType::CicFlow) => read_pcap_file_ethernet::(&path), + (GeneratedMachineType::Windows, FlowType::CiddsFlow) => read_pcap_file_ethernet::(&path), + (GeneratedMachineType::Windows, FlowType::NfFlow) => read_pcap_file_ethernet::(&path), + (GeneratedMachineType::Linux, FlowType::BasicFlow) => read_pcap_file_linux_cooked::(&path), + (GeneratedMachineType::Linux, FlowType::CicFlow) => read_pcap_file_linux_cooked::(&path), + (GeneratedMachineType::Linux, FlowType::CiddsFlow) => read_pcap_file_linux_cooked::(&path), + (GeneratedMachineType::Linux, FlowType::NfFlow) => read_pcap_file_linux_cooked::(&path), + } + } } } @@ -96,8 +118,6 @@ async fn handle_realtime( where T: Flow + Send + Sync + 'static, { - env_logger::init(); - // Loading the eBPF program for egress, the macros make sure the correct file is loaded #[cfg(debug_assertions)] let mut bpf_egress_ipv4 = Ebpf::load(include_bytes_aligned!( @@ -327,11 +347,192 @@ where info!("Waiting for Ctrl-C..."); signal::ctrl_c().await?; + + for entry in flow_map_ipv4.iter() { + let flow = entry.value(); + println!("{}", flow.dump()); + } + + for entry in flow_map_ipv6.iter() { + let flow = entry.value(); + println!("{}", flow.dump()); + } + info!("Exiting..."); Ok(()) } +fn handle_dataset(dataset: Dataset, path: &str) { + println!( + "Dataset feature extraction for {:?} from path: {}", + dataset, path + ); + + match dataset { + Dataset::CicIds2017 => { + if path.ends_with(".csv") { + let parser = CsvParser; + + match parser.parse::(path) { + Ok(records) => { + for record in records { + match record { + Ok(record) => { + record.print(); + } + Err(err) => { + eprintln!("Error: {:?}", err); + } + } + } + } + Err(err) => { + eprintln!("Error: {:?}", err); + } + } + } else if path.ends_with(".parquet") { + panic!("This file format is not supported yet..."); + } else { + panic!("This file format is not supported..."); + } + } + _ => { + panic!("This is not implemented yet..."); + } + } +} + +fn read_pcap_file_ethernet(path: &str) +where + T: Flow, +{ + let start = Instant::now(); + let mut amount_of_packets = 0; + + let flow_map_ipv4: Arc> = Arc::new(DashMap::new()); + let flow_map_ipv6: Arc> = Arc::new(DashMap::new()); + + let mut cap = match pcap::Capture::from_file(path) { + Ok(c) => c, + Err(e) => { + log::error!("Error opening file: {:?}", e); + return; + } + }; + + while let Ok(packet) = cap.next_packet() { + amount_of_packets += 1; + if let Some(ethernet) = EthernetPacket::new(packet.data){ + match ethernet.get_ethertype() { + EtherTypes::Ipv4 => { + if let Some(ipv4_packet) = Ipv4Packet::new(ethernet.payload()) { + if let Some(features_ipv4) = extract_ipv4_features(&ipv4_packet) { + redirect_packet_ipv4(&features_ipv4, &flow_map_ipv4); + } + } + }, + EtherTypes::Ipv6 => { + if let Some(ipv6_packet) = Ipv6Packet::new(ethernet.payload()) { + if let Some(features_ipv6) = extract_ipv6_features(&ipv6_packet) { + redirect_packet_ipv6(&features_ipv6, &flow_map_ipv6); + } + } + }, + _ => { + log::debug!("Unknown EtherType, consider using Linux cooked capture by setting the machine type to linux"); + } + } + } else { + log::error!("Error parsing packet..."); + } + } + + for entry in flow_map_ipv4.iter() { + let flow = entry.value(); + println!("{}", flow.dump()); + } + + for entry in flow_map_ipv6.iter() { + let flow = entry.value(); + println!("{}", flow.dump()); + } + + let end = Instant::now(); + println!("{} packets were processed in {:?} milliseconds",amount_of_packets, end.duration_since(start).as_millis()); +} + +fn read_pcap_file_linux_cooked(path: &str) +where + T: Flow, +{ + let start = Instant::now(); + let mut amount_of_packets = 0; + + let flow_map_ipv4: Arc> = Arc::new(DashMap::new()); + let flow_map_ipv6: Arc> = Arc::new(DashMap::new()); + + let mut cap = match pcap::Capture::from_file(path) { + Ok(c) => c, + Err(e) => { + log::error!("Error opening file: {:?}", e); + return; + } + }; + + // Define constants for Linux cooked capture EtherTypes + const SLL_IPV4: u16 = 0x0800; + const SLL_IPV6: u16 = 0x86DD; + + while let Ok(packet) = cap.next_packet() { + if packet.data.len() > 14 { + amount_of_packets += 1; + let ethertype = u16::from_be_bytes([packet.data[14], packet.data[15]]); + match ethertype { + SLL_IPV4 => { + if let Some(ipv4_packet) = Ipv4Packet::new(&packet.data[16..]) { + if let Some(features_ipv4) = extract_ipv4_features(&ipv4_packet) { + redirect_packet_ipv4(&features_ipv4, &flow_map_ipv4); + } + } + }, + SLL_IPV6 => { + if let Some(ipv6_packet) = Ipv6Packet::new(&packet.data[16..]) { + if let Some(features_ipv6) = extract_ipv6_features(&ipv6_packet) { + redirect_packet_ipv6(&features_ipv6, &flow_map_ipv6); + } + } + }, + _ => { + log::debug!("Unknown SLL EtherType, consider using Ethernet capture by setting the machine type to windows"); + } + } + } else { + log::error!("Packet too short to be SLL"); + } + } + + for entry in flow_map_ipv4.iter() { + let flow = entry.value(); + println!("{}", flow.dump()); + } + + for entry in flow_map_ipv6.iter() { + let flow = entry.value(); + println!("{}", flow.dump()); + } + + let end = Instant::now(); + println!("{} packets were processed in {:?} milliseconds",amount_of_packets, end.duration_since(start).as_millis()); +} + +/// Processes an ipv4 packet and updates the flow map. +/// +/// # Arguments +/// +/// * `data` - Basic features of the packet. +/// * `flow_map` - Map of flows. +/// * `fwd` - Direction of the packet. fn process_packet_ipv4(data: &BasicFeaturesIpv4, flow_map: &Arc>, fwd: bool) where T: Flow, @@ -403,15 +604,20 @@ where } } +/// Processes an ipv6 packet and updates the flow map. +/// +/// # Arguments +/// +/// * `data` - Basic features of the packet. +/// * `flow_map` - Map of flows. +/// * `fwd` - Direction of the packet. fn process_packet_ipv6(data: &BasicFeaturesIpv6, flow_map: &Arc>, fwd: bool) where T: Flow, { let timestamp = Instant::now(); - let destination = std::net::IpAddr::V6(Ipv6Addr::from(unsafe { - data.ipv6_destination.in6_u.u6_addr8 - })); - let source = std::net::IpAddr::V6(Ipv6Addr::from(unsafe { data.ipv6_source.in6_u.u6_addr8 })); + let destination = std::net::IpAddr::V6(Ipv6Addr::from(data.ipv6_destination)); + let source = std::net::IpAddr::V6(Ipv6Addr::from(data.ipv6_source)); let features = BasicFeatures { fin_flag: data.fin_flag, syn_flag: data.syn_flag, @@ -477,47 +683,255 @@ where } } -fn handle_dataset(dataset: Dataset, path: &str) { - println!( - "Dataset feature extraction for {:?} from path: {}", - dataset, path +/// Redirects an ipv4 packet to the correct flow. +/// +/// # Arguments +/// +/// * `features_ipv4` - Basic features of the packet. +/// * `flow_map` - Map of flows. +fn redirect_packet_ipv4(features_ipv4: &BasicFeaturesIpv4, flow_map: &Arc>) +where + T: Flow, +{ + let fwd_flow_id = create_flow_id( + std::net::IpAddr::V4(Ipv4Addr::from(features_ipv4.ipv4_source)), + features_ipv4.port_source, + std::net::IpAddr::V4(Ipv4Addr::from(features_ipv4.ipv4_destination)), + features_ipv4.port_destination, + features_ipv4.protocol, + ); + let bwd_flow_id = create_flow_id( + std::net::IpAddr::V4(Ipv4Addr::from(features_ipv4.ipv4_destination)), + features_ipv4.port_destination, + std::net::IpAddr::V4(Ipv4Addr::from(features_ipv4.ipv4_source)), + features_ipv4.port_source, + features_ipv4.protocol, ); - match dataset { - Dataset::CicIds2017 => { - if path.ends_with(".csv") { - let parser = CsvParser; + if flow_map.contains_key(&fwd_flow_id) { + process_packet_ipv4(&features_ipv4, &flow_map, true); + } else if flow_map.contains_key(&bwd_flow_id) { + process_packet_ipv4(&features_ipv4, &flow_map, false); + } else { + process_packet_ipv4(&features_ipv4, &flow_map, true); + } +} - match parser.parse::(path) { - Ok(records) => { - for record in records { - match record { - Ok(record) => { - record.print(); - } - Err(err) => { - // TODO: Will we output to stderr, drop the record or use default values? - eprintln!("Error: {:?}", err); - } - } - } - } - Err(err) => { - eprintln!("Error: {:?}", err); - } - } - } else if path.ends_with(".pcap") { - panic!("This file format is not supported yet..."); - } else if path.ends_with(".parquet") { - panic!("This file format is not supported yet..."); - } else { - panic!("This file format is not supported..."); - } +/// Redirects an ipv6 packet to the correct flow. +/// +/// # Arguments +/// +/// * `features_ipv6` - Basic features of the packet. +/// * `flow_map` - Map of flows. +fn redirect_packet_ipv6(features_ipv6: &BasicFeaturesIpv6, flow_map: &Arc>) +where + T: Flow, +{ + let fwd_flow_id = create_flow_id( + std::net::IpAddr::V6(Ipv6Addr::from(features_ipv6.ipv6_source)), + features_ipv6.port_source, + std::net::IpAddr::V6(Ipv6Addr::from(features_ipv6.ipv6_destination)), + features_ipv6.port_destination, + features_ipv6.protocol, + ); + let bwd_flow_id = create_flow_id( + std::net::IpAddr::V6(Ipv6Addr::from(features_ipv6.ipv6_destination)), + features_ipv6.port_destination, + std::net::IpAddr::V6(Ipv6Addr::from(features_ipv6.ipv6_source)), + features_ipv6.port_source, + features_ipv6.protocol, + ); + + if flow_map.contains_key(&fwd_flow_id) { + process_packet_ipv6(&features_ipv6, &flow_map, true); + } else if flow_map.contains_key(&bwd_flow_id) { + process_packet_ipv6(&features_ipv6, &flow_map, false); + } else { + process_packet_ipv6(&features_ipv6, &flow_map, true); + } +} + +/// Extracts the basic features of an ipv4 packet pnet struct. +/// +/// # Arguments +/// +/// * `ipv4_packet` - Ipv4 packet pnet struct. +/// +/// # Returns +/// +/// * `Option` - Basic features of the packet. +fn extract_ipv4_features(ipv4_packet: &Ipv4Packet) -> Option { + let source_ip = ipv4_packet.get_source(); + let destination_ip = ipv4_packet.get_destination(); + let protocol = ipv4_packet.get_next_level_protocol(); + + let source_port: u16; + let destination_port: u16; + + let mut syn_flag: u8 = 0; + let mut fin_flag: u8 = 0; + let mut rst_flag: u8 = 0; + let mut psh_flag: u8 = 0; + let mut ack_flag: u8 = 0; + let mut urg_flag: u8 = 0; + let mut ece_flag: u8 = 0; + let mut cwe_flag: u8 = 0; + + let data_length: u32; + let header_length: u32; + let length: u32; + + let mut window_size: u16 = 0; + + if protocol.0 == IpNextHeaderProtocols::Tcp.0 { + if let Some(tcp_packet) = TcpPacket::new(ipv4_packet.payload()) { + source_port = tcp_packet.get_source(); + destination_port = tcp_packet.get_destination(); + + syn_flag = (tcp_packet.get_flags() & 0b0000_0010 != 0) as u8; + fin_flag = (tcp_packet.get_flags() & 0b0000_0001 != 0) as u8; + rst_flag = (tcp_packet.get_flags() & 0b0000_0100 != 0) as u8; + psh_flag = (tcp_packet.get_flags() & 0b0000_1000 != 0) as u8; + ack_flag = (tcp_packet.get_flags() & 0b0001_0000 != 0) as u8; + urg_flag = (tcp_packet.get_flags() & 0b0010_0000 != 0) as u8; + ece_flag = (tcp_packet.get_flags() & 0b0100_0000 != 0) as u8; + cwe_flag = (tcp_packet.get_flags() & 0b1000_0000 != 0) as u8; + + data_length = tcp_packet.payload().len() as u32; + header_length = (tcp_packet.get_data_offset() * 4) as u32; + length = tcp_packet.packet().len() as u32; + + window_size = tcp_packet.get_window(); + } else { + return None; } - _ => { - panic!("This is not implemented yet..."); + } else if protocol.0 == IpNextHeaderProtocols::Udp.0 { + if let Some(udp_packet) = pnet::packet::udp::UdpPacket::new(ipv4_packet.payload()) { + source_port = udp_packet.get_source(); + destination_port = udp_packet.get_destination(); + + data_length = udp_packet.payload().len() as u32; + header_length = 8; + length = udp_packet.packet().len() as u32; + } else { + return None; + } + + } else { + return None; + } + + Some(BasicFeaturesIpv4 { + ipv4_source: source_ip.into(), + ipv4_destination: destination_ip.into(), + port_source: source_port, + port_destination: destination_port, + protocol: protocol.0, + fin_flag, + syn_flag, + rst_flag, + psh_flag, + ack_flag, + urg_flag, + ece_flag, + cwe_flag, + data_length, + header_length, + length, + window_size, + }) + +} + +/// Extracts the basic features of an ipv6 packet pnet struct. +/// +/// # Arguments +/// +/// * `ipv6_packet` - Ipv6 packet pnet struct. +/// +/// # Returns +/// +/// * `Option` - Basic features of the packet. +fn extract_ipv6_features(ipv6_packet: &Ipv6Packet) -> Option { + let source_ip = ipv6_packet.get_source(); + let destination_ip = ipv6_packet.get_destination(); + let protocol = ipv6_packet.get_next_header(); + + let source_port: u16; + let destination_port: u16; + + let mut syn_flag: u8 = 0; + let mut fin_flag: u8 = 0; + let mut rst_flag: u8 = 0; + let mut psh_flag: u8 = 0; + let mut ack_flag: u8 = 0; + let mut urg_flag: u8 = 0; + let mut ece_flag: u8 = 0; + let mut cwe_flag: u8 = 0; + + let data_length: u32; + let header_length: u32; + let length: u32; + + let mut window_size: u16 = 0; + + if protocol == IpNextHeaderProtocols::Tcp { + if let Some(tcp_packet) = TcpPacket::new(ipv6_packet.payload()) { + source_port = tcp_packet.get_source(); + destination_port = tcp_packet.get_destination(); + + syn_flag = (tcp_packet.get_flags() & 0b0000_0010 != 0) as u8; + fin_flag = (tcp_packet.get_flags() & 0b0000_0001 != 0) as u8; + rst_flag = (tcp_packet.get_flags() & 0b0000_0100 != 0) as u8; + psh_flag = (tcp_packet.get_flags() & 0b0000_1000 != 0) as u8; + ack_flag = (tcp_packet.get_flags() & 0b0001_0000 != 0) as u8; + urg_flag = (tcp_packet.get_flags() & 0b0010_0000 != 0) as u8; + ece_flag = (tcp_packet.get_flags() & 0b0100_0000 != 0) as u8; + cwe_flag = (tcp_packet.get_flags() & 0b1000_0000 != 0) as u8; + + data_length = tcp_packet.payload().len() as u32; + header_length = (tcp_packet.get_data_offset() * 4) as u32; + length = tcp_packet.packet().len() as u32; + + window_size = tcp_packet.get_window(); + } else { + return None; + } + } else if protocol == IpNextHeaderProtocols::Udp { + if let Some(udp_packet) = pnet::packet::udp::UdpPacket::new(ipv6_packet.payload()) { + source_port = udp_packet.get_source(); + destination_port = udp_packet.get_destination(); + + data_length = udp_packet.payload().len() as u32; + header_length = 8; + length = udp_packet.packet().len() as u32; + } else { + return None; } + + } else { + return None; } + + Some(BasicFeaturesIpv6 { + ipv6_source: source_ip.into(), + ipv6_destination: destination_ip.into(), + port_source: source_port, + port_destination: destination_port, + protocol: protocol.0, + fin_flag, + syn_flag, + rst_flag, + psh_flag, + ack_flag, + urg_flag, + ece_flag, + cwe_flag, + data_length, + header_length, + length, + window_size, + }) } #[cfg(test)] From 3e204dc182576c85a64d457aed7ac59c95f7b6f8 Mon Sep 17 00:00:00 2001 From: Matisse Callewaert Date: Tue, 26 Mar 2024 10:49:19 +0100 Subject: [PATCH 03/11] :green_heart: Fix missing pcap install --- .github/workflows/rust.yml | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml index 48517a9..d9c1e53 100644 --- a/.github/workflows/rust.yml +++ b/.github/workflows/rust.yml @@ -24,7 +24,9 @@ jobs: profile: minimal components: rust-src, llvm-tools-preview override: true - + - name: Install libpcap-dev + run: sudo apt install libpcap-dev + - name: Install bpf-linker run: cargo install bpf-linker From 426212ec5761f7e76fcbc4b1d8a5009dd5d3f2a3 Mon Sep 17 00:00:00 2001 From: Matisse Callewaert Date: Tue, 26 Mar 2024 11:20:44 +0100 Subject: [PATCH 04/11] :pencil2: Add extra installation lines --- README.md | 16 +++++++++++++--- 1 file changed, 13 insertions(+), 3 deletions(-) diff --git a/README.md b/README.md index ec3aba4..6c814ec 100644 --- a/README.md +++ b/README.md @@ -2,11 +2,21 @@ This is a feature extraction tool that is capable of exporting multiple kinds of feature and feature sets. The project is written in rust and uses eBPF code to collect the basic network traffic data from the incomming and outgoing packets. The project was made with following goals, it needed to be fast, adaptable and reliable. -![GitHub Actions Workflow Status](https://img.shields.io/github/actions/workflow/status/matissecallewaert/nids-feature-extraction-tool/rust.yml) ![Website](https://img.shields.io/website?url=https%3A%2F%2Fmatissecallewaert.github.io%2Fnids-feature-extraction-tool&label=Documentation) +![GitHub Actions Workflow Status](https://img.shields.io/github/actions/workflow/status/matissecallewaert/nids-feature-extraction-tool/rust.yml?logo=github +) ![Website](https://img.shields.io/website?url=https%3A%2F%2Fmatissecallewaert.github.io%2Fnids-feature-extraction-tool&label=Documentation) ![flows](flows.gif) ## How to install: +### Installing libpcap-dev +#### Debian +```sh +sudo apt install libpcap-dev +``` +#### Fedora +```sh +sudo dnf install libpcap-devel +``` ### Installing rust ```bash @@ -22,13 +32,13 @@ rustup toolchain install nightly --component rust-src ### Installing bpf linker -If you are running a linux x86_64 system the install is simple: +If you are running a Linux x86_64 system the installation is simple: ```bash cargo install bpf-linker ``` -If you are running macos or linux on any other architecture, you need to install the newest stable version of LLVM first: +If you are running MacOs or Linux on any other architecture, you need to install the newest stable version of LLVM first: ```bash brew install llvm From 7cfc08ec90537beea6d82bdacb3efcc35f9822ea Mon Sep 17 00:00:00 2001 From: Matisse Callewaert Date: Tue, 26 Mar 2024 14:30:14 +0100 Subject: [PATCH 05/11] :heavy_plus_sign: Add lazy_static --- feature-extraction-tool/Cargo.toml | 1 + 1 file changed, 1 insertion(+) diff --git a/feature-extraction-tool/Cargo.toml b/feature-extraction-tool/Cargo.toml index 982a09b..5e0753a 100644 --- a/feature-extraction-tool/Cargo.toml +++ b/feature-extraction-tool/Cargo.toml @@ -29,6 +29,7 @@ chrono = "0.4.34" dashmap = "5.5.3" pcap = "1.3.0" pnet = "0.34.0" +lazy_static = "1.4.0" [[bin]] name = "feature-extraction-tool" From 81e60aa2bf7cb50f731d909b21e320cb8bd9329c Mon Sep 17 00:00:00 2001 From: Matisse Callewaert Date: Tue, 26 Mar 2024 14:33:05 +0100 Subject: [PATCH 06/11] :art: Add formatting --- feature-extraction-tool/src/flows/cic_flow.rs | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/feature-extraction-tool/src/flows/cic_flow.rs b/feature-extraction-tool/src/flows/cic_flow.rs index cda294a..2c9cb71 100644 --- a/feature-extraction-tool/src/flows/cic_flow.rs +++ b/feature-extraction-tool/src/flows/cic_flow.rs @@ -1977,7 +1977,8 @@ mod tests { let mut cic_flow = setup_cic_flow(); cic_flow.basic_flow.first_timestamp = chrono::Utc::now(); - cic_flow.basic_flow.last_timestamp = chrono::Utc::now() + chrono::Duration::try_seconds(5).unwrap(); + cic_flow.basic_flow.last_timestamp = + chrono::Utc::now() + chrono::Duration::try_seconds(5).unwrap(); cic_flow.fwd_pkt_len_tot = 100; cic_flow.bwd_pkt_len_tot = 100; @@ -1990,7 +1991,8 @@ mod tests { let mut cic_flow = setup_cic_flow(); cic_flow.basic_flow.first_timestamp = chrono::Utc::now(); - cic_flow.basic_flow.last_timestamp = chrono::Utc::now() + chrono::Duration::try_seconds(5).unwrap(); + cic_flow.basic_flow.last_timestamp = + chrono::Utc::now() + chrono::Duration::try_seconds(5).unwrap(); cic_flow.basic_flow.fwd_packet_count = 5; cic_flow.basic_flow.bwd_packet_count = 5; @@ -2003,7 +2005,8 @@ mod tests { let mut cic_flow = setup_cic_flow(); cic_flow.basic_flow.first_timestamp = chrono::Utc::now(); - cic_flow.basic_flow.last_timestamp = chrono::Utc::now() + chrono::Duration::try_seconds(5).unwrap(); + cic_flow.basic_flow.last_timestamp = + chrono::Utc::now() + chrono::Duration::try_seconds(5).unwrap(); cic_flow.basic_flow.fwd_packet_count = 5; @@ -2015,7 +2018,8 @@ mod tests { let mut cic_flow = setup_cic_flow(); cic_flow.basic_flow.first_timestamp = chrono::Utc::now(); - cic_flow.basic_flow.last_timestamp = chrono::Utc::now() + chrono::Duration::try_seconds(5).unwrap(); + cic_flow.basic_flow.last_timestamp = + chrono::Utc::now() + chrono::Duration::try_seconds(5).unwrap(); cic_flow.basic_flow.bwd_packet_count = 5; From a80a7c195ff76a563205082d45036795bdde1020 Mon Sep 17 00:00:00 2001 From: Matisse Callewaert Date: Tue, 26 Mar 2024 14:33:48 +0100 Subject: [PATCH 07/11] :children_crossing: Update runner to only run sudo when using eBPF --- xtask/src/run.rs | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/xtask/src/run.rs b/xtask/src/run.rs index f396b49..9c98ecb 100644 --- a/xtask/src/run.rs +++ b/xtask/src/run.rs @@ -62,13 +62,14 @@ pub fn run(opts: Options) -> Result<(), anyhow::Error> { let profile = if opts.release { "release" } else { "debug" }; let bin_path = format!("target/{profile}/feature-extraction-tool"); - // arguments to pass to the application - let mut run_args: Vec<_> = opts.run_args.iter().map(String::as_str).collect(); + // Determine if 'realtime' is the first argument in run_args + let use_sudo = opts.run_args.get(0).map(|arg| arg == "realtime").unwrap_or(false); // configure args - let mut args: Vec<_> = opts.runner.trim().split_terminator(' ').collect(); + let runner = if use_sudo { opts.runner.trim().split_terminator(' ').collect() } else { Vec::new() }; + let mut args = runner; args.push(bin_path.as_str()); - args.append(&mut run_args); + args.extend(opts.run_args.iter().map(String::as_str)); // directly extend with run_args // run the command let status = Command::new(args.first().expect("No first argument")) From 1f6b76f4ccd91c1968a01b4fe93d0016038ff705 Mon Sep 17 00:00:00 2001 From: Matisse Callewaert Date: Tue, 26 Mar 2024 14:35:28 +0100 Subject: [PATCH 08/11] :bricks: Add multiple output possibilities --- feature-extraction-tool/src/args.rs | 30 ++- feature-extraction-tool/src/main.rs | 242 ++++++++++++++------ feature-extraction-tool/src/output/csv.rs | 19 ++ feature-extraction-tool/src/output/mod.rs | 5 + feature-extraction-tool/src/output/print.rs | 5 + 5 files changed, 235 insertions(+), 66 deletions(-) create mode 100644 feature-extraction-tool/src/output/csv.rs create mode 100644 feature-extraction-tool/src/output/mod.rs create mode 100644 feature-extraction-tool/src/output/print.rs diff --git a/feature-extraction-tool/src/args.rs b/feature-extraction-tool/src/args.rs index 5fbd492..7a51f81 100644 --- a/feature-extraction-tool/src/args.rs +++ b/feature-extraction-tool/src/args.rs @@ -1,4 +1,4 @@ -use clap::{Parser, Subcommand}; +use clap::{Args, Parser, Subcommand}; #[derive(Debug, Parser)] #[clap(author, version, about)] @@ -20,6 +20,10 @@ pub enum Commands { /// The maximum lifespan of a flow in seconds lifespan: u64, + /// Output method + #[clap(flatten)] + export_method: Output, + /// The print interval for open flows in seconds, needs to be smaller than the flow maximum lifespan interval: Option, }, @@ -43,9 +47,33 @@ pub enum Commands { /// The relative path to the pcap file path: String, + + /// Output method + #[clap(flatten)] + export_method: Output, }, } +#[derive(Args, Debug, Clone)] +pub struct Output { + /// Output method + #[clap(value_enum)] + pub method: ExportMethodType, + + /// File path for output (used if method is File) + #[clap(required_if_eq("method", "Csv"))] + pub export_path: Option, +} + +#[derive(clap::ValueEnum, Clone, Debug)] +pub enum ExportMethodType { + /// The output will be printed to the console + Print, + + /// The output will be written to a CSV file + Csv, +} + #[derive(clap::ValueEnum, Clone, Debug)] pub enum GeneratedMachineType { /// The pcap file was generated on a Windows machine diff --git a/feature-extraction-tool/src/main.rs b/feature-extraction-tool/src/main.rs index 8e7357c..f6ebbe7 100644 --- a/feature-extraction-tool/src/main.rs +++ b/feature-extraction-tool/src/main.rs @@ -1,16 +1,18 @@ mod args; mod flows; +mod output; mod parsers; mod records; mod utils; use crate::{ flows::cic_flow::CicFlow, + output::Export, parsers::csv_parser::CsvParser, records::{cic_record::CicRecord, print::Print}, utils::utils::{create_flow_id, get_duration}, }; -use args::{Cli, Commands, Dataset, FlowType, GeneratedMachineType}; +use args::{Cli, Commands, Dataset, ExportMethodType, FlowType, GeneratedMachineType}; use aya::{ include_bytes_aligned, maps::AsyncPerfEventArray, @@ -25,23 +27,35 @@ use common::{BasicFeaturesIpv4, BasicFeaturesIpv6}; use core::panic; use dashmap::DashMap; use flows::{basic_flow::BasicFlow, cidds_flow::CiddsFlow, flow::Flow, nf_flow::NfFlow}; +use lazy_static::lazy_static; use log::info; -use std::{ - net::{Ipv4Addr, Ipv6Addr}, - sync::Arc, - time::Instant, -}; -use tokio::time::{self, Duration}; -use tokio::{signal, task}; -use utils::utils::BasicFeatures; use pnet::packet::{ - ethernet::{EthernetPacket, EtherTypes}, + ethernet::{EtherTypes, EthernetPacket}, ip::IpNextHeaderProtocols, ipv4::Ipv4Packet, ipv6::Ipv6Packet, tcp::TcpPacket, Packet, }; +use std::{ + fs::{File, OpenOptions}, + io::BufWriter, + ops::{Deref, DerefMut}, +}; +use std::{ + net::{Ipv4Addr, Ipv6Addr}, + sync::{Arc, Mutex}, + time::Instant, +}; +use tokio::time::{self, Duration}; +use tokio::{signal, task}; +use utils::utils::BasicFeatures; + +lazy_static! { + static ref EXPORT_FUNCTION: Arc>> = Arc::new(Mutex::new(None)); + static ref EXPORT_FILE: Arc>>> = Arc::new(Mutex::new(None)); + static ref FLUSH_COUNTER: Arc>> = Arc::new(Mutex::new(Some(0))); +} #[tokio::main] async fn main() { @@ -53,6 +67,7 @@ async fn main() { Commands::Realtime { interface, flow_type, + export_method, lifespan, interval, } => { @@ -62,6 +77,31 @@ async fn main() { } } + match export_method.method { + ExportMethodType::Print => { + let func = output::print::print; + let mut export_func = EXPORT_FUNCTION.lock().unwrap(); + *export_func = Some(func); + } + ExportMethodType::Csv => { + let func = output::csv::export_to_csv; + let mut export_func = EXPORT_FUNCTION.lock().unwrap(); + *export_func = Some(func); + + if let Some(path) = export_method.export_path { + let file = OpenOptions::new() + .create(true) + .append(true) + .open(path) + .unwrap_or_else(|err| { + panic!("Error opening file: {:?}", err); + }); + let mut export_file = EXPORT_FILE.lock().unwrap(); + *export_file = Some(BufWriter::new(file)); + } + } + } + match flow_type { FlowType::BasicFlow => { if let Err(err) = @@ -95,16 +135,62 @@ async fn main() { Commands::Dataset { dataset, path } => { handle_dataset(dataset, &path); } - Commands::Pcap { path, machine_type, flow_type } => { + Commands::Pcap { + path, + machine_type, + flow_type, + export_method, + } => { + match export_method.method { + ExportMethodType::Print => { + let func = output::print::print; + let mut export_func = EXPORT_FUNCTION.lock().unwrap(); + *export_func = Some(func); + } + ExportMethodType::Csv => { + let func = output::csv::export_to_csv; + let mut export_func = EXPORT_FUNCTION.lock().unwrap(); + *export_func = Some(func); + + if let Some(path) = export_method.export_path { + let file = OpenOptions::new() + .create(true) + .append(true) + .open(path) + .unwrap_or_else(|err| { + panic!("Error opening file: {:?}", err); + }); + let mut export_file = EXPORT_FILE.lock().unwrap(); + *export_file = Some(BufWriter::new(file)); + } + } + } + match (machine_type, flow_type) { - (GeneratedMachineType::Windows, FlowType::BasicFlow) => read_pcap_file_ethernet::(&path), - (GeneratedMachineType::Windows, FlowType::CicFlow) => read_pcap_file_ethernet::(&path), - (GeneratedMachineType::Windows, FlowType::CiddsFlow) => read_pcap_file_ethernet::(&path), - (GeneratedMachineType::Windows, FlowType::NfFlow) => read_pcap_file_ethernet::(&path), - (GeneratedMachineType::Linux, FlowType::BasicFlow) => read_pcap_file_linux_cooked::(&path), - (GeneratedMachineType::Linux, FlowType::CicFlow) => read_pcap_file_linux_cooked::(&path), - (GeneratedMachineType::Linux, FlowType::CiddsFlow) => read_pcap_file_linux_cooked::(&path), - (GeneratedMachineType::Linux, FlowType::NfFlow) => read_pcap_file_linux_cooked::(&path), + (GeneratedMachineType::Windows, FlowType::BasicFlow) => { + read_pcap_file_ethernet::(&path) + } + (GeneratedMachineType::Windows, FlowType::CicFlow) => { + read_pcap_file_ethernet::(&path) + } + (GeneratedMachineType::Windows, FlowType::CiddsFlow) => { + read_pcap_file_ethernet::(&path) + } + (GeneratedMachineType::Windows, FlowType::NfFlow) => { + read_pcap_file_ethernet::(&path) + } + (GeneratedMachineType::Linux, FlowType::BasicFlow) => { + read_pcap_file_linux_cooked::(&path) + } + (GeneratedMachineType::Linux, FlowType::CicFlow) => { + read_pcap_file_linux_cooked::(&path) + } + (GeneratedMachineType::Linux, FlowType::CiddsFlow) => { + read_pcap_file_linux_cooked::(&path) + } + (GeneratedMachineType::Linux, FlowType::NfFlow) => { + read_pcap_file_linux_cooked::(&path) + } } } } @@ -212,8 +298,9 @@ where let mut buf_egress_ipv4 = flows_egress_ipv4.open(cpu_id, None)?; let flow_map_clone_egress_ipv4 = flow_map_ipv4.clone(); task::spawn(async move { + // 10 buffers with 10240 bytes each, meaning a capacity of 292 packets per buffer (280 bits per packet) let mut buffers = (0..10) - .map(|_| BytesMut::with_capacity(1024)) + .map(|_| BytesMut::with_capacity(10_240)) .collect::>(); loop { @@ -231,7 +318,7 @@ where let flow_map_clone_ingress_ipv4 = flow_map_ipv4.clone(); task::spawn(async move { let mut buffers = (0..10) - .map(|_| BytesMut::with_capacity(1024)) + .map(|_| BytesMut::with_capacity(10_240)) .collect::>(); loop { @@ -248,8 +335,9 @@ where let mut buf_egress_ipv6 = flows_egress_ipv6.open(cpu_id, None)?; let flow_map_clone_egress_ipv6 = flow_map_ipv6.clone(); task::spawn(async move { + // 10 buffers with 10240 bytes each, meaning a capacity of 173 packets per buffer (472 bits per packet) let mut buffers = (0..10) - .map(|_| BytesMut::with_capacity(1024)) + .map(|_| BytesMut::with_capacity(10_240)) .collect::>(); loop { @@ -267,7 +355,7 @@ where let flow_map_clone_ingress_ipv6 = flow_map_ipv6.clone(); task::spawn(async move { let mut buffers = (0..10) - .map(|_| BytesMut::with_capacity(1024)) + .map(|_| BytesMut::with_capacity(10_240)) .collect::>(); loop { @@ -291,11 +379,11 @@ where interval.tick().await; for entry in flow_map_print_ipv4.iter() { let flow = entry.value(); - println!("{}", flow.dump()); + export(&flow.dump()); } for entry in flow_map_print_ipv6.iter() { let flow = entry.value(); - println!("{}", flow.dump()); + export(&flow.dump()); } } }); @@ -316,7 +404,7 @@ where let end = get_duration(flow.get_first_timestamp(), timestamp) / 1_000_000.0; if end >= lifespan as f64 { - println!("{}", flow.dump()); + export(&flow.dump()); keys_to_remove_ipv4.push(entry.key().clone()); } } @@ -328,7 +416,7 @@ where let end = get_duration(flow.get_first_timestamp(), timestamp) / 1_000_000.0; if end >= lifespan as f64 { - println!("{}", flow.dump()); + export(&flow.dump()); keys_to_remove_ipv6.push(entry.key().clone()); } } @@ -350,12 +438,12 @@ where for entry in flow_map_ipv4.iter() { let flow = entry.value(); - println!("{}", flow.dump()); + export(&flow.dump()); } for entry in flow_map_ipv6.iter() { let flow = entry.value(); - println!("{}", flow.dump()); + export(&flow.dump()); } info!("Exiting..."); @@ -363,6 +451,25 @@ where Ok(()) } +fn export(output: &String) { + let export_func = EXPORT_FUNCTION.lock().unwrap(); + + if let Some(function) = export_func.deref() { + let mut export_file_option = EXPORT_FILE.lock().unwrap(); + let mut flush_counter_option = FLUSH_COUNTER.lock().unwrap(); + + if let Some(ref mut flush_counter) = + flush_counter_option.deref_mut() + { + function(&output, export_file_option.deref_mut(), flush_counter); + } else { + log::error!("No export file set...") + } + } else { + log::error!("No export function set...") + } +} + fn handle_dataset(dataset: Dataset, path: &str) { println!( "Dataset feature extraction for {:?} from path: {}", @@ -403,7 +510,7 @@ fn handle_dataset(dataset: Dataset, path: &str) { } } -fn read_pcap_file_ethernet(path: &str) +fn read_pcap_file_ethernet(path: &str) where T: Flow, { @@ -423,7 +530,7 @@ where while let Ok(packet) = cap.next_packet() { amount_of_packets += 1; - if let Some(ethernet) = EthernetPacket::new(packet.data){ + if let Some(ethernet) = EthernetPacket::new(packet.data) { match ethernet.get_ethertype() { EtherTypes::Ipv4 => { if let Some(ipv4_packet) = Ipv4Packet::new(ethernet.payload()) { @@ -431,14 +538,14 @@ where redirect_packet_ipv4(&features_ipv4, &flow_map_ipv4); } } - }, + } EtherTypes::Ipv6 => { if let Some(ipv6_packet) = Ipv6Packet::new(ethernet.payload()) { if let Some(features_ipv6) = extract_ipv6_features(&ipv6_packet) { redirect_packet_ipv6(&features_ipv6, &flow_map_ipv6); } } - }, + } _ => { log::debug!("Unknown EtherType, consider using Linux cooked capture by setting the machine type to linux"); } @@ -450,19 +557,23 @@ where for entry in flow_map_ipv4.iter() { let flow = entry.value(); - println!("{}", flow.dump()); + export(&flow.dump()); } for entry in flow_map_ipv6.iter() { let flow = entry.value(); - println!("{}", flow.dump()); + export(&flow.dump()); } let end = Instant::now(); - println!("{} packets were processed in {:?} milliseconds",amount_of_packets, end.duration_since(start).as_millis()); + println!( + "{} packets were processed in {:?} milliseconds", + amount_of_packets, + end.duration_since(start).as_millis() + ); } -fn read_pcap_file_linux_cooked(path: &str) +fn read_pcap_file_linux_cooked(path: &str) where T: Flow, { @@ -495,14 +606,14 @@ where redirect_packet_ipv4(&features_ipv4, &flow_map_ipv4); } } - }, + } SLL_IPV6 => { if let Some(ipv6_packet) = Ipv6Packet::new(&packet.data[16..]) { if let Some(features_ipv6) = extract_ipv6_features(&ipv6_packet) { redirect_packet_ipv6(&features_ipv6, &flow_map_ipv6); } } - }, + } _ => { log::debug!("Unknown SLL EtherType, consider using Ethernet capture by setting the machine type to windows"); } @@ -514,22 +625,26 @@ where for entry in flow_map_ipv4.iter() { let flow = entry.value(); - println!("{}", flow.dump()); + export(&flow.dump()); } for entry in flow_map_ipv6.iter() { let flow = entry.value(); - println!("{}", flow.dump()); + export(&flow.dump()); } let end = Instant::now(); - println!("{} packets were processed in {:?} milliseconds",amount_of_packets, end.duration_since(start).as_millis()); + println!( + "{} packets were processed in {:?} milliseconds", + amount_of_packets, + end.duration_since(start).as_millis() + ); } /// Processes an ipv4 packet and updates the flow map. /// /// # Arguments -/// +/// /// * `data` - Basic features of the packet. /// * `flow_map` - Map of flows. /// * `fwd` - Direction of the packet. @@ -598,16 +713,16 @@ where let end = entry.update_flow(&features, ×tamp, fwd); if end.is_some() { - println!("{}", end.unwrap()); + export(&end.unwrap()); drop(entry); flow_map.remove(&flow_id_remove); } } /// Processes an ipv6 packet and updates the flow map. -/// +/// /// # Arguments -/// +/// /// * `data` - Basic features of the packet. /// * `flow_map` - Map of flows. /// * `fwd` - Direction of the packet. @@ -677,16 +792,16 @@ where let end = entry.update_flow(&features, ×tamp, fwd); if end.is_some() { - println!("{}", end.unwrap()); + export(&end.unwrap()); drop(entry); flow_map.remove(&flow_id_remove); } } /// Redirects an ipv4 packet to the correct flow. -/// +/// /// # Arguments -/// +/// /// * `features_ipv4` - Basic features of the packet. /// * `flow_map` - Map of flows. fn redirect_packet_ipv4(features_ipv4: &BasicFeaturesIpv4, flow_map: &Arc>) @@ -718,9 +833,9 @@ where } /// Redirects an ipv6 packet to the correct flow. -/// +/// /// # Arguments -/// +/// /// * `features_ipv6` - Basic features of the packet. /// * `flow_map` - Map of flows. fn redirect_packet_ipv6(features_ipv6: &BasicFeaturesIpv6, flow_map: &Arc>) @@ -752,19 +867,19 @@ where } /// Extracts the basic features of an ipv4 packet pnet struct. -/// +/// /// # Arguments -/// +/// /// * `ipv4_packet` - Ipv4 packet pnet struct. -/// +/// /// # Returns -/// +/// /// * `Option` - Basic features of the packet. fn extract_ipv4_features(ipv4_packet: &Ipv4Packet) -> Option { let source_ip = ipv4_packet.get_source(); let destination_ip = ipv4_packet.get_destination(); let protocol = ipv4_packet.get_next_level_protocol(); - + let source_port: u16; let destination_port: u16; @@ -809,14 +924,13 @@ fn extract_ipv4_features(ipv4_packet: &Ipv4Packet) -> Option if let Some(udp_packet) = pnet::packet::udp::UdpPacket::new(ipv4_packet.payload()) { source_port = udp_packet.get_source(); destination_port = udp_packet.get_destination(); - + data_length = udp_packet.payload().len() as u32; header_length = 8; length = udp_packet.packet().len() as u32; } else { return None; } - } else { return None; } @@ -840,17 +954,16 @@ fn extract_ipv4_features(ipv4_packet: &Ipv4Packet) -> Option length, window_size, }) - } /// Extracts the basic features of an ipv6 packet pnet struct. -/// +/// /// # Arguments -/// +/// /// * `ipv6_packet` - Ipv6 packet pnet struct. -/// +/// /// # Returns -/// +/// /// * `Option` - Basic features of the packet. fn extract_ipv6_features(ipv6_packet: &Ipv6Packet) -> Option { let source_ip = ipv6_packet.get_source(); @@ -901,14 +1014,13 @@ fn extract_ipv6_features(ipv6_packet: &Ipv6Packet) -> Option if let Some(udp_packet) = pnet::packet::udp::UdpPacket::new(ipv6_packet.payload()) { source_port = udp_packet.get_source(); destination_port = udp_packet.get_destination(); - + data_length = udp_packet.payload().len() as u32; header_length = 8; length = udp_packet.packet().len() as u32; } else { return None; } - } else { return None; } diff --git a/feature-extraction-tool/src/output/csv.rs b/feature-extraction-tool/src/output/csv.rs new file mode 100644 index 0000000..77a5ed1 --- /dev/null +++ b/feature-extraction-tool/src/output/csv.rs @@ -0,0 +1,19 @@ +use std::{ + fs::File, + io::{BufWriter, Write}, +}; + +pub fn export_to_csv(output: &str, writer: &mut Option>, flush_counter: &mut u8) { + if let Some(writer) = writer { + writeln!(writer, "{}", output).unwrap(); + + *flush_counter += 1; + + if *flush_counter >= 10 { + let _ = writer.flush(); + *flush_counter = 0; + } + } else { + eprintln!("Error: No writer found for CSV output"); + } +} diff --git a/feature-extraction-tool/src/output/mod.rs b/feature-extraction-tool/src/output/mod.rs new file mode 100644 index 0000000..0cc4e30 --- /dev/null +++ b/feature-extraction-tool/src/output/mod.rs @@ -0,0 +1,5 @@ +use std::{fs::File, io::BufWriter}; + +pub type Export = fn(&str, &mut Option>, &mut u8) -> (); +pub mod csv; +pub mod print; diff --git a/feature-extraction-tool/src/output/print.rs b/feature-extraction-tool/src/output/print.rs new file mode 100644 index 0000000..f5b88e7 --- /dev/null +++ b/feature-extraction-tool/src/output/print.rs @@ -0,0 +1,5 @@ +use std::{fs::File, io::BufWriter}; + +pub fn print(output: &str, _writer: &mut Option>, _flush_counter: &mut u8) { + println!("{}", output); +} From c61c8c9167241030a687fdf124699a15f5a3c44b Mon Sep 17 00:00:00 2001 From: Matisse Callewaert Date: Tue, 26 Mar 2024 14:56:48 +0100 Subject: [PATCH 09/11] :bulb: Add comment to export function and relocate --- feature-extraction-tool/src/main.rs | 43 ++++++++++++++++------------- 1 file changed, 24 insertions(+), 19 deletions(-) diff --git a/feature-extraction-tool/src/main.rs b/feature-extraction-tool/src/main.rs index f6ebbe7..6929580 100644 --- a/feature-extraction-tool/src/main.rs +++ b/feature-extraction-tool/src/main.rs @@ -451,25 +451,6 @@ where Ok(()) } -fn export(output: &String) { - let export_func = EXPORT_FUNCTION.lock().unwrap(); - - if let Some(function) = export_func.deref() { - let mut export_file_option = EXPORT_FILE.lock().unwrap(); - let mut flush_counter_option = FLUSH_COUNTER.lock().unwrap(); - - if let Some(ref mut flush_counter) = - flush_counter_option.deref_mut() - { - function(&output, export_file_option.deref_mut(), flush_counter); - } else { - log::error!("No export file set...") - } - } else { - log::error!("No export function set...") - } -} - fn handle_dataset(dataset: Dataset, path: &str) { println!( "Dataset feature extraction for {:?} from path: {}", @@ -641,6 +622,30 @@ where ); } +/// Export the flow to the set export function. +/// +/// # Arguments +/// +/// * `output` - The output to export. +fn export(output: &String) { + let export_func = EXPORT_FUNCTION.lock().unwrap(); + + if let Some(function) = export_func.deref() { + let mut export_file_option = EXPORT_FILE.lock().unwrap(); + let mut flush_counter_option = FLUSH_COUNTER.lock().unwrap(); + + if let Some(ref mut flush_counter) = + flush_counter_option.deref_mut() + { + function(&output, export_file_option.deref_mut(), flush_counter); + } else { + log::error!("No export file set...") + } + } else { + log::error!("No export function set...") + } +} + /// Processes an ipv4 packet and updates the flow map. /// /// # Arguments From eb1c5c2c2fa2acc480a3ddf5d0217fe81f1a9c68 Mon Sep 17 00:00:00 2001 From: Matisse Callewaert Date: Tue, 26 Mar 2024 15:38:23 +0100 Subject: [PATCH 10/11] :rotating_light: Fix linter warning --- xtask/src/run.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/xtask/src/run.rs b/xtask/src/run.rs index 9c98ecb..32f207a 100644 --- a/xtask/src/run.rs +++ b/xtask/src/run.rs @@ -63,7 +63,7 @@ pub fn run(opts: Options) -> Result<(), anyhow::Error> { let bin_path = format!("target/{profile}/feature-extraction-tool"); // Determine if 'realtime' is the first argument in run_args - let use_sudo = opts.run_args.get(0).map(|arg| arg == "realtime").unwrap_or(false); + let use_sudo = opts.run_args.first().map(|arg| arg == "realtime").unwrap_or(false); // configure args let runner = if use_sudo { opts.runner.trim().split_terminator(' ').collect() } else { Vec::new() }; From d7cb6681ba78ba51f5290ff80977f68058200f7d Mon Sep 17 00:00:00 2001 From: Matisse Callewaert Date: Tue, 26 Mar 2024 15:43:39 +0100 Subject: [PATCH 11/11] :test_tube: Fix failing test by removing probability --- feature-extraction-tool/src/flows/cic_flow.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/feature-extraction-tool/src/flows/cic_flow.rs b/feature-extraction-tool/src/flows/cic_flow.rs index 2c9cb71..c183548 100644 --- a/feature-extraction-tool/src/flows/cic_flow.rs +++ b/feature-extraction-tool/src/flows/cic_flow.rs @@ -1967,7 +1967,7 @@ mod tests { #[test] fn test_get_duration() { let start = chrono::Utc::now(); - let end = chrono::Utc::now() + chrono::Duration::try_seconds(5).unwrap(); + let end = start + chrono::Duration::try_seconds(5).unwrap(); assert_eq!(get_duration(start, end), 5_000_000.0); }