From 6389e8866af00e72e36b7e4d91d42da7c908f7a9 Mon Sep 17 00:00:00 2001 From: Kirill Fomichev Date: Fri, 18 Dec 2020 08:11:02 +0300 Subject: [PATCH] enhancement(remap transform): add `parse_aws_alb_log` function (#5489) Signed-off-by: Kirill Fomichev --- .github/CODEOWNERS | 2 + Cargo.lock | 1 + docs/reference/remap/parse_aws_alb_log.cue | 76 ++++++ lib/remap-functions/Cargo.toml | 3 + lib/remap-functions/src/lib.rs | 6 + lib/remap-functions/src/parse_aws_alb_log.rs | 269 +++++++++++++++++++ tests/behavior/transforms/remap.toml | 46 ++++ 7 files changed, 403 insertions(+) create mode 100644 docs/reference/remap/parse_aws_alb_log.cue create mode 100644 lib/remap-functions/src/parse_aws_alb_log.rs diff --git a/.github/CODEOWNERS b/.github/CODEOWNERS index 619dffa254f95..a8f295b70cf1b 100644 --- a/.github/CODEOWNERS +++ b/.github/CODEOWNERS @@ -75,6 +75,7 @@ /docs/reference/remap/log.cue @FungusHumungus /docs/reference/remap/merge.cue @FungusHumungus /docs/reference/remap/parse_grok.cue @FungusHumungus +/docs/reference/remap/parse_aws_alb_log.cue @fanatid /distribution/ @hoverbear @jamtur01 /distribution/docker/ @vector-kubernetes @@ -101,6 +102,7 @@ /lib/remap-functions/src/ipv6_to_ipv4.rs @FungusHumungus /lib/remap-functions/src/log.rs @FungusHumungus /lib/remap-functions/src/merge.rs @FungusHumungus +/src/remap/function/parse_aws_alb_log.rs @fanatid /lib/remap-functions/src/parse_grok.rs @FungusHumungus /proto/ @lukesteensen diff --git a/Cargo.lock b/Cargo.lock index c7a7b478db8bb..62f8c974fcb52 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5086,6 +5086,7 @@ dependencies = [ "hex", "lazy_static", "md-5 0.9.1", + "nom 6.0.1", "regex", "remap-lang", "rust_decimal", diff --git a/docs/reference/remap/parse_aws_alb_log.cue b/docs/reference/remap/parse_aws_alb_log.cue new file mode 100644 index 0000000000000..2ef81f8d2dc68 --- /dev/null +++ b/docs/reference/remap/parse_aws_alb_log.cue @@ -0,0 +1,76 @@ +package metadata + +remap: functions: parse_aws_alb_log: { + arguments: [ + { + name: "value" + description: "Access log of the Application Load Balancer." + required: true + type: ["string"] + }, + ] + return: ["map"] + category: "parse" + description: #""" + Parses a Elastic Load Balancer Access log into it's constituent components. + """# + examples: [ + { + title: "Success" + input: { + log: #"http 2018-11-30T22:23:00.186641Z app/my-loadbalancer/50dc6c495c0c9188 192.168.131.39:2817 - 0.000 0.001 0.000 200 200 34 366 "GET http://www.example.com:80/ HTTP/1.1" "curl/7.46.0" - - arn:aws:elasticloadbalancing:us-east-2:123456789012:targetgroup/my-targets/73e2d6bc24d8a067 "Root=1-58337364-23a8c76965a2ef7629b185e3" "-" "-" 0 2018-11-30T22:22:48.364000Z "forward" "-" "-" "-" "-" "-" "-""# + } + source: #""" + .parsed = parse_aws_alb_log(.log) + """# + output: { + log: #"http 2018-11-30T22:23:00.186641Z app/my-loadbalancer/50dc6c495c0c9188 192.168.131.39:2817 - 0.000 0.001 0.000 200 200 34 366 "GET http://www.example.com:80/ HTTP/1.1" "curl/7.46.0" - - arn:aws:elasticloadbalancing:us-east-2:123456789012:targetgroup/my-targets/73e2d6bc24d8a067 "Root=1-58337364-23a8c76965a2ef7629b185e3" "-" "-" 0 2018-11-30T22:22:48.364000Z "forward" "-" "-" "-" "-" "-" "-""# + parsed: { + "type": "http" + "timestamp": "2018-11-30T22:23:00.186641Z" + "elb": "app/my-loadbalancer/50dc6c495c0c9188" + "client_host": "192.168.131.39:2817" + "target_host": null + "request_processing_time": 0.0 + "target_processing_time": 0.001 + "response_processing_time": 0.0 + "elb_status_code": "200" + "target_status_code": "200" + "received_bytes": 34 + "sent_bytes": 366 + "request_method": "GET" + "request_url": "http://www.example.com:80/" + "request_protocol": "HTTP/1.1" + "user_agent": "curl/7.46.0" + "ssl_cipher": null + "ssl_protocol": null + "target_group_arn": "arn:aws:elasticloadbalancing:us-east-2:123456789012:targetgroup/my-targets/73e2d6bc24d8a067" + "trace_id": "Root=1-58337364-23a8c76965a2ef7629b185e3" + "domain_name": null + "chosen_cert_arn": null + "matched_rule_priority": "0" + "request_creation_time": "2018-11-30T22:22:48.364000Z" + "actions_executed": "forward" + "redirect_url": null + "error_reason": null + "target_port_list": [] + "target_status_code_list": [] + "classification": null + "classification_reason": null + } + } + }, + { + title: "Error" + input: { + log: "I am not a log" + } + source: #""" + .parsed = parse_aws_alb_log(.log) + """# + output: { + error: remap.errors.ParseError + } + }, + ] +} diff --git a/lib/remap-functions/Cargo.toml b/lib/remap-functions/Cargo.toml index 626dbba0c4a7b..cf2d3fcd86e3a 100644 --- a/lib/remap-functions/Cargo.toml +++ b/lib/remap-functions/Cargo.toml @@ -15,6 +15,7 @@ grok = { version = "1", optional = true } hex = { version = "0.4", optional = true } lazy_static = { version = "1", optional = true } md-5 = { version = "0.9", optional = true } +nom = { version = "6.0.1", optional = true } regex = { version = "1", optional = true } rust_decimal = { version = "1", optional = true } serde_json = { version = "1", optional = true } @@ -57,6 +58,7 @@ default = [ "now", "ok", "only_fields", + "parse_aws_alb_log", "parse_duration", "parse_grok", "parse_json", @@ -108,6 +110,7 @@ merge = [] now = [] ok = [] only_fields = [] +parse_aws_alb_log = ["nom"] parse_duration = [] parse_grok = ["grok"] parse_json = ["serde_json"] diff --git a/lib/remap-functions/src/lib.rs b/lib/remap-functions/src/lib.rs index 3be39bba52154..381a91fa33ce9 100644 --- a/lib/remap-functions/src/lib.rs +++ b/lib/remap-functions/src/lib.rs @@ -46,6 +46,8 @@ mod now; mod ok; #[cfg(feature = "only_fields")] mod only_fields; +#[cfg(feature = "parse_aws_alb_log")] +mod parse_aws_alb_log; #[cfg(feature = "parse_duration")] mod parse_duration; #[cfg(feature = "parse_grok")] @@ -147,6 +149,8 @@ pub use now::Now; pub use ok::OK; #[cfg(feature = "only_fields")] pub use only_fields::OnlyFields; +#[cfg(feature = "parse_aws_alb_log")] +pub use parse_aws_alb_log::ParseAwsAlbLog; #[cfg(feature = "parse_duration")] pub use parse_duration::ParseDuration; #[cfg(feature = "parse_grok")] @@ -248,6 +252,8 @@ pub fn all() -> Vec> { Box::new(OK), #[cfg(feature = "only_fields")] Box::new(OnlyFields), + #[cfg(feature = "parse_aws_alb_log")] + Box::new(ParseAwsAlbLog), #[cfg(feature = "parse_duration")] Box::new(ParseDuration), #[cfg(feature = "parse_grok")] diff --git a/lib/remap-functions/src/parse_aws_alb_log.rs b/lib/remap-functions/src/parse_aws_alb_log.rs new file mode 100644 index 0000000000000..b66fbda199165 --- /dev/null +++ b/lib/remap-functions/src/parse_aws_alb_log.rs @@ -0,0 +1,269 @@ +use nom::{ + branch::alt, + bytes::complete::{tag, take_while1}, + character::complete::char, + combinator::map_res, + sequence::{delimited, preceded}, + IResult, +}; +use remap::prelude::*; +use std::collections::BTreeMap; + +#[derive(Clone, Copy, Debug)] +pub struct ParseAwsAlbLog; + +impl Function for ParseAwsAlbLog { + fn identifier(&self) -> &'static str { + "parse_aws_alb_log" + } + + fn parameters(&self) -> &'static [Parameter] { + &[Parameter { + keyword: "value", + accepts: |v| matches!(v, Value::Bytes(_)), + required: true, + }] + } + + fn compile(&self, mut arguments: ArgumentList) -> Result> { + let value = arguments.required("value")?.boxed(); + + Ok(Box::new(ParseAwsAlbLogFn::new(value))) + } +} + +#[derive(Debug, Clone)] +struct ParseAwsAlbLogFn { + value: Box, +} + +impl ParseAwsAlbLogFn { + fn new(value: Box) -> Self { + Self { value } + } +} + +impl Expression for ParseAwsAlbLogFn { + fn execute(&self, state: &mut state::Program, object: &mut dyn Object) -> Result { + let bytes = self.value.execute(state, object)?.try_bytes()?; + + parse_log(&String::from_utf8_lossy(&bytes)) + } + + fn type_def(&self, state: &state::Compiler) -> TypeDef { + self.value + .type_def(state) + .fallible_unless(value::Kind::Bytes) + .into_fallible(true) // Log parsing error + .with_constraint(value::Kind::Map) + } +} + +fn parse_log(mut input: &str) -> Result { + let mut log = BTreeMap::new(); + + macro_rules! get_value { + ($name:expr, $parser:expr) => {{ + let result: IResult<&str, _, (&str, nom::error::ErrorKind)> = $parser(input); + match result { + Ok((rest, value)) => { + input = rest; + value + } + Err(error) => { + return Err(format!("failed to get field `{}`: {}", $name, error).into()) + } + } + }}; + } + macro_rules! field_raw { + ($name:expr, $parser:expr) => { + log.insert( + $name.into(), + match get_value!($name, $parser).into() { + Value::Bytes(bytes) if bytes == &"-" => Value::Null, + value => value, + }, + ) + }; + } + macro_rules! field { + ($name:expr, $($pattern:pat)|+) => { + field_raw!($name, preceded(char(' '), take_while1(|c| matches!(c, $($pattern)|+)))) + }; + } + macro_rules! field_parse { + ($name:expr, $($pattern:pat)|+, $type:ty) => { + field_raw!($name, map_res(preceded(char(' '), take_while1(|c| matches!(c, $($pattern)|+))), |s: &str| s.parse::<$type>())) + }; + } + + field_raw!("type", take_while1(|c| matches!(c, 'a'..='z' | '0'..='9'))); + field!("timestamp", '0'..='9' | '.' | '-' | ':' | 'T' | 'Z'); + field_raw!("elb", take_anything); + field!("client_host", '0'..='9' | '.' | ':' | '-'); + field!("target_host", '0'..='9' | '.' | ':' | '-'); + field_parse!("request_processing_time", '0'..='9' | '.' | '-', f64); + field_parse!("target_processing_time", '0'..='9' | '.' | '-', f64); + field_parse!("response_processing_time", '0'..='9' | '.' | '-', f64); + field!("elb_status_code", '0'..='9' | '-'); + field!("target_status_code", '0'..='9' | '-'); + field_parse!("received_bytes", '0'..='9' | '-', i64); + field_parse!("sent_bytes", '0'..='9' | '-', i64); + let request = get_value!("request", take_quoted1); + let mut iter = request.splitn(2, ' '); + log.insert("request_method".to_owned(), iter.next().unwrap().into()); // split always have at least 1 item + match iter.next() { + Some(value) => { + let mut iter = value.rsplitn(2, ' '); + log.insert("request_protocol".into(), iter.next().unwrap().into()); // same as previous one + match iter.next() { + Some(value) => log.insert("request_url".into(), value.into()), + None => return Err("failed to get field `request_url`".into()), + } + } + None => return Err("failed to get field `request_url`".into()), + }; + field_raw!("user_agent", take_quoted1); + field_raw!("ssl_cipher", take_anything); + field_raw!("ssl_protocol", take_anything); + field_raw!("target_group_arn", take_anything); + field_raw!("trace_id", take_quoted1); + field_raw!("domain_name", take_quoted1); + field_raw!("chosen_cert_arn", take_quoted1); + field!("matched_rule_priority", '0'..='9' | '-'); + field!( + "request_creation_time", + '0'..='9' | '.' | '-' | ':' | 'T' | 'Z' + ); + field_raw!("actions_executed", take_quoted1); + field_raw!("redirect_url", take_quoted1); + field_raw!("error_reason", take_quoted1); + field_raw!( + "target_port_list", + take_list(|c| matches!(c, '0'..='9' | '.' | ':' | '-')) + ); + field_raw!( + "target_status_code_list", + take_list(|c| matches!(c, '0'..='9')) + ); + field_raw!("classification", take_quoted1); + field_raw!("classification_reason", take_quoted1); + + match input.is_empty() { + true => Ok(log.into()), + false => Err(format!(r#"Log should be fully consumed: "{}""#, input).into()), + } +} + +type SResult<'a, O> = IResult<&'a str, O, (&'a str, nom::error::ErrorKind)>; + +fn take_anything(input: &str) -> SResult<&str> { + preceded(char(' '), take_while1(|c| c != ' '))(input) +} + +fn take_quoted1(input: &str) -> SResult { + delimited(tag(" \""), until_quote, char('"'))(input) +} + +fn until_quote(input: &str) -> SResult { + let mut ret = String::new(); + let mut skip_delimiter = false; + for (i, ch) in input.char_indices() { + if ch == '\\' && !skip_delimiter { + skip_delimiter = true; + } else if ch == '"' && !skip_delimiter { + return Ok((&input[i..], ret)); + } else { + ret.push(ch); + skip_delimiter = false; + } + } + Err(nom::Err::Incomplete(nom::Needed::Unknown)) +} + +fn take_list(cond: impl Fn(char) -> bool) -> impl FnOnce(&str) -> SResult> { + move |input: &str| { + alt(( + map_res(tag(r#" "-""#), |_| { + Ok::<_, std::convert::Infallible>(vec![]) + }), + map_res(preceded(char(' '), take_while1(cond)), |v: &str| { + Ok::<_, std::convert::Infallible>(vec![v]) + }), + ))(input) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + remap::test_type_def![ + value_string { + expr: |_| ParseAwsAlbLogFn { value: Literal::from("foo").boxed() }, + def: TypeDef { fallible: true, kind: value::Kind::Map, ..Default::default() }, + } + + value_optional { + expr: |_| ParseAwsAlbLogFn { value: Box::new(Noop) }, + def: TypeDef { fallible: true, kind: value::Kind::Map, ..Default::default() }, + } + ]; + + #[test] + fn parse_aws_alb_log() { + let logs = vec![ + r#"http 2018-07-02T22:23:00.186641Z app/my-loadbalancer/50dc6c495c0c9188 +192.168.131.39:2817 10.0.0.1:80 0.000 0.001 0.000 200 200 34 366 +"GET http://www.example.com:80/ HTTP/1.1" "curl/7.46.0" - - +arn:aws:elasticloadbalancing:us-east-2:123456789012:targetgroup/my-targets/73e2d6bc24d8a067 +"Root=1-58337262-36d228ad5d99923122bbe354" "-" "-" +0 2018-07-02T22:22:48.364000Z "forward" "-" "-" 10.0.0.1:80 200 "-" "-""#, + r#"https 2018-07-02T22:23:00.186641Z app/my-loadbalancer/50dc6c495c0c9188 +192.168.131.39:2817 10.0.0.1:80 0.086 0.048 0.037 200 200 0 57 +"GET https://www.example.com:443/ HTTP/1.1" "curl/7.46.0" ECDHE-RSA-AES128-GCM-SHA256 TLSv1.2 +arn:aws:elasticloadbalancing:us-east-2:123456789012:targetgroup/my-targets/73e2d6bc24d8a067 +"Root=1-58337281-1d84f3d73c47ec4e58577259" "www.example.com" "arn:aws:acm:us-east-2:123456789012:certificate/12345678-1234-1234-1234-123456789012" +1 2018-07-02T22:22:48.364000Z "authenticate,forward" "-" "-" 10.0.0.1:80 200 "-" "-""#, + r#"h2 2018-07-02T22:23:00.186641Z app/my-loadbalancer/50dc6c495c0c9188 +10.0.1.252:48160 10.0.0.66:9000 0.000 0.002 0.000 200 200 5 257 +"GET https://10.0.2.105:773/ HTTP/2.0" "curl/7.46.0" ECDHE-RSA-AES128-GCM-SHA256 TLSv1.2 +arn:aws:elasticloadbalancing:us-east-2:123456789012:targetgroup/my-targets/73e2d6bc24d8a067 +"Root=1-58337327-72bd00b0343d75b906739c42" "-" "-" +1 2018-07-02T22:22:48.364000Z "redirect" "https://example.com:80/" "-" 10.0.0.66:9000 200 "-" "-""#, + r#"ws 2018-07-02T22:23:00.186641Z app/my-loadbalancer/50dc6c495c0c9188 +10.0.0.140:40914 10.0.1.192:8010 0.001 0.003 0.000 101 101 218 587 +"GET http://10.0.0.30:80/ HTTP/1.1" "-" - - +arn:aws:elasticloadbalancing:us-east-2:123456789012:targetgroup/my-targets/73e2d6bc24d8a067 +"Root=1-58337364-23a8c76965a2ef7629b185e3" "-" "-" +1 2018-07-02T22:22:48.364000Z "forward" "-" "-" 10.0.1.192:8010 101 "-" "-""#, + r#"wss 2018-07-02T22:23:00.186641Z app/my-loadbalancer/50dc6c495c0c9188 +10.0.0.140:44244 10.0.0.171:8010 0.000 0.001 0.000 101 101 218 786 +"GET https://10.0.0.30:443/ HTTP/1.1" "-" ECDHE-RSA-AES128-GCM-SHA256 TLSv1.2 +arn:aws:elasticloadbalancing:us-west-2:123456789012:targetgroup/my-targets/73e2d6bc24d8a067 +"Root=1-58337364-23a8c76965a2ef7629b185e3" "-" "-" +1 2018-07-02T22:22:48.364000Z "forward" "-" "-" 10.0.0.171:8010 101 "-" "-""#, + r#"http 2018-11-30T22:23:00.186641Z app/my-loadbalancer/50dc6c495c0c9188 +192.168.131.39:2817 - 0.000 0.001 0.000 200 200 34 366 +"GET http://www.example.com:80/ HTTP/1.1" "curl/7.46.0" - - +arn:aws:elasticloadbalancing:us-east-2:123456789012:targetgroup/my-targets/73e2d6bc24d8a067 +"Root=1-58337364-23a8c76965a2ef7629b185e3" "-" "-" +0 2018-11-30T22:22:48.364000Z "forward" "-" "-" "-" "-" "-" "-""#, + r#"http 2018-11-30T22:23:00.186641Z app/my-loadbalancer/50dc6c495c0c9188 +192.168.131.39:2817 - 0.000 0.001 0.000 502 - 34 366 +"GET http://www.example.com:80/ HTTP/1.1" "curl/7.46.0" - - +arn:aws:elasticloadbalancing:us-east-2:123456789012:targetgroup/my-targets/73e2d6bc24d8a067 +"Root=1-58337364-23a8c76965a2ef7629b185e3" "-" "-" +0 2018-11-30T22:22:48.364000Z "forward" "-" "LambdaInvalidResponse" "-" "-" "-" "-""#, + ]; + let logs = logs + .into_iter() + .map(|s| s.replace('\n', " ")) + .collect::>(); + + for log in logs { + assert!(parse_log(&log).is_ok()) + } + } +} diff --git a/tests/behavior/transforms/remap.toml b/tests/behavior/transforms/remap.toml index a21c20ea72478..7708a3572de45 100644 --- a/tests/behavior/transforms/remap.toml +++ b/tests/behavior/transforms/remap.toml @@ -1253,3 +1253,49 @@ "b.equals" = "nope" "pass.equals" = true "fail.equals" = false + +[transforms.remap_function_parse_aws_alb_log] + inputs = [] + type = "remap" + source = """ + .parts = parse_aws_alb_log(.log) + """ +[[tests]] + name = "remap_function_parse_aws_alb_log" + [tests.input] + insert_at = "remap_function_parse_aws_alb_log" + type = "log" + [tests.input.log_fields] + log = 'http 2018-11-30T22:23:00.186641Z app/my-loadbalancer/50dc6c495c0c9188 192.168.131.39:2817 - 0.000 0.001 0.000 200 200 34 366 "GET http://www.example.com:80/ HTTP/1.1" "curl/7.46.0" - - arn:aws:elasticloadbalancing:us-east-2:123456789012:targetgroup/my-targets/73e2d6bc24d8a067 "Root=1-58337364-23a8c76965a2ef7629b185e3" "-" "-" 0 2018-11-30T22:22:48.364000Z "forward" "-" "-" "-" "-" "-" "-"' + [[tests.outputs]] + extract_from = "remap_function_parse_aws_alb_log" + [[tests.outputs.conditions]] + "parts.type.equals" = "http" + "parts.timestamp.equals" = "2018-11-30T22:23:00.186641Z" + "parts.elb.equals" = "app/my-loadbalancer/50dc6c495c0c9188" + "parts.client_host.equals" = "192.168.131.39:2817" + "parts.target_host.equals" = "" + "parts.request_processing_time.equals" = 0.0 + "parts.target_processing_time.equals" = 0.001 + "parts.response_processing_time.equals" = 0.0 + "parts.elb_status_code.equals" = "200" + "parts.target_status_code.equals" = "200" + "parts.received_bytes.equals" = 34 + "parts.sent_bytes.equals" = 366 + "parts.request_method.equals" = "GET" + "parts.request_url.equals" = "http://www.example.com:80/" + "parts.request_protocol.equals" = "HTTP/1.1" + "parts.user_agent.equals" = "curl/7.46.0" + "parts.ssl_cipher.equals" = "" + "parts.ssl_protocol.equals" = "" + "parts.target_group_arn.equals" = "arn:aws:elasticloadbalancing:us-east-2:123456789012:targetgroup/my-targets/73e2d6bc24d8a067" + "parts.trace_id.equals" = "Root=1-58337364-23a8c76965a2ef7629b185e3" + "parts.domain_name.equals" = "" + "parts.chosen_cert_arn.equals" = "" + "parts.matched_rule_priority.equals" = "0" + "parts.request_creation_time.equals" = "2018-11-30T22:22:48.364000Z" + "parts.actions_executed.equals" = "forward" + "parts.redirect_url.equals" = "" + "parts.error_reason.equals" = "" + "parts.classification.equals" = "" + "parts.classification_reason.equals" = ""