From 96d8063ad37872b2ac4bfff7a0dc98777c2049f1 Mon Sep 17 00:00:00 2001 From: Artur Malchanau Date: Thu, 10 Aug 2023 21:54:45 +0300 Subject: [PATCH] enhancement(elasticsearch sink): Allow empty data_stream fields (#18193) enhancement(elasticsearch sink): Allow empty data_stream fileds Add support for ES datastream with empty namespace or dataset. closes: #17883 Signed-off-by: Artur Malchanau --- src/sinks/elasticsearch/config.rs | 9 +- src/sinks/elasticsearch/tests.rs | 167 ++++++++++++++++++++++++++++-- 2 files changed, 169 insertions(+), 7 deletions(-) diff --git a/src/sinks/elasticsearch/config.rs b/src/sinks/elasticsearch/config.rs index d84153fb2a899c..ca52fdde0899c5 100644 --- a/src/sinks/elasticsearch/config.rs +++ b/src/sinks/elasticsearch/config.rs @@ -460,7 +460,14 @@ impl DataStreamConfig { .or_else(|| self.namespace(log))?; (dtype, dataset, namespace) }; - Some(format!("{}-{}-{}", dtype, dataset, namespace)) + + let name = [dtype, dataset, namespace] + .into_iter() + .filter(|s| !s.is_empty()) + .collect::>() + .join("-"); + + Some(name) } } diff --git a/src/sinks/elasticsearch/tests.rs b/src/sinks/elasticsearch/tests.rs index a9cc8576a68431..f2672228abfe3f 100644 --- a/src/sinks/elasticsearch/tests.rs +++ b/src/sinks/elasticsearch/tests.rs @@ -65,10 +65,25 @@ async fn sets_create_action_when_configured() { assert_eq!(encoded.len(), encoded_size); } -fn data_stream_body() -> BTreeMap { +fn data_stream_body( + dtype: Option, + dataset: Option, + namespace: Option, +) -> BTreeMap { let mut ds = BTreeMap::::new(); - ds.insert("type".into(), Value::from("synthetics")); - ds.insert("dataset".into(), Value::from("testing")); + + if let Some(dtype) = dtype { + ds.insert("type".into(), Value::from(dtype)); + } + + if let Some(dataset) = dataset { + ds.insert("dataset".into(), Value::from(dataset)); + } + + if let Some(namespace) = namespace { + ds.insert("namespace".into(), Value::from(namespace)); + } + ds } @@ -100,7 +115,14 @@ async fn encode_datastream_mode() { .single() .expect("invalid timestamp"), ); - log.insert("data_stream", data_stream_body()); + log.insert( + "data_stream", + data_stream_body( + Some("synthetics".to_string()), + Some("testing".to_string()), + None, + ), + ); let mut encoded = vec![]; let (encoded_size, _json_size) = es @@ -143,7 +165,14 @@ async fn encode_datastream_mode_no_routing() { let es = ElasticsearchCommon::parse_single(&config).await.unwrap(); let mut log = LogEvent::from("hello there"); - log.insert("data_stream", data_stream_body()); + log.insert( + "data_stream", + data_stream_body( + Some("synthetics".to_string()), + Some("testing".to_string()), + None, + ), + ); log.insert( ( lookup::PathPrefix::Event, @@ -287,7 +316,14 @@ async fn encode_datastream_mode_no_sync() { let es = ElasticsearchCommon::parse_single(&config).await.unwrap(); let mut log = LogEvent::from("hello there"); - log.insert("data_stream", data_stream_body()); + log.insert( + "data_stream", + data_stream_body( + Some("synthetics".to_string()), + Some("testing".to_string()), + None, + ), + ); log.insert( ( lookup::PathPrefix::Event, @@ -389,3 +425,122 @@ async fn allows_using_only_fields() { assert_eq!(std::str::from_utf8(&encoded).unwrap(), expected); assert_eq!(encoded.len(), encoded_size); } + +#[tokio::test] +async fn datastream_index_name() { + #[derive(Clone, Debug)] + struct TestCase { + dtype: Option, + namespace: Option, + dataset: Option, + want: String, + } + + let config = ElasticsearchConfig { + bulk: BulkConfig { + index: parse_template("vector"), + ..Default::default() + }, + endpoints: vec![String::from("https://example.com")], + mode: ElasticsearchMode::DataStream, + api_version: ElasticsearchApiVersion::V6, + ..Default::default() + }; + let es = ElasticsearchCommon::parse_single(&config).await.unwrap(); + + let test_cases = [ + TestCase { + dtype: Some("type".to_string()), + dataset: Some("dataset".to_string()), + namespace: Some("namespace".to_string()), + want: "type-dataset-namespace".to_string(), + }, + TestCase { + dtype: Some("type".to_string()), + dataset: Some("".to_string()), + namespace: Some("namespace".to_string()), + want: "type-namespace".to_string(), + }, + TestCase { + dtype: Some("type".to_string()), + dataset: None, + namespace: Some("namespace".to_string()), + want: "type-generic-namespace".to_string(), + }, + TestCase { + dtype: Some("type".to_string()), + dataset: Some("".to_string()), + namespace: Some("".to_string()), + want: "type".to_string(), + }, + TestCase { + dtype: Some("type".to_string()), + dataset: None, + namespace: None, + want: "type-generic-default".to_string(), + }, + TestCase { + dtype: Some("".to_string()), + dataset: Some("".to_string()), + namespace: Some("".to_string()), + want: "".to_string(), + }, + TestCase { + dtype: None, + dataset: None, + namespace: None, + want: "logs-generic-default".to_string(), + }, + TestCase { + dtype: Some("".to_string()), + dataset: Some("dataset".to_string()), + namespace: Some("namespace".to_string()), + want: "dataset-namespace".to_string(), + }, + TestCase { + dtype: None, + dataset: Some("dataset".to_string()), + namespace: Some("namespace".to_string()), + want: "logs-dataset-namespace".to_string(), + }, + TestCase { + dtype: Some("".to_string()), + dataset: Some("".to_string()), + namespace: Some("namespace".to_string()), + want: "namespace".to_string(), + }, + TestCase { + dtype: None, + dataset: None, + namespace: Some("namespace".to_string()), + want: "logs-generic-namespace".to_string(), + }, + TestCase { + dtype: Some("".to_string()), + dataset: Some("dataset".to_string()), + namespace: Some("".to_string()), + want: "dataset".to_string(), + }, + TestCase { + dtype: None, + dataset: Some("dataset".to_string()), + namespace: None, + want: "logs-dataset-default".to_string(), + }, + ]; + + for test_case in test_cases { + let mut log = LogEvent::from("hello there"); + log.insert( + "data_stream", + data_stream_body( + test_case.dtype.clone(), + test_case.dataset.clone(), + test_case.namespace.clone(), + ), + ); + + let processed_event = process_log(log, &es.mode, &None, &config.encoding).unwrap(); + assert_eq!(processed_event.index, test_case.want, "{test_case:?}"); + } +}