Skip to content

Commit

Permalink
Make parser async & Use with Arc wit Dlt parser & Adjustments on plug…
Browse files Browse the repository at this point in the history
…in hosts

- Changing parser trait to async caused an error in the DLT parser
  because it returns type has a reference in it. Because of that we got
  a compiler error about that the results doesn't implements Send trait
  enough.
- This Error is possible to be a bug in rust that would be fixed in the
  future. See issues:
  - rust-lang/rust#64552
  - rust-lang/rust#96865
- For now I replaced the references with Arcs in the results of
  DLT-Parser

- implements for parser Plugin hosts now awaits on the async call from
  the plugin instead of using `futures::executer::block_on()`. However,
  this change didn't improve the performance of the plugins
  • Loading branch information
AmmarAbouZor committed Jun 12, 2024
1 parent 0e5b24a commit 44b9535
Show file tree
Hide file tree
Showing 14 changed files with 225 additions and 218 deletions.
1 change: 1 addition & 0 deletions application/apps/indexer/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions application/apps/indexer/parsers/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,4 @@ someip-payload = { git = "https://github.com/esrlabs/someip-payload" }

[dev-dependencies]
stringreader = "0.1.1"
tokio = { version = "1.24", features = ["full"] }
19 changes: 10 additions & 9 deletions application/apps/indexer/parsers/src/dlt/fmt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use serde::ser::{Serialize, SerializeStruct, Serializer};
use std::{
fmt::{self, Formatter},
str,
sync::Arc,
};

const DLT_COLUMN_SENTINAL: char = '\u{0004}';
Expand Down Expand Up @@ -194,13 +195,13 @@ impl From<Option<&String>> for FormatOptions {
}

/// A dlt message that can be formatted with optional FIBEX data support
pub struct FormattableMessage<'a> {
pub struct FormattableMessage {
pub message: Message,
pub fibex_metadata: Option<&'a FibexMetadata>,
pub options: Option<&'a FormatOptions>,
pub fibex_metadata: Option<Arc<FibexMetadata>>,
pub options: Option<Arc<FormatOptions>>,
}

