Skip to content

Commit

Permalink
pgsql: trigger raw stream reassembly at tx completion
Browse files Browse the repository at this point in the history
Once we are tracking tx progress per-direction for PGSQL, we can trigger
the raw stream reassembly, for detection purposes, as soon as the
transactions are completed in the given direction.

Task OISF#7000

(cherry picked from commit 2b1ad81)
  • Loading branch information
jufajardini committed Sep 23, 2024
1 parent 8e3d7fd commit 8b293a3
Showing 1 changed file with 18 additions and 9 deletions.
27 changes: 18 additions & 9 deletions rust/src/pgsql/pgsql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use super::parser::{self, ConsolidatedDataRowPacket, PgsqlBEMessage, PgsqlFEMess
use crate::applayer::*;
use crate::conf::*;
use crate::core::{AppProto, Flow, Direction, ALPROTO_FAILED, ALPROTO_UNKNOWN, IPPROTO_TCP};
use crate::core::sc_app_layer_parser_trigger_raw_stream_reassembly;
use nom7::{Err, IResult};
use std;
use std::collections::VecDeque;
Expand Down Expand Up @@ -316,7 +317,7 @@ impl PgsqlState {
}
}

fn parse_request(&mut self, input: &[u8]) -> AppLayerResult {
fn parse_request(&mut self, flow: *const Flow, input: &[u8]) -> AppLayerResult {
// We're not interested in empty requests.
if input.is_empty() {
return AppLayerResult::ok();
Expand Down Expand Up @@ -371,6 +372,10 @@ impl PgsqlState {
/* The server won't send any responses to such requests, so transaction should be over */
tx.tx_res_state = PgsqlTxProgress::TxDone;
}
sc_app_layer_parser_trigger_raw_stream_reassembly(
flow,
Direction::ToServer as i32,
);
}
}
} else {
Expand Down Expand Up @@ -486,7 +491,7 @@ impl PgsqlState {
}
}

fn parse_response(&mut self, input: &[u8], flow: *const Flow) -> AppLayerResult {
fn parse_response(&mut self, flow: *const Flow, input: &[u8]) -> AppLayerResult {
// We're not interested in empty responses.
if input.is_empty() {
return AppLayerResult::ok();
Expand Down Expand Up @@ -538,6 +543,10 @@ impl PgsqlState {
if Self::response_is_complete(state) {
tx.tx_req_state = PgsqlTxProgress::TxDone;
tx.tx_res_state = PgsqlTxProgress::TxDone;
sc_app_layer_parser_trigger_raw_stream_reassembly(
flow,
Direction::ToClient as i32,
);
}
}
}
Expand Down Expand Up @@ -692,7 +701,7 @@ pub extern "C" fn SCPgsqlStateTxFree(state: *mut std::os::raw::c_void, tx_id: u6

#[no_mangle]
pub unsafe extern "C" fn SCPgsqlParseRequest(
_flow: *const Flow, state: *mut std::os::raw::c_void, pstate: *mut std::os::raw::c_void,
flow: *const Flow, state: *mut std::os::raw::c_void, pstate: *mut std::os::raw::c_void,
stream_slice: StreamSlice, _data: *const std::os::raw::c_void,
) -> AppLayerResult {
if stream_slice.is_empty() {
Expand All @@ -710,7 +719,7 @@ pub unsafe extern "C" fn SCPgsqlParseRequest(
if stream_slice.is_gap() {
state_safe.on_request_gap(stream_slice.gap_size());
} else if !stream_slice.is_empty() {
return state_safe.parse_request(stream_slice.as_slice());
return state_safe.parse_request(flow, stream_slice.as_slice());
}
AppLayerResult::ok()
}
Expand All @@ -733,7 +742,7 @@ pub unsafe extern "C" fn SCPgsqlParseResponse(
if stream_slice.is_gap() {
state_safe.on_response_gap(stream_slice.gap_size());
} else if !stream_slice.is_empty() {
return state_safe.parse_response(stream_slice.as_slice(), flow);
return state_safe.parse_response(flow, stream_slice.as_slice());
}
AppLayerResult::ok()
}
Expand Down Expand Up @@ -889,7 +898,7 @@ mod test {
let mut state = PgsqlState::new();
// an SSL Request
let buf: &[u8] = &[0x00, 0x00, 0x00, 0x08, 0x04, 0xd2, 0x16, 0x2f];
state.parse_request(buf);
state.parse_request(std::ptr::null_mut(), buf);
let ok_state = PgsqlStateProgress::SSLRequestReceived;

assert_eq!(state.state_progress, ok_state);
Expand All @@ -903,7 +912,7 @@ mod test {
// An SSL Request
let buf: &[u8] = &[0x00, 0x00, 0x00, 0x08, 0x04, 0xd2, 0x16, 0x2f];

let r = state.parse_request(&buf[0..0]);
let r = state.parse_request(std::ptr::null_mut(), &buf[0..0]);
assert_eq!(
r,
AppLayerResult {
Expand All @@ -913,7 +922,7 @@ mod test {
}
);

let r = state.parse_request(&buf[0..1]);
let r = state.parse_request(std::ptr::null_mut(), &buf[0..1]);
assert_eq!(
r,
AppLayerResult {
Expand All @@ -923,7 +932,7 @@ mod test {
}
);

let r = state.parse_request(&buf[0..2]);
let r = state.parse_request(std::ptr::null_mut(), &buf[0..2]);
assert_eq!(
r,
AppLayerResult {
Expand Down

0 comments on commit 8b293a3

Please sign in to comment.