impl<'a> Serialize for FormattableMessage<'a> {
impl Serialize for FormattableMessage {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
Expand Down Expand Up @@ -286,7 +287,7 @@ impl<'a> Serialize for FormattableMessage<'a> {
}
}

impl<'a> From<Message> for FormattableMessage<'a> {
impl From<Message> for FormattableMessage {
fn from(message: Message) -> Self {
FormattableMessage {
message,
Expand Down Expand Up @@ -319,7 +320,7 @@ impl<'a> PrintableMessage<'a> {
}
}

impl<'a> FormattableMessage<'a> {
impl FormattableMessage {
pub fn printable_parts<'b>(
&'b self,
ext_h_app_id: &'b str,
Expand Down Expand Up @@ -462,7 +463,7 @@ impl<'a> FormattableMessage<'a> {
}

fn info_from_metadata<'b>(&'b self, id: u32, data: &[u8]) -> Option<NonVerboseInfo<'b>> {
let fibex = self.fibex_metadata?;
let fibex = self.fibex_metadata.as_ref()?;
let md = extract_metadata(fibex, id, self.message.extended_header.as_ref())?;
let msg_type: Option<MessageType> = message_type(&self.message, md.message_info.as_deref());
let app_id = md.application_id.as_deref().or_else(|| {
Expand Down Expand Up @@ -511,7 +512,7 @@ impl<'a> FormattableMessage<'a> {
}
}

impl<'a> fmt::Display for FormattableMessage<'a> {
impl fmt::Display for FormattableMessage {
/// will format dlt Message with those fields:
/// ********* storage-header ********
/// date-time
Expand All @@ -530,7 +531,7 @@ impl<'a> fmt::Display for FormattableMessage<'a> {
/// payload
fn fmt(&self, f: &mut Formatter) -> Result<(), fmt::Error> {
if let Some(h) = &self.message.storage_header {
let tz = self.options.map(|o| o.tz);
let tz = self.options.as_ref().map(|o| o.tz);
match tz {
Some(Some(tz)) => {
write_tz_string(f, &h.timestamp, &tz)?;
Expand Down
36 changes: 18 additions & 18 deletions application/apps/indexer/parsers/src/dlt/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,11 @@ use dlt_core::{
parse::{dlt_consume_msg, dlt_message},
};
use serde::Serialize;
use std::{io::Write, ops::Range};
use std::{io::Write, ops::Range, sync::Arc};

use self::{attachment::FtScanner, fmt::FormatOptions};

impl LogMessage for FormattableMessage<'_> {
impl LogMessage for FormattableMessage {
fn to_writer<W: Write>(&self, writer: &mut W) -> Result<usize, std::io::Error> {
let bytes = self.message.as_bytes();
let len = bytes.len();
Expand Down Expand Up @@ -66,10 +66,10 @@ impl LogMessage for RawMessage {

#[derive(Default)]
//TODO AAZ: This is the parser that should be replaced
pub struct DltParser<'m> {
pub struct DltParser {
pub filter_config: Option<ProcessedDltFilterConfig>,
pub fibex_metadata: Option<&'m FibexMetadata>,
pub fmt_options: Option<&'m FormatOptions>,
pub fibex_metadata: Option<Arc<FibexMetadata>>,
pub fmt_options: Option<Arc<FormatOptions>>,
pub with_storage_header: bool,
ft_scanner: FtScanner,
offset: usize,
Expand Down Expand Up @@ -98,30 +98,30 @@ impl DltRangeParser {
}
}

impl<'m> DltParser<'m> {
impl DltParser {
pub fn new(
filter_config: Option<ProcessedDltFilterConfig>,
fibex_metadata: Option<&'m FibexMetadata>,
fmt_options: Option<&'m FormatOptions>,
fibex_metadata: Option<FibexMetadata>,
fmt_options: Option<FormatOptions>,
with_storage_header: bool,
) -> Self {
Self {
filter_config,
fibex_metadata,
fibex_metadata: fibex_metadata.map(Arc::new),
with_storage_header,
fmt_options,
fmt_options: fmt_options.map(Arc::new),
ft_scanner: FtScanner::new(),
offset: 0,
}
}
}

impl<'m> Parser<FormattableMessage<'m>> for DltParser<'m> {
fn parse<'b>(
impl Parser<FormattableMessage> for DltParser {
async fn parse<'a>(
&mut self,
input: &'b [u8],
input: &'a [u8],
timestamp: Option<u64>,
) -> Result<(&'b [u8], Option<ParseYield<FormattableMessage<'m>>>), Error> {
) -> Result<(&'a [u8], Option<ParseYield<FormattableMessage>>), Error> {
match dlt_message(input, self.filter_config.as_ref(), self.with_storage_header)
.map_err(|e| Error::Parse(format!("{e}")))?
{
Expand All @@ -142,8 +142,8 @@ impl<'m> Parser<FormattableMessage<'m>> for DltParser<'m> {

let msg = FormattableMessage {
message: msg_with_storage_header,
fibex_metadata: self.fibex_metadata,
options: self.fmt_options,
fibex_metadata: self.fibex_metadata.clone(),
options: self.fmt_options.clone(),
};
self.offset += input.len() - rest.len();
Ok((
Expand All @@ -160,7 +160,7 @@ impl<'m> Parser<FormattableMessage<'m>> for DltParser<'m> {
}

impl Parser<RangeMessage> for DltRangeParser {
fn parse<'b>(
async fn parse<'b>(
&mut self,
input: &'b [u8],
_timestamp: Option<u64>,
Expand All @@ -180,7 +180,7 @@ impl Parser<RangeMessage> for DltRangeParser {
}

impl Parser<RawMessage> for DltRawParser {
fn parse<'b>(
async fn parse<'b>(
&mut self,
input: &'b [u8],
_timestamp: Option<u64>,
Expand Down
4 changes: 2 additions & 2 deletions application/apps/indexer/parsers/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ pub enum ParseYield<T> {
MessageAndAttachment((T, Attachment)),
}

impl<T> From<T> for ParseYield<T> {
impl<T: Send + Unpin> From<T> for ParseYield<T> {
fn from(item: T) -> Self {
Self::Message(item)
}
Expand All @@ -44,7 +44,7 @@ pub trait Parser<T> {
&mut self,
input: &'a [u8],
timestamp: Option<u64>,
) -> Result<(&'a [u8], Option<ParseYield<T>>), Error>;
) -> impl std::future::Future<Output = Result<(&'a [u8], Option<ParseYield<T>>), Error>> + Send;
}

#[derive(Debug, Clone, Serialize)]
Expand Down
50 changes: 25 additions & 25 deletions application/apps/indexer/parsers/src/someip.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ unsafe impl Send for SomeipParser {}
unsafe impl Sync for SomeipParser {}

impl Parser<SomeipLogMessage> for SomeipParser {
fn parse<'a>(
async fn parse<'a>(
&mut self,
input: &'a [u8],
timestamp: Option<u64>,
Expand Down Expand Up @@ -397,8 +397,8 @@ mod test {
FibexParser::parse(vec![reader]).expect("parse failed")
}

#[test]
fn parse_cookie_client() {
#[tokio::test]
async fn parse_cookie_client() {
let input: &[u8] = &[
0xFF, 0xFF, 0x00, 0x00, // serviceId(u16), methodId(u16)
0x00, 0x00, 0x00, 0x08, // length(u32)
Expand All @@ -407,7 +407,7 @@ mod test {
];

let mut parser = SomeipParser::new();
let (output, message) = parser.parse(input, None).unwrap();
let (output, message) = parser.parse(input, None).await.unwrap();

assert!(output.is_empty());

Expand All @@ -418,8 +418,8 @@ mod test {
}
}

#[test]
fn parse_cookie_server() {
#[tokio::test]
async fn parse_cookie_server() {
let input: &[u8] = &[
0xFF, 0xFF, 0x80, 0x00, // serviceId(u16), methodId(u16)
0x00, 0x00, 0x00, 0x08, // length(u32)
Expand All @@ -428,7 +428,7 @@ mod test {
];

let mut parser = SomeipParser::new();
let (output, message) = parser.parse(input, None).unwrap();
let (output, message) = parser.parse(input, None).await.unwrap();

assert!(output.is_empty());

Expand All @@ -439,8 +439,8 @@ mod test {
}
}

#[test]
fn parse_empty_rpc_message_no_model() {
#[tokio::test]
async fn parse_empty_rpc_message_no_model() {
let input: &[u8] = &[
0x01, 0x03, 0x80, 0x04, // serviceId(u16), methodId(u16)
0x00, 0x00, 0x00, 0x08, // length(u32)
Expand All @@ -449,7 +449,7 @@ mod test {
];

let mut parser = SomeipParser::new();
let (output, message) = parser.parse(input, None).unwrap();
let (output, message) = parser.parse(input, None).await.unwrap();

assert!(output.is_empty());

Expand All @@ -462,8 +462,8 @@ mod test {
}
}

#[test]
fn parse_empty_rpc_message() {
#[tokio::test]
async fn parse_empty_rpc_message() {
let input: &[u8] = &[
0x01, 0x03, 0x80, 0x04, // serviceId(u16), methodId(u16)
0x00, 0x00, 0x00, 0x08, // length(u32)
Expand All @@ -474,7 +474,7 @@ mod test {
let model = test_model();

let mut parser = SomeipParser { model: Some(model) };
let (output, message) = parser.parse(input, None).unwrap();
let (output, message) = parser.parse(input, None).await.unwrap();

assert!(output.is_empty());

Expand All @@ -487,8 +487,8 @@ mod test {
}
}

#[test]
fn parse_rpc_message_no_model() {
#[tokio::test]
async fn parse_rpc_message_no_model() {
let input: &[u8] = &[
0x01, 0x03, 0x80, 0x05, // serviceId(u16), methodId(u16)
0x00, 0x00, 0x00, 0x0A, // length(u32)
Expand All @@ -498,7 +498,7 @@ mod test {
];

let mut parser = SomeipParser::new();
let (output, message) = parser.parse(input, None).unwrap();
let (output, message) = parser.parse(input, None).await.unwrap();

assert!(output.is_empty());

Expand All @@ -511,8 +511,8 @@ mod test {
}
}

#[test]
fn parse_rpc_message() {
#[tokio::test]
async fn parse_rpc_message() {
let input: &[u8] = &[
0x01, 0x03, 0x80, 0x05, // serviceId(u16), methodId(u16)
0x00, 0x00, 0x00, 0x0A, // length(u32)
Expand All @@ -524,7 +524,7 @@ mod test {
let model = test_model();

let mut parser = SomeipParser { model: Some(model) };
let (output, message) = parser.parse(input, None).unwrap();
let (output, message) = parser.parse(input, None).await.unwrap();

assert!(output.is_empty());

Expand All @@ -537,8 +537,8 @@ mod test {
}
}

#[test]
fn parse_empty_sd_message() {
#[tokio::test]
async fn parse_empty_sd_message() {
let input: &[u8] = &[
0xFF, 0xFF, 0x81, 0x00, // serviceId(u16), methodId(u16)
0x00, 0x00, 0x00, 0x14, // length(u32)
Expand All @@ -550,7 +550,7 @@ mod test {
];

let mut parser = SomeipParser::new();
let (output, message) = parser.parse(input, None).unwrap();
let (output, message) = parser.parse(input, None).await.unwrap();

assert!(output.is_empty());

Expand All @@ -563,8 +563,8 @@ mod test {
}
}

#[test]
fn parse_sd_message() {
#[tokio::test]
async fn parse_sd_message() {
let input: &[u8] = &[
0xFF, 0xFF, 0x81, 0x00, // serviceId(u16), methodId(u16)
0x00, 0x00, 0x00, 0x40, // length(u32)
Expand Down Expand Up @@ -592,7 +592,7 @@ mod test {
];

let mut parser = SomeipParser::new();
let (output, message) = parser.parse(input, None).unwrap();
let (output, message) = parser.parse(input, None).await.unwrap();

assert!(output.is_empty());

Expand Down
Loading

0 comments on commit 44b9535

Please sign in to comment